2

I am on Windows 10 trying to multiple read text lines, separated by '\n' from a TCPsocket source (test purpose so far) using Spark Streaming (Spark 2.4.4). Words should be counted and current word count regularly displayed on the console. This is a standard test of Spark streaming, found in several books and web posts, but seems to fail with the socket source:

Text strings are sent from a Java program like:

serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');

On the Spark receiving side, the Scala code looks like:

val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
      .trigger(Trigger.Continuous("1 second"))
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination

So, I would like to get a once-a-second write out on the console of the current word count.

But I get an error:

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

and nothing seems to be processed by Spark from the source (due to cast exception of source input?). At least nothing is written out on the console. What can be the reason for this?

Full stack trace follows:

Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I have tried to remove coalesque(1) and replaced the Continuous trigger with a ProcessingTime trigger. This makes the error not to happen, but the console printout becomes:


Batch: 0

+-----+-----+ |value|count| +-----+-----+ +-----+-----+

That is, no output, even though many words indeed are injected into the socket. Also, this output is shown only ondce, and much later than after 1 second.

5
  • Something wrong with this line I think val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1) could you put the full stack trace please ? Commented Nov 8, 2019 at 15:34
  • Full stack trace added, and the effect of some code changes I did (coalesque(1) removed). Commented Nov 8, 2019 at 17:10
  • Any luck solving this? Commented Nov 13, 2020 at 15:36
  • I've answered a similar question concerning the same error from a MQTT stream: stackoverflow.com/a/64851108/6807769. Hope it can help Commented Nov 16, 2020 at 0:13
  • Thanx, I asked this question 1 year ago, and I am not using Spark right know, but I will write down your hint that works for MQTT (I might use MQTT next year as well, above socket solution was just for testing)! Commented Nov 18, 2020 at 8:42

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.