3

I am generating a large number of elasticsearch documents with random content using python and index them with elasticsearch-py.

Simplified working example (document with just one field):

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)

Since this makes one request per document, I tried to speed it up by sending chunks of 1000 documents each using the _bulk API. However, my attempts so far have been unsuccessful.

My understanding from the docs is that you can pass an iterable to bulk(), so I tried:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []

but this results in a

elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')

1 Answer 1

11

Ok, seems I have mixed up two different functions: helpers.bulk() and Elasticsearch.bulk(). Either can be used to achieve what I intended to do, but they have a slightly different signature.

The helpers.bulk() function takes an Elasticsearch() object and an iterable containing the documents as parameters. The operation can be specified as _op_type and can be one of index, create, delete, or update. Since _op_type defaults to index, we can just omit it and simply pass the list of documents in this case:

from elasticsearch import Elasticsearch, helpers
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(32)}
    document_list.append(document)
    if i % 1000 == 0:
        helpers.bulk(es_client, document_list, index='my_index')
        document_list = []

This works fine.

The Elasticsearch.bulk() function can be used alternatively, but the actions/operations are mandatory as part of the iterable here and the syntax is slightly different. This means that instead of just a dict with the document contents, we need to have a dict specifying both the action (in this case "index": {}), as well as the body for each document. See also _bulk documentation:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

actions_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(32)}
    actions_list.append({"index": {}, "doc": document})
    if i % 1000 == 0:
        es_client.bulk(operations=actions_list, index='my_index')
        actions_list = []

This works fine as well.

I assume that both of the above generate the same _bulk REST API statement internally, so they should be equivalent in the end.


UPDATE:

As pointed out by Johan, the helpers.bulk() function internally takes care of the chunking (it actually calls helpers.streaming_bulk() internally), so there is no need to manually assign action lists of size 1000 to it. For my final solution, I ended up writing a generator function that yields one document / action at a time anyway. This can then simply be passed directly to helpers.streaming_bulk(), along with a chunk_size of your choosing (the default value is 500):

from elasticsearch import Elasticsearch, helpers                                
from random import getrandbits                                                  
                                                                                
es_client = Elasticsearch('https://elastic.host:9200')
                                                                                
def doc_stream():                                                          
   ''' generator function for stream of actions '''                      
   for i in range(1,10000000):                                                  
       yield {'_index': 'my_index',                                   
              '_source': {'my_field': getrandbits(32)} }                                                                                                                                                  
                                                                                
for status_ok, response in helpers.streaming_bulk(es_client,
                                          actions=doc_stream(), 
                                          chunk_size=1000):
    if not status_ok:                                                           
        # if failure inserting, log response                                                  
        print(response)     
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for sharing this. But when my_index does not exist, ES detects my_field as long integer. Then it throws error like: "failed to parse field [my_field] of type [long] in document with id 'NfaL7YIBbLcHbjIwQM39'. Preview of field's value: '17554611566426620742'". One fix is to use str(getrandbits(64)) to force ES to detect keyword type.
Nice question and answer! One question though: why do you assign action lists of size 1000 to helpers.bulk()? Doesn't that method (in contrast to es.bulk()) internally make chunks of docs ? And it seems only es.bulk() takes index as an arg.
@Johan, you're correct, helpers.bulk() internally passes everything on to helpers.streaming_bulk(), which does the chunking. So helpers.bulk() also accepts a parameter chunk_size which has a default value of 500. I guess back when I wrote the answer, I may have wanted to keep the list of generated documents small, but I really don't know right now. I ended up doing this properly by defining a generator function and passing that to streaming_bulk() directly. I've updated my answer with the respective code for reference.
@Jean-PierreMatsumoto in the process of updating my answer, I also adjusted it to getrandbits(32) which will not cause errors in the minimal examples anymore. It was no issue for me, as most of the time I explicitly set the mapping beforehand anyway.
Found helpers.bulk(es_client, document_list, index='my_index') piece quite easy to use. To add chunking, use like this helpers.bulk(es_client, document_list, index='my_index', chunk_size=1000) Thanks.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.