3

I have started learning spark-streaming from Spark engine and very new to data analytics and spark. I just want to create a small IOT application in which I want to forecast future data.

I have Tiva hardware which sends realtime sensor JSON data as follows,

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]

In this t is unix time stamp for which data is posted. sensors is array of sensors with each sensor('s') data as 'd'.

What I want to do is, consume this data and create object which spark-streaming and then pass all data though spark's Mlib (machine learning) or equivalent library to forecast future data.

I want a general idea whether this will be possible with all technology choices

  1. I have decided to use?
  2. How can I consume the nested JSON? I tried using SQLContext but got no success.
  3. General guidelines to achieve what I am trying to do here.

Here is code which I am using to consume messages from KAFKA.

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    // TODO: processing pipeline
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
    Set<String> topics = Collections.singleton("RAH");


    JavaPairInputDStream<String, String> directKafkaStream = 
            KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
                    StringDecoder.class, kafkaParams, topics);


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            System.out.println(message._2());
            return message._2();
        };
    });


    System.out.println(" json is  0------ 0"+ json);



    json.foreachRDD(rdd -> {
        rdd.foreach(
                record -> System.out.println(record));
    });

    ssc.start();
    ssc.awaitTermination(); 

PS: I want to do this in Java as to maintain linearity and good performance.

3
  • Can you post the code what you tried so far? its is possible using Spark SQL and Streaming. Commented Nov 4, 2016 at 10:00
  • Posted code in question. Commented Nov 4, 2016 at 10:43
  • when you try sqlContext to read the json string, what's the problem you faced? is that Task not serializable issue? Commented Nov 4, 2016 at 10:59

2 Answers 2

5

Since you are using SPark 2.0, from SparkSession, you can read the JSON

json.foreachRDD( rdd -> {

      DataFrame df= spark.read.json(rdd)
      //process json with this DF.
}

OR you can convert the rdd to RDD of Row, then you can use createDataFrame method.

json.foreachRDD( rdd -> {

          DataFrame df= spark.createDataFrame(rdd);
          //process json with this DF.
    }

Nested JSON processing is possible from DF, you can follow this article.

Also, once you convert your json to DF, you can use it in any spark modules ( like spark sql, ML)

Sign up to request clarification or add additional context in comments.

4 Comments

In my case SQLContext constructor I tried using was deprecated. And I am not getting how to get 'sc'(SparkContext) using 'JavaSparkContext'
@RahulBorkar: You can pass JavaSparkContext to SQLContext(javasparkContext)
Its deprecated in Spark 2.11. Also, when trying your code I am getting "The method transform(Function<JavaRDD<String>,JavaRDD<Object>>) is ambiguous for the type JavaDStream<String>"
There is no 'DataFrame' class I am able to find. Do I need to add any other library? Also, how can I get the same SparkSession?
2

Answer to your questions:

1) Whether this will be possible with all technology choices I have decided to use?

`Ans: Yes it can be done and quiet a normal use-case for spark.`

2) How can I consume the nested JSON? I tried using SQLContext but got no success.

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.`

3) General guidelines to achieve what I am trying to do here.

Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.

If you want to use other machine learning algorithms or third party library, perhaps you should consider the model creation as an batch job emmiting out the model at the end. The streaming job should load the model and get stream of data and predict only.

1 Comment

Can you guide me to proper documentation for use case like this? It will be very helpful

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.