0

I am trying to read json data from kafka and process it in Scala.I am new to flink and kafka streaming so please try to answer by giving the solution code.I want to be able to convert it to Map containing all the key,value pairs.

map1.get("FC196") should give me Dormant where map1 is the map containing the key value pairs

The challenge I'm facing is converting the DataStream[ObjectNode] which is the st variable in the code to a map of key value pairs. I am using JSonDeserializerSchema.If I use Simple String Schema i get DataStream[String]. I am open to alternative suggestions.

Input Format from kafka :

{"FC196":"Dormant","FC174":"A262210940","FC195":"","FC176":"40","FC198":"BANKING","FC175":"AHMED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053575","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"afs","FC188":"BR08","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}

Code :

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema



object WordCount {
  def main(args: Array[String]) {

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.*:9092")
    properties.setProperty("zookeeper.connect", "***.**.*.*:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "latest")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val st = env
      .addSource(new FlinkKafkaConsumer09("new", new JSONDeserializationSchema() , properties))

    st.print()

      env.execute()
  }
}

My code after the changes :

import java.util.Properties

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try



object WordCount{
  def main(args: Array[String]) {

    case class CC(key:String)

    implicit val formats = org.json4s.DefaultFormats
    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

   val st = env
       .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
       .flatMap(raw => JsonMethods.parse(raw).toOption)
       .map(_.extract[CC])

    st.print()

      env.execute()
  }
}

And for some reason I cannot put a Try in the flatmap as u described

error:

INFO [main] (TypeExtractor.java:1804) - No fields detected for class org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
    at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:36)
    at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

Process finished with exit code 1

1 Answer 1

1

There are two tasks that need to be dealt with here:

  1. Parsing the raw json payload to some form of AST
  2. Converting the AST into a format you can use.

If you you use the SimpleStringSchema, you can choose a nice Json parser and unmarshall the json payload in a simple flatMap operator.

Some dependencies for your build.sbt

"org.json4s" %% "json4s-core" % "3.5.1",
"org.json4s" %% "json4s-native" % "3.5.1"

There are dozen Json libraries in Scala to choose from, a nice overview can be found here https://manuel.bernhardt.io/2015/11/06/a-quick-tour-of-json-libraries-in-scala/

Then some parsing:

scala> import org.json4s.native.JsonMethods._
import org.json4s.native.JsonMethods._

scala> val raw = """{"key":"value"}"""
raw: String = {"key":"value"}

scala> parse(raw)
res0: org.json4s.JValue = JObject(List((key,JString(value))))

At this stage an AST is available. This can be converted to a Map as follows:

scala> res0.values
res1: res0.Values = Map(key -> value)

Keep in mind that Json4s does not perform exception handling, thus this can throw an exception (Something you should avoid when you fetch data from Kafka, it will kill your job eventually).

In flink, this would look like this:

env
  .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
  .flatMap(raw => Try(JsonMethods.parse(raw).toOption) // this will discard failed instances, you should handle better, ie log
  .map(_.values)

However, I would recommend representing your data as a case class. This would need another step to map the AST to a case class.

In the example above.

scala> implicit val formats = org.json4s.DefaultFormats
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@341621da

scala> case class CC(key: String)
defined class CC

scala> parse(raw).extract[CC]
res20: CC = CC(value)

Or in flink:

env
  .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema(), properties))
  .flatMap(raw => Try(JsonMethods.parse(raw).toOption)
  .map(_.extract[CC])

Update:

Just move the implicit formats outside of the main method:

Object WordCount {
    implicit val formats = org.json4s.DefaultFormats
    def main(args: Array[String]) = {...}
}
Sign up to request clarification or add additional context in comments.

2 Comments

Im geting a Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable. I have added my updated code in the question at the end.
You will get that a lot in Flink (also in spark and in anything that serializes the code before pushing it to remote instances for execution). You just have to move the formats outside of main: object WordCount { implicit val formats = org.json4s.DefaultFormats def main() {...}}

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.