4

Let's say I have some asyncio coroutine which fetches some data and returns it. Like this:

async def fetch_data(*args):
  result = await some_io()
  return result

Basically this coroutine is called from the chain of coroutines, and initial coroutine is runned by creating a task. But what if for test purposes I want to run only one coroutine this way just when running some file:

if __name__ == '__main__':
  result = await fetch_data(*args)
  print(result)

And obviously I can't do this since I'm trying to run and await coroutine from not coroutine function. So the question is: is there some correct way to get data from coroutine without calling it function? I can make some Future object for result and await it, but maybe there are some other more simple and clearer ways?

2 Answers 2

2

You will need to create an event loop to run your coroutine:

import asyncio

async def async_func():
    return "hello"

loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()

print(result)

Or as a function:

def run_coroutine(f, *args, **kwargs):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(f(*args, **kwargs))
    loop.close()
    return result

Use like this:

print(run_coroutine(async_func))

Or:

assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
Sign up to request clarification or add additional context in comments.

Comments

0

For Python 3.7+, you may use asyncio.run():

import asyncio

async def fetch_data():
  return "sample_data"

asyncio.run(fetch_data())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.