I'm a spark newbie
I'm trying to read kafka topic using Spark Stream.
The 'value' field of the data streamed from Kafka is a json string. and I want to convert this 'value' field to a dataframe and change it to a parquet file.
I want to get schema information from the string value contained in the value field. cause, JSON data fields continue to be added
for example the kafka data like this.
| key | value | ... |
|---|---|---|
| 0 | "{a:1, b:2, c:3}.." | ... |
| 1 | "{a:1, b:2, c:3, d:4}.." | ... |
i'm trying this code
source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
.select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
.select("data.*")
i got error pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
please help