1

I try to pass shared counter to tasks in multiprocessing by apply_async but it fails with such error "RuntimeError: Synchronized objects should only be shared between processes through inheritance". What's going on

def processLine(lines, counter, mutex):
    pass

counter = multiprocessing.Value('i', 0)
mutex = multiprocessing.Lock()
pool = Pool(processes = 8)
lines = []

for line in inputStream:
    lines.append(line)
    if len(lines) >= 5000:
         #don't queue more than 1'000'000 lines
         while counter.value > 1000000:
                 time.sleep(0.05)
         mutex.acquire()
         counter.value += len(lines)
         mutex.release()
         pool.apply_async(processLine, args=(lines, counter, ), callback = collectResults)
         lines = []

2 Answers 2

2

Let the pool handle the scheduling:

for result in pool.imap(process_single_line, input_stream):
    pass

If order doesn't matter:

for result in pool.imap_unordered(process_single_line, input_stream):
    pass

pool.*map*() function have chunksize argument that you can change to see whether it affects performance in your case.

If your code expects multiple lines to be passed in a single call:

from itertools import izip_longest

chunks = izip_longest(*[iter(inputStream)]*5000, fillvalue='') # grouper recipe
for result in pool.imap(process_lines, chunks):
    pass

Some alternatives to limit number of queued items are:

  • multiprocessing.Queue with set max size (you don't need a pool in this case). queue.put() will block when the max size is reached until other processes call queue.get()
  • manual implementation of producer/consumer pattern using multiprocessing primitives such as Condition or BoundedSemaphor.

Note: each Value has associated lock, you don't need a separate lock.

Sign up to request clarification or add additional context in comments.

Comments

0

I solved it in such not elegant way

def processLine(lines):
    pass

def collectResults(result):
    global counter
    counter -= len(result)

counter = 0
pool = Pool(processes = 8)
lines = []

for line in inputStream:
    lines.append(line)
    if len(lines) >= 5000:
         #don't queue more than 1'000'000 lines
         while counter.value > 1000000:
             time.sleep(0.05)
         counter.value += len(lines)
         pool.apply_async(processLine, args=(lines), callback = collectResults)
         lines = []

1 Comment

I understand that this is some minimal snippet, but I'm worried that your callback function is not doing what you expect. Since processLine has no return value, you really shouldn't be calling len(result) as it has no meaning. If you are going to answer your own question you should keep it fully self-contained.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.