16

I am trying to use The Queue in python which will be multithreaded. I just wanted to know the approach I am using is correct or not. And if I am doing something redundant or If there is a better approach that I should use.

I am trying to get new requests from a table and schedule them using some logic to perform some operation like running a query.

So here from the main thread I spawn a separate thread for the queue.

if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)

Now in the Setqueue class I have a process_queue function which is used for processing the top 2 requests in every run that were added to the queue.

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)

I want to see if my understanding is correct and if there can be any issues with it.

So the main thread running in while True in the main func connects to database gets new requests and puts it in the queue. The worker thread(daemon) for the queue keeps on getting new requests from the queue and fork non-daemon threads which do the processing and since timeout for the join is 1 the worker thread will keep on taking new requests without getting blocked, and its child thread will keep on processing in the background. Correct?

So in case if the main process exit these won`t be killed until they finish their work but the worker daemon thread would exit. Doubt : If the parent is daemon and child is non daemon and if parent exits does child exit?).


I also read here :- David beazley multiprocessing

By david beazley in using a Pool as a Thread Coprocessor section where he is trying to solve a similar problem. So should I follow his steps :- 1. Create a pool of processes. 2. Open a thread like I am doing for request_queue 3. In that thread

  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")

Use a process from the pool and run the verify_func in parallel. Will this give me more performance?

3
  • would you please fix the indentation? Commented May 19, 2015 at 18:46
  • I don't think I can adequately answer all your questions but you may consider following some architectural patterns found in the popular Celery Project. I don't mean to recommend it as a solution over what you are creating but it is a mature project which could highlight a decent approach which you can copy. Commented Jun 9, 2015 at 15:03
  • I`ll look through it. Thanks @erik-e Commented Jun 9, 2015 at 16:53

2 Answers 2

3

While its possible to create a new independent thread for the queue, and process that data separately the way you are doing it, I believe it is more common for each independent worker thread to post messages to a queue that they already "know" about. Then that queue is processed from some other thread by pulling messages out of that queue.

Design Idea

The way I invision your application would be three threads. The main thread, and two worker threads. 1 worker thread would get requests from the database and put them in the queue. The other worker thread would process that data from the queue

The main thread would just waiting for the other threads to finish by using the thread functions .join()

You would protect queue that the threads have access to and make it thread safe by using a mutex. I have seen this pattern in many other designs in other languages as well.

Suggested Reading

"Effective Python" by Brett Slatkin has a great example of this very question.

Instead of inheriting from Queue, he just creates a wrapper to it in his class called MyQueue and adds a get() and put(message) function.

He even provides the source code at his Github repo

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

I'm not affiliated with the book or its author, but I highly recommend it as I learned quite a few things from it :)

Sign up to request clarification or add additional context in comments.

Comments

1

I like this explanation of the advantages & differences between using threads and processes - ".....But there's a silver lining: processes can make progress on multiple threads of execution simultaneously. Since a parent process doesn't share the GIL with its child processes, all processes can execute simultaneously (subject to the constraints of the hardware and OS)...."

He has some great explanations for getting around GIL and how to improve performance

Read more here:

http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/

4 Comments

But overall the snippet provided won`t have any issues right except GIL? Only benefit I might from migrating to processes is performance.
I don't think so! Since threads use the same memory, precautions have to be taken that you don't have errors there but as long as you're careful in that regard I think you're only looking at possible performance gains from using the processes alright - it's also possible to lose performance if you use them unnecessarily though - spawning processes can take longer than spawning threads but once they're running they'll go about the same speed
If you're using multiple cores, processes will at least help you take advantage of that!
Threads vs processes is much more complicated than that. The GIL is only acquired more or less when a thread is running python bytecode. In this particular case, the main thread would be waiting on other threads (GIL-released), the first worker thread is mostly waiting on the database (GIL-released), the second worker thread is the only one that would need the GIL and the GIL is available to it. Also the mutexes needed are much cheaper for threads (usually the cost of a spinlock), than for processes which require OS-level shared memory mutexes or IPC.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.