I use Spark 2.4.3 and want to do structured streaming with data from a Kafka source. The following code works so far:
from pyspark.sql import SparkSession
from ast import literal_eval
spark = SparkSession.builder \
.appName("streamer") \
.getOrCreate()
# Create DataFrame representing the stream
dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", """{"test":{"0":2707422}}""") \
.load()
# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")
# Do query on the raw data
rawQuery = dsraw \
.writeStream \
.queryName("qraw") \
.format("memory") \
.start()
raw = spark.sql("select * from qraw")
# Do query on the converted data
dsQuery = ds \
.writeStream \
.queryName("qds") \
.format("memory") \
.start()
sdf = spark.sql("select * from qds")
# I have to access raw otherwise I get errors...
raw.select("value").show()
sdf.show()
# Make the json stuff accessable
sdf2 = sdf.rdd.map(lambda val: literal_eval(val['value']))
print(sdf2.first())
But I really wonder if the convertion in the next to last line is the most useful/fastest one. Do you have other ideas? Can I stay with (Spark) dataframes instead of the RDD?
The output of the script is
+--------------------+
| value|
+--------------------+
|{
"Signal": "[...|
|{
"Signal": "[...|
+--------------------+
only showing top 20 rows
{'Signal': '[1234]', 'Value': 0.0, 'Timestamp': '2019-08-27T13:51:43.7146327Z'}