4

i am new in python and Elasticsearch i write a python code that read data from very large json file and index some attributes in Elasricsearch.

import elasticsearch
import json
es = elasticsearch.Elasticsearch()  # use default of localhost, port 9200
with open('j.json') as f:
    n=0
    for line in f:
        try:
            j_content = json.loads(line)
            event_type = j_content['6000000']
            device_id = j_content['6500048']
            raw_event_msg= j_content['6000012']
            event_id = j_content["0"]
            body = {
                '6000000': str(event_type),
                '6500048': str(device_id),
                '6000012': str(raw_event_msg),
                '6000014': str(event_id),
            }
            n=n+1
            es.index(index='coredb', doc_type='json_data', body=body)
        except:
            pass

but it's too slow and i have many free hardware resources. how can i improve performance of code with multi thread or bulk ?

2
  • As you have a for loop, the multi threading is kind of a straightforward answer, given that your Elastic cluster can handle a heavier load. (Instead of multi threading, I'd suggest you use multiprocessing in Python, mainly because of the GIL). Then, I'd suggest you have a look at this article which gives good tips to increase the indexing speed: elastic.co/guide/en/elasticsearch/reference/master/… Commented Jul 5, 2017 at 7:48
  • You should use the bulk API instead of single document indexing within the python client, in comnbination with the bulk helpers Commented Jul 5, 2017 at 9:50

2 Answers 2

1

You probably want to look into using Elasticsearch helpers, one in particular called bulk, seems you are aware of it, so instead of Elasticsearch setting the data to the index on every loop, collect the results in a list, and then once that list reaches a certain length, use the bulk function, and this dramatically increases performance.

You can see a rough idea with the following example; I had a very large text file, with 72873471 lines, efficiently calculated from the command line with wc -l big-file.txt, and then, using the same method you posted, resulted in an estimated ETA of 10 days

# Slow Method ~ 10 days
from elasticsearch import Elasticsearch
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
file = open("/path/to/big-file.txt")
with progressbar.ProgressBar(max_value=72873471) as bar:
    for idx, line in enumerate(file):
        bar.update(idx)
        clean = re.sub("\n","",line).lstrip().rstrip()
        doc = {'tag1': clean, "tag2": "some extra data"}
        es.index(index="my_index", doc_type='index_type', body=doc)

Now importing helpers from Elasticsearch, cut that time down to 3.5 hours:

# Fast Method ~ 3.5 hours
from elasticsearch import Elasticsearch, helpers
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
with progressbar.ProgressBar(max_value=72873471) as bar:
actions = []
file = open("/path/to/big-file.txt")
for idx, line in enumerate(file):
    bar.update(idx)
    if len(actions) > 10000:
        helpers.bulk(es, actions)
        actions = []
    clean = re.sub("\n","",line).lstrip().rstrip()
    actions.append({
        "_index": "my_index", # The index on Elasticsearch
        "_type": "index_type", # The document type
        "_source": {'tag1': clean, "tag2": "some extra data"}
    })
Sign up to request clarification or add additional context in comments.

Comments

0

What you want it is called Cython ;)

You can speed up your code up to 20x for sure only enabling static typing to your variables.

The code bellow should go into cython, give it a try, you'll see:

try:
    j_content = json.loads(line)       # Here you might want to work with cython structs.
                                       # I can see you have a json per line, so it should be easy
    event_type = j_content['6000000']
    device_id = j_content['6500048']
    raw_event_msg= j_content['6000012']
    event_id = j_content["0"]
    body = {
        '6000000': str(event_type),
        '6500048': str(device_id),
        '6000012': str(raw_event_msg),
        '6000014': str(event_id),
    }
    n=n+1

2 Comments

The answer is not related to the original question, as it does not mention how to use threads or bulk api. Also, dictionary initialization is barely a problem in comparison with multiple api calls over network.
@PyGuy, while is true my answer does not mention how to use treads or bulk api (or asyncIO or trio), it is indeed related the the question that is about performance. And for sure speeding up dictionary initialization would improve performance a lot.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.