I have to run a query on ElasticSearch with Python to extract some voluminous data. I'm using elasticsearch 7.9.1 with Python 3.
The problem is, my script doesn't return the same number of lines from one run to an other. Sometimes I get ~300,000 results, sometimes more (1 million), sometimes zero. It seems to be happening especially if 2 runs are close to one another. I'm not changing the query nor the time range in which to search.
I noticed changing the scroll argument in es.search method seems to change the behaviour of the script.
If scroll = '1s' for example, page['hits']['hits'] is zero, which doesn't really make sense to me.
Here is the class I'm using in my script :
class ElasticsearchFinder():
def __init__(self, cfg):
logger.info('ElasticsearchFinder.__init__ : initiate parameters')
try:
######### ES configuration #######
self.port = cfg.get('Elasticsearch', 'port')
self.hostnames = cfg.get('Elasticsearch', 'hostnames')
self.username = cfg.get('Elasticsearch', 'username')
self.password = cfg.get('Elasticsearch', 'password')
self.index = cfg.get('Elasticsearch', 'index')
self.delay = cfg.get('Elasticsearch', 'delay')
self.folder = cfg.get('Elasticsearch', 'root_csv_folder')
self.filename = cfg.get('Elasticsearch', 'csv_filename')
self.ssl_es_context = create_ssl_context()
self.ssl_es_context.check_hostname = False
self.ssl_es_context.verify_mode = ssl.CERT_NONE
try:
self.start_time_conf = cfg.get('Elasticsearch', 'start_time')
except:
self.start_time_conf = None
try:
self.end_time_conf = cfg.get('Elasticsearch', 'end_time')
except:
self.end_time_conf = None
except Exception as e:
logger.error('ElasticsearchFinder.__init__ : initiate parameters failed, please verify fetch_qradar.conf : %s', str(e))
now = datetime.datetime.now()
if self.delay != "0":
start_date = now - datetime.timedelta(hours=int(self.delay))
else:
start_date = now - datetime.timedelta(weeks=52)
self.end_time = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.start_time = start_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if self.start_time_conf:
self.start_time = datetime.datetime.strptime(self.start_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if self.end_time_conf:
self.end_time = datetime.datetime.strptime(self.end_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
try:
self.es = Elasticsearch(self.hostnames, port = self.port, scheme="https",
http_auth=(self.username, self.password), ssl_context=self.ssl_es_context)
except Exception as e:
logger.error('ElasticsearchFinder.__init__ : Connect to remote Elasticsearch %s:%s failed : %s', self.hostname, self.port, str(e))
def search_elastic(self, start_time=None, end_time=None):
try:
# field to store all data
data = []
# parameters end_time and start_time as ARGS
if start_time:
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if end_time:
self.end_time = datetime.datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
logger.info('ElasticsearchFinder.search_elastic : QUERY = field1: "value1" and (field2: "value2" or "value3")')
query = {
"query": {
"bool": {
"must": [
{"match" : {"field1" : "value1"}},
{"range": {"timestamp": {"lt": self.end_time, "gte": self.start_time}}},
{
"bool": {
"should": [
{"match" : {"field2" : "value2"}},
{"match" : {"field2" : "value3"}}
]
}
}
]
}
}
}
# Initialize the scroll
page = self.es.search(
index=self.index,
scroll='30m',
size=1000,
body=query)
sid = page['_scroll_id']
scroll_size = page['hits']['total']['value']
logger.info('ElasticsearchFinder.search_elastic : search for qradar from {} to {}. Total hits {}'.format(self.start_time, self.end_time, scroll_size))
# fetch data
for i in page['hits']['hits']:
data.append({'some_field' : i[_source]['some_field']
})
# Start scrolling
while (scroll_size > 0):
# Get the number of results that we returned in the last scroll
page = self.es.scroll(scroll_id=sid, scroll='30m')
scroll_size = len(page['hits']['hits'])
for i in page['hits']['hits']:
data.append({'some_field' : i[_source]['some_field']
})
# Update the scroll ID (to move to next page)
sid = page['_scroll_id']
logger.info('ElasticsearchFinder.search_elastic : Total stored data {}'.format(len(data)))
# write CSV file
self.writeToCSV(self.folder+self.filename, data)
The query has since been checked directly on Kibana and is correct. (My script is also not returning the same number of results than Kibana)
I've had to set large timeouts or else I would get timeout errors.
Any idea why it isn't returning the same number of results each time?
I thought maybe the scroll_id was kept somewhere so Elastic wasn't returning results it had already returned on the previous run, but the scroll_id changes from one run to the next so that seems unlikely.