Sorry for the long question, but I couldn't find a better way to summarize it.
I have a program which uses Python's multiprocessing to run some calculations in paralell. The communication between processes is done using two Queue objects, a work_queue and a result_queue.
The main process fills up the work_queue with data used for the calculations, and then it starts several subprocesses which will consume this queue and and store the results into result_queue.
Everything seems to work fine, but as I play a bit with the amount of sample data (i.e. the amount of data that goes into into the work_queue) and the number of subprocesses, I've started to get an error that has been puzzling me for hours.
The following code illustrates the problem:
# -- queue_bug.py --
import sys
import time
import random
import datetime
import traceback
# Need this to catch the Queue.Empty exception
import Queue
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue as MultiprocessingQueue
# -------------------------------------------------------------------
# do_calculation
# -------------------------------------------------------------------
def do_calculation(p_name, work_queue, result_queue):
def log(msg):
print '%s [%s] %s' % (datetime.datetime.now(), p_name, msg)
log('Starting up...')
while True:
# Get work from queue
try:
work = work_queue.get(timeout = 0.1)
test_id = work[0]
test_data = work[1]
except Queue.Empty:
break
# this is just a dummy loop
for i in range(100):
test_result = [x * random.random() for x in test_data]
result_queue.put((test_id, test_data, test_result))
log('Finished')
# -------------------------------------------------------------------
# main
# -------------------------------------------------------------------
def main():
def log(msg):
print '%s [main ] %s' % (datetime.datetime.now(), msg)
try:
num_tests = int(sys.argv[1])
num_procs = int(sys.argv[2])
except Exception:
print 'usage: <prog> number-of-tests number-of-subprocesses'
sys.exit()
log('Initializing queues...')
work_queue = MultiprocessingQueue()
result_queue = MultiprocessingQueue()
log('Creating subprocesses...')
process_list = []
for i in range(num_procs):
p_name = 'PROC_%02d' % (i+1)
log(' Initializing %s' % p_name)
p = Process(
target = do_calculation,
args = (p_name, work_queue, result_queue),
name = p_name)
p.daemon = True
process_list.append(p)
log('Populating the work_queue...')
for test_id in range(num_tests):
work_queue.put((test_id, [test_id]*20))
log('Work_queue size is %d' % work_queue.qsize())
log('Starting the subprocesses...')
for p in process_list:
p.start()
log('Waiting until the work_queue is empty...')
while True:
log(' Work_queue size is %d' % work_queue.qsize())
if work_queue.qsize() > 0:
time.sleep(0.5)
else:
break
log('Waiting until the result_queue is completely filled...')
while True:
log(' Result_queue size is %d' % result_queue.qsize())
if result_queue.qsize() < num_tests:
time.sleep(0.5)
else:
break
log('Getting results...')
result_dict = {}
while True:
try:
queue_data = result_queue.get_nowait()
test_id = queue_data[0]
test_data = queue_data[1]
test_result = queue_data[2]
result_dict[test_id] = test_result
except Queue.Empty:
log(' All results loaded from result_queue')
break
log('Storing test results in result_summary...')
result_summary = []
for test_id in range(num_tests):
try:
test_result = result_dict[test_id]
result_summary.append((test_id, test_result))
except KeyError:
ex = traceback.format_exc()
log('ERROR: Exception found: %s' % ex)
sys.exit()
log('Success.')
return result_summary
if __name__ == '__main__':
main()
Now, as I try to run it:
Attempt 1: 10.000 calculations, 10 subprocesses - OK
$ python queue_bug.py 10000 10
2012-12-04 19:24:25.430667 [main ] Initializing queues...
2012-12-04 19:24:25.440521 [main ] Creating subprocesses...
2012-12-04 19:24:25.440550 [main ] Initializing PROC_01
2012-12-04 19:24:25.440576 [main ] Initializing PROC_02
2012-12-04 19:24:25.440597 [main ] Initializing PROC_03
2012-12-04 19:24:25.440617 [main ] Initializing PROC_04
2012-12-04 19:24:25.440637 [main ] Initializing PROC_05
2012-12-04 19:24:25.440656 [main ] Initializing PROC_06
2012-12-04 19:24:25.440679 [main ] Initializing PROC_07
2012-12-04 19:24:25.440699 [main ] Initializing PROC_08
2012-12-04 19:24:25.440721 [main ] Initializing PROC_09
2012-12-04 19:24:25.440741 [main ] Initializing PROC_10
2012-12-04 19:24:25.440759 [main ] Populating the work_queue...
2012-12-04 19:24:25.494263 [main ] Work_queue size is 10000
2012-12-04 19:24:25.494301 [main ] Starting the subprocesses...
2012-12-04 19:24:25.495515 [PROC_01] Starting up...
2012-12-04 19:24:25.495802 [PROC_02] Starting up...
2012-12-04 19:24:25.496212 [PROC_03] Starting up...
2012-12-04 19:24:25.496557 [PROC_04] Starting up...
2012-12-04 19:24:25.496896 [PROC_05] Starting up...
2012-12-04 19:24:25.497300 [PROC_06] Starting up...
2012-12-04 19:24:25.497705 [PROC_07] Starting up...
2012-12-04 19:24:25.498074 [PROC_08] Starting up...
2012-12-04 19:24:25.498258 [main ] Waiting until the work_queue is empty...
2012-12-04 19:24:25.498349 [main ] Work_queue size is 9974
2012-12-04 19:24:25.498661 [PROC_09] Starting up...
2012-12-04 19:24:25.499765 [PROC_10] Starting up...
2012-12-04 19:24:25.998914 [main ] Work_queue size is 0
2012-12-04 19:24:25.998954 [main ] Waiting until the result_queue is completely filled...
2012-12-04 19:24:25.998976 [main ] Result_queue size is 10000
2012-12-04 19:24:25.998993 [main ] Getting results...
2012-12-04 19:24:26.029774 [PROC_06] Finished
2012-12-04 19:24:26.029798 [PROC_03] Finished
2012-12-04 19:24:26.029824 [PROC_08] Finished
2012-12-04 19:24:26.029853 [PROC_02] Finished
2012-12-04 19:24:26.029868 [PROC_01] Finished
2012-12-04 19:24:26.029898 [PROC_07] Finished
2012-12-04 19:24:26.029921 [PROC_09] Finished
2012-12-04 19:24:26.029942 [PROC_10] Finished
2012-12-04 19:24:26.031040 [PROC_04] Finished
2012-12-04 19:24:26.031057 [PROC_05] Finished
2012-12-04 19:24:26.087804 [main ] All results loaded from result_queue
2012-12-04 19:24:26.087844 [main ] Storing test results in result_summary...
2012-12-04 19:24:26.092477 [main ] Success.
Attempt 2: 70.000 calculations, 10 subprocesses - ERROR
$ python queue_bug.py 70000 10
2012-12-04 19:25:01.083092 [main ] Initializing queues...
2012-12-04 19:25:01.093483 [main ] Creating subprocesses...
2012-12-04 19:25:01.093520 [main ] Initializing PROC_01
2012-12-04 19:25:01.093548 [main ] Initializing PROC_02
2012-12-04 19:25:01.093570 [main ] Initializing PROC_03
2012-12-04 19:25:01.093591 [main ] Initializing PROC_04
2012-12-04 19:25:01.093612 [main ] Initializing PROC_05
2012-12-04 19:25:01.093632 [main ] Initializing PROC_06
2012-12-04 19:25:01.093656 [main ] Initializing PROC_07
2012-12-04 19:25:01.093676 [main ] Initializing PROC_08
2012-12-04 19:25:01.093699 [main ] Initializing PROC_09
2012-12-04 19:25:01.093720 [main ] Initializing PROC_10
2012-12-04 19:25:01.093738 [main ] Populating the work_queue...
2012-12-04 19:25:01.395974 [main ] Work_queue size is 70000
2012-12-04 19:25:01.396012 [main ] Starting the subprocesses...
2012-12-04 19:25:01.397601 [PROC_01] Starting up...
2012-12-04 19:25:01.398183 [PROC_02] Starting up...
2012-12-04 19:25:01.398545 [PROC_03] Starting up...
2012-12-04 19:25:01.399021 [PROC_04] Starting up...
2012-12-04 19:25:01.399621 [PROC_05] Starting up...
2012-12-04 19:25:01.400137 [PROC_06] Starting up...
2012-12-04 19:25:01.400675 [PROC_07] Starting up...
2012-12-04 19:25:01.401200 [PROC_08] Starting up...
2012-12-04 19:25:01.401645 [main ] Waiting until the work_queue is empty...
2012-12-04 19:25:01.401691 [PROC_09] Starting up...
2012-12-04 19:25:01.401738 [main ] Work_queue size is 69959
2012-12-04 19:25:01.402387 [PROC_10] Starting up...
2012-12-04 19:25:01.902063 [main ] Work_queue size is 58415
2012-12-04 19:25:02.402640 [main ] Work_queue size is 47302
2012-12-04 19:25:02.903067 [main ] Work_queue size is 36145
2012-12-04 19:25:03.403650 [main ] Work_queue size is 24992
2012-12-04 19:25:03.904065 [main ] Work_queue size is 13481
2012-12-04 19:25:04.404643 [main ] Work_queue size is 1951
2012-12-04 19:25:04.588562 [PROC_02] Finished
2012-12-04 19:25:04.588580 [PROC_06] Finished
2012-12-04 19:25:04.588611 [PROC_10] Finished
2012-12-04 19:25:04.588631 [PROC_03] Finished
2012-12-04 19:25:04.589705 [PROC_04] Finished
2012-12-04 19:25:04.589741 [PROC_09] Finished
2012-12-04 19:25:04.589764 [PROC_05] Finished
2012-12-04 19:25:04.589791 [PROC_08] Finished
2012-12-04 19:25:04.589814 [PROC_01] Finished
2012-12-04 19:25:04.589844 [PROC_07] Finished
2012-12-04 19:25:04.905065 [main ] Work_queue size is 0
2012-12-04 19:25:04.905098 [main ] Waiting until the result_queue is completely filled...
2012-12-04 19:25:04.905121 [main ] Result_queue size is 70000
2012-12-04 19:25:04.905140 [main ] Getting results...
2012-12-04 19:25:05.012083 [main ] All results loaded from result_queue
2012-12-04 19:25:05.012140 [main ] Storing test results in result_summary...
2012-12-04 19:25:05.020498 [main ] ERROR: Exception found: Traceback (most recent call last):
File "queue_bug.py", line 95, in main
test_result = result_dict[test_id]
KeyError: 10647
On the second attempt I get a KeyError when trying to read the data from result_dict.
This dictionary is filled with data got from the result_queue, therefore I suspect it must be related to it.
I also noticed that everytime I run it with a combination of arguments that fails (e.g. 70000 / 10), the KeyError is raised on a different key, which seems to indicate some concurrency / synchronization issue.
Last but not least, the likelihood to reproduce it increases if the size of the sample data or the number of subprocesses increases.
Any ideas?
JoinabelQueue( docs.python.org/2/library/… ) for your work queue. You can then drop most of the wait loops.multiprocessing.Poolwith it'smap(), that would cut out most of the management code here. Still though, doesn't explain what's going on here...