12

Let's say I have a dataframe which looks like this:

+--------------------+--------------------+--------------------------------------------------------------+
|                id  |           Name     |                                                       Payment|
+--------------------+--------------------+--------------------------------------------------------------+
|                1   |           James    |[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|
+--------------------+--------------------+--------------------------------------------------------------+

And the schema is:

root

|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

How can I explode the above JSON array into below:

+--------------------+--------------------+-------------------------------+
|                id  |           Name     |                        Payment|
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":1, "currency":"GBP"} |
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":2, "currency":"USD"} |
+--------------------+--------------------+-------------------------------+

I've been trying to use the explode functionality like the below, but it's not working. It's giving an error about not being able to explode string types, and that it expects either a map or array. This makes sense given the schema denotes it's a string, rather than an array/map, but I'm not sure how to convert this into an appropriate format.

val newDF = dataframe.withColumn("nestedPayment", explode(dataframe.col("Payment")))

Any help is greatly appreciated!

0

4 Answers 4

11

You'll have to parse the JSON string into an array of JSONs, and then use explode on the result (explode expects an array).

To do that (assuming Spark 2.0.*):

  • If you know all Payment values contain a json representing an array with the same size (e.g. 2 in this case), you can hard-code extraction of the first and second elements, wrap them in an array and explode:

    val newDF = dataframe.withColumn("Payment", explode(array(
      get_json_object($"Payment", "$[0]"),
      get_json_object($"Payment", "$[1]")
    )))
    
  • If you can't guarantee all records have a JSON with a 2-element array, but you can guarantee a maximum length of these arrays, you can use this trick to parse elements up to the maximum size and then filter out the resulting nulls:

    val maxJsonParts = 3 // whatever that number is...
    val jsonElements = (0 until maxJsonParts)
                         .map(i => get_json_object($"Payment", s"$$[$i]"))
    
    val newDF = dataframe
      .withColumn("Payment", explode(array(jsonElements: _*)))
      .where(!isnull($"Payment")) 
    
Sign up to request clarification or add additional context in comments.

4 Comments

is there a way to do this with a while loop? Seems like it would be more efficient
The supposed performance improvement achieved by a while loop would be so small it's probably unmeasurable. This being a Spark application, one can assume that runtime is dominated by the actual DataFrame operations and not the driver-side code that builds them. Such "premature optimizations" only make code harder to read.
Hello, if I don't know the max length of my array. How can I do something like: val jsonElements = (0 until arrayLength) .map(i => get_json_object($"Payment", s"$$[$i]")) ?
@TzachZohar how can we calculate the size of the json array using get_json_object(), I tried get_json_object(col("col_name"), "$.length()"), it didn't work and gives null
3
import org.apache.spark.sql.types._

val newDF = dataframe.withColumn("Payment", 
explode(
from_json(
  get_json_object($"Payment", "$."),ArrayType(StringType)
)))

1 Comment

Instead of just downvoting, please leave a comment so I know what's wrong with my answer. This is my first post and it's quite discouraging when all you want to do is help. Thank you.
3

My solution is wrap your json array string into a json string to use from_json function with struct type of array of string

val dataframe = spark.sparkContext.parallelize(Seq(("1", "James", "[ {\"@id\": 1, \"currency\":\"GBP\"},{\"@id\": 2, \"currency\": \"USD\"} ]"))).toDF("id", "Name", "Payment")
val result = dataframe.withColumn("wrapped_json", concat_ws("", lit("{\"array\":"), col("Payment"), lit("}")))
    .withColumn("array_json", from_json(col("wrapped_json"), StructType(Seq(StructField("array", ArrayType(StringType))))))
    .withColumn("result", explode(col("array_json.array")))

Result:

+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|id |Name |Payment                                                       |wrapped_json                                                            |array_json                                                |result                    |
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":1,"currency":"GBP"}|
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":2,"currency":"USD"}|
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+

I am using spark 2.3.2 and Kudakwashe Nyatsanza's solution not work for me, It throw org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(value)' due to data type mismatch: Input schema array<string> must be a struct or an array of structs.

Comments

0

You can define the schema of the Payment json array using ArrayType.

import org.apache.spark.sql.types._

val paymentSchema = ArrayType(StructType(
                  Array(
                        StructField("@id", DataTypes.IntegerType),
                        StructField("currency", DataTypes.StringType)
                  )
))

Then exploding after using from_json with this schema will return the desired result.

val newDF = dataframe.withColumn("Payment", explode(from_json($"Payment", paymentSchema)))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.