1

We are developing an IOT application

We get following data stream from each of the device we want to run analysis for,

[{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]

At primary level I am able to get schema using spark java code as follows,

    root
     |-- sensors: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- d: string (nullable = true)
     |    |    |-- s: string (nullable = true)
     |-- t: long (nullable = true)

Code I have written is,

    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            return message._2();
        };
    });

    SQLContext sqlContext = spark.sqlContext();
    json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> jsonRecord) throws Exception {

            Dataset<Row> row = sqlContext.read().json(jsonRecord).toDF();
            row.createOrReplaceTempView("MyTable");
            row.printSchema();
            row.show();

            Dataset<Row> sensors = row.select("sensors");
            sensors.createOrReplaceTempView("sensors");
            sensors.printSchema();
            sensors.show();

        }
    });

This gives me and error as "org.apache.spark.sql.AnalysisException: cannot resolve 'sensors' given input columns: [];"

I am beginner with spark and analytics and not able to find any good example in java for parsing nested json.

What I am trying to achieve is and might need suggestions on from experts here is,

I am going to extract each sensor value and then going to run Regression analysis using sparkML library of spark. This will help me to find out what trend is occuring in each sensor stream as well as I want to detect failueres using that data.

I am not sure which should be the best way to do this and any guidance, links and info would be really helpful.

0

1 Answer 1

5

Here is how your json.foreachRDD should look like.

json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            if(!rdd.isEmpty()){
                Dataset<Row> data = spark.read().json(rdd).select("sensors");
                data.printSchema();
                data.show(false);
                //DF in table
                Dataset<Row> df = data.select( org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors"))).toDF("sensors").select("sensors.s","sensors.d");
                df.show(false);
            }
        }
    });

For regression analysis sample, you can refer JavaRandomForestRegressorExample.java at https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java

For real time data analysis using spark machine learning and spark streaming, you can refer below articles.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot for your help and pointing me to right direction. Now using this I am able to extract single value. As you know there are multiple sensors and then there are multiple streams. So there is going to be thousands of sensor data streams coming in. Could you please recommend a way to train different models for each of sensor and then keep on running predictions....
I have edited my answer above and added some links for real time analysis using Spark ML and Streaming.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.