Spark Streaming:
I am receiving a dataframe that consists of two columns. The first column is of string type that contains a json string and the second column consists of schema for each value(first column).
Batch: 0
-------------------------------------------
+--------------------+--------------------+
| value| schema|
+--------------------+--------------------+
|{"event_time_abc...|`event_time_abc...|
+--------------------+--------------------+
The table is stored in the val input(non mutable variable). I am using DataType.fromDDL function to convert the string type to a json DataFrame in the following way:
val out= input.select(from_json(col("value").cast("string"),ddl(col("schema"))))
where ddl is a predefined function,DataType.from_ddl(_:String):DataType in spark(scala) but i have registered it so that i can use it on whole column instead of a string only. I have done it in following way:
val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)
and here is the final transformation on both column, value and schema of input table.
val out = input.select(from_json(col("value").cast("string"),ddl(col("schema"))))
However, i get exception from the registration at this line:
val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)
The error is:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported
If i use:
val out = input.select(from_json(col("value").cast("string"),DataType.fromDDL("`" + "event_time_human2"+"`" +" STRING")).alias("value"))
then it works but as you see i am only using a string(manually typed coming from the schema column) inside the function DataType.fromDDL(_:String):DataType.
So how can i apply this function to whole column without registration or is there any other way to register the function?
EDIT: from_json function's first argument requires a column while second argument requires a schema and not a column. Hence, i guess a manual approach is required to parse each value field with each schema field. After some investigation i found out that DataFrames do not support DataType.
Since a bounty has been set on this question. I would like to provide additional information regarding the data and schema. The schema is defined in DDL(string type) and it can be parsed with from_DDL function. The value is simple json string that will be parsed with the schema that we derive using from_DDL function.
The basic idea is that each value has it's own schema and needs to be parsed with corresponding schema. A new column should be created where the result will be store.
Data: Here is one example of the data:
value = {"event_time_human2":"09:45:00 +0200 09/27/2021"}
schema = "`event_time_human2` STRING"
It is not needed to convert to correct time format. Just a string will be fine.
It is in streaming context. So ,not all approaches work.
col("schema")is literally just a named Column object, not the selected string content of that one cellinput?from_jsonon that?