I'm trying to implement pool of threads in Celery task.
My Celery task calls update_state() function to send info about task state to DB. And it works successfully. But when I'm adding Threads into task and trying to call update_state() function in EACH thread - Celery returns an error.
This is working example (without Threading):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
This is not working example (with Threading):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
The error is:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
What is the reason? Why I can't call update_state() into thread?