2

I've been trying to find an implementation that looks like mine but I can't seem to find one.

Specifics: I retrieve some database records and want to process all of them in a maximum of 5 threads. But I want these threads to report any potential errors and then close the individual threads (or log them). So I want to push all the records onto a queue and have the threads fetch from the queue.

So far I have this.

class DatabaseRecordImporterThread(threading.Thread):
    def __init__(self, record_queue):
        super(DatabaseRecordImporterThread, self).__init__()
        self.record_queue = record_queue

    def run(self):
        try:
            record = self.record_queue.get()
            force_key_error(record)
        except Exception as e:
            print("Thread failed: ", e)  # I want this to print to the main thread stdout
            logger.log(e)  # I want this to log to a shared log file (with appending)



MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query)  # unimportant

# this is where i put all records on a queue
for record in database_records_retrieved:
    jobs.put(record)

for _ in range(MAX_THREAD_COUNT):
    worker = DatabaseRecordImporterThread(jobs)
    worker.start()
    workers.append(worker)

print('*** Main thread waiting')
jobs.join()
print('*** Done')

So the idea is that every thread gets the jobs queue and they are retrieving records from it and printing. Since the amount to process isn't predesignated (defined to do k records at a time or something), each thread will attempt to just process whatever is on the queue. However the output looks like this, when I force an error.

Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
*** Main thread waiting

when no errors are reported the threads only read one record each:

(record)
(record)
(record)
(record)
(record)
*** Main thread waiting

In the normal Threading setup, I understand that you can setup a queue by doing something like this

Thread(target=function, args=(parameters, queue)

But when you use a class that inherits the Thread object, how do you set this up properly? I can't seem to figure it out. One of my assumptions is that the queue object is not shallow, so every new object created actually refers to the same queue in memory - is this true?

The threads are hanging, obviously, because they are not(?) daemon threads. Not only that, but it seems as though the threads only read one record each and then do the same thing. Some thing I want to do but don't really understand how to do.

  1. If all threads fail, the main thread should move on and say "*** Done."
  2. The threads should continue processing the queue until it is empty

In order to do (2), I probably need something in the main thread like while !queue.empty but then how would I make sure that I limit the threads to only have a maximum of 5?

1
  • Looks like I figured one part out, which is that in each thread I simply have to make sure I loop with something like while not self.queue.empty() but I'm not sure if this is the best practice. And it doesn't resolve issue (1) Commented May 18, 2020 at 3:57

1 Answer 1

4

I figured out the answer to the question. After doing a lot of research and some code reading, what needs to happen is the following

  1. The queue should not be checked whether or not it is empty since it presents a race condition. Rather, the workers should continue under an infinite loop and attempt to keep retrieving from the Queue

  2. Whenever a queue task is finished, the queue.task_done() method needs to be called to alert the MainThread join() method. What happens is that the number of task_done calls will sync with the number of enqueue calls and the thread will officially join once the queue is empty.

  3. Using a queue for a fixed data size task is somewhat suboptimal. Instead of creating a queue that each thread reads off of, it would be better to simply partition the data into chunks of equal size and have the threads just run processing a list subset. This way we don't potentially get blocked by queue.get() waiting for a new element to be added. Something like, while True: if not queue.empty(): do_something()

  4. Exception handling should still make a call to task_done() if we want to proceed past. Deciding whether the whole thread should fail or not depending on whether an exception is caught is a design choice, but if it is the case, then the element should still be marked as processed.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.