1

I have data in Elasticsearch that I want to use with Spark. The problem is that my Elasticsearch document contains array type.

Here is a sample of my Elasticsearch data :

{  
   "took":4,
   "timed_out":false,
   "_shards":{  
      "total":36,
      "successful":36,
      "skipped":0,
      "failed":0
   },
   "hits":{  
      "total":2586638,
      "max_score":1,
      "hits":[  
         {  
            "_index":"Index_Name",
            "_type":"Type_Name",
            "_id":"l-hplmIBgpUzwNjPutjY",
            "_score":1,
            "_source":{  
               "currentTime":1518339120000,
               "location":{  
                  "lat":25.13,
                  "lon":55.18
               },
               "radius":65.935,
               "myArray":[  
                  {  
                     "id":"1154",
                     "field2":8,
                     "field3":16.39,
                     "myInnerArray":[  
                        [  
                           55.18,
                           25.13
                        ],
                        [  
                           55.18,
                           25.13
                        ],
                        ...
                     ]
                  }
               ],
               "field4":0.512,
               "field5":123.47,
               "time":"2018-02-11T08:52:00+0000"
            }
         },
         {  
            "_index":"Index_Name",
            "_type":"Type_Name",
            "_id":"4OhplmIBgpUzwNjPutjY",
            "_score":1,
            "_source":{  
               "currentTime":1518491400000,
               "location":{  
                  "lat":25.16,
                  "lon":55.22
               },
               "radius":6.02,
               "myArray":[  
                  {  
                     "id":"1158",
                     "field2":14,
                     "field3":32.455,
                     "myInnerArray":[  
                        [  
                           55.227,
                           25.169
                        ],
                        [  
                           55.2277,
                           25.169
                        ],
                       ...
                     ]
                  }
               ],
               "field4":0.5686,
               "field5":11.681,
               "time":"2018-02-13T03:10:00+0000"
            }
         },
         ...
      ]
   }
}

I managed to query Elasticsearch with the following code :

val df= spark.read.format("org.elasticsearch.spark.sql")
             // Some options
             .option("es.read.field.exclude","myArray")
             .option("es.query", DSL_QUERY)
             .load("Index_Name/Type_Name")

That return me a Dataframe with all my datas except my array. I now want to get a Dataframe with all my datas including the array. I tried this :

val df= spark.read.format("org.elasticsearch.spark.sql")
        // Some options
        .option("es.read.field.as.array.include","myArray")
        .option("es.query", DSL_QUERY)
        .load("Index_Name/Type_Name")

But I get the following error :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 1389, 10.139.64.5, executor 0): java.lang.ClassCastException: scala.collection.convert.Wrappers$JListWrapper cannot be cast to java.lang.Float
    at scala.runtime.BoxesRunTime.unboxToFloat(BoxesRunTime.java:109)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getFloat(rows.scala:43)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getFloat(rows.scala:194)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:423)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:126)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:125)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:349)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

What am I missing ?

EDIT :

The problem seem to come from the nested array. If I add the option

.option("es.read.field.as.array.include","myArray")

The field myArray is recognized as an array, but not 'myInnerArray' So I added

.option("es.read.field.as.array.include","myArray.myInnerArray")

This time, 'myInnerArray' is recognized as an array, but not 'myArray'.

2
  • You are adding both options right? Commented Apr 17, 2018 at 12:18
  • Yes of course, but when I add the second one, it 'undo' the first one Commented Apr 17, 2018 at 13:56

1 Answer 1

2

It seems that the second option overwrite the first option because you make them into two separate lines.

Try to combine them into one line like below,

.option("es.read.field.as.array.include","myArray,myArray.myInnerArray")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.