0

I'm trying to build a kafka consumer in Scala using Intellij to read messages from a kafka topic.I have spark and kafka both on windows .

I tried this code :

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

 object connector {
    def main(args: Array[String]) {
     class Kafkaconsumer {
     val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "host1:port,host2:port2,host3:port3",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "use_a_separate_group_id_for_each_stream",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val sparkConf = new SparkConf().setMaster("yarn")
    .setAppName("kafka example")
  val streamingContext = new StreamingContext(sparkConf, Seconds(10))
  val topics = Array("topicname")
  val topicsSet = topics.split(",").toSet
  val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,PreferConsistent,Subscribe[String, String](kafkaParams,topicsSet)
  )
  stream.print()
  stream.map(record => (record.key, record.value))
  streamingContext.start()
  streamingContext.awaitTermination()

}
}

But I have an error in these two lines

 val topicsSet = topics.split(",").toSet
 streamingContext,PreferConsistent,Subscribe[String, String](kafkaParams,topicsSet)

split function and Subscribe function are always in red .

Any idea ?

Thank you

1 Answer 1

1

This code works well for me .

 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.kafka010.KafkaUtils
 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 object Conector_kafka_spark extends App {
       val conf = new SparkConf()
      .setAppName("SparkStreamingKafka010Demo")
      .setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "100")
      val sc = new SparkContext(conf)
      val streamingContext = new StreamingContext(sc, Seconds(10))
      val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092",
     "key.deserializer" -> classOf[StringDeserializer],
     "value.deserializer" -> classOf[StringDeserializer],
     "group.id" -> "kafka_demo_group",
     "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
     )

    val topics = Array("message")
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
 )

}

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.