2

Spark Streaming:

I am receiving a dataframe that consists of two columns. The first column is of string type that contains a json string and the second column consists of schema for each value(first column).

Batch: 0
-------------------------------------------
+--------------------+--------------------+
|               value|              schema|
+--------------------+--------------------+
|{"event_time_abc...|`event_time_abc...|
+--------------------+--------------------+

The table is stored in the val input(non mutable variable). I am using DataType.fromDDL function to convert the string type to a json DataFrame in the following way:

val out=  input.select(from_json(col("value").cast("string"),ddl(col("schema"))))

where ddl is a predefined function,DataType.from_ddl(_:String):DataType in spark(scala) but i have registered it so that i can use it on whole column instead of a string only. I have done it in following way:

val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)

and here is the final transformation on both column, value and schema of input table.

val out =  input.select(from_json(col("value").cast("string"),ddl(col("schema"))))

However, i get exception from the registration at this line:

val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)

The error is:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported
 

If i use:

val out =  input.select(from_json(col("value").cast("string"),DataType.fromDDL("`" + "event_time_human2"+"`" +" STRING")).alias("value"))

then it works but as you see i am only using a string(manually typed coming from the schema column) inside the function DataType.fromDDL(_:String):DataType.

So how can i apply this function to whole column without registration or is there any other way to register the function?

EDIT: from_json function's first argument requires a column while second argument requires a schema and not a column. Hence, i guess a manual approach is required to parse each value field with each schema field. After some investigation i found out that DataFrames do not support DataType.


Since a bounty has been set on this question. I would like to provide additional information regarding the data and schema. The schema is defined in DDL(string type) and it can be parsed with from_DDL function. The value is simple json string that will be parsed with the schema that we derive using from_DDL function.

The basic idea is that each value has it's own schema and needs to be parsed with corresponding schema. A new column should be created where the result will be store.

Data: Here is one example of the data:

value = {"event_time_human2":"09:45:00 +0200 09/27/2021"}

schema = "`event_time_human2` STRING"

It is not needed to convert to correct time format. Just a string will be fine.

It is in streaming context. So ,not all approaches work.

5
  • col("schema") is literally just a named Column object, not the selected string content of that one cell Commented Oct 1, 2021 at 22:27
  • @OneCricketeer ddl is registered by me and it takes a string but after registration spark passes it a column and not string. Commented Oct 1, 2021 at 22:29
  • "Take a string", okay, but you are giving it a column object which type is undefined. What is the schema of input? Commented Oct 1, 2021 at 22:33
  • Maybe try defining a UDF that accepts two parameters that takes both columns and runs from_json on that? Commented Oct 1, 2021 at 22:34
  • @OneCricketeer i tried that when i pass both columns then the output of from_json function is a column which is again undefined. So i can't pass back the results. Commented Oct 1, 2021 at 22:34

1 Answer 1

2

Schemas are being applied and validated before runtime, that is, before the Spark code is executed on the executors. Parsed schemas must be part of the execution plan therefore schema parsing can't be executed dynamically as you intended until now. This is the reason that you see the exception: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported only for the UDF. Consequently that implies that DataType.fromDDL should be used only inside the driver code and not in the runtime/executor code, which is the code within your UDF function. Inside the UDF function Spark has already executed the transformation of the imported data applying the schemas that you specified on the driver side. This is the reason that you can't use DataType.fromDDL directly in your UDF because it is essentially useless. All the above means that inside the UDF functions we can only use primitive Scala/Java types and some wrappers provided by the Spark API i.e WrappedArray.

An alternative could be to collect all the schemas on the driver. Then create a map with the pair (schema, dataframe) for each schema.

Keep in mind that collecting data to the driver is an expensive operation and it would make sense only if you have a reasonable number of unique schemas, i.e max some thousands. Also, applying these schemas to each dataset need to be done sequentially in the driver, which is quite expensive too, therefore it is important to realize that the suggested solution will only work efficiently if you have a limited amount of unique schemas.

Up to this point, your code could look as next:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.StructType

import spark.implicits._

val df = Seq(
  ("""{"event_time_human2":"09:45:00 +0200 09/27/2021", "name":"Pinelopi"}""", "`event_time_human2` STRING, name STRING"),
  ("""{"first_name":"Pin", "last_name":"Halen"}""", "first_name STRING, last_name STRING"),
  ("""{"code":993, "full_name":"Sofia Loren"}""", "code INT, full_name STRING")
).toDF("value", "schema")

val schemaDf = df.select("schema").distinct()

val dfBySchema = schemaDf.collect().map{ row =>
  val schemaValue = row.getString(0)
  val ddl = StructType.fromDDL(schemaValue)
  val filteredDf = df.where($"schema" === schemaValue)
                     .withColumn("value", from_json($"value", ddl))
  
  (schemaValue, filteredDf)
}.toMap

// Map(
//   `event_time_human2` STRING, name STRING -> [value: struct<event_time_human2: string, name: string>, schema: string], 
//   first_name STRING, last_name STRING -> [value: struct<first_name: string, last_name: string>, schema: string], 
//   code INT, full_name STRING -> [value: struct<code: int, full_name: string>, schema: string]
// )

Explanation: first we gather each unique schema with schemaDf.collect(). Then we iterate through schemas and filter the initial df based on the current schema. We also use from_json to convert current string value column to the specific schema.

Note that we can't have one common column with different data type, this is the reason that we are creating a different df for each schema and not one final df.

Sign up to request clarification or add additional context in comments.

11 Comments

Thanks for your answer. I really appreciate it. I will check in streaming context if it works then i will accept your answer. As far as i know each time a new micro batch comes in(streams) then we collect again distinct schemas. Is it possible to just collect all distinct schemas once(from first batch) and keep them in memory. Each subsequent batch will be tested with already existing schemas and if there is an exception then we can extract it's corresponding schema and parse it?
I just wanted to let you know that your code does not work in streaming mode. I get following error ``Queries with streaming sources must be executed with writeStream.start()`
for streaming you will have to restart your program in order to collect all your schemas on the driver and then reload all the collected schemas. I am not aware of any alternative here unfortunately. Also, please update your question with this specific information because it is an important detail
It is not important but if any solution works for streams then it will be fine.
@KhanSaab for streaming you will need to follow a completely different approach. As a matter of fact, you should not use Spark DDL schema representation at all. One solution would be to store the schema in a different format i.e Avro, and then parse the validate the json value in a UDF function. Again, this constitutes a completely different problem than the one described in the above question. I believe it is a better idea to write a new question in which you should explain what precisely you are trying to achieve
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.