I am on Windows 10 trying to multiple read text lines, separated by '\n' from a TCPsocket source (test purpose so far) using Spark Streaming (Spark 2.4.4). Words should be counted and current word count regularly displayed on the console. This is a standard test of Spark streaming, found in several books and web posts, but seems to fail with the socket source:
Text strings are sent from a Java program like:
serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');
On the Spark receiving side, the Scala code looks like:
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.trigger(Trigger.Continuous("1 second"))
.outputMode("complete")
.format("console")
.start
.awaitTermination
So, I would like to get a once-a-second write out on the console of the current word count.
But I get an error:
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
and nothing seems to be processed by Spark from the source (due to cast exception of source input?). At least nothing is written out on the console. What can be the reason for this?
Full stack trace follows:
Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I have tried to remove coalesque(1) and replaced the Continuous trigger with a ProcessingTime trigger. This makes the error not to happen, but the console printout becomes:
Batch: 0
+-----+-----+ |value|count| +-----+-----+ +-----+-----+
That is, no output, even though many words indeed are injected into the socket. Also, this output is shown only ondce, and much later than after 1 second.