14

I have to map a list of columns to another column in a Spark dataset: think something like this

val translationMap: Map[Column, Column] = Map(
  lit("foo") -> lit("bar"),
  lit("baz") -> lit("bab")
)

And I have a dataframe like this one:

val df = Seq("foo", "baz").toDF("mov")

So I intend to perform the translation like this:

df.select(
  col("mov"),
  translationMap(col("mov"))
)

but this piece of code spits the following error

key not found: movs
java.util.NoSuchElementException: key not found: movs

Is there a way to perform such translation without concatenating hundreds of whens? think that translationMap could have lots of pairs key-value.

2 Answers 2

20

Instead of Map[Column, Column] you should use a Column containing a map literal:

import org.apache.spark.sql.functions.typedLit

val translationMap: Column = typedLit(Map(
  "foo" -> "bar",
  "baz" -> "bab"
))

The rest of your code can stay as-is:

df.select(
  col("mov"),
  translationMap(col("mov"))
).show
+---+---------------------------------------+
|mov|keys: [foo,baz], values: [bar,bab][mov]|
+---+---------------------------------------+
|foo|                                    bar|
|baz|                                    bab|
+---+---------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

By the way, how does this handle the case of an unknown key? for example, if I feed a column with a value different to foo or baz into translationMap? Ideally I'd like to return the unknown value unmodified
@mrbolichi, coalesce(translationMap(col("mov")), col("mov")) would handle non-matching keys the way you want.
2

You can not refer a Scala collection declared on the driver like this inside a distributed dataframe. An alternative would be to use a UDF which will not be performance efficient if you have a large dataset since UDFs are not optimized by Spark.

val translationMap = Map( "foo" -> "bar" , "baz" -> "bab" )
val getTranslationValue = udf ((x: String)=>translationMap.getOrElse(x,null.asInstanceOf[String]) )
df.select(col("mov"), getTranslationValue($"mov").as("value")  ).show

//+---+-----+
//|mov|value|
//+---+-----+
//|foo|  bar|
//|baz|  bab|
//+---+-----+

Another solution would be to load the Map as a DataSet[(String, String)] and the join the two datasets taking mov as the key.

2 Comments

If possible, I'd rather avoid udfs because of the performance concerns you have mentioned. Also, this will be a very common operation so I'd like to avoid joins, since it would be convenient to have a function that will return a column to insert it into my selects
@philantrovert I am using the same but on cluster translationMap is coming as null , how to handle it ? is it possible to send translationMap into UDF as parameter ? stackoverflow.com/questions/63935600/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.