0

I have a pyspark streaming code that reads from socket, and writes to Kafka topic.
When I am writing to console its able to print it out, but when i am writing to Kafka topic, its giving me org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.
The log is:

2022-12-13 09:18:22,631 - my_module_lib_io.streaming.kafka.spark.producer - INFO - Open Socket on localhost 9999 by using command on another shell: nc -l localhost 9999
22/12/13 09:18:22 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
22/12/13 09:18:25 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
[Stage 1:>                                                          (0 + 3) / 3]
22/12/13 09:19:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, server.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.

I am running this Apache Kafka (kafka_2.11-2.0.0) as localhost:9092, with no SSL enabled. server.properties looks:

broker.id=1
group.initial.rebalance.delay.ms=0
listeners=PLAINTEXT://:9092
log.dirs=/home/aiman/kafka_local/kafka_2.11-2.0.0/kafka-logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
max.poll.interval.ms=5
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
session.timeout.ms=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=1
transaction.state.log.replication.factor=1
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

The topic (RANDOM_STRING) is existing:

./kafka-topics.sh --zookeeper localhost:2181 --list
TEST_TOPIC
RANDOM_STRING
__consumer_offsets

My Code is:

kafka_config = {
    "kafka.bootstrap.servers": "localhost:9092",
    "checkpointLocation": "/user/aiman/checkpoint/kafka_local/random_string",
    "topic": "RANDOM_STRING"
}
data = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# This Print the dataframe in console:
# data.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .option("truncate", False) \
#     .start() \
#     .awaitTermination()
data.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .options(**kafka_config) \
    .start() \
    .awaitTermination()

I am using the following spark-sql connector: spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar
Spark version 2.4.0-cdh6.3.4.
And Apache Kafka version is kafka_2.11-2.0.0.

UPDATE:
changed the apache kafka version to kafka_2.11-0.10.0.0.

Is it any configuration I am missing ?

2
  • 1
    Could you produce to this topic with cli? Commented Dec 13, 2022 at 15:33
  • 1
    yes i did using the utility shell. it writes to the topic. but from spark-streaming facing issue. Commented Dec 13, 2022 at 15:37

1 Answer 1

1

Cannot reproduce. Is RANDOM really random? If so, I suspect that is the problem. Otherwise, Spark is distributed, but you're referring to localhost for Kafka, so do you have more than one Kafka cluster, one of which doesn't have the topic you're using?

Using latest Spark (but that probably isn't your issue, although, I'd advise not using a CDH specific version), and Kafka 3.3 (again, version shouldn't really matter)

Create and check topics

$ kafka-topics --list --bootstrap-server localhost:9092 | grep RANDOM_STRING
RANDOM_STRING

Start a server

$ nc -lk 9999

Run the code, using a package for my Spark version. Note: master is local, rather than yarn in CDH

spark-submit --master=local --packages 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.1' example.py

(type something in the nc teriminal)

Then consume it.

kcat -C -b localhost:9092 -t 'RANDOM_STRING' -o earliest
hello world
% Reached end of topic RANDOM_STRING [2] at offset 1

Note: Python has native libraries for TCP socket servers and Kafka producers, so if your code is just forwarding requests from one to the other, Spark is unnecessary.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks @OneCricketeer the reply really gave me an idea when you said --master=local. Since in my case Spark is running on cluster, and Kafka on local, the executors are not getting hold of localhost. So I configured my kafka and renamed all the "localhost" to my hostname. Now i am able to connect.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.