1

I would like to pass numpy array to the multiprocessing queue. The program is working with small size arrays (20x20), however bigger size does not work. In general, I would like to pass 4D tensor with dimensions (100,1,16,12000). Running with python3.6 on Mac.

Code example:

import numpy as np
from multiprocessing import JoinableQueue, Process


class Writer(Process):
    def __init__(self,que):
        Process.__init__(self)
        self.queue=que

    def run(self):
        for i in range(10):
            data=np.random.randn(30,30)
            self.queue.put(data)
            print(i)


class Reader(Process):
    def __init__(self,que):
        Process.__init__(self)
        self.queue=que

    def run(self):
        while not(self.queue.empty()):
            result=self.queue.get()
            print(result)


def main():
    q = JoinableQueue()
    w=Writer(q)
    r=Reader(q)

    w.start()
    w.join()
    print("DONE WRITING")

    r.start()
    r.join()
    print("DONE READING")




if __name__ == "__main__":
    main()
1
  • try with queue.put(data, block=False)? Commented Apr 22, 2018 at 16:52

1 Answer 1

2

The python multiprocessing queue is unsuitable for large arrays, as they need to be pickled when put into a queue and unpickled on getting from the queue, which introduces processing and memory overheads.

I developed a small package which uses instead the built-in Python multiprocessing Array class to store the data. A queue is used in the background to pass around metadata. Unlike other solutions I encountered, it works on Mac, Windows and Linux. You can install it with

pip install arrayqueues

The instructions, source and issues are on github: https://github.com/portugueslab/arrayqueues

For simple use-cases it works as a drop-in replacement for the multiprocessing Queue, with the major difference of having to specify the amount of memory the queue will take.

Regarding the reader process, as far as I know, queue.empty() is considered not reliable, and the following pattern is encouraged:

from Queue import Empty # The Empty exception is defined in the normal queue class

# inside the process
while True:
    try:
        item = queue.get()
    except Empty:
        break
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.