2

I have a spark dataframe which I would like to write to Kafka. I have tried below snippet,

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
for row in df.rdd.collect():
    producer.send('topic',str(row.asDict()))
    producer.flush()

This works but problem with this snippet is this is not Scalable as every time collect runs, data will be aggregated on driver node and can slow down all operations.

As foreach operation on dataframe can run in parallel on worker nodes. I tried below approach.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
def custom_fun(row):
    producer.send('topic',str(row.asDict()))
    producer.flush()

df.foreach(custom_fun)

This doesn't and gives pickling error. PicklingError: Cannot pickle objects of type <type 'itertools.count'> Not able to understand the reason behind this error. Can anyone help me understand this error or provide any other parallel solution?

4
  • What is the Spark version and Python version? Do you get the same error when you run this code with clean session? Commented Jan 16, 2018 at 13:36
  • Hi, spark version is 2.1 and python is 2.7. Not sure what do you mean by clean session but I get same error everytime I launch job on yan using spark-submit. Commented Jan 16, 2018 at 13:42
  • I mean that error looks unrelated to Kafka writes Commented Jan 16, 2018 at 13:50
  • @NachiketKate : you were able to find the answer? I am facing the same issue. Not able to write confluent kafka topic. Commented May 3, 2021 at 8:38

1 Answer 1

3

The error you get looks unrelated to Kafka writes. Looks like somewhere else in your code you use itertools.count (AFAIK it is not used in Spark's source at all, it is of course possible that it comes with KafkaProducer) which is for some reason serialized with cloudpickle module. Changing Kafka writing code might have no impact at all. If KafkaProducer is the source of the error, you should be able to resolve this with forachPartition:

from kafka import KafkaProducer


def send_to_kafka(rows):
    producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
    for row in rows:
        producer.send('topic',str(row.asDict()))  
        producer.flush()

df.foreachPartition(send_to_kafka)

That being said:

or provide any other parallel solution?

I would recommend using Kafka source. Include Kafka SQL package, for example:

spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

And:

from pyspark.sql.functions import to_json, col, struct

(df 
    .select(to_json(struct([col(c).alias(c) for c in df.columns])))
    .write
    .format("kafka") 
    .option("kafka.bootstrap.servers", botstrap_servers) 
    .option("topic", topic)
    .save())
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for answer. I'll try this and will let you know.
With dataframe.write() I get nosuchmethoderror. Looks like version mismatch with spark, kafka, spark-sql-kafka.
spark-sql-kafka component has to match Spark and Scala version
how about sending just one column of dataframe to kafka instead of entire record?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.