5

I have a backend application that is currently written with asyncio: fastapi for a web server, sqlalchemy 1.4 + asyncpg for an async database driver. I have a need to deploy tasks to workers that will run and update the host application. Currently I am using aio_pika, but would like something more robust such as celery with flower.

I understand that celery is not integrated with asyncio. I also have read through answers like this one and my concern is not having the tasks be async, that is trivial. I am concerned about launching tasks from within the main event loop.

My primary question, does my_task.delay()/my_task.apply_async() block the running thread at all? If so, would a better approach be to use multiprocessing workers that get items from a central mp.Queue, or a ProcessPoolExecutor, and then deploy celery tasks only from that worker process?

I want to deploy tasks and, ideally, be notified when they are complete. This can be done from within the task itself via the fastapi interface, though. I just want to ensure that deploying tasks does not block the async event loop.

1
  • There will be some blocking when publishing the task to the broker. You can avoid that using an executor to call my_task.delay() or other celery-related functions, as you suggested. Commented Oct 19, 2021 at 16:49

1 Answer 1

0

I tried to do something with an answer from the post you linked (this one). Basically I took his code and modified a little bit. It seems to work correctly for most cases utilizing simple tasks, but I guess it is not completely safe, it is just a workaround that I made. Here's the code:

import asyncio

from celery import Celery


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                asyncio.set_event_loop(asyncio.get_event_loop())
                result = TaskBase.__call__(self, *args, **kwargs)
                return await result

            def __call__(self, *args, **kwargs):
                loop = asyncio.get_event_loop()
                try:
                    return loop.run_until_complete(self._run(*args, **kwargs))
                except:
                    return asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.