2

I've got more than 6 billion of social media data in HBase (including content/time/author and other possible fields) with 4100 regions in 48 servers , and I need to flush these data into Elasticsearch now.

I'm clear about the bulk API of ES, and using bulk in Java with MapReduce still cost many days (at least a week or so). I can use spark instead but I don't think it will help a lot.

I'm wondering is there any other tricks to write these large data into ElasticSearch ? Like manually write to es index files and using some kinds of recover to load the files in local file system ?

Appreciate any possible advice, thanks.

==============

Some details about my cluster environments:

spark 1.3.1 standalone (I can change it on yarn to use Spark 1.6.2 or 1.6.3)

Hadoop 2.7.1 (HDP 2.4.2.258)

ElasticSearch 2.3.3

1
  • 1
    :I have experience in Spark + Hbase +Solr as well as Solr + Hbase + mapreduce indexing as well. I am not aware any other techniques other than spark, mapreduce will drastically improve performance. you are okay with spark is best option out of my experience. Commented Dec 15, 2016 at 16:49

2 Answers 2

2

AFAIK Spark is best option for indexing out of below 2 options. along with that below are approaches I'd offer :

Divide (input scan criteria) and conquer 6 billion of social media data :

Id recommend create multiple Spark/Mapreduce jobs with different search criteria(to divide 6 billion of social media data in 6 pieces based on category or something else) and trigger them in parallel. For example based on data capture Time Range(scan.setTimeRange(t1, t2)) or else with some fuzzy row logic(FuzzyRowFilter), should definitely speed up things.

OR

Kind of Streaming approach :

You can also consider as and when you are inserting data through spark or mapreduce you can simultaneously create indexes for them.

For example in case of SOLR : clouder has NRT hbase lily indexer... i.e as and when hbase table is populated based on WAL (write ahead log) entries simultaneously it will create solr indexes. check any thing is there like that for Elastic search.

Even if its not there for ES as well, don't have to bother, while ingesting data it self using Spark/Mapreduce program you can create by yourself.

Option 1 :

Id suggest if you are okay with spark it is good solution Spark Supports native integration of ES from hadoop 2.1. see

elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an RDD (Resilient Distributed Dataset) (or Pair RDD to be precise) that can read data from Elasticsearch. The RDD is offered in two flavors: one for Scala (which returns the data as Tuple2 with Scala collections) and one for Java (which returns the data as Tuple2 containing java.util collections).

Option 2 : As you are aware bit slow than spark

Writing data to Elasticsearch With elasticsearch-hadoop, Map/Reduce jobs can write data to Elasticsearch making it searchable through indexes. elasticsearch-hadoop supports both (so-called) old and new Hadoop APIs.

Sign up to request clarification or add additional context in comments.

2 Comments

thanks! es-hadoop may be a option. The main point that I have to look into the source code of es-hadoop is how it calculates the outputformat splits. I believe if I can ensure that each data in mr-split/spark-partition maps into the correct shard in ES , the cost of request forward and bulk thread pool occupation can be cut down. It's my concerns that es-hadoop is still using bulk api and if so the performance will actually not going to be improved.
yep ok! Your answer provides several possible optimizable methods and I think it worths an accept! appreciate it very much.
0

I found a practical trick to improve the bulk index performance by myself.

I can calculate the hash routing in my client and make sure that each bulk request containing all index requests with the same routing. According to the routing result and shard info with ip, I directly send the bulk request to corresponding shard node. This trick can avoid the bulk reroute cost and cut down the bulk request thread pool occupation which may cause EsRejectedException.

For example, I have 48 nodes in different machines. Assuming that I send a bulk request containing 3000 index requests to any node, these index requests will be rerouted to other nodes (usually all the nodes) according by routing. And the client thread has to wait for the whole process finished, including processing local bulk and waiting for other nodes' bulk responses. However, without the reroute phase, the network costs are gone (except for forwarding to the replica nodes), and the client just need to wait less time. Meanwhile, assuming that I have only 1 replica, the total occupation of bulk threads are 2 only. ( client-> primary shard and primary shard -> replica shard )

Routing hash:

shard_num = murmur3_hash (_routing) % num_primary_shards

Try to take a look into: org.elasticsearch.cluster.routing.Murmur3HashFunction

Client can get the shards and index aliases by request to cat apis.

shard info url: cat shards

aliases mapping url: cat aliases

Some attentions:

  1. ES may change default hash function in different version, which means the client code may not be version compatible.
  2. This trick is based on the assumption that the hash results are basically balanced.
  3. Client should think about fault tolerance such as connection timeout to the corredponding shard node.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.