0

first time trying asyncio and aiohttp. I have the following code that gets urls from the MySQL database for GET requests. Gets the responses and pushes them to MySQL database.

if __name__ == "__main__":
    database_name = 'db_name'
    company_name = 'company_name'

    my_db = Db(database=database_name) # wrapper class for mysql.connector
    urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
    update_id = my_db.get_updateid()
    my_db.get_connection(dictionary=True)

    for url in urls_dict:
        url_id = url['id']
        url = url['url']
        table_name = my_db.make_sql_table_name_by_url(url)
        insert_query = my_db.get_sql_for_insert(table_name)
        r = requests.get(url=url).json() # make the request
        args = [json.dumps(r), update_id, url_id]
        my_db.db_execute_one(insert_query, args, close_conn=False)

    my_db.close_conn()

This works fine but to speed it up How can I run it asynchronously?

I have looked here, here and here but can't seem to get my head around it.

Here is what I have tried based on @Raphael Medaer's answer.

async def fetch(url):
    async with ClientSession() as session:
        async with session.request(method='GET', url=url) as response:
            json = await response.json()
            return json


async def process(url, update_id):
    table_name = await db.make_sql_table_name_by_url(url)
    result = await fetch(url)
    print(url, result)

if __name__ == "__main__":
    """Get urls from DB"""
    db = Db(database="fuse_src")
    urls = db.get_rest_api_urls()  # This returns list of dictionary
    update_id = db.get_updateid()
    url_list = []
    for url in urls:
        url_list.append(url['url'])
    print(update_id)
    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url, update_id) for url in url_list]))

I get an error in the process method:

TypeError: object str can't be used in 'await' expression

Not sure whats the problem?

Any code example specific to this would be highly appreciated.

3
  • Try commit all queries and execute only once. Commented Apr 30, 2020 at 18:31
  • thanks @Frank any code fragments would be appreciated? Commented Apr 30, 2020 at 18:32
  • Sorry I really forget how to do it. I had the same problem before, I just remembered what I did is append all the queries and send to server all at once. Commented Apr 30, 2020 at 18:53

1 Answer 1

6

Make this code asynchronous will not speed it up at all. Except if you consider to run a part of your code in "parallel". For instance you can run multiple (SQL or HTTP) queries in "same time". By doing asynchronous programming you will not execute code in "same time". Although you will get benefit of long IO tasks to execute other part of your code while you're waiting for IOs.

First of all, you'll have to use asynchronous libraries (instead of synchronous one).

  • mysql.connector could be replaced by aiomysql from aio-libs.
  • requests could be replaced by aiohttp

To execute multiple asynchronous tasks in "parallel" (for instance to replace your loop for url in urls_dict:), you have to read carefully about asyncio tasks and function gather.

I will not (re)write your code in an asynchronous way, however here are a few lines of pseudo code which could help you:

async def process(url):
    result = await fetch(url)
    await db.commit(result)

if __name__ == "__main__":
    db = MyDbConnection()
    urls = await db.fetch_all_urls()

    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url) for url in urls]))
Sign up to request clarification or add additional context in comments.

5 Comments

This is super helpful. Will try and report back. thanks @Raphael Medaer :)
I have good success as far as getting url results but stumped on db side :(
I saw your edit with the error. However without the full stacktrace we cannot help you. Could you also put the code of db.make_sql_table_name_by_url(url) and write a minimal reproducible example ?
The above error was due to the fact that db.make_sql_table_name_by_url(url) method wasn't async lol. I fixed it and it works fine. The problem I have now is if the db.make_sql_table_name_by_url(url) is in the same file, it works. But if I create a wrapper class out of it. It gives AttributeError: module 'asyncio.streams' has no attribute 'IncompleteReadError'
OK. I guess we can consider that off topic or as another question. I hope my answer helped you to get a better idea of the big picture. Could you give a full working example once it's done ? And optionally accept the answer ? Kind regards,

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.