2

I tried to run the following codes:

import multiprocessing
import time

def init_queue():
    print("init g_queue start")
    while not g_queue.empty():
        g_queue.get()
    for _index in range(10):
        g_queue.put(_index)
    print("init g_queue end")
    return

def task_io(task_id):
    print("IOTask[%s] start" % task_id)
    print("the size of queue is %s" % g_queue.qsize())
    while not g_queue.empty():
        time.sleep(1)
        try:
            data = g_queue.get(block=True, timeout=1)
            print("IOTask[%s] get data: %s" % (task_id, data))
        except Exception as excep:
            print("IOTask[%s] error: %s" % (task_id, str(excep)))
    print("IOTask[%s] end" % task_id)
    return

g_queue = multiprocessing.Queue()

if __name__ == '__main__':
    print("the size of queue is %s" % g_queue.qsize())
    init_queue()
    print("the size of queue is %s" % g_queue.qsize())
    time_0 = time.time()
    process_list = [multiprocessing.Process(target=task_io, args=(i,)) for i in range(multiprocessing.cpu_count())]
    for p in process_list:
        p.start()
    for p in process_list:
        if p.is_alive():
            p.join()
    print("End:", time.time() - time_0, "\n")

what I got was the following:

the size of queue is 0
init g_queue start
init g_queue end
the size of queue is 10
IOTask[0] start
the size of queue is 0
IOTask[0] end
IOTask[1] start
the size of queue is 0
IOTask[1] end
('End:', 0.6480000019073486, '\n')

What I was expecting was

IOTask[0] start
the size of queue is 10

Because after initialization of g_queue, the size of queue was supposed to be 10, not 0. It seems like the queue is not in the shared memory. When the sub process starts, a copy of g_queue is created and its size is 0.

Why multiprocessing.queue is not in the shared memory? Please advise. Many thanks!

1 Answer 1

3

You should pass your g_queue as a parameter, then it will work.

demo for using multiprocessing with queue

import multiprocessing
import time


def long_time_calculate(n, result_queue):
 time.sleep(1)
 result_queue.put(n)


if __name__ == '__main__':
 result_queue = multiprocessing.Queue()
 pool_size = multiprocessing.cpu_count() * 2
 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)

 manager = multiprocessing.Manager()
 result_queue = manager.Queue()

 inputs = [(1, result_queue), (2, result_queue), (3, result_queue), (4, result_queue)]

 for input in inputs:
     pool.apply_async(long_time_calculate, input)

 pool.close()
 pool.join()

 print(list(result_queue.get() for _ in inputs))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.