1

I want to reformat json structure using spark process, into a structure containing array of objects. My input file contain the lines:

{ "keyvals" : [[1,"a"], [2, "b"]] }, 
{ "keyvals" : [[3,"c"], [4, "d"]] }

and I want my process to output

{ "keyvals": [{"id": 1, "value": "a"}, {"id": 2, "value": "c"}] },
{ "keyvals": [{"id": 3, "value": "c"}, {"id": 4, "value": "d"}] }

What's the best way to do that?

For looking at the example input you can run within scala spark-shell:

var jsonStrings = Seq("""{"keyvals": [[1,"a"], [2, "b"]] }""", """{ "keyvals" : [[3,"c"], [4, "d"]] }""") 
var inputRDD = sc.parallelize(jsonStrings)
var df = spark.sqlContext.read.json(inputRDD)
// reformat goes here ?
df.write.json("myfile.json")

thanks

2
  • Did you try anything? to_json maybe? Please produce a minimal reproducible example. Commented Jun 5, 2018 at 13:25
  • How would to_json transform [[1,"a"], [2, "b"]] => [{"id": 1, "value": "a"}, {"id": 2, "value": "c"}] ? There's need to be transformation on the data structure. Commented Jun 5, 2018 at 13:27

1 Answer 1

1

If you check the schema, you'll see that following structure is actually mapped to array<array<string>>

df.printSchema
// root
//  |-- keyvals: array (nullable = true)
//  |    |-- element: array (containsNull = true)
//  |    |    |-- element: string (containsNull = true)

Unless the number of elements is fixed, you'll need an udf:

import org.apache.spark.sql.functions._   

case class Record(id: Long, value: String)

val parse = udf((xs: Seq[Seq[String]]) => xs.map {
  case Seq(id, value) => Record(id.toLong, value)
})


val result = df.select(parse($"keyvals").alias("keyvals"))

and result can be converted toJSON

result.toJSON.toDF("keyvals").show(false)
// +-------------------------------------------------------+
// |keyvals                                                |
// +-------------------------------------------------------+
// |{"keyvals":[{"id":1,"value":"a"},{"id":2,"value":"b"}]}|
// |{"keyvals":[{"id":3,"value":"c"},{"id":4,"value":"d"}]}|
// +-------------------------------------------------------+

or written using JSON writer (result.write.json).

It is also possible to use strongly typed Dataset:

df.as[Seq[Seq[String]]].map { xs => xs.map {
  case Seq(id, value) => Record(id.toLong, value)
}}.toDF("keyvals")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.