1

I'm trying to rewrite a django management command in an asynchronous way using asyncio and aiohttp. Those are the files involved:

# rest_async.py
async def t_search_coro(token, loop, **kwargs):
    """
    ws T Search Query:
    kwargs:
    - modification_start_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - modification_end_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - lo_type: (str) LO Type. Defaults to 'Event'
    - status: (str) T Status of the LO. Required
    - portal: portal. Default: settings.PORTAL
    - page_nr: PageNumber querystring parameter. Default: 1
    """
    path = '/services/api/TSearch'
    method = 'GET'
    modification_start_date = kwargs.pop('modification_start_date')
    modification_end_date = kwargs.pop('modification_end_date')
    lo_type = kwargs.pop('lo_type', 'Event')
    status = kwargs.pop('status')
    portal = kwargs.pop('portal', settings.PORTAL)
    page_nr = kwargs.pop('page_nr', 1)
    debugging = kwargs.pop('debugging', True)
    signature_kws = get_signature_kwargs(token, path, method)
    headers = signature_kws.get('headers')
    params = {
        'LOType': lo_type,
        'Status': status,
        'PageNumber': page_nr,
        'format': 'JSON'
    }
    if modification_start_date is not None:
        params['ModificationStartDate'] = modification_start_date
    if modification_end_date is not None:
        params['ModificationEndDate'] = modification_end_date

    service_end_point = 'https://{}.example.net{}'.format(portal, path)
    print("fetching data: {} - {}".format(modification_start_date, modification_end_date))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url=service_end_point, params=params, headers=headers) as resp:
            assert resp.status == 200
            return await resp.read()


# utils_async.py

async def fetch_t_data_coro(
        loop, lo_type='Session', modification_start_date=now()-timedelta(hours=22), modification_end_date=now(),
    status='Completed', **kwargs):
    date_fmt = "%Y-%m-%dT%H:%M:%S"
    if (modification_end_date - modification_start_date).total_seconds() > timedelta(days=1).total_seconds():
        raise Exception("modification start/end datetime interval must be within 24 hrs."
                        "\nmod. start date: {}\nmod. end date: {}".format(
            modification_start_date.strftime(date_fmt), modification_end_date.strftime(date_fmt)
        ))
    debugging = kwargs.pop('debugging', False)
    page_nr = kwargs.get('page_nr', 1)
    modification_start_date = modification_start_date.strftime(date_fmt)
    modification_end_date = modification_end_date.strftime(date_fmt)
    rtn_data = []
    params = {
        'LOType': lo_type, 'Status': status, 'PageNumber': page_nr, 'format': 'JSON'
    }
    already_added = set()
    while True:
        data = await rest_async.t_search_coro(
            token, loop, modification_start_date=modification_start_date, modification_end_date=modification_end_date,
            lo_type=lo_type, status=status, page_nr=page_nr, debugging=debugging
        )
        data_dict = json.loads(data.decode('utf-8'))
        if 'data' not in data_dict:
            break
        total_pages = data_dict['data'][0]['T_Item']['TotalPages']
        t_raw_data = data_dict['data'][0]['T_Item']['T']
        for item in t_raw_data:
            _h = hash(json.dumps(item, sort_keys=True))
            if _h in already_added:
                continue
            already_added.add(_h)
            rtn_data.append(item)
        if page_nr >= total_pages:
            break
        page_nr += 1
    return rtn_data


# load_data_async.py (actual django management command)

import asyncio
from datetime import timedelta, datetime
import argparse
import logging

from django.core.management.base import BaseCommand
from django.utils.timezone import now

from myapp.utils_async import fetch_transcript_data_coro

RUNNING_INTERVAL_MINS = 60
logger = logging.getLogger('my_proj')
MAX_BACKDAYS = 160
BACKDAYS_HOURS = {3, 9, 15, 21}
DEFAULT_TIMEFRAME=24
GO_BACK_DAYS = 30
GO_BACK_DAYS_TIMEFRAME = 24


class Command(BaseCommand):
    help = "fetch data asynchrounously"

    def add_arguments(self, parser):
        parser.add_argument(
            '--timeframe', action='store', dest='timeframe', default=DEFAULT_TIMEFRAME, type=int,
            help='Timeframe hours to be used (default to 24, range: 1 to 24)'
        )
        parser.add_argument(
            '--backdays', action='store', dest='backdays', default=None, type=int,
            help='repeat the command execution (for the same timeframe) n days before the current day'
        )

        parser.add_argument('--start-date', type=valid_date_type)
        parser.add_argument('--end-date', type=valid_date_type)

    def handle(self, *args, **options):
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self._handle(*args, **options))

    async def _handle(self, *args, **options):
        timeframe = options.get('timeframe')
        backdays = options.get('backdays', None)
        start_date = options.get('start_date')
        end_date = options.get('end_date')
        backdays = backdays + 1 if backdays is not None else 1
        if all([start_date is not None, end_date is not None]):
            days_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
        else:
            days_range = [now() - timedelta(days=x) for x in range(backdays)]
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            data = await fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            )

def valid_date_type(arg_date_str):
    try:
        return datetime.strptime(arg_date_str, "%Y-%m-%d")
    except ValueError:
        msg = "Given Date ({0}) not valid! Expected format, YYYY-MM-DD!".format(arg_date_str)
        raise argparse.ArgumentTypeError(msg)

I then tried to run the cmd as:

python manage.py load_data_async --start-date 2018-04-20 --end-date 2018-06-6

the command runs without errors, however it seems from the print statement that the coroutines are executed sequentially, in the same way of the original synchrounous code:

# output
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-22T00:00:00 - 2018-04-23T00:00:00
fetching data: 2018-04-23T00:00:00 - 2018-04-24T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00

...
...
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-24T00:00:00 - 2018-05-25T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-26T00:00:00 - 2018-05-27T00:00:00
fetching data: 2018-05-27T00:00:00 - 2018-05-28T00:00:00
fetching data: 2018-05-28T00:00:00 - 2018-05-29T00:00:00
fetching data: 2018-05-29T00:00:00 - 2018-05-30T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00

Anyone noticed something wrong? or this is the correct behavior? I have no experience with asyncio but I was expecting not a sequential execution...

python version: 3.6.3

1 Answer 1

2

The code seems to await the fetch_t_data_coro invocations one by one, which forces them to run in sequence.

To run them in parallel, you can use asyncio.gather:

        coros = []
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            coros.append(fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            ))
        data_list = await asyncio.gather(*coros)

Two unrelated notes:

  • The code instantiates aiohttp.ClientSession in each t_search_coro. This is an anti-pattern - you should create a single ClientSession at top-level and pass it down to individual coroutines (even ones running in parallel), so that they all share the same session instance.
  • Beginning with Python 3.5.3, asyncio.get_event_loop() will correctly pick up the running event loop when called from a coroutine. As a result, you don't need to send the loop object down the coroutine invocations, just call get_event_loop when you need it (which in your code you don't, since ClientSession also correctly infers the event loop on its own).
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the reply. Does this start separate threads/processes (similar to process pool) or does it runs in the same thread, jumping from one coroutine to another?
@Luke The latter - all of asyncio is single-threaded by default, multiple threads or processes are introduced explicitly using run_in_executor. This allows you to have a large number of tasks running in parallel without exhausting the system resources.
Also, functions like gather and wait are just convenience API allowing you to wait for all the given coroutines to finish, propagate results and exceptions, etc. There is nothing magic about gather that provides parallelization - you could as well use create_task to start the coroutines in parallel, and then wait on a synchronization object.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.