5

I am trying to create a dataframe in kafka-spark stream , i have successfuly mapped values to case class, but whenever i call toDF method it gives me error. **

value toDF is not a member of Array[WeatherEvent] [error] possible cause: maybe a semicolon is missing before `value toDF'? [error]
}).toDF("longitude", "latitude", "country", "sunrise", "sunset", "temperature", "temperatureMin", "temperatureMax", [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 2 s, completed Sep 27, 2017 11:49:23 AM

Here is my code

 val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String ](Array("test"), kafkaParams))
 //  val json = parse(inputStream)


  val processedStream = inputStream
  .flatMap(record => record.value.split(" ").map(payload => {
        //val ts = Timestamp.valueOf(payload(3))
        WeatherEvent(payload(0).toDouble, payload(1).toDouble, payload(2).toString , payload(3).toInt,
                    payload(4).toInt, payload(5).toDouble, payload(6).toDouble, payload(7).toDouble, 
                    payload(8).toDouble, payload(9).toInt, payload(10).toInt, payload(11).toInt, 
                    payload(12).toDouble, payload(13).toDouble)
      }).toDF("longitude", "latitude", "country", "sunrise", "sunset", "temperature", "temperatureMin", "temperatureMax", 
              "pressure", "humidity", "cloudiness", "id", "wind_speed", "wind_deg")
 )

Thanks **

5
  • I guess WeatherEvent is a case class. if thats correct then you don't need to give header names in toDF, Just doing .toDF is enough. Commented Sep 27, 2017 at 7:05
  • @RameshMaharjan thanks, but its still same. value toDF is not a member of Array[WeatherEvent] [error] possible cause: maybe a semicolon is missing before `value toDF'? [error] }).toDF() [error] ^ [error] one error found Commented Sep 27, 2017 at 7:17
  • 1
    @matesio can you try importing this import ssc.implicits._ Commented Sep 27, 2017 at 7:25
  • yes exactly. I agree with Akash that you need implicits._ to be imported to apply .toDF Commented Sep 27, 2017 at 7:27
  • yeah i tried ** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ ** Commented Sep 27, 2017 at 7:29

1 Answer 1

2

toDF() is an implicit method defined in sqlContext. toDF() is used to convert an RDD to Dataframe. Here you are getting a stream from Kafka I mean Dstreams. To convert the same to DF, you need to process each RDD in the Dstreams either using transform API or foreachRDD API. Below I am using foreachRDD transformation to convert RDD to Dataframe

val data=KafkaUtils.createStream(ssc, zkQuorum, "GroupName", topics).map(x=>x._2)
val lines12=data.foreachRDD(x=>{
  val df=x.flatMap(x => x.split(",")).map(x=>(x(0),x(1))).toDF()
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.