6

I'm looking for a generic solution to extract all the json fields as columns from a JSON string column.

df =  spark.read.load(path)
df.show()

File format of the files in 'path' is parquet

Sample data

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}

Expected output

|id | name    | depts              | sal | address_city | address_state
| 1 | "abc"   | ["dep01", "dep02"] | null| null         | null
| 2 | "xyz"   | ["dep03"]          | 100 | null         | null
| 3 | "pqr"   | ["dep02"]          | null| "SF"         | "CA"

I know that I can extract the columns by creating a StructType with the schema defined and using 'from_json' method.

But this approach requires manual schema definition.

val myStruct = StructType(
  Seq(
    StructField("name", StringType),
    StructField("depts", ArrayType(StringType)),
    StructField("sal", IntegerType)
  ))

var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))

Is there a better way to flatten the JSON column without manually defining the schema? In the example provided, I can see the JSON fields available. But in reality, I can't traverse all the rows to find all the fields.

So I'm looking for a solution to split all the fields to columns without specifying the names or types of the columns.

2
  • if the data is a pure json multi rows then it is possible to implement the schema automatically. Commented Sep 3, 2019 at 23:24
  • It is a pure json column but not all the fields are available in all the rows. Like first row in my example is missing 'sal' field Commented Sep 3, 2019 at 23:44

3 Answers 3

3

If it's a CSV file and only one column is coming as JSON data. You can use following solution.

val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
val ds = rdd.toDS
val jsonDF = spark.read.json(ds)
val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")

This is how final Data Frame look like.

scala> joinDF.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- depts: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- id : double (nullable = true)

Following solution would work if it's a JSON file. for me. inferSchema works perfectly fine.

json File

~/Downloads ▶ cat test.json
{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}

code

scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
+--------------+---+----+----+
|         depts| id|name| sal|
+--------------+---+----+----+
|[dep01, dep02]|  1| abc|null|
|       [dep03]|  2| xyz| 100|
+--------------+---+----+----+
Sign up to request clarification or add additional context in comments.

3 Comments

My data/file format is not JSON. Only one of the column is a JSON string.
@Munesh I have modified the answer. is this what you want?
Thanks Gaurang Shah. Your approach helped me with my solution. This approach didn't work with nested JSON like my example and monotonically_increasing_id didn't work because it is not sequential and hence join returns only first few rows.
1

Assuming json_data is of type map (which you can always convert to map if it's not), you can use getItem:

df = spark.createDataFrame([
    [1, {"name": "abc", "depts": ["dep01", "dep02"]}],
    [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
],
    ['id', 'json_data']
)

df.select(
    df.id, 
    df.json_data.getItem('name').alias('name'), 
    df.json_data.getItem('depts').alias('depts'), 
    df.json_data.getItem('sal').alias('sal')
).show()

+---+----+--------------+----+
| id|name|         depts| sal|
+---+----+--------------+----+
|  1| abc|[dep01, dep02]|null|
|  2| xyz|       [dep03]| 100|
+---+----+--------------+----+

A more dynamic way to extract columns:

cols = ['name', 'depts', 'sal']
df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()

1 Comment

I can't specify the column names in the variable 'cols' as I don't know all the fields available in the JSON
1

Based on @Gaurang Shah's answer, I have implemented a solution to handle nested JSON structure and fixed the issues with using monotonically_increasing_id(Non-sequential)

In this approach, 'populateColumnName' function recursively checks for StructType column and populate the column name.

'renameColumns' function renames the columns by replacing '.' with '_' to identify the nested json fields.

'addIndex' function adds index to the dataframe to join the dataframe after parsing the JSON column.

def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {

    val indexCol = "internal_temp_id"

    def populateColumnName(col : StructField) : Array[String] = {
        col.dataType match {
          case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
          case rest         => Array(col.name)
        }
    }

    def renameColumns(name : String) : String = {
        if(name contains ".") {
            name + " as " + name.replaceAll("\\.", "_")
        }
        else name
    }

    def addIndex(df : DataFrame) : DataFrame = {

        // Append "rowid" column of type Long
        val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))

        // Zip on RDD level
        val rddWithId = df.rdd.zipWithIndex
        // Convert back to DataFrame
        spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    }

    val dfWithID = addIndex(df)

    val jsonDF = df.select(columnName)

    val ds = jsonDF.rdd.map(_.getString(0)).toDS
    val parseDF = spark.read.option("inferSchema",true).json(ds)

    val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)

    var resultDF = parseDF.selectExpr(columnNames:_*)

    val jsonDFWithID = addIndex(resultDF)

    val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)

    joinDF
}

val res = flattenJSON(jsonDF, "address")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.