Problem To Solve
Need to extract json string from delta table and parse it eventually. show function can be used to see the data but it needs to be extracted into map or case class for processing.
The data is inserted from json file to delta table. The column in the table is of type String. In this process only data is stored in the delta table column and not the key values from json array. So select query returns partial data so to say. Is this default behavior of delta table ?
e.g.
The data to store { "name" : "Sample"} What actually gets stored : { "Sample"}
Background
Already referred to following SO questions and followed the instructions there but could not find the solution
- Insertion Step
Data was inserted into delta table from a json file
One of the field is json array and it is supposed to be inserted as is
val jsonData = spark.read
.option("multiLine", true)
.option("mode",
"PERMISSIVE"
).option(
"dropFieldIfAllNull",
false
).json(FILE_PATH)
.createOrReplaceTempView("data")
val data = spark.sql("""SELECT * from Data""")
val inputDataFrame = data
.select(
col("Id"),
col("Version"),
col("StartDate"),
col("EndDate"),
explode(col("Configuration")).alias(
"Config"
)
)
.withColumn("ConfigId", col("Config.Id"))
.
.
.
.toDF
val deltaTable = DeltaTable.forPath(.....)
deltaTable
.as("Config")
.merge(
inputDataFrame.as("input"),
)
.whenMatched
.update(
Map(
"ConfigData" -> col("input.Config"),
)
)
.whenNotMatched
.insert(
Map(
"ConfigData" -> col("input.Config"),
)
)
.execute()
}
- Select Step
After running select query the data shows up without keys and only values from the json string inserted
- Read Step
Method 1
val arrayOffStringSchema = ArrayType(StringType)
var configData = df.select("Config").as[String]
configData.show(false) // shows Config column and data without keys
var desiredData = configData.withColumn("Config",from_json(col("Config"), arrayofStringSchema))
desiredData.show(false) // shows null
Method 2
val json_schema = spark.read
.option("multiLine", true)
.option(
"mode",
"PERMISSIVE"
)
.option(
"dropFieldIfAllNull",
false
).json(df.select("Config").as[String]).schema
var config = configData.withColumn("Config", from_json(col("Config"),json_schema))
config.show(false) // shows Config column and data without keys
config.printSchema() // Shows following
/*
root
|-- Config: struct (nullable = true)
| |-- _corrupt_record: string (nullable = true)
*/
Platform
Scala 2.12.18
Apache Spark 3.5.1
What is missing here ? Is there problem in inserting or there is some other way to achieve this ?
EDIT 1 Apparently when the data is stored the schema is inferred automatically and it is not stored in the delta table. So while fetching data, no idea where does the schema come from. Basically a Map is needed which will enable to fetch fields from the json string stored as column in delta table.
to_jsonandfrom_jsonmethods came handy for me in this situation. The delta table column is of typeStringand the json as read from a file was being stored there. In this process the schema was being striped off. So for reading usingfrom_jsonmethod was tricky as there was no schema. So I corrected the insert mechanism usingto_jsonand not json schema is available hencefrom_jsonmethod gives me dataframe from which required fields could be chosen