2

I m running the kafka producer and consumer code for testing purpose in cdh 5.12. While I m trying to do so I m facing below error while running the consumer code.

dataSet: org.apache.spark.sql.Dataset[(String, String)] = [key: string, value: string]
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@109a5573
2018-10-25 10:08:37 ERROR MicroBatchExecution:91 - Query [id = 70bc4f7a-cc41-470d-afd0-d46e5aebf3db, runId = 4d974468-6c6b-47e5-976b-8b9aa98114e2] terminated with error
java.lang.AbstractMethodError
        at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
        at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
        at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "stream execution thread for [id = 70bc4f7a-cc41-470d-afd0-d46e5aebf3db, runId = 4d974468-6c6b-47e5-976b-8b9aa98114e2]" java.lang.AbstractMethodError
        at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
        at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
        at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 70bc4f7a-cc41-470d-afd0-d46e5aebf3db, runId = 4d974468-6c6b-47e5-976b-8b9aa98114e2] terminated with exception: null
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AbstractMethodError
  at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
  at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
  at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
  at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
  at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
  at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)

Below is the scala code I m running:

import org.apache.kafka.clients.consumer.KafkaConsumer

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}



val dataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","host:9093,host:9093,host:9093").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.kerberos.service.name", "kafka").option("kafka.ssl.truststore.location","/opt/cloudera/security/jks/truststore.jks").option("kafka.ssl.truststore.password", "password").option("subscribe", "SampleTopic").load()

// dataFrame.writeStream.format("console").option("truncate","false").start().awaitTermination()

dataFrame.printSchema()

val dataSet =dataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val query = dataSet.writeStream.outputMode("append").format("console").start()

query.awaitTermination()

Below is the command I m running to execute above code:

spark2-shell --files /tmp/jaas.conf,/path/to/.keytab  --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/tmp/jaas.conf --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0  -i /path/to/file.scala

Thanks

2
  • 1
    Why do you think this happens? Commented Oct 25, 2018 at 16:30
  • Your code looks fine, but the error is from Spark logger library within the Spark Kafka package, not Kafka itself Commented Oct 26, 2018 at 3:16

2 Answers 2

2

I got a similar issue and it turns out that the problem was an incompatibility between spark version and the versions of the used packages.

For your case - according to cloudera doc cdh 5.12 comes with Spark 1.6 which in turns requires scala 2.10, while the used package org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 is compiled with scala 2.11. You can try to use org.apache.spark:spark-streaming-kafka_2.10:1.6.1 instead.

Credits: https://community.hortonworks.com/articles/197922/spark-23-structured-streaming-integration-with-apa.html

Sign up to request clarification or add additional context in comments.

1 Comment

Same. I had somethings pinned to Spark 2.2, but I was using Spark 2.3.
0

For me this happened because i had not imported spark-streaming in my dependency pom. adding

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.12</artifactId>
   <version>{whatEverOtherDependencyAreThere}</version>
</dependency>

resolved the issue

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.