3

I have a DB with a queue table, new entries are inserted continuously in the queue.

I want a Python script to execute the queue as fast as possible, and I think I need some threaded code to do so, running like a daemon.

But I can't figure out how to use the DB as the queue.

I am looking at this example:

import MySQLdb
from Queue import Queue
from threading import Thread

def do_stuff(q):
    while True:
        print q.get()
        q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(q,))
    worker.setDaemon(True)
    worker.start()

// TODO:  Use the DB
db = MySQLdb.connect(...)
cursor = db.cursor()
q = cursor.execute("SELECT * FROM queue")

for x in range(100):
    q.put(x)
q.join()
1
  • I can't understand what you are trying to do with the db. Anyway Python has GIL, that means that most likely you will get zero performance boost for parallelising this operation. Commented Sep 26, 2014 at 11:25

2 Answers 2

3
+50

2 quick points :

  1. Assuming you are using cPython, The GIL will effectively render threading useless, allowing only 1 thread through the interpreter at one time. Couple of workarounds are :

    • The Gevent library [source]

      gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

    • The multiprocessing module, you can spawn multiple processes - this is true concurrency in python.

    • The concurrent.futures module - new in python 3, port available for python 2. [source]

      This is a new high-level library that operates only at a “job” level, which means that you no longer have to fuss with
      synchronization, or managing threads or processes. you just specify a thread or process pool with a certain number of “workers,” submit
      jobs, and collate the results. It’s new in Python 3.2, but a port for Python 2.6+ is available at http://code.google.com/p/pythonfutures.

You can use the SSDictCursor() of MySQLdb and do a fetchone().This is a streaming cursor and you can run this in an infinite while() loop to resemble a queue:

cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other
  1. Having said all that, I would suggest you look at implementing tools like celery or mongodb to emulate queues and workers. Relational databases are just not cut out for that kind of a job and suffer unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.
Sign up to request clarification or add additional context in comments.

2 Comments

gevent doesn't allow you to bypass the GIL, and neither does concurrent.futures.ThreadPoolExecutor. Only multiprocessing and concurrent.futures.ProcessPoolExecutor do that. Note that threads and gevent are still useful for I/O bound operations, because doing I/O releases the GIL.
Thats correct. Its only by spawning multiple processes that will we be able to achieve true concurrency here. However, many times, what we are trying to achieve by concurrency is actually asynchronous processing ,and gevent fits in beautifully there.
1

I am not sure if its the best solution but I think of a structure of a main-thread which reads the db and fill the Queue. Make sure to avoid doublets. Maybe by using primary key of increasing numbers would be easy to check.

The Worker-Structure is nice, but like mentioned in comments: the GIL will avoid any boost. But you could use multiprocessing if your "do_stuff" is independent from the script himself (f.e. the tasks are pictures and the "do_stuff" is "rotate ervery picture 90°"). Afaik it doesn't suffer from GIL

https://docs.python.org/2/library/subprocess.html get you some informations about that.

PS: English isn't my native language.

1 Comment

I think the multiprocessing module will be more relevant than subprocess in this case.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.