3

I need to get a lot of data from Elasticsearch (es), so I'm using the scan command which is a wrap-up for the native es scroll command. As a result I will get the following generator Object: <generator object scan at 0x000001BF5A25E518>. Farther more, I'd like to insert all the data into a Pandas DataFrame object so I can easily process it.

Code goes as follows:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan as escan
import pandas as pd

es = Elasticsearch(dpl_server, verify_certs=False)

body = {
  "size": 1000,
  "query": {
    "match_all": {}
  }
}
response = escan(client=es,
                 index="index-*,
                 query=body, request_timeout=30, size=1000)

print(response)
#<generator object scan at 0x000001BF5A25E518>

What I want to do is putting all the results in Pandas DataFrame. If I print each element in the generator as follows:

for res in response:
    print(res['_source'])
# { .... }
# { .... }
# { .... }

I will get a lot of dictionaries. A naive solution of mine so far is to add them 1 by 1 like so:

df = None
for res in response:
    if (df is None):
        df = pd.DataFrame([res['_source']])
    else:
        df = pd.concat([df, pd.DataFrame([res['_source']])], sort=True)

I wish to know if there's a better way in doing so (first, in terms of speed, second, in terms of clean code). For instance, would it be better to accumulate all the results from the generator into a list and then build a complete DataFrame ?

1 Answer 1

2

You can use panda's json_normalize.

from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan as escan
import pandas as pd

es = Elasticsearch(dpl_server, verify_certs=False)

body = {
  "size": 1000,
  "query": {
    "match_all": {}
  }
}
response = escan(client=es,
                 index="index",
                 query=body, request_timeout=30, size=1000)

# Initialize a double ended queue
output_all = deque()
# Extend deque with iterator
output_all.extend(response)
# Convert deque to DataFrame
output_df = json_normalize(output_all)

Here you can find more info on the double ended queue.

Sign up to request clarification or add additional context in comments.

4 Comments

That is a great answer! But it raises some questions for me. Why did we chose deque() ? Is there a better (faster) data structure? Additionally, I want to add the res['_source'] (for each "hit" I want to add the the object under '_source') and not the whole response object.
@EranMoshe deque() just allows for faster appending/extending. To only select fields belonging to _source I have a small function that only selects the columns starting with "_source.", something on the lines of output_df = output_df[[x for x in output_df.columns if "_source." in x]]
Hey, if you come up with a better solution, multi-processed or multi-threaded or something of that kind, let me know! I'll also do the same if I develop one.
I tried both your solution and the answer solution, but in both cases, I am stuck in an infinite loop, I am unable to break the for loop, knowing that I have very few elements in the response (around 500). any clue about this?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.