12

Elasticsaerch's documentation only covers loading a complete index to Spark.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

How can you perform a query to return data from an Elasticsearch index and load them to Spark as a DataFrame using pyspark?

3 Answers 3

7

Below is how I do it.

General environment settings and command:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

Code:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

You can also define data-frame columns. Refer Here for more info.

Hope that it helps!

Sign up to request clarification or add additional context in comments.

4 Comments

That's what I've been doing right now, I was hoping there was a way to directly fetch a filtered DataFrame
I am not sure it is possible with the latest API of the ES-Hadoop Spark connector.
Is there a way to write a dataframe to elasticsearch using this API as well?
@ElesinOlalekanFuad yes there is a way: elastic.co/guide/en/elasticsearch/hadoop/current/… Note that you have to translate from the Scala API to PySpark but it isnt' that hard.a
2

I am running my code in a EMR cluster from Amazon using pyspark. Then, the way I made it work was following these steps:

1) Put this bootstrap action in the cluster creation (to create localhost elasticsearch server):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb

2) I run these commands to populate the elasticsearch database with some data:

 curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
   "title": "The Godfather",
   "director": "Francis Ford Coppola",
   "year": 1972
  }'

You can also run other curl commands if you wish, like:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}

3) I inited pyspark using the following parameters:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar

I had downloaded the elasticsearch python client previously

4) I run the following code:

from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "movies/movie",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

Then I finally got successful result from the command.

1 Comment

2

I have faced a issue similar to this to get geo-filtered data into a PySpark DataFrame. I am using elasticsearch-spark-20_2.11-5.2.2.jar with Spark version 2.1.1 and ES version 5.2. I was able to load the data into a DataFrame by specifying my query as an option while creating the DataFrame

My geo-query

q ="""{
  "query": {
        "bool" : {
            "must" : {
                "match_all" : {}
            },
           "filter" : {
                "geo_distance" : {
                    "distance" : "100km",
                    "location" : {
                        "lat" : 35.825,
                        "lon" : -87.99
                    }
                }
            }
        }
    }
}"""

I used the following command to load data into DataFrame

spark_df = spark.read.format("es").option("es.query", q).load("index_name")

The API for this is detailed here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources

1 Comment

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.