I've been trying to find an implementation that looks like mine but I can't seem to find one.
Specifics: I retrieve some database records and want to process all of them in a maximum of 5 threads. But I want these threads to report any potential errors and then close the individual threads (or log them). So I want to push all the records onto a queue and have the threads fetch from the queue.
So far I have this.
class DatabaseRecordImporterThread(threading.Thread):
def __init__(self, record_queue):
super(DatabaseRecordImporterThread, self).__init__()
self.record_queue = record_queue
def run(self):
try:
record = self.record_queue.get()
force_key_error(record)
except Exception as e:
print("Thread failed: ", e) # I want this to print to the main thread stdout
logger.log(e) # I want this to log to a shared log file (with appending)
MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query) # unimportant
# this is where i put all records on a queue
for record in database_records_retrieved:
jobs.put(record)
for _ in range(MAX_THREAD_COUNT):
worker = DatabaseRecordImporterThread(jobs)
worker.start()
workers.append(worker)
print('*** Main thread waiting')
jobs.join()
print('*** Done')
So the idea is that every thread gets the jobs queue and they are retrieving records from it and printing. Since the amount to process isn't predesignated (defined to do k records at a time or something), each thread will attempt to just process whatever is on the queue. However the output looks like this, when I force an error.
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
*** Main thread waiting
when no errors are reported the threads only read one record each:
(record)
(record)
(record)
(record)
(record)
*** Main thread waiting
In the normal Threading setup, I understand that you can setup a queue by doing something like this
Thread(target=function, args=(parameters, queue)
But when you use a class that inherits the Thread object, how do you set this up properly? I can't seem to figure it out. One of my assumptions is that the queue object is not shallow, so every new object created actually refers to the same queue in memory - is this true?
The threads are hanging, obviously, because they are not(?) daemon threads. Not only that, but it seems as though the threads only read one record each and then do the same thing. Some thing I want to do but don't really understand how to do.
- If all threads fail, the main thread should move on and say "*** Done."
- The threads should continue processing the queue until it is empty
In order to do (2), I probably need something in the main thread like while !queue.empty but then how would I make sure that I limit the threads to only have a maximum of 5?
while not self.queue.empty()but I'm not sure if this is the best practice. And it doesn't resolve issue (1)