12

So, I'm trying to speed up one routine by using the Multiprocessing module in Python. I want to be able to read several .csv files by splitting the job among several cores, for that I have:

def csvreader(string):

  from numpy import genfromtxt;

  time,signal=np.genfromtxt(string, delimiter=',',unpack="true") 
  return time,signal

Then I call this function by saying:

if __name__ == '__main__':
     for i in range(0,2):
        p = multiprocessing.Process(target=CSVReader.csvreader, args=(string_array[i],))
        p.start()

The thing is that this doesn't store any output. I have read all the forums online and seen that there might be a way with multiprocessing.queue but I don't understand it quite well. Is there any simple and straightforward method?

2
  • Have you looked at the introduction to the documentation? Commented Mar 11, 2016 at 15:27
  • Indeed I have. Call me dense but I was unable to fix my program up with that info. Commented Mar 11, 2016 at 15:38

2 Answers 2

12

Your best bet are multiprocessing.Queue or multiprocessing.Pipe, which are designed exactly for this problem. They allow you to send data between processes in a safe and easy way.

If you'd like to return the output of your csvreader function, you should pass another argument to it, which is the multiprocessing.Queue through which the data will be sent back to the main process. Instead of returning the values, place them on the queue, and the main process will retrieve them at some point later. If they're not ready when the process tries to get them, by default it will just block (wait) until they are available

Your function would now look like this:

def cvsreader(string, q):
    q.put(np.genfromtxt(string, delimiter=',', unpack="true"))

The main routine would be:

if __name__ == '__main__'
    q = multiprocessing.Queue()
    for i in range(2):
        p = multiprocessing.Process(target=csvreader, args=(string_array[i], q,))
        p.start()

# Do anything else you need in here

time=np.empty(2,dtype='object')
signal=np.empty(2,dtype='object')
for i in range(2):
    time[i], signal[i] = q.get() # Returns output or blocks until ready
    # Process my output

Note that you have to call Queue.get() for each item you want to return.

Have a look at the documentation on the multiprocessing module for more examples and information.

Sign up to request clarification or add additional context in comments.

5 Comments

There's gotta be something missing in there, because everytime I run it, it gets stuck
You're right, see my edit. You have to call the process's start() method.
Actually now is stuck on an infinte loop because the items in the queue are not being consumed, any suggestion?
Yep, I made another mistake. The Queue needs to be created outside the first loop.
I have been scratching my head for two hours because of that hahaha. I made some minor changes to the code, which now should be perfect for anyone else. Thanks again!
1

Using the example from the introduction to the documentation:

if __name__ == '__main__':
    pool = Pool(2)
    results = pool.map(CSVReader.csvreader, string_array[:2])
    print(results)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.