I have started learning spark-streaming from Spark engine and very new to data analytics and spark. I just want to create a small IOT application in which I want to forecast future data.
I have Tiva hardware which sends realtime sensor JSON data as follows,
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
In this t is unix time stamp for which data is posted. sensors is array of sensors with each sensor('s') data as 'd'.
What I want to do is, consume this data and create object which spark-streaming and then pass all data though spark's Mlib (machine learning) or equivalent library to forecast future data.
I want a general idea whether this will be possible with all technology choices
- I have decided to use?
- How can I consume the nested JSON? I tried using SQLContext but got no success.
- General guidelines to achieve what I am trying to do here.
Here is code which I am using to consume messages from KAFKA.
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
PS: I want to do this in Java as to maintain linearity and good performance.
sqlContextto read the json string, what's the problem you faced? is that Task not serializable issue?