0

I have to run a query on ElasticSearch with Python to extract some voluminous data. I'm using elasticsearch 7.9.1 with Python 3.

The problem is, my script doesn't return the same number of lines from one run to an other. Sometimes I get ~300,000 results, sometimes more (1 million), sometimes zero. It seems to be happening especially if 2 runs are close to one another. I'm not changing the query nor the time range in which to search.

I noticed changing the scroll argument in es.search method seems to change the behaviour of the script. If scroll = '1s' for example, page['hits']['hits'] is zero, which doesn't really make sense to me.

Here is the class I'm using in my script :

class ElasticsearchFinder():

    def __init__(self, cfg):
        logger.info('ElasticsearchFinder.__init__ : initiate parameters')
        try:
            ######### ES configuration #######
            self.port = cfg.get('Elasticsearch', 'port')
            self.hostnames = cfg.get('Elasticsearch', 'hostnames')
            self.username = cfg.get('Elasticsearch', 'username')
            self.password = cfg.get('Elasticsearch', 'password')
            self.index = cfg.get('Elasticsearch', 'index')
            self.delay = cfg.get('Elasticsearch', 'delay')
            self.folder = cfg.get('Elasticsearch', 'root_csv_folder')
            self.filename = cfg.get('Elasticsearch', 'csv_filename')

            self.ssl_es_context = create_ssl_context()
            self.ssl_es_context.check_hostname = False
            self.ssl_es_context.verify_mode = ssl.CERT_NONE
        

            try:
                self.start_time_conf = cfg.get('Elasticsearch', 'start_time')
            except:
                self.start_time_conf = None
            try:
                self.end_time_conf = cfg.get('Elasticsearch', 'end_time')
            except:
                self.end_time_conf = None

        except Exception as e:
            logger.error('ElasticsearchFinder.__init__ : initiate parameters failed, please verify fetch_qradar.conf : %s', str(e))

        now = datetime.datetime.now()

        if self.delay != "0":
            start_date = now - datetime.timedelta(hours=int(self.delay))
        else:
            start_date = now - datetime.timedelta(weeks=52)

        self.end_time = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        self.start_time = start_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        if self.start_time_conf:
            self.start_time = datetime.datetime.strptime(self.start_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        if self.end_time_conf:
            self.end_time = datetime.datetime.strptime(self.end_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        try:
            self.es = Elasticsearch(self.hostnames, port = self.port, scheme="https",
                                    http_auth=(self.username, self.password), ssl_context=self.ssl_es_context)

        except Exception as e:
            logger.error('ElasticsearchFinder.__init__ : Connect to remote Elasticsearch %s:%s failed : %s', self.hostname, self.port, str(e))


    def search_elastic(self, start_time=None, end_time=None):
        try:
            # field to store all data
            data = []

            # parameters end_time and start_time as ARGS
            if start_time:
                self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

            if end_time:
                self.end_time = datetime.datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

            logger.info('ElasticsearchFinder.search_elastic : QUERY = field1: "value1" and (field2: "value2" or "value3")')
            query = {
                        "query": {
                            "bool": {
                                "must": [
                                    {"match" : {"field1" : "value1"}},
                                    {"range": {"timestamp": {"lt": self.end_time, "gte": self.start_time}}},
                                    {
                                        "bool": {
                                            "should": [
                                                {"match" : {"field2" : "value2"}},
                                                {"match" : {"field2" : "value3"}}
                                            ]
                                        }
                                    }
                                ]
                            }
                        }
                    }
            
            # Initialize the scroll
            page = self.es.search(
                index=self.index,
                scroll='30m',
                size=1000,
                body=query)

            sid = page['_scroll_id']
            scroll_size = page['hits']['total']['value']
            logger.info('ElasticsearchFinder.search_elastic : search for qradar from {} to {}. Total hits {}'.format(self.start_time, self.end_time, scroll_size))

            # fetch data
            for i in page['hits']['hits']:
                data.append({'some_field' : i[_source]['some_field']
                    })

            # Start scrolling
            while (scroll_size > 0):
                # Get the number of results that we returned in the last scroll
                page = self.es.scroll(scroll_id=sid, scroll='30m')
                scroll_size = len(page['hits']['hits'])
                for i in page['hits']['hits']:
                    data.append({'some_field' : i[_source]['some_field']
                    })

                # Update the scroll ID (to move to next page)
                sid = page['_scroll_id']

            logger.info('ElasticsearchFinder.search_elastic : Total stored data {}'.format(len(data)))
           
            # write CSV file
            self.writeToCSV(self.folder+self.filename, data)

The query has since been checked directly on Kibana and is correct. (My script is also not returning the same number of results than Kibana)

I've had to set large timeouts or else I would get timeout errors.

Any idea why it isn't returning the same number of results each time? I thought maybe the scroll_id was kept somewhere so Elastic wasn't returning results it had already returned on the previous run, but the scroll_id changes from one run to the next so that seems unlikely.

4
  • Hi, May I ask you about your elasticsearch cluster? do you have a single node or a cluster of elasticsearch? If you have a cluster may I know about the configs of your machines? Commented Oct 13, 2020 at 21:24
  • @saeednasehi It is a cluster, but I don't know about the configs, I was only given the hostnames Commented Oct 14, 2020 at 7:35
  • In my view, this problem is because of your cluster. I think one or more nodes of your cluster have some problems such as network latency or slow hard disk. I am not sure about this but this behavior of your cluster shows something around this problem. Commented Oct 14, 2020 at 7:41
  • @saeednasehi cluster has been checked by my colleagues and does not appear to have any problems, I'll try playing with timeout parameters in my script Commented Oct 15, 2020 at 9:14

1 Answer 1

1

Turns out the script was creating too many scroll contexts (more than the default 500). This explains why the issue especially appeared if the script was run several times in a row.

This could be seen by looking at the _shards part of the result (page['_shards'] in my case), which contained many such error message :

Trying to create too many scroll contexts. Must be less than or equal to: [500]

Reducing the scroll argument in es.search() seems to help, as scroll contexts are then flushed more quickly, as well as clearing scroll context after the search with es.clear_scroll().

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.