0

I am now trying to put SparkStreaming and Kafka work together on Ubantu. But here comes the question.

I can make sure Kafka's working properly.

On the first terminal:

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties

On the second terminal:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest

then,I create some data:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
hello hadoop
hello spark

On the third terminal:

cd /usr/local/spark/mycode/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py localhost:2181 wordsendertest

Code of kafkaWordCount.py:

from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys

if __name__ == "__main__":
   if len(sys.argv) != 3:
      print("usage:KafkaWordCount.py<zk><topic>",file=sys.stderr)
      exit(-1)
   sc = SparkContext(appName="PythonStreamingKafkaWordCount")
   ssc = StreamingContext(sc,1)
   zkQuorum,topic = sys.argv[1:]
   kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})
   lines = kvs.map(lambda x:x[1])
   counts = lines.flatMap(lambda x:x.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
   counts.pprint
   ssc.start()
   ssc.awaitTermination()

My error:

Traceback (most recent call last):
  File "/usr/local/spark/mycode/kafka/./KafkaWordCount.py", line 20, in <module>
    ssc.start()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 196, in start
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

Please help me! Thank you!

1 Answer 1

1

You forgot to add () in counts.pprint function.

Change counts.pprint to counts.pprint(), It will work.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.