3

I am currently working on a flask application (Python 3.6) and wanted to integrate celery because I have multiple long running background tasks.

Edit: Celery 4.1

The integration was no problem and the celery tasks are executed correctly, but I cannot access the current state of a running task.

Celery, Flask setup:

def make_celery(app):
    celery = Celery(app.import_name,
                    backend=app.config["result_backend"],
                    broker=app.config["broker_url"])

    celery.conf.update(app.config)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"

celery_app = make_celery(app)

Celery task:

@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
    print(exchange, currency_a, currency_b)
    time.sleep(50)

Call the task and store the value in a dictionary:

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id

Get state of a task:

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id

This is were the error is raised. When i print only the result object it simply prints the task_id. And if I try to retrieve the current state i raises following exception:

TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
4
  • what about print(result.status) Commented Mar 6, 2018 at 9:26
  • raises the same error Commented Mar 6, 2018 at 9:29
  • Have a look into this question stackoverflow.com/questions/9034091/… Hope it helps Commented Mar 6, 2018 at 9:41
  • I allready read that question. My problem is that I can't even access the properties of the AsyncResult because the error is raised. I can't even call result.ready(), result.get() or any other Method. Although when I check for the result type with type(result) it say it is a "celery.result.AsyncResult". The error says the same AsyncResult found therefore the exception doesn't even make sense. Commented Mar 6, 2018 at 9:45

1 Answer 1

3

EXPLANATION :

When you call your task :

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)

your variable task_id is an instance of AsyncResult, it is not a string.

So, your variable TASK_STATES[market.__id__()] is also an instance of AsyncResult whereas it should be a string.

And then your are trying to instantiate an AsyncResult object with it

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])

So you are instantiating an AsyncResult object with another AsyncResult object whereas it should be instantiated with a string.

Maybe your confusion comes from your print(task_id) which shows you a string, but when you do this, under the hood the __str__ method of the AsyncResult object is called and if you look at it in the source code here,

def __str__(self):
    """`str(self) -> self.id`."""
    return str(self.id)

it justs print the id attribute of your task_id object.

SOLUTION :

You can fix it with either by doing TASK_STATES[id] = task_id.id or by doing

result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you! That was the exact problem. I thought that somewhere I read that delay() returns only the id. A look into the documentation would have solved the problem. docs.celeryproject.org/en/latest/reference/celery.app.task.html

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.