2

I'm trying to stream the data from kafka topic using scala application.I'm able to get the data from the topic, but how to create a data frame out of it?

Here is the data(in string,string format)

{
  "action": "AppEvent",
  "tenantid": 298,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16390,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"
}

I tried few ways to do it, but it is not yielding satisfactory results.

 +--------------------+ |                  _1|
 +--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
 |{"action":"AppEve...| |{"action":"AppEve...|

Can anyone tell How to do the mapping so that each field goes in to a seperate column like a table. the data is in avro format.

here is the code which is getting the data from the topic.

val ssc = new StreamingContext(sc, Seconds(2))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####",
     "zookeeper.connect" -> "########",
     "group.id" -> "KafkaConsumer",
     "zookeeper.connection.timeout.ms" -> "1000000")
val topicMaps = Map("fishbowl" -> 1)
val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2)

please guide me how to use foreachRDD func and map() to create a proper data frame

3
  • Did you try to do a search? spark-streaming + dataframe Commented Dec 19, 2016 at 13:33
  • it is not helpful, as im new to scala.I couldnt figure it out how to convert a avro[String,String] to dataframe stackoverflow.com/questions/41237929/… Commented Dec 20, 2016 at 8:39
  • Here is my answer Thanks Maasg found the answer Commented Dec 20, 2016 at 15:52

1 Answer 1

2

To create a dataframe out of an rdd irrespective of its case class schema. Use this below logic

stream.foreachRDD(
  rdd => {
     val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
dataFrame.show()
        })

Here stream is an rdd created from kafkaUtils.createStream()

Sign up to request clarification or add additional context in comments.

2 Comments

Well done. Regarding the comment "irrespective of its format or case class schema" that's not exactly correct => This works only for JSON-formatted records.
@maasg thanks sir, edited my comment. As i worked it out with avro(Still its schema is in json)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.