1

I'm trying to read kafka topic and display the data on console using pySpark. I've defined the from_json schema and trying to match and display it. However, the df returns nulls.

Original object in kafka topic and schema are blow.

{
  "kind": "youtube#videoListResponse",
  "etag": "jUow4VqgbKTDD9d1QI8TBQdM0po",
  "items": [
    {
      "kind": "youtube#video",
      "etag": "SRkYji_KdZvK3LDoACVdkHcm-Og",
      "id": "E4R_WJBqaaQ",
      "snippet": {
        "publishedAt": "2022-03-30T13:51:05Z",
        "channelId": "UCw9DyZg3_F0bIks2jrEgQAA",
        "title": "Brother Job 🔥#sadiqahmed",
        "description": "",
        "thumbnails": {
          "default": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/default.jpg",
            "width": 120,
            "height": 90
          },
          "medium": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/mqdefault.jpg",
            "width": 320,
            "height": 180
          },
          "high": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/hqdefault.jpg",
            "width": 480,
            "height": 360
          },
          "standard": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/sddefault.jpg",
            "width": 640,
            "height": 480
          },
          "maxres": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/maxresdefault.jpg",
            "width": 1280,
            "height": 720
          }
        },
        "channelTitle": "Sadiq Ahmed",
        "categoryId": "22",
        "liveBroadcastContent": "none",
        "localized": {
          "title": "Brother Job 🔥#sadiqahmed",
          "description": ""
        }
      },
      "statistics": {
        "viewCount": "5155911",
        "likeCount": "559596",
        "favoriteCount": "0",
        "commentCount": "844"
      }
    }
  ],
  "nextPageToken": "CAEQAA",
  "pageInfo": {
    "totalResults": 200,
    "resultsPerPage": 1
  }
}

json Schema definition

mySchema = StructType([
    StructField("items",ArrayType(StructType([
                        StructField("id", StringType, True),
                        StructField("snippet", StructType([
                                StructField("channelId", StringType, True),
                                StructField("channelTitle", StringType, True),
                                StructField("categoryId", IntegerType, True),
                                StructField("publishedAt", TimestampType, True),
                                StructField("title", StringType, True)]), 
                            True),
                        StructField("statistics", StructType([
                                StructField("commentCount", IntegerType, True),
                                StructField("favoriteCount", IntegerType, True),
                                StructField("likeCount", IntegerType, True),
                                StructField("viewCount", IntegerType, True)]),
                            True)]), True), True)                    
])

Reading Kafka Topic and transforming to from_json

df = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", kafkaServer)\
    .option("subscribePattern", topic_name_read)\
    .option("startingOffsets", "earliest")\
    .load()\
    .selectExpr("CAST(value AS STRING)")\
    .select(F.from_json(F.col("value").cast("string"), mySchema).alias("data"))\
    .select("data.*")

The current output.

+-----+
|items|
+-----+
|null |
+-----+

What's wrong in my schema definition. Thank you in advance.

1 Answer 1

1

Most commonly, it is null because the fields do not match. I believe you are required to define every single field, and so you should try adding kind, etag, and anything else you've missed.

You could also use get_json_object instead of defining a Schema if you really only need certain fields from a String.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you, I've defined the entire JSON schema, that works for me!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.