1

many of the tutorials on multiprocessing use don't seem to completely address why the technique below works for threading but not multiprocessing.

Why doesn't this work for multiprocessing, and what is the implementation for what I am trying to do? Thank you!

Threading implementation (works fine, makes sense to me):

from threading import Thread
from Queue import Queue
from time import sleep    

"""threading functions"""
def producer_thread(n):
    for x in range(10):
        thread_q.put(n)

def consumer_thread():
    while True:
        item = thread_q.get()
        print item

if __name__ == '__main__':
    thread_q = Queue()

    """works fine"""
    p_thread = Thread(target=producer_thread, args=(10,))
    c_thread = Thread(target=consumer_thread)
    c_thread.daemon=True
    p_thread.start(); c_thread.start()
    p_thread.join()
    """prevents c_thread daemon process from cancelling prematurely"""
    sleep(.001)

Output:

10
10
10
10
10
10
10
10
10
10

Multiprocessing implementation (seems to be identical to threading but doesn't work at all):

from multiprocessing import Process, freeze_support
from Queue import Queue

"""multiprocessing functions"""
def producer_process(n):
    for x in range(10):
        process_q.put(n)

def consumer_process():
    while True:
        item = process_q.get()
        print item
#            
if __name__ == '__main__':
    freeze_support()
    process_q = Queue()        
    """computer explodes"""
    p_process = Process(target=producer_process, args=(10,))
    c_process = Process(target=consumer_process)
    c_process.daemon=True
    p_process.start(); c_process.start()
    p_process.join()

Output:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'
2
  • When I run the Thread version, I got: ` File "test.py", line 18, in <module> p_thread = Thread(target=producer, args=(10,)) NameError: name 'producer' is not defined ` Commented Feb 2, 2015 at 1:49
  • Ahh, yes, you are correct -- typos fixed. Commented Feb 2, 2015 at 1:57

3 Answers 3

2

import Queue is for multithreads apps: https://docs.python.org/2/library/queue.html not for multi processes apps.

from multiprocessing import Queue is for multiprocesses apps: https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

According to the documentation multiprocessing.Queue "is a near clone of Queue.Queue"

Besides multiprocessing.Queue there is the JoinableQueue that has task_done() and join() methods in case you need it.

In your example I don't think you need JoinableQueue. Did you try this:

from multiprocessing import (Process, Queue, freeze_support)

def producer(q, n):
    for x in range(n):
        q.put(x)
    q.put("end")


def consumer(q):
    while True:
        item = q.get()
        if item == "end":
            break
        print item

if __name__ == '__main__':
    freeze_support()
    q = Queue()
    c = Process(target=consumer, args=(q,))
    c.start()
    p = Process(target=producer, args=(q, 10))
    p.start()
    c.join()

Tested in Linux and Windows.

Sign up to request clarification or add additional context in comments.

Comments

0

When I run the Thread version, I got:

File "test.py", line 18, in <module> 
  p_thread = Thread(target=producer, args=(10,)) 
    NameError: name 'producer' is not defined 

Also, I think one error in multiprocessing version

NameError: global name 'q' is not defined

should be some typo. It seems that nothing named "q" is defined.

EDIT: Now I run thread version, and find less then ten "10" are printed: typically there are there or four - and it changes randomly in different run. I'm using python 2.7.5 . Can you check this issue ?

EDIT I run the mp version, there is no output or error message, and the program terminated quickly. I believe there are some issue with the logic - and it cannot be ignored. I think fixing the thread version first may be of great help for you.

3 Comments

Sure, done. Although I don't know what to say about the typos, the code still works fine for me :/
I get the same issue using Python 2.7.8. To be honest, I have no idea why -- I realize this isn't the best threading implementation in the world, it's just supposed to be simple enough to get the idea across. But yes, I don't know why this is happening either -- sometimes I get the right output, other times I don't, but that's less important to me than why the same implementation doesn't work for multiprocessing
Alright, hacked the output of the threaded version to make it work -- still has no bearing on the answer, in my opinion, but hopefully it helps to clarify things
0

Okay, (sorry(?)) to answer my own question, I found a working implementation of what I was trying to do. There seems to be quite a bit of nuance going on.

First of all, multiprocessing requires a JoinableQueue rather than a standard Queue.

Secondly, since the multiprocessing functions are modifying a queue in place, the queue needs to be passed as an argument to the function -- maybe this should have been obvious, but I obviously overlooked it.

Thirdly, and perhaps most importantly, the threads don't print to the stdout of the interpreter -- they print to windows stdout, so you MUST run it from the command line if you want to see the output.

"""multiprocessing functions"""
def producer_process(n, q):
    for x in range(10):
        q.put(n)

def consumer_process(q):
    while True:
        item = q.get()
        print item
        q.task_done()

if __name__ == '__main__':
    from multiprocessing import Process, freeze_support, JoinableQueue

    freeze_support()
    process_q = JoinableQueue()        
    '''launch consumer process'''
    c_process = Process(target=consumer_process, args=(process_q,))
    c_process.daemon = True
    c_process.start()

    '''launch producer process'''
    p_process = Process(target=producer_process, args=(10, process_q))
    p_process.start()
    p_process.join()

    process_q.join()
    print "Done"

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.