9

When scrolling in elasticsearch it is important to provide at each scroll the latest scroll_id:

The initial search request and each subsequent scroll request returns a new scroll_id — only the most recent scroll_id should be used.

The following example (taken from here) puzzle me. First, the srolling initialization:

rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'], 
               scroll='10s', 
               search_type='scan', 
               size=100, 
               preference='_primary_first',
               body={
                 "fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
                   "query" : {
                     "wildcard" : { "entities.urls.expanded_url" : "*.ru" }
                   }
               }
   )
sid = rs['_scroll_id']

and then the looping:

tweets = [] while (1):
    try:
        rs = es.scroll(scroll_id=sid, scroll='10s')
        tweets += rs['hits']['hits']
    except:
        break

It works, but I don't see where sid is updated... I believe that it happens internally, in the python client; but I don't understand how it works...

1
  • when does this break out of the loop? I tried this approach, it kept going and I had to stop manually. Thanks Commented Jan 27, 2021 at 21:14

5 Answers 5

25

This is an old question, but for some reason came up first when searching for "elasticsearch python scroll". The python module provides a helper method to do all the work for you. It is a generator function that will return each document to you while managing the underlying scroll ids.

https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan

Here is an example of usage:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
    "query": {"match_all": {}}
}

es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
    print(hit["_source"]["field"])
Sign up to request clarification or add additional context in comments.

Comments

9

Using python requests

import requests
import json

elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}

payload = {
    "size": 100,
    "sort": ["_doc"]
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

r1 = requests.request(
    "POST",
    elastic_url,
    data=json.dumps(payload),
    headers=headers
)

# first batch data
try:
    res_json = r1.json()
    data = res_json['hits']['hits']
    _scroll_id = res_json['_scroll_id']
except KeyError:
    data = []
    _scroll_id = None
    print 'Error: Elastic Search: %s' % str(r1.json())
while data:
    print data
    # scroll to get next batch data
    scroll_payload = json.dumps({
        'scroll': '1m',
        'scroll_id': _scroll_id
    })
    scroll_res = requests.request(
        "POST", scroll_api_url,
        data=scroll_payload,
        headers=headers
    )
    try:
        res_json = scroll_res.json()
        data = res_json['hits']['hits']
        _scroll_id = res_json['_scroll_id']
    except KeyError:
        data = []
        _scroll_id = None
        err_msg = 'Error: Elastic Search Scroll: %s'
        print err_msg % str(scroll_res.json())

Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll

3 Comments

Hey @anjan can we use this if we are using elastic version 5. I guess the scroll is only in elastic version 6
@ak3191 I think it is only available in v6. so, we can't use it in v5.
i got a way...let me know if you would like to know
5

In fact the code has a bug in it - in order to use the scroll feature correctly you are supposed to use the new scroll_id returned with each new call in the next call to scroll(), not reuse the first one:

Important

The initial search request and each subsequent scroll request returns a new scroll_id — only the most recent scroll_id should be used.

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

It's working because Elasticsearch does not always change the scroll_id in between calls and can for smaller result sets return the same scroll_id as was originally returned for some time. This discussion from last year is between two other users seeing the same issue, the same scroll_id being returned for awhile:

http://elasticsearch-users.115913.n3.nabble.com/Distributing-query-results-using-scrolling-td4036726.html

So while your code is working for a smaller result set it's not correct - you need to capture the scroll_id returned in each new call to scroll() and use that for the next call.

4 Comments

Just to make it clear: the bug is with the code in the example, right? No bug in the python client I hope.
If this is indeed the case, as I assume, could you please suggest how to correct the example? Merely adding something like sid=rs['_scroll_id'] inside the try doesn't seem to work.
Runs into an infinite loop. The while loop doesn't end.
That's the second bug in the code from that site - should not be a while loop waiting for an exception, since when scroll is called correctly I don't believe it throws one. You should check to see if any hits are received in that iteration of the loop - if not then you are done.
2

self._elkUrl = "http://Hostname:9200/logstash-*/_search?scroll=1m"

self._scrollUrl="http://Hostname:9200/_search/scroll"


    """
    Function to get the data from ELK through scrolling mechanism
    """ 
    def GetDataFromELK(self):
        #implementing scroll and retriving data from elk to get more than 100000 records at one search
        #ref :https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html
        try :
            dataFrame=pd.DataFrame()
            if self._elkUrl is None:
                raise ValueError("_elkUrl is missing")
            if self._username is None:
                raise ValueError("_userNmae for elk is missing")
            if self._password is None:
                raise ValueError("_password for elk is missing")
            response=requests.post(self._elkUrl,json=self.body,auth=(self._username,self._password))
            response=response.json()
            if response is None:
                raise ValueError("response is missing")
            sid  = response['_scroll_id']
            hits = response['hits']
            total= hits["total"]
            if total is  None:
                raise ValueError("total hits from ELK is none")
            total_val=int(total['value'])
            url = self._scrollUrl
            if url is None:
                raise ValueError("scroll url is missing")
            #start scrolling 
            while(total_val>0):
                #keep search context alive for 2m
                scroll = '2m'
                scroll_query={"scroll" : scroll, "scroll_id" : sid }
                response1=requests.post(url,json=scroll_query,auth=(self._username,self._password))
                response1=response1.json()
                # The result from the above request includes a scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
                sid = response1['_scroll_id']
                hits=response1['hits']
                data=response1['hits']['hits']
                if len(data)>0:
                    cleanDataFrame=self.DataClean(data)
                    dataFrame=dataFrame.append(cleanDataFrame)
                total_val=len(response1['hits']['hits'])
                num=len(dataFrame)
            print('Total records recieved from ELK=',num)
            return dataFrame
        except Exception as e:
            logging.error('Error while getting the data from elk', exc_info=e)
            sys.exit()

Comments

2
from elasticsearch import Elasticsearch

elasticsearch_user_name ='es_username'
elasticsearch_user_password ='es_password'
es_index = "es_index"

es = Elasticsearch(["127.0.0.1:9200"],
                   http_auth=(elasticsearch_user_name, elasticsearch_user_password))


query = {
  "query": {
     "bool": {
        "must": [
            {
            "range": {
                "es_datetime": {
                    "gte": "2021-06-21T09:00:00.356Z",
                    "lte": "2021-06-21T09:01:00.356Z",
                    "format": "strict_date_optional_time"
                    }
                }
        }
    ]
 }
},
  "fields": [
      "*"
],
  "_source": False,
  "size": 2000,
}
resp = es.search(index=es_index, body=query, scroll="1m")
old_scroll_id = resp['_scroll_id']
results = resp['hits']['hits']
while len(results):
    for i, r in enumerate(results):
    # do something whih data
    pass
    result = es.scroll(
        scroll_id=old_scroll_id,
        scroll='1m'  # length of time to keep search context
    )
    # check if there's a new scroll ID
    if old_scroll_id != result['_scroll_id']:
        print("NEW SCROLL ID:", result['_scroll_id'])
    # keep track of pass scroll _id
    old_scroll_id = result['_scroll_id']

    results = result['hits']['hits']

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.