I know, it's a mouthful.
I am using pytest-asyncio, which gives you an async event loop for running async code in your tests.
I want to use factory-boy with an async ORM. The only problem is that factory-boy does not support async whatsoever. I want to override the _create function on a factory (which is a synchronous function), however, I need that function to run asynchronous code. It doesn't matter if it blocks, it's just tests.
Something like this:
class AsyncModelFactory(factory.alchemy.SQLAlchemyFactory):
class Meta:
sqlalchemy_session = async_session
abstract = True
@classmethod
def _create(cls, model_class, *args, **kwargs):
# run_until_complete only works if there isn't already an event loop running
return asyncio.get_event_loop().run_until_complete(
cls._async_create(model_class, *args, **kwargs)
)
@classmethod
async def _async_create(cls, model_class, *args, **kwargs):
obj = model_class(*args, **kwargs)
session = self._meta.sqlalchemy_session
session.add(obj)
await session.commit() # needs to be awaited
return obj
The above actually works, except in the case where an async event loop is already running. Ironically, that factory works from synchronous tests, but not from asynchronous tests.
Is there a way to await _async_create from _create in an asynchronous context? All of factory-boy with factories creating sub-factories, etc, is synchronous, so this can only work if _create remains a synchronous call