0

I am writing a Dataframe with 30000 entries into kafka using the below params

    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_brokers)
    .option("kafka.compression.type","lz4")
    .option("kafka.max.request.size", 1048576)
    .option("kafka.message.max.bytes", 750000)
    .option("kafka.max.request.size",750000)
    .option("kafka.max.partition.fetch.bytes",750000)
    .option("kafka.batch.size", 100)
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    .option("value.serializer", "org.apache.kafka.common.serialization.JsonSerializer")
    .option("topic", product_kafka_topic)
    .option("partition",15)

I am not getting is how Spark is partitioning the Dataframe, I get the error

org.apache.kafka.common.errors.RecordTooLargeException: The message is 10540452 bytes when serialized which is larger than 750000, which is the value of the max.request.size configuration.

How can I solve it?

3
  • 1
    You are are defining twice the same option max.request.size. Also, the error message pretty much explains what the problem here is... Commented Jan 29, 2021 at 17:30
  • its pushing 10 MB chunk of data, I have kept limit as 750 KB Commented Jan 29, 2021 at 17:37
  • Exactly. So it is denied Commented Jan 29, 2021 at 17:39

1 Answer 1

1

The message is 10540452 bytes when serialized which is larger than 750000

You have explicitly capped the message size

.option("kafka.message.max.bytes", 750000)

I am not getting is how Spark is partitioning the Dataframe

It takes your columns of key, value and optionally timestamp and partition, then packages each row into Kafka records, which are produced in request batches.

.option("kafka.max.request.size", 1048576)
.option("kafka.max.request.size", 750000)

Given that you repeated the option with a second value, that is the value that'll be used in the config

More than one message can be in a request, but this value is bytes, not number of records.

.option("kafka.batch.size", 100)

This isn't something that can be fixed in Spark alone; the broker will also deny large messages -- See solutions here How can I send large messages with Kafka (over 15MB)?


Aside: Structured Streaming does not use key.serializer or value.serializer, or partition as configuration options

Sign up to request clarification or add additional context in comments.

2 Comments

I cannot change kafka, not in my control I put .repartition(2500, col("pbpdID"),col("productName"),col("seasonName"), col("createdDate") ) it partitioned the data, but 2% of data is still problematic, guess need to filter
When you write the data to the topic, it only accepts those columns that I listed, not other names.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.