1

This guy had a very small example that showed how to integrate ElasticSearch and Spark, when all the ES ecosystem was around version 0.9. Nowadays, it doesn't work anymore (and googling for it doesn't seem an easy feat). Can someone give a small, self-contained Scala example of:

  1. Opening a file in spark (in the example above, it was /var/log/syslog);
  2. Doing something with it;
  3. Sending the result into ES;
  4. Opening that result back in Spark.

... that works with ElasticSearch 1.3.4 and Spark 1.1.0.

1 Answer 1

3

I gave a talk awhile back with Spark and Elastic Search (around the 0.9 days), and I recently updated some of the examples for present day (read 1.1). I've posted the slides and the example code. Hope that helps!

I've also copied the relevant sections (from my own github repo) here:

import org.elasticsearch.spark.sql._
...
val tweetsAsCS =
    createSchemaRDD(tweetRDD.map(SharedIndex.prepareTweetsCaseClass))
tweetsAsCS.saveToEs(esResource)

Note that we didn't specify any ES nodes. This will default to trying to save to a cluster on local host. If we want to use a different cluster we can add:

// if we want to have a different es cluster we can add
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
val config = new SparkConf()
config.set(ConfigurationOptions.ES_NODES, node) // set the node for discovery
// other config settings
val sc = new SparkContext(config)

So that will do the first part (indexing some data).

Querying ES from Spark has also gotten a lot simpler, although only if your data types are supported by the mappings of the connector (the primary one I ran into that wasn't was geolocation but its easy enough to extend the mapper if you run into this).

val query = "{\"query\": {\"filtered\" : {\"query\" : {\"match_all\" : {}},\"filter\" : { \"geo_distance\" : { \"distance\" : \""+ dist + "km\", \"location\" : { \"lat\" : "+ lat +", \"lon\" : "+ lon +" }}}}}}"
val tweets = sqlCtx.esRDD(esResource, query)

The esRDD function isn't normally on the SQLContext, but the implicit conversions we imported up above make it available to us. tweets is now a SchemaRDD and we can update it as desired and save the results back as we did in the first part of this example.

Hope this helps!

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, I'll take a look!! As a reviewer of SO, though, I would have to ask you if you could put a code sample here. Links are discouraged because they might become dead, and it's against the spirit of a FAQ-oriented site.
Sure let me copy it over :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.