I'm trying to read kafka topic and display the data on console using pySpark. I've defined the from_json schema and trying to match and display it. However, the df returns nulls.
Original object in kafka topic and schema are blow.
{
"kind": "youtube#videoListResponse",
"etag": "jUow4VqgbKTDD9d1QI8TBQdM0po",
"items": [
{
"kind": "youtube#video",
"etag": "SRkYji_KdZvK3LDoACVdkHcm-Og",
"id": "E4R_WJBqaaQ",
"snippet": {
"publishedAt": "2022-03-30T13:51:05Z",
"channelId": "UCw9DyZg3_F0bIks2jrEgQAA",
"title": "Brother Job 🔥#sadiqahmed",
"description": "",
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/default.jpg",
"width": 120,
"height": 90
},
"medium": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/mqdefault.jpg",
"width": 320,
"height": 180
},
"high": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/hqdefault.jpg",
"width": 480,
"height": 360
},
"standard": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/sddefault.jpg",
"width": 640,
"height": 480
},
"maxres": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/maxresdefault.jpg",
"width": 1280,
"height": 720
}
},
"channelTitle": "Sadiq Ahmed",
"categoryId": "22",
"liveBroadcastContent": "none",
"localized": {
"title": "Brother Job 🔥#sadiqahmed",
"description": ""
}
},
"statistics": {
"viewCount": "5155911",
"likeCount": "559596",
"favoriteCount": "0",
"commentCount": "844"
}
}
],
"nextPageToken": "CAEQAA",
"pageInfo": {
"totalResults": 200,
"resultsPerPage": 1
}
}
json Schema definition
mySchema = StructType([
StructField("items",ArrayType(StructType([
StructField("id", StringType, True),
StructField("snippet", StructType([
StructField("channelId", StringType, True),
StructField("channelTitle", StringType, True),
StructField("categoryId", IntegerType, True),
StructField("publishedAt", TimestampType, True),
StructField("title", StringType, True)]),
True),
StructField("statistics", StructType([
StructField("commentCount", IntegerType, True),
StructField("favoriteCount", IntegerType, True),
StructField("likeCount", IntegerType, True),
StructField("viewCount", IntegerType, True)]),
True)]), True), True)
])
Reading Kafka Topic and transforming to from_json
df = spark.read.format("kafka")\
.option("kafka.bootstrap.servers", kafkaServer)\
.option("subscribePattern", topic_name_read)\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value").cast("string"), mySchema).alias("data"))\
.select("data.*")
The current output.
+-----+
|items|
+-----+
|null |
+-----+
What's wrong in my schema definition. Thank you in advance.