0

I'm a spark newbie

I'm trying to read kafka topic using Spark Stream.

The 'value' field of the data streamed from Kafka is a json string. and I want to convert this 'value' field to a dataframe and change it to a parquet file.

I want to get schema information from the string value contained in the value field. cause, JSON data fields continue to be added

for example the kafka data like this.

key value ...
0 "{a:1, b:2, c:3}.." ...
1 "{a:1, b:2, c:3, d:4}.." ...

i'm trying this code

   source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
         .select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
         .select("data.*")

i got error pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

please help

1 Answer 1

0

Option 1: Hard code the schema and use it in F.from_json().

my_schema = T.StructType([
    T.StructField('a', T.IntegerType()),
    T.StructField('b', T.IntegerType()),
    T.StructField('c', T.IntegerType()),
    T.StructField('d', T.IntegerType()),
])
value = F.col('value').cast(T.StringType())
data = F.from_json(value, my_schema).alias('data')
source_df = streaming_data.select(data).select('data.*')

Option 2: If you want to dynamically infer the schema, you can use foreachbatch. But note that this is risky and breaking schema changes will fail the streaming query. Also it's not guaranteed that the schema will be inferred correctly.

def parse_and_process(df: DataFrame, epoch_id: int) -> None:
    # cache the current micro batch, it will be scanned more than once
    df.persist()

    # infer the schema of the current batch
    spark = SparkSession.getActiveSession()

    value = F.col('value').cast(T.StringType())
    inferred_df = spark.read.json(
        df.select(value).rdd.map(lambda x: x[0]),
        dropFieldIfAllNull=True
    )
    inferred_schema = inferred_df.schema

    # parse the json with the schema
    res_df = df.withColumn('data', F.from_json(value, inferred_schema))

    # process the DataFramee, it's not a streaming DataFrame anymore.
    res_df.write....

    df.unpersist()


streaming_data.writeStream.foreachBatch(parse_and_process).start()
Sign up to request clarification or add additional context in comments.

1 Comment

it was great help! THK!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.