2

I'm reading a Hive table which has two columns, id and jsonString. I can easily transform the jsonString into a Spark Data Structure calling the spark.read.json function, but I have to add the column id as well.

val jsonStr1 = """{"fruits":[{"fruit":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""
val jsonStr2 = """{"fruits":[{"dt":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""
val jsonStr3 = """{"fruits":[{"a":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""


case class Foo(id: Integer, json: String)

val ds = Seq(new Foo(1,jsonStr1), new Foo(2,jsonStr2), new Foo(3,jsonStr3)).toDS
val jsonDF = spark.read.json(ds.select($"json").rdd.map(r => r.getAs[String](0)).toDS)

jsonDF.show()

jsonDF.show
+--------------------+------------------+------------------+--------------------+
|                 bar|              cars|            daniel|              fruits|
+--------------------+------------------+------------------+--------------------+
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|
+--------------------+------------------+------------------+--------------------+

I would like to add the column id from the Hive table, like this:

+--------------------+------------------+------------------+--------------------+---------------
|                 bar|              cars|            daniel|              fruits|  id
+--------------------+------------------+------------------+--------------------+--------------
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|1
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|2
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|3
+--------------------+------------------+------------------+--------------------+

I will not use regular expressions

I created a udf which take this two fields as argument and using a proper JSON library include the desired field(id) and return a new JSON string. It works like a charm but I hope Spark API offers a better way to do it. I'm using Apache Spark 2.3.0.

2 Answers 2

3

I already knew about the from_json function before, but in my case it would be "impossible" to manually infer the schema for each JSON. I was thinking that Spark would have an "idiomatic" interface.

This is my final solution:

ds.select($"id", from_json($"json", jsonDF.schema).alias("_json_path")).select($"_json_path.*", $"id").show

ds.select($"id", from_json($"json", jsonDF.schema).alias("_json_path")).select($"_json_path.*", $"id").show

+--------------------+------------------+------------------+--------------------+---+
|                 bar|              cars|            daniel|              fruits| id|
+--------------------+------------------+------------------+--------------------+---+
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|  1|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|  2|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|  3|
+--------------------+------------------+------------------+--------------------+---+
Sign up to request clarification or add additional context in comments.

Comments

2

One way would be to apply from_json to the JSON strings with the corresponding schema, as shown below:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class Foo(id: Int, json: String)

val df = Seq(Foo(1, jsonStr1), Foo(2, jsonStr2), Foo(3, jsonStr3)).toDF

val schema = StructType(Seq(
  StructField("bar", StructType(Seq(
    StructField("foo", StringType, true)
    )), true),
  StructField("cars", ArrayType(StringType, true), true),
  StructField("daniel", StringType, true),
  StructField("fruits", ArrayType(StructType(Seq(
    StructField("a", StringType, true),
    StructField("dt", StringType, true),
    StructField("fruid", StringType, true),
    StructField("fruit", StringType, true)
  )), true), true)
))

df.
  withColumn("json_col", from_json($"json", schema)).
  select($"id", $"json_col.*").
  show
// +---+--------------------+------------------+------------------+--------------------+
// | id|                 bar|              cars|            daniel|              fruits|
// +---+--------------------+------------------+------------------+--------------------+
// |  1|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[null,null,null,...|
// |  2|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[null,banana,nul...|
// |  3|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,null,nul...|
// +---+--------------------+------------------+------------------+--------------------+

3 Comments

I already knew this solution ,the JSON files are insanely huge, would be insane create the schema manually. I could use from_json if I Spark infer the schema automatic as it does with "spark.read.json". I was thinking in call spadk.read.json infer the schema and give as parameter to from_json. But I'm not sure it will be straightforward as sounds like, besides the override of the serialization.
ds.select($"id",from_json($"json",jsonDF.schema).alias("_json_path")).show
@Mantovani, you could certainly get the schema from your jsonDF, which itself would require additional transformations to generate. For a large dataset with complex JSON schema, perhaps it would be best to create a JSON file with one single row of the JSON data, perform a spark.read.json and apply .schema for its schema.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.