1

I'm attempting to write a Spark Scala DataFrame Column as an array of bytes. I have a DataFrame that consists of two columns. The first column is a string and the second is a Map from Strings to Longs.

For example,

user_id | map
"ac2"   | Map("c2" -> 1, "b3" -> 5)

I want to write the map column as an array of bytes. So far I've attempted to use Jackson with the following UDF:

val writeJackson = udf { x: Map[String, Long] =>
    jacksonWriter.writeValueAsBytes(x)
}

val df2 = df.withColumn("jacksonMap", writeJackson($"map"))

but this fails because of

java.io.NotSerializableException: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer

Is there a way to get this to work with Jackson, and if not is there a different library that will let me write this Spark column as a byte array?

1 Answer 1

0

I am able to convert to ByteArray and get the output with the following code. Using spark 1.6.2.

object DF {

  def main(args: Array[String]): Unit = {

    val mapper: ObjectMapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)

    val df = Seq(
      ("ac2", Map("c2" -> 1, "b3" -> 5))
    ).toDF("id", "map")

    df.show(false)
    //output
    // +---+---------------------+
    // |id |map                  |
    // +---+---------------------+
    // |ac2|Map(c2 -> 1, b3 -> 5)|
    // +---+---------------------+
    val getByteArray = udf((map: Map[String, Int]) => mapper.writeValueAsBytes(map))

    df.withColumn("bytearray", getByteArray($"map")).show(false)

    //output
    // +---+---------------------+----------------------------------------------+
    // |id |map                  |bytearray                                     |
    // +---+---------------------+----------------------------------------------+
    // |ac2|Map(c2 -> 1, b3 -> 5)|[7B 22 63 32 22 3A 31 2C 22 62 33 22 3A 35 7D]|
    // +---+---------------------+----------------------------------------------+
  }
}
Sign up to request clarification or add additional context in comments.

6 Comments

how do u import ObjectMapper ? and what is DefaultScalaModule
Add this to pom.xml <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.5</version> </dependency>
thank you for that. Can i use utilise this UDF to convert a Spark ArrayType Column to ByteArray, instead of Map?
Yes, you can just change the type in UDF with your appropriate type.
val convertToByteArray = udf((map: Array[String]) => mapper.writeValueAsBytes(map)) val arrayDF = Seq( ("x100", Array("p1","p2","p3")) ).toDF("id", "myarray") arrayDF.withColumn("bytearray", convertToByteArray($"myarray")).show(false) This is throwing me an error Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(anonfun$3: (array<string>) => binary)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.