2

Found several ideas for Scala but unable to implement in Java with success, hence posting as a new question.

I need to format input JSON in "value" column streaming from Kafka topic

Dataset<Row> output = df.select(functions.from_json(df.col("value"), schema));

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

Reached up to the point to see below being printed on console sink -

StreamingQuery query = output.writeStream().format("console").start();

+-------------------------+ 
|     jsontostructs(value)|
+-------------------------+
|                    []   |
+-------------------------+

Please advise how to get individual columns from this Structure.

2 Answers 2

1

So basically need to use "from_json" function in conjunction with schema.json() function to get String schema (similar to what Filip mentioned above in scala). Hope it helps someone.

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

Dataset<Row> output = df.select(from_json(df.col("value"), DataType.fromjson(schema.json())).as("data")).select("data.*");

the last select will flatten out the structure into the field defined under schema directly.

Sign up to request clarification or add additional context in comments.

Comments

0

You already have a schema defined for your JSON message...

val sparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("test")
    .getOrCreate()

val df: DataFrame = sparkSession
    .readStream
    .format("kafka")...

import org.apache.spark.sql.functions._
import sparkSession.implicits._

val ds = df.select($"value" cast "string" as "json")
        .select(from_json($"json", schema) as "data")
        .select("data.*")

Note that append output mode isn't supported when you're messing w/ streaming aggregations on streaming DF/DS without watermarking, so in case you wanna go crazy with aggregations remember to update output to something along the following lines:

val query = aggregations
           .writeStream
           .outputMode("complete")
           .format("console")
           .start()

query.awaitTermination()

2 Comments

APIs are slightly different between scala and java, posting the solution I found later below.
@AbhishekN +1 to the solution you've shared on this

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.