1

I am facing an issue while writing the stream on the topic from Spark.

import org.apache.spark.sql.types._

val mySchema = StructType(Array(
  StructField("ID", IntegerType),
  StructField("ACCOUNT_NUMBER", StringType)
))

val streamingDataFrame = spark.readStream.schema(mySchema).option("delimiter",",")
                              .csv("file:///opt/files")


streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
                  .writeStream.format("kafka")
                  .option("topic", "testing")
                  .option("kafka.bootstrap.servers", "10.55.55.55:9092")
                  .option("checkpointLocation", "file:///opt/")
                  .start().awaitTermination()

Error:

 2018-09-12 11:09:04,344 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,358 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):                                            org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,359 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
2018-09-12 11:09:04,370 ERROR streaming.StreamExecution: Query [id = 866e4416-138a-42b6-82fd-04b6ee1aa638, runId = 4dd10740-29dd-4275-97e2-a43104d71cf5] terminated with error
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):   org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

My sbt details:

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.2.0"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"

But when I send message through server using bin/kafka-console-producer.sh and bin/kafka-console-consumer.sh I can send and receive message

1
  • Matthias J. Sax, did you solve this issue? Is it possible that you need to upgrade the kafka-client version to 1.0.0? Commented Oct 10, 2018 at 11:04

1 Answer 1

1

You need to increase the value of request.timeout.ms on the client side.

Kafka groups records into batches in order to increase throughput. When a new record is added into the batch, it must be sent within the time limit. request.timeout.ms is a configurable parameter (default value is 30sec) that controls this time limit.

When a batch is queued for longer period, the a TimeoutException is being thrown and records will be removed from the queue (and therefore messages will not be delivered).

Sign up to request clarification or add additional context in comments.

2 Comments

Need to add this parameter in client side or kafka server configuration
@aravinth on the client side.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.