2

I'm aware of Celery's command-line options:

celery -A my_app purge -Q queue_name

But I am looking for a way to purge queue_name from my Python app with Celery, something along the lines of:

def start_chunk(num_of_objs):
    # clear current queue before starting here
    RELEVANT CODE HERE TO PURGE queue_name
    for num in num_of_objcts:
         some_task.apply_async(kwargs={'num': num}, queue="queue_name")

Note, I'm aware of this:

from proj.celery import app
app.control.purge()

But as I understand this purges all queues.

2 Answers 2

1

I admit, it is little bit difficult to get it right. But the first thing you should look at for hints how to solve this problem is bin/purge.py (that is what I've done). After analysing the mentioned file, I think something like the following should work:

from celery.app.base import Celery
from yourproject import celery_app


def purge_queue(app: Celery, queue_name: str):
    with app.connection_for_write() as conn:
        conn.default_channel.queue_purge(queue_name)


purge_queue(celery_app, "celery")  # purge the "celery" queue

The above is more/less what celery -A yourproject.celery_app purge -Q celery does.

Sign up to request clarification or add additional context in comments.

2 Comments

thanks for your effort. This does seem to work! (haven't tested if it purges all other queues yet) any advantages to using this vs. the answer I just posted?
I have now tested. This doesn't purge tasks in other queues, only the one specified. Nice!
0

One way is to call the purge command through the os, we must use the -f flag:

import os
os.system("celery -A app_name purge -Q queue_name -f")

I don't know if there are any cons to this compared to DejanLekic's answer, which looks cleaner.

5 Comments

How/why is this any different from simply running the purge command in the shell?? I thought you explicitly wanted to purge particular queue programmatically...
@DejanLekic this allows me to schedule the purge right before I send more tasks in.
If you have to schedule purge of the queue(s), then you are doing something wrong, but that is off-topic...
@DejanLekic I wouldn’t be too quick to make this kind of blanket statement. Suppose TASK_A has to process 5000 oldest objs in DB every hour. But sometimes things take a little longer and if it doesn’t finish in time, then it will inevitably process the same obj twice unnecessarily. Can’t we solve this by purging/resetting the queue before firing the next batch?
@DejanLekic I still think your answer is better, I’m curious to hear why mine is so wrong. You obviously know Celery better than me.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.