I have a pyspark streaming code that reads from socket, and writes to Kafka topic.
When I am writing to console its able to print it out, but when i am writing to Kafka topic, its giving me org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.
The log is:
2022-12-13 09:18:22,631 - my_module_lib_io.streaming.kafka.spark.producer - INFO - Open Socket on localhost 9999 by using command on another shell: nc -l localhost 9999
22/12/13 09:18:22 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
22/12/13 09:18:25 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
[Stage 1:> (0 + 3) / 3]
22/12/13 09:19:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, server.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.
I am running this Apache Kafka (kafka_2.11-2.0.0) as localhost:9092, with no SSL enabled. server.properties looks:
broker.id=1
group.initial.rebalance.delay.ms=0
listeners=PLAINTEXT://:9092
log.dirs=/home/aiman/kafka_local/kafka_2.11-2.0.0/kafka-logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
max.poll.interval.ms=5
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
session.timeout.ms=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=1
transaction.state.log.replication.factor=1
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
The topic (RANDOM_STRING) is existing:
./kafka-topics.sh --zookeeper localhost:2181 --list
TEST_TOPIC
RANDOM_STRING
__consumer_offsets
My Code is:
kafka_config = {
"kafka.bootstrap.servers": "localhost:9092",
"checkpointLocation": "/user/aiman/checkpoint/kafka_local/random_string",
"topic": "RANDOM_STRING"
}
data = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# This Print the dataframe in console:
# data.writeStream \
# .format("console") \
# .outputMode("append") \
# .option("truncate", False) \
# .start() \
# .awaitTermination()
data.writeStream \
.format("kafka") \
.outputMode("append") \
.options(**kafka_config) \
.start() \
.awaitTermination()
I am using the following spark-sql connector: spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar
Spark version 2.4.0-cdh6.3.4.
And Apache Kafka version is kafka_2.11-2.0.0.
UPDATE:
changed the apache kafka version to kafka_2.11-0.10.0.0.
Is it any configuration I am missing ?