6

I would like to index a bunch of large pandas dataframes (some million rows and 50 columns) into Elasticsearch.

When looking for examples on how to do this, most people will use elasticsearch-py's bulk helper method, passing it an instance of the Elasticsearch class which handles the connection as well as a list of dictionaries which is created with pandas' dataframe.to_dict(orient='records') method. Metadata can be inserted into the dataframe beforehand as new columns, e.g. df['_index'] = 'my_index' etc.

However, I have reasons not to use the elasticsearch-py library and would like to talk to the Elasticsearch bulk API directly, e.g. via requests or another convenient HTTP library. Besides, df.to_dict() is very slow on large dataframes, unfortunately, and converting a dataframe to a list of dicts which is then serialized to JSON by elasticsearch-py sounds like unnecessary overhead when there is something like dataframe.to_json() which is pretty fast even on large dataframes.

What would be an easy and quick approach of getting a pandas dataframe into the format required by the bulk API? I think a step in the right direction is using dataframe.to_json() as follows:

import pandas as pd
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}])
df
   a  b
0  1  2
1  3  4
2  5  6
df.to_json(orient='records', lines=True)
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}'

This is now a newline-separated JSON string, however, it is still lacking the metadata. What would be a performing way to get it in there?

edit: For completeness, a metadata JSON document would look like that:

{"index": {"_index": "my_index", "_type": "my_type"}}

Hence, in the end the whole JSON expected by the bulk API would look like this (with an additional linebreak after the last line):

{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":1,"b":2}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":3,"b":4}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":5,"b":6}
4
  • Can you post an expected metadata for your sample DF? Commented Jan 3, 2017 at 12:12
  • Sure, please see my edit. Commented Jan 3, 2017 at 12:31
  • i don't understand that format (structure) - it's not a valid JSON. Can you make a little test trying to load this small "JSON" into ElasticSearch using its bulk API? Commented Jan 3, 2017 at 13:12
  • Yes, that's indeed not valid JSON in total but a linebreak-delimited list of multiple valid JSON documents. Unfortunately, this is what elasticsearch's bulk API expects. The reason behind that is that the bulk data is split into documents at the linebreak, then documents might be forwarded to other nodes than the receiving one before the document actually gets parsed. Commented Jan 5, 2017 at 14:56

2 Answers 2

2

Meanwhile I found out multiple possibilities how to do that with at least reasonable speed:

import json
import pandas as pd
import requests

# df is a dataframe or dataframe chunk coming from your reading logic
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id
df_as_json = df.to_json(orient='records', lines=True)

final_json_string = ''
for json_document in df_as_json.split('\n'):
    jdict = json.loads(json_document)
    metadata = json.dumps({'index': {'_id': jdict['_id']}})
    jdict.pop('_id')
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n'

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

Instead of using pandas' to_json() method, one could also use to_dict() as follows. This was slightly slower in my tests but not much:

dicts = df.to_dict(orient='records')
final_json_string = ''
for document in dicts:
    metadata = {"index": {"_id": document["_id"]}}
    document.pop('_id')
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n'

When running this on large datasets, one can save a couple of minutes by replacing Python's default json library with ujson or rapidjson via installing it, then import ujson as json or import rapidjson as json, respectively.

An even bigger speedup can be achieved by replacing the sequential execution of the steps with a parallel one so that reading and converting does not stop while requests is waiting for Elasticsearch to process all documents and return a response. This could by done via Threading, Multiprocessing, Asyncio, Task Queues, ... but this is out of the scope of this question.

If you happen to find an approach to do the to-json-conversion even faster, let me know.

Sign up to request clarification or add additional context in comments.

1 Comment

Just looking at this code, you're serializing into json, then deserializing it again for the loop. I imagine you could get a simple speedup by using df.iterrows and then only calling to_json on the rows themselves
2

This function insert a pandas dataframe into elastic search (chunk by chunk)

def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200',
                           chunk_size = 2000):
    headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'}
    records = dataFrame.to_dict(orient='records')
    actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j])
                    for j in range(len(records))]
    i=0
    while i<len(actions):
        serverAPI = server + '/_bulk' 
        data='\n'.join(actions[i:min([i+chunk_size,len(actions)])])
        data = data + '\n'
        r = requests.post(serverAPI, data = data, headers=headers)
        print r.content
        i = i+chunk_size

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.