5

I have a JSON file that I am reading into Spark dataframe using Scala 2.10 with

val df = sqlContext.read.json("file_path")

JSON looks like below:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Client"},{"name": "Market" }]}, { "id":"20180220","parent": [{"name": "Client"}]},{ "id":"20180221","parent": []}]}

data is an array of struct. Each struct again has parent key. Parent is again an array of struct which can hold 0 or more values.

I need to filter the parent array such that it holds only the structs that have name either "Market" or nothing. My output should look like:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Market" }]}, { "id":"20180220","parent": []},{ "id":"20180221","parent": []}]}

So, basically filter out every struct that has name anything other than "Market" and keep the empty parent array (as a result of the operation, or if it was already empty).

Can somebody help out here?

Thanks

1
  • 2
    What have you tried to do so far? Can you share some code examples of things you tried? Commented Feb 21, 2018 at 15:42

2 Answers 2

5

We need to use explode function to achieve this sort of nested JSON struct and array queries.

scala> val df = spark.read.json("/Users/pavithranrao/Desktop/test.json")

scala> df.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> val oneDF = df.select(col("data"), explode(col("data"))).toDF("data", "element").select(col("data"), col("element.parent"))
scala> oneDF.show
"""
+--------------------+--------------------+
|                data|              parent|
+--------------------+--------------------+
|[[20180218,Wrappe...|          [[Market]]|
|[[20180218,Wrappe...|[[Client], [Market]]|
|[[20180218,Wrappe...|          [[Client]]|
|[[20180218,Wrappe...|                  []|
+--------------------+--------------------+
"""

scala> val twoDF = oneDF.select(col("data"), explode(col("parent"))).toDF("data", "names")
scala> twoDF.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- names: struct (nullable = true)
 |    |-- name: string (nullable = true)

scala> twoDF.show
"""
+--------------------+--------+
|                data|   names|
+--------------------+--------+
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
+--------------------+--------+
"""

scala> import org.apache.spark.sql.functions.length

// Extract names struct that is Empty
scala> twoDF.select(length(col("names.name")) === 0).show
+------------------------+
|(length(names.name) = 0)|
+------------------------+
|                   false|
|                   false|
|                   false|
|                   false|
+------------------------+

// Extract names strcut that doesn't have Market
scala> twoDF.select(!col("names.name").contains("Market")).show()
+----------------------------------+
|(NOT contains(names.name, Market))|
+----------------------------------+
|                             false|
|                              true|
|                             false|
|                              true|
+----------------------------------+

// Combining these two

scala> val ansDF = twoDF.select("data").filter(!col("names.name").contains("Market") || length(col("names.name")) === 0)
scala> ansDF.printSchema

// Schema same as input df
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> ansDF.show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------+
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
+----------------------------------------------------------------------------------------------------------------------------------------------+

The final ansDF has the filtered records that satisfy the condition name does not contain 'Market' or isEmpty.

PS : If I have missed the exact filter scenario, correct from the filter function in the above code

Hope this helps!

Sign up to request clarification or add additional context in comments.

5 Comments

On a side note, if you are using Spark v 2.0+, then using DataSet instead of DataFrames with proper case classes for the nested structure, this can be achieved very easily. DataSet allows us to use RDD operations like filter and we needn't use explode to peak into the struct or array.
Hi Pavithran. this is not solving the purpose.The filter condition was not working because of incorrect de-referencing. I tried twoDF.where(length(col("names.name")) === 0 || !col("names.name").contains("Market")).show which gave the result +--------------------+--------+ | data| names| +--------------------+--------+ |[[20180218,Wrappe...| [Client]| |[[20180218,Wrappe...| [Client]| +--------------------+--------+
Did twoDF.where(length(col("names.name")) === 0 || !col("names.name").contains("Market")) gave the correct result? Please edit the answer if need be.
No, it didn't. It gives back rows with parent name as Client only and filters out nulls/ empty parent.
This can come a little late, but using filter should be the answer. You just need how to extract the "name" containing market from a column. This is what you should do. ``` df.withColumn("filtered_by_market", (c: Column) => c.apply("name") === "Market") ``` and it's as simple as that..
2

Assuming you have a column with an array of structs, in your case parent, what you need to do is to use the function filter. I believe some people already said that. The trick is that the filtering function need to work on a struct.

According to the documentation https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html#apply(extraction:Any):org.apache.spark.sql.Column you can

Extracts a value or values from a complex type. The following types of extraction are supported:

  • Given an Array, an integer ordinal can be used to retrieve a single value.
  • Given a Map, a key of the correct type can be used to retrieve an individual value.
  • Given a Struct, a string fieldName can be used to extract that field.
  • Given an Array of Structs, a string fieldName can be used to extract filed of every struct in that array, and return an Array of fields.

Therefore filtering is as simple as:

df.withColumn("filtered", filter(col("parent"), (c: Column) => c.apply("name") === "Market")

I believe this is the most efficient and clean way.

1 Comment

Do you have a PySpark equivalent?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.