I have a spark dataframe which I would like to write to Kafka. I have tried below snippet,
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
for row in df.rdd.collect():
producer.send('topic',str(row.asDict()))
producer.flush()
This works but problem with this snippet is this is not Scalable as every time collect runs, data will be aggregated on driver node and can slow down all operations.
As foreach operation on dataframe can run in parallel on worker nodes. I tried below approach.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = util.get_broker_metadata())
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ('k', 'v'))
def custom_fun(row):
producer.send('topic',str(row.asDict()))
producer.flush()
df.foreach(custom_fun)
This doesn't and gives pickling error. PicklingError: Cannot pickle objects of type <type 'itertools.count'> Not able to understand the reason behind this error. Can anyone help me understand this error or provide any other parallel solution?