0

I make Spark Java client codes with Spark structured streaming API. These code extract CSV type string from Kafka

SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
        
Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
            .option("subscribe", "topicForMongoDB")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING)");
            
df.show();

The returned results are successful. Those codes print the CSV type string.

+--------------------+
|               value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|

Then I try to transform these strings to Spark dataframe in Spark SQL. First, below codes are the Java POJO class

public class EntityMongoDB implements Serializable {

    private Date date;
    private float value;
    private String id;
    private String title;
    private String state;
    private String frequency_short;
    private String units_short;
    private String seasonal_adjustment_short;
    
    private static StructType structType = DataTypes.createStructType(new StructField[] {
              
              DataTypes.createStructField("date", DataTypes.DateType, false),
              DataTypes.createStructField("value", DataTypes.FloatType, false),
              DataTypes.createStructField("id", DataTypes.StringType, false),
              DataTypes.createStructField("title", DataTypes.StringType, false),
              DataTypes.createStructField("state", DataTypes.StringType, false),
              DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
              DataTypes.createStructField("units_short", DataTypes.StringType, false),
              DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
    });
    
    public static StructType getStructType() {
        return structType;
    }
}

And I make codes to transform those CSV type string to dataframe

Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id", 
                "entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short", 
                "entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();

dfs.show();
dfs.printSchema();

The printed schema is correct.

 |-- date: date (nullable = true)
 |-- value: float (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- state: string (nullable = true)
 |-- frequency_short: string (nullable = true)
 |-- units_short: string (nullable = true)
 |-- seasonal_adjustment_short: string (nullable = true)

But the generated columns are full with null values:

+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value|  id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|

I think the schema of dataframe is generated correctly but extracting data parts have some problems.

1
  • 1
    use from_csv, not from_json Commented Feb 6, 2021 at 8:33

1 Answer 1

2

The strings you have in the value column aren't valid JSON, so from_json won't work here.

For Spark 3+, you can use from_csv as pointed out in the comments by @mck :

Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.*").toDF(); 

For Spark versions prior to 3, you can split the values by comma then transfrom the resulting array into multiple columns:

Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
        .select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
        .toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short"); 

Also, it seems you have column names in the values, you can filter out that line.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.