0

I'm trying to create a dataframe new_df and load the DataFrame to Kafka using pyspark. However, I'm getting few exception. Couldn't figure what exactly is the issue. Any help would be appreciated.

>>> dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
>>> df = spark.createDataFrame(dict)

>>> import time
>>> import datetime
>>> from pyspark.streaming.kafka import KafkaUtils
>>> timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
>>> type(timestamp)
<class 'str'>

>>> from pyspark.sql.functions import lit,unix_timestamp
>>> timestamp
'2017-08-02 16:16:14'
>>> new_df = df.withColumn('time',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
>>> new_df.show(truncate = False)
+---+-----+---------------------+
|age|name |time                 |
+---+-----+---------------------+
|1  |Alice|2017-08-02 16:16:14.0|
|2  |Again|2017-08-02 16:16:14.0|
+---+-----+---------------------+

Now I'm trying to wrtie the dataframe to a Kafka topic

def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc:9092") \
                .option("topic", "test_topic") \
                .save()



writeToKafka(new_df)

Exceptions(picked from error):

org.apache.spark.SparkException: Job aborted due to stage failure: 
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

Full Error:

Py4JJavaError: An error occurred while calling o1811.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 76.0 failed 1 times, most recent failure: Lost task 8.0 in stage 76.0 (TID 110, localhost, executor driver): org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270) at org.apache.spark.sql.kafka010.CachedKafkaProducer$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(CachedKafkaProducer.scala:67) at org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:46) at org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:43) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.kafka010.CachedKafkaProducer$.getOrCreate(CachedKafkaProducer.scala:80) at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:44) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:407) ... 31 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978) at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:254) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270) at org.apache.spark.sql.kafka010.CachedKafkaProducer$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(CachedKafkaProducer.scala:67) at org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:46) at org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:43) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.kafka010.CachedKafkaProducer$.getOrCreate(CachedKafkaProducer.scala:80) at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:44) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:407) ... 31 more

Note:

I have 3 kafka brokers, 3 kafka zookeepers which are hosted on Kubernetes Cluster.

2
  • 1
    did you take a look to this answer : stackoverflow.com/questions/47969955/… Commented Mar 24, 2021 at 22:07
  • 1
    No resolvable bootstrap urls given in bootstrap.servers... We cannot fix your networking issues, but have you tried any other Kafka client using the same address? Commented Mar 25, 2021 at 3:02

1 Answer 1

0

New code:

def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc.test_namespace:9092") \
                .option("topic", "test_topic") \
                .save()

The kafka brokers are on another namespace in kubernetes cluster. And, my jupyter notebook was on another namespace.

Once I tried with "kafka_service.namespace:portno" for the kafka.bootstrap.servers (i.e.,kafka-svc.test_namespace:9092 ), it worked

kafka-svc - is the kafka service name.
test_namespace - is the name of the namespace where kafka brokers are hosted
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.