9

I am using Apache Spark DataFrame and I want to upsert data to Elasticsearch and I found I can overwrite them like this

val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.ssl","true")
  .option("es.nodes", esURL)
  .option("es.mapping.id", index)
  .mode("Overwrite")
  .save("index/dogs")

but what i noticed so far is this command mode("Overwrite") is actually delete all existing duplicated data and insert the new data

is there a way I can upsert them not delete and re-write them ? because I need to query those data almost real time. thanks in advance

2 Answers 2

10

The reason why mode("Overwrite") was a problem is that when you overwrite your entire dataframe it deletes all data that matches with your rows of dataframe at once and it looks like the entire index is empty for me and I figure out how to actually upsert it

here is my code

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.nodes.discovery", "false")
  .option("es.nodes.client.only", "false")
  .option("es.net.ssl","true")
  .option("es.mapping.id", index)
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .option("es.port", "443")
  .mode("append")
  .save(path)

Note that you have to put "es.write.operation", "upert" and .mode("append")

Sign up to request clarification or add additional context in comments.

2 Comments

What is the value of the index ?
@Soumendra it's mapping id of ES as shown. For me, it's userId
2

Try setting:

es.write.operation = upsert

This should perform the required operation. You can find more details in https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

2 Comments

thanks for answering. I tried that but it didn't work for me and I needed to put .mode("append") too
While this is correct, you need to set the mode to "append" otherwise all the existing documents will be removed from the index.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.