0

I use Spark 2.4.3 and want to do structured streaming with data from a Kafka source. The following code works so far:

from pyspark.sql import SparkSession
from ast import literal_eval

spark = SparkSession.builder \
    .appName("streamer") \
    .getOrCreate()

# Create DataFrame representing the stream
dsraw = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", """{"test":{"0":2707422}}""") \
  .load()

# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")

# Do query on the raw data
rawQuery = dsraw \
     .writeStream \
     .queryName("qraw") \
     .format("memory") \
     .start()
raw = spark.sql("select * from qraw")

# Do query on the converted data
dsQuery = ds \
     .writeStream \
     .queryName("qds") \
     .format("memory") \
     .start()
sdf = spark.sql("select * from qds")

# I have to access raw otherwise I get errors...
raw.select("value").show()

sdf.show()

# Make the json stuff accessable
sdf2 = sdf.rdd.map(lambda val: literal_eval(val['value']))
print(sdf2.first())

But I really wonder if the convertion in the next to last line is the most useful/fastest one. Do you have other ideas? Can I stay with (Spark) dataframes instead of the RDD?

The output of the script is

+--------------------+
|               value|
+--------------------+
|{
  "Signal": "[...|
|{
  "Signal": "[...|
+--------------------+
only showing top 20 rows

{'Signal': '[1234]', 'Value': 0.0, 'Timestamp': '2019-08-27T13:51:43.7146327Z'}

1 Answer 1

2

There are some solutions out there but only this adapted solution does work (credit goes to https://stackoverflow.com/a/51070457/3021134):

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

schema = StructType(
        [
                StructField("Signal", StringType()),
                StructField("Value", DoubleType()),
                StructField("Timestamp", StringType())
        ]
)

sdf.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))\
    .show()

with the output:

+--------+-----------+--------------------+
|  Signal|      Value|           Timestamp|
+--------+-----------+--------------------+
|[123456]|        0.0|2019-08-27T13:51:...|
|[123457]|        0.0|2019-08-27T13:51:...|
|[123458]| 318.880859|2019-08-27T13:51:...|
|[123459]|   285.5808|2019-08-27T13:51:...|
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.