1

I have below data in Elasticseach (local single node server)

seach command curl -XPOST 'localhost:9200/sparkdemo/_search?pretty' -d '{ "query": { "match_all": {} } }'

OUTPUT:

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_H0lYe0cQl--Bin",
      "_score" : 1.0,
      "_source" : {
        "date" : "9/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 86,
        "avg" : 86,
        "stage" : "S1"
      }
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_KklYe0cQl--Bir",
      "_score" : 1.0,
      "_source" : {
        "date" : "13/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 86,
        "avg" : 87,
        "stage" : "S1"
      }
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY-TolYe0cQl--Bii",
      "_score" : 1.0,
      "_source" : {
        "date" : "4/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 82,
        "avg" : 82,
        "stage" : "S0"
      }
    }, 
.......
... Few more records
..........
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_KklYe0cQl--Biq",
      "_score" : 1.0,
      "_source" : {
        "date" : "12/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 91,
        "avg" : 89,
        "stage" : "S1"
      }
    } ]
  }
}

I am trying to fetch all data in Spark program (Local Standalone program running from eclipse).

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.elasticsearch.spark._
import scala.collection.mutable.Map;

object Test1 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HRInfo");
    val sc = new SparkContext(conf);

    val esRdd = sc.esRDD("sparkdemo/hrinfo", "?q=*");

    val searchResultRDD = esRdd.map(t => {
      println("id:" + t._1 + ", map:" + t._2);
      t._2;
    });

    val infoRDD = searchResultRDD.collect().foreach(map => {
      var stage = map.get("stage");
      var pid = map.get("pid");
      var date = map.get("date");
      var propName = map.get("propName");
      var propValue = map.get("propValue");
      var avg = map.get("avg");
      var variation = map.get("var");

      println("Info(" + stage + "," + pid + "," + date + "," + propName + "," + propValue + "," + avg + "," + variation + ")");

    });

  }
}

But program not fetching all fileds of records stored in ElasticSearch.

Program OUTPUT:

id:AVNAY_H0lYe0cQl--Bin, map:Map(date -> 9/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bir, map:Map(date -> 13/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bii, map:Map(date -> 4/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_H0lYe0cQl--Bio, map:Map(date -> 10/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bip, map:Map(date -> 11/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bij, map:Map(date -> 5/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bil, map:Map(date -> 7/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bim, map:Map(date -> 8/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bik, map:Map(date -> 6/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Biq, map:Map(date -> 12/Mar/2016, pid -> 1, propName -> HEARTRATE)
Info(None,Some(1),Some(9/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(13/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(4/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(10/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(11/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(5/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(7/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(8/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(6/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(12/Mar/2016),Some(HEARTRATE),None,None,None)

Program fetching all records but in each record not fetching other fields (i.e. stage, propValue, avg and variabtion) why ? Thanks in adavance.

2 Answers 2

1

This is happening because of the "var": null values in your documents. The "var": null in each document and all of the following values do not make it into the map in Scala.

You can show this by replacing one of the "var": null values with a non-null value (e.g. "var": "test"). You will then get all the values returned correctly as you are expecting. Alternatively you can place a null value at the start of a document. e.g.

curl -X POST 'http://localhost:9200/sparkdemo/hrinfo/5' -d '{"test":null,"date": "9/Mar/2016","pid": "1","propName": "HEARTRATE","propValue": 86,"avg": 86,"stage": "S1"}'

and the map will be empty for that document:

id:5, map:Map()
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks @Steve Willcock, non-null value for "var" solved my problem. But one confusion is that, for proper working I should not put null value in ElasticSeach document, is it right?
Hi Mahendra, great I'm glad that's working for you. ElasticSearch certainly supports nulls although there are some caveats - see here for details: elastic.co/guide/en/elasticsearch/guide/current/…. Querying the data via curl shows that the nulls are returned in the json document as you would expect, so the problem seems to be happening in the deserialisation of the json document in the elastic search spark library - I'm not sure if that is the intended behaviour - it does seem a little odd.
0

Try this :

import org.elasticsearch.spark.sql._

val sql = new SQLContext(sc)
val index1 = sql.esDF("index/type")
println(index1.schema.treeString)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.