6

I can't figure out how to write a dataframe to elasticsearch using python from spark. I followed the steps from here.

Here is my code:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)

Above code gives

Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

I also started the script from: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py to ensure that elasticsearch-hadoop is loaded

2
  • which version of elasticseach are you using ? Commented Sep 20, 2016 at 9:39
  • @eliasah2.4.0, tried also using elasticsearch-hadoop-5.0.0-alpha5.jar for the 2.x versions of es Commented Sep 20, 2016 at 13:19

3 Answers 3

4

For starters saveAsNewAPIHadoopFile expects a RDD of (key, value) pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.

I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:

df.write.format("org.elasticsearch.spark.sql").save(...)
Sign up to request clarification or add additional context in comments.

Comments

1

As zero323 said, the easiest way to load a Dataframe from PySpark to Elasticsearch is with the method

Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")  

Comments

0

You can use something like this:

df.write.mode('overwrite').format("org.elasticsearch.spark.sql").option("es.resource", '%s/%s' % (conf['index'], conf['doc_type'])).option("es.nodes", conf['host']).option("es.port", conf['port']).save()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.