1

I'm having a strange issue with Python queues and threading.

I have a web.py application that schedules jobs, and as such has a global incoming_queue = Queue(maxsize=10).

I have a url and a GET handler that adds to the queue (I also add to a list because I need to know the contents of the queue):

class ProcessRequest:
    def GET(self):
        global incoming_queue, incoming_jobs
        if incoming_queue.full():
            print "Queue is full"
            return web.InternalError("Queue is full, please try submitting later.")
        else:
            job_id = getNextInt()
            req_folder = "req" + str(job_id)
            incoming_queue.put(job_id)
            incoming_jobs.append(job_id)
            print "Received request, assigning Drop Folder {0}".format(req_folder)
            web.header('Drop-Folder', req_folder)
            return req_folder

I also run a thread to handle the jobs:

def processJobs():
    global incoming_queue, incoming_jobs, current_job, finished_jobs
    while True:
        print "Job processor thread active"
        current_job = incoming_queue.get(block=True)
        incoming_jobs.remove(current_job)
        print "Processing job {0}".format(current_job)
        # Do stuff here
        print "Job processor thread ready for next job"
    print "Job processor thread finished"

I run the following when I start the program:

if __name__ == '__main__':
    job_processor_thread = threading.Thread(target=processJobs)
    job_processor_thread.start()
    app.run()

I then call the URL that adds to the queue. Using another url, I was able to check that the item was indeed added to the list, and adding the following code to the GET method of the original url handler (print incoming_queue.get()), I verified that the item was indeed being added to the queue.

The job handling thread just blocks at current_job = incoming_queue.get(block=True). This is intended. However, it never unblocks, even when an item is added to the queue. It simply stays blocked forever.

Why is that? It's almost like it has a separate copy of the queue object.

Edit: Per Martin's suggestion, I decided to try and see what object was being referenced in the GET method, and the processJobs method.

processJobs(): <Queue.Queue instance at 0x7f32b6958a70> GET(): <Queue.Queue instance at 0x7f32b5ec5368>

Yes, they are different, but why?

EDIT #2: Here is the entire script for reference:

'''
Created on Apr 20, 2015

@author: chris
'''
import web
import time
import threading
import json
from Queue import Queue, Empty
import os

urls = (
        '/request', 'ProcessRequest',
        '/status', 'CheckStatus',
    )

current_job_thread = threading.Thread()

app = web.application(urls, globals())

incoming_jobs = []
incoming_queue = Queue(maxsize=10)

current_job = None

finished_jobs = []

next_int = 0

def getNextInt():
    global next_int, incoming_queue
    the_int = next_int
    next_int += 1
    return the_int

class ProcessRequest:
    def GET(self):
        global incoming_queue, incoming_jobs
        if incoming_queue.full():
            print "Queue is full"
            return web.InternalError("Queue is full, please try submitting later.")
        else:
            job_id = getNextInt()
            req_folder = "req" + str(job_id)
            print incoming_queue
            incoming_queue.put(job_id)
            incoming_jobs.append(job_id)
            print "Received request, assigning Drop Folder {0}".format(req_folder)
            web.header('Drop-Folder', req_folder)
            return req_folder

class CheckStatus:
    def GET(self):
        global incoming_queue, incoming_jobs, current_job, finished_jobs
        if str(web.input().jobid) == 'all':
            # Construct JSON to return
            web.header('Content-Type', 'application/json')
            return {'In Queue': incoming_jobs,
                            'Currently Processing': current_job,
                            'Finished': finished_jobs
                    }
        try:
            jobid = int(web.input().jobid)
        except ValueError:
            jobid = -1
        print jobid
        if jobid in finished_jobs:
            file_string = "results{0}.json".format(jobid)
            try:
                json_file = open(file_string)
                finished_jobs.remove(jobid)
                os.remove(file_string)
                web.header('Process-Status', 'Complete')
                web.header('Content-Type', 'application/json')
                return json.load(json_file)
            except IOError:
                web.header('Process-Status', 'Complete, but failed to retrieve file, saving')
                return ""

        elif jobid is current_job:
            web.header('Process-Status', 'Processing')
        elif jobid in incoming_jobs:
            web.header('Process-Status', 'In Queue')
        else:
            web.header('Process-Status', 'Unknown')
        return ""         

def processJobs():
    global incoming_queue, incoming_jobs, current_job, finished_jobs
    while True:
        print incoming_queue
        print "Job processor thread active"
        current_job = incoming_queue.get(block=True)
        incoming_jobs.remove(current_job)
        print "Processing job {0}".format(current_job)
        # Do magical Spark stuff here
        time.sleep(10)  # Simulate a Spark Job
        finished_jobs.append(current_job)
        current_job = None
        print "Job processor thread ready for next job"
    print "Job processor thread finished"

if __name__ == '__main__':
    job_processor_thread = threading.Thread(target=processJobs)
    job_processor_thread.start()
    app.run()

2 Answers 2

1

You can test your assumption that they are different queues simply by printing the object:

def processJobs():
    global incoming_queue, incoming_jobs, current_job, finished_jobs
    print incoming_queue # print something like <__main__.Queue instance at 0x7f556d93f830>


class ProcessRequest:
    def GET(self):
        global incoming_queue, incoming_jobs
        print incoming_queue # print something like <__main__.Queue instance at 0x7f556d93f830>

Ensure the memory addresses (0x7f556d93f830) match.

You never mention if you are using a framework to handle web requests, so it may be that the framework is doing some forking which cause your queues to be separate instances.

On a side note, you may want to consider Redis or beanstalk as a queue - these are really simple to use, and your queues will persist even when your restart your app.

Sign up to request clarification or add additional context in comments.

11 Comments

I mentioned I was using web.py. I'll try this out and report back.
Interesting, they are indeed different instances.
Do you have incoming_queue = Queue() in multiple locations? If only one location, how is that location run?
It is in the main script, I'll include the whole thing in an edit so you can see.
I'm not familiar web.py lifecycle, but it seems that making a GET request will re-import your script, which causes incoming_queue = Queue() to be run again. You can test this out by putting a print statement right above where the Queue instance is created, and making a request. You will see your printed output twice.
|
1

With Martin's direction, I was able to solve the issue using the idea from here: https://groups.google.com/forum/#!topic/webpy/u-cfL7jLywo.

Basically, web.py recreates the global variables when a request is made, so we cannot use global variables if we want to share data between the framework and other threads. The solution is to create another module, create a class in that module, and add the variable definitions to there. Here is what I ended up with:

jobqueue.py:

'''
Created on Apr 23, 2015

@author: chris
'''
import Queue

class JobManagement:
    incoming_queue = Queue.Queue(maxsize=10)
    incoming_jobs = []
    current_job = None
    finished_jobs = []

main.py:

'''
Created on Apr 20, 2015

@author: chris
'''
import web
import time
import threading
import json
from Queue import Queue, Empty
import os
from jobqueue import JobManagement

urls = (
        '/request', 'ProcessRequest',
        '/status', 'CheckStatus',
    )

app = web.application(urls, globals())

next_int = 0

def getNextInt():
    global next_int
    the_int = next_int
    next_int += 1
    return the_int

class ProcessRequest:
    def GET(self):
        if JobManagement.incoming_queue.full():
            print "Queue is full"
            return web.InternalError("Queue is full, please try submitting later.")
        else:
            job_id = getNextInt()
            req_folder = "req" + str(job_id)
            print JobManagement.incoming_queue
            JobManagement.incoming_queue.put(job_id)
            JobManagement.incoming_jobs.append(job_id)
            print "Received request, assigning Drop Folder {0}".format(req_folder)
            web.header('Drop-Folder', req_folder)
            return req_folder

class CheckStatus:
    def GET(self):
        if str(web.input().jobid) == 'all':
            # Construct JSON to return
            web.header('Content-Type', 'application/json')
            return {'In Queue': JobManagement.incoming_jobs,
                            'Currently Processing': JobManagement.current_job,
                            'Finished': JobManagement.finished_jobs
                    }
        try:
            jobid = int(web.input().jobid)
        except ValueError:
            jobid = -1
        print jobid
        if jobid in JobManagement.finished_jobs:
            file_string = "results{0}.json".format(jobid)
            try:
                json_file = open(file_string)
                JobManagement.finished_jobs.remove(jobid)
                os.remove(file_string)
                web.header('Process-Status', 'Complete')
                web.header('Content-Type', 'application/json')
                return json.load(json_file)
            except IOError:
                web.header('Process-Status', 'Complete, but failed to retrieve file, saving')
                return ""

        elif jobid is JobManagement.current_job:
            web.header('Process-Status', 'Processing')
        elif jobid in JobManagement.incoming_jobs:
            web.header('Process-Status', 'In Queue')
        else:
            web.header('Process-Status', 'Unknown')
        return ""         

def processJobs():
    while True:
        print JobManagement.incoming_queue
        print "Job processor thread active"
        JobManagement.current_job = JobManagement.incoming_queue.get(block=True)
        JobManagement.incoming_jobs.remove(JobManagement.current_job)
        print "Processing job {0}".format(JobManagement.current_job)
        # Do magical Spark stuff here
        time.sleep(10)  # Simulate a Spark Job
        JobManagement.finished_jobs.append(JobManagement.current_job)
        JobManagement.current_job = None
        print "Job processor thread ready for next job"
    print "Job processor thread finished"

if __name__ == '__main__':
    print JobManagement.incoming_queue
    job_processor_thread = threading.Thread(target=processJobs)
    job_processor_thread.start()
    app.run()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.