1

I'm trying to implement pool of threads in Celery task.

My Celery task calls update_state() function to send info about task state to DB. And it works successfully. But when I'm adding Threads into task and trying to call update_state() function in EACH thread - Celery returns an error.

This is working example (without Threading):

import celery

@celery.task(bind=True)
def get_info(self, user):
    for i in xrange(4):
        self.update_state(state=states.SUCCESS, meta={'subtask_id': i})

This is not working example (with Threading):

import celery
import threading

lock = threading.Lock()

def run_subtask(celery_task, i):
    lock.acquire()
    #Error raises here, when update_state calls
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
    lock.release()

@celery.task(bind=True)
def get_info(self, user):

    for i in xrange(4):
        worker = threading.Thread(target=run_subtask, args=(self, i))
        worker.start()

The error is:

    [2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found

What is the reason? Why I can't call update_state() into thread?

2 Answers 2

3

Celery adds a sort of context object to the thread so it knows which task it is related to. In order to associate the thread to the task you need to do something like:

from celery.app import push_current_task


def run_subtask(celery_task, i):
    push_current_task(celery_task)

    ...

    pop_current_task()
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot! I already found solution and posted it here. I didn't testes your solution but I think it has the same direction.
Mine is the way the documentation recommends. I would recommend you to use mine.
0

I found an answer! It's an answer from one of the Celery contributor's:

task.request is a thread local, so only the thread executing the task can call update_state.

This especially makes sense if you consider that the thread could race with the task post handler storing the result.

You can pass the task_id to the thread:

cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})

But you have to make damn sure the thread is joined and stopped before the task exits (thread.join()). In your example the thread can only be joined after the while loop exits, and since you are sleeping for 1 second the join can be delayed by as much.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.