1

I have a sparkScala RDD that looks like this :

df.printSchema()

 |-- stock._id: string (nullable = true)
 |-- stock.value: string (nullable = true)

Second column of the RDD is a nested JSON :

[ { ""warehouse"" : ""Type1"" , ""amount"" : ""0.0"" }, { ""warehouse"" : ""Type1"" , ""amount"" : ""25.0"" }]

I need to generate an RDD that will contain the existing two columns but also the columns from the JSON like:

_id, value , warehouse , amount

I've tried to do it using custom functions, but I'm struggling to apply this function to my RDD and getting the needed result

import org.json4s.jackson.JsonMethods._

import org.json4s._

 def extractWarehouses (value: String)  {
    val json = parse(value)
    for {
      JObject(warehouses) <- json
      JField("warehouse", JString(warehouse)) <- warehouses
      JField("amount", JDouble(amount)) <- warehouses
    } yield (warehouse, amount)
  }

1 Answer 1

1

As you said value is a json array which is holding list of json objects, you need to explode it and get individual properties as columns something like below:

import org.apache.spark.sql.functions

val flattenedDF = df.select(functions.column("_id"), functions.explode(df("value")).as("value"))
val result = flattenedDF.select("_id", "value.warehouse", "value.amount")
result.printSchema()
Sign up to request clarification or add additional context in comments.

1 Comment

Unforntunately this doesn't work I got the error org.apache.spark.sql.AnalysisException: cannot resolve 'explode(value)' due to data type mismatch: input to function explode sho uld be array or map type, not StringType;

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.