0

This is my input data frame structure

root
|--Name (String)
|--Version (int)
|--Details (array)

Something like this :

"Name":"json",
"Version":1,
"Details":[
"{
    \"Id\":\"123\",
    \"TaxDetails\":[\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"]
}",
"{
    \"Id\":\"234\",
    \"TaxDetails\":[\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"]
}"
]

I want to explode this at level of TaxDetails something like this :

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail1\":\"val1\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail2\":\"val2\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail3\":\"val3\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail4\":\"va4\"}

I have exploded the Details with explode function like this

val explodedDetailDf = inputDf.withColumn("Detail", explode($"Details"))

Now the data type of column 'Detail' is string and when I am trying to do this:

val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))

Above operation is failing with error "AnalysisException due to data type mismatch: input to function explode should be array or map type, not String"

How can I explode a nested json array based on it's name?

4
  • You have to use the from_json() function from org.apache.spark.sql.functions to turn the JSON string column into a structure column first. Commented May 2, 2020 at 21:12
  • 3
    your json is corrupted Commented May 2, 2020 at 21:34
  • hi can you check my answer suites you well ? I have not created manual schema with the existing corrupted json I formatted and used Commented May 4, 2020 at 6:24
  • also I used explode function twice with out using from_json which is common way of doing just have look at it. if needed we can further discuss Commented May 4, 2020 at 6:32

2 Answers 2

4

explode will take values of type map or array. but not string

From your sample json Detail.TaxDetails is of type string not array.

To extract Detail.TaxDetails string type values you have to use

def from_json(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.Column

Note

Your json is corrupted, I have modified your json like below.

scala> val json = """{
     |   "Name": "json",
     |   "Version": 1,
     |   "Details": [
     |     "{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
     |     "{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
     |   ]
     | }"""

json: String =
{
  "Name": "json",
  "Version": 1,
  "Details": [
    "{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
    "{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
  ]
}

Please check the below code how to extract value for Detail.TaxDetails


scala> val df = spark.read.json(Seq(json).toDS)
df: org.apache.spark.sql.DataFrame = [Details: array<string>, Name: string ... 1 more field]

scala> df.printSchema
root
 |-- Details: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Name: string (nullable = true)
 |-- Version: long (nullable = true)

scala> df.withColumn("details",explode($"details").as("details")).show(false) // inside details array has string values.
+----------------------------------------------------------------------+----+-------+
|details                                                               |Name|Version|
+----------------------------------------------------------------------+----+-------+
|{"Id":"123","TaxDetails":[{"TaxDetail1":"val1", "TaxDetail2":"val2"}]}|json|1      |
|{"Id":"234","TaxDetails":[{"TaxDetail3":"val3", "TaxDetail4":"val4"}]}|json|1      |
+----------------------------------------------------------------------+----+-------+

scala> val json = spark.read.json(Seq("""[{"Id": "123","TaxDetails": [{"TaxDetail1": "val1","TaxDetail2": "val2"}]},{"Id": "234","TaxDetails": [{"TaxDetail3": "val3","TaxDetail4": "val4"}]}]""").toDS).schema.json
json: String = {"type":"struct","fields":[{"name":"Id","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetails","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"TaxDetail1","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail2","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail3","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail4","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(json).asInstanceOf[StructType] // Creating schema for inner string
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+


scala>

Updated

Above I have taken json manually & created schema. Please check the below code to get schema from the available data.

scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).printSchema
root
 |-- Id: string (nullable = true)
 |-- TaxDetails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |-- TaxDetail4: string (nullable = true)


scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> val schema = spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+

Time taken: 212 ms

scala>

Sign up to request clarification or add additional context in comments.

4 Comments

Sorry for the corrupted json file. I had created this data from my actual data and posted it without checking. Thnx @Srinivas for answer. You have created json manually while creation of schema. Can you please explain how to get the string from Df directly?
I have taken manually those strings & created dataframe to get schema, Now i have updated code to get schema from the available DF.
@Srinivas with out fromjson function I did it, you can have a look at it. some one downvoted with out mentioning the reason for it.
@RamGhadiyaram, Sure I will check that. Details is an array of string so i have to use from_json function to convert string to object. In your json has Details is an array of object so no need to use from_json there.
0

Since the earlier json you gave was corrupted, I formatted json in this way you can work with your explode 2 times and flatten the dataframe.

Implemented like below...

 package examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object JsonTest extends App {
  Logger.getLogger("org").setLevel(Level.OFF)

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

  import spark.implicits._

  val jsonString =
    """
      |{
      |  "Name": "json",
      |  "Version": "1",
      |  "Details": [
      |    {
      |      "Id": "123",
      |      "TaxDetails": [
      |        {
      |          "TaxDetail1": "val1",
      |          "TaxDetail2": "val2"
      |        }
      |      ]
      |    },
      |    {
      |    "Id":"234",
      |    "TaxDetails":[
      |    {
      |    "TaxDetail3":"val3"
      |    , "TaxDetail4":"val4"
      |    }
      |    ]
      |}
      |  ]
      |}
    """.stripMargin
  val df3 = spark.read.json(Seq(jsonString).toDS)
  df3.printSchema()
  df3.show(false)
  val explodedDetailDf = df3.withColumn("Detail", explode($"Details"))
  // explodedDetailDf.show(false)
  val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))
  explodedTaxDetail.show(false)

  val finaldf = explodedTaxDetail.select($"Name", $"Version"
    , to_json(struct
    (col("TaxDetail.TaxDetail1").as("TaxDetail1"))
    ).as("TaxDetails"))
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail2").as("TaxDetail2"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail3").as("TaxDetail3"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail4").as("TaxDetail4"))
        ).as("TaxDetails"))
    ).filter(!($"TaxDetails" === "{}"))

  finaldf.show(false)
  finaldf.toJSON.show(false)
}

Result :

root
 |-- Details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- TaxDetails: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |    |    |-- TaxDetail4: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Version: string (nullable = true)

+---------------------------------------------------+----+-------+
|Details                                            |Name|Version|
+---------------------------------------------------+----+-------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |
+---------------------------------------------------+----+-------+

+---------------------------------------------------+----+-------+------------------------+---------------+
|Details                                            |Name|Version|Detail                  |TaxDetail      |
+---------------------------------------------------+----+-------+------------------------+---------------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[123, [[val1, val2,,]]] |[val1, val2,,] |
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[234, [[,, val3, val4]]]|[,, val3, val4]|
+---------------------------------------------------+----+-------+------------------------+---------------+

+----+-------+---------------------+
|Name|Version|TaxDetails           |
+----+-------+---------------------+
|json|1      |{"TaxDetail1":"val1"}|
|json|1      |{"TaxDetail2":"val2"}|
|json|1      |{"TaxDetail3":"val3"}|
|json|1      |{"TaxDetail4":"val4"}|
+----+-------+---------------------+

Final output as expected by you:

+----------------------------------------------------------------------+
|value                                                                 |
+----------------------------------------------------------------------+
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail1\":\"val1\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail2\":\"val2\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail3\":\"val3\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail4\":\"val4\"}"}|
+----------------------------------------------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.