0

I have following code:

#!/usr/bin/env python

def do_job(row):
  # COMPUTING INTENSIVE OPERATION
  sleep(1)
  row.append(int(row[0])**2)

  # WRITING TO FILE - ATOMICITY ENSURED
  semaphore.acquire()
  print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
  csvWriter.writerow(row)
  print "Inside semaphore after writing to file"
  semaphore.release()

  # RETURNING VALUE
  return row

def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # COMPUTING INTENSIVE OPERATIONS
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    results = p.map_async(do_job, csvReader, chunksize = 10)

  except KeyboardInterrupt:
    p.close()
    p.terminate()
    p.join()

  # WAIT FOR RESULTS
  # results.get()
  p.close()
  p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

if __name__ == '__main__':
  import csv
  from time import sleep
  from multiprocessing import Pool
  from multiprocessing import cpu_count
  from multiprocessing import current_process
  from multiprocessing import Semaphore
  from pprint import pprint as pp
  import calendar
  import time

  SCRIPT_START_TIME  = calendar.timegm(time.gmtime())
  inputFile  = "input.csv"
  outputFile = "output.csv"
  semaphore = Semaphore(1)

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')
  csvWriter.writerow(["before","calling","multiprocessing"])
  parallel_csv_processing(inputFile, cpuCount = cpu_count())
  csvWriter.writerow(["after","calling","multiprocessing"])

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  SCRIPT_STOP_TIME   = calendar.timegm(time.gmtime())
  SCRIPT_DURATION    = SCRIPT_STOP_TIME - SCRIPT_START_TIME
  print "Script duration:    %s seconds" % SCRIPT_DURATION

After running the output on terminal is following:

Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration:    10 seconds

content of input.csv is following:

0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27

created content of output.csv is following:

before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing

Why Is nothing written to output.csv from parallel_csv_processing resp. do_job method?

1 Answer 1

1

Your processes are silently failing with an exception - specifically, in the spawned processes the script doesn't have a value for csvWriter because they are each in a separate python interpreter, and haven't run main() - this is deliberate, you don't want the subprocesses to run main. The do_job() function can only access values you pass to it explicitly in the map_async() call, and you aren't passing csvWriter. Even if you were I'm not sure it would work, don't know if file handles are shared between main and the processes created by multiprocessing.

Put a try/except around the code in do_job and you will see the exception.

def do_job(row):
  try:
      # COMPUTING INTENSIVE OPERATION
      sleep(1)
      row.append(int(row[0])**2)

      # WRITING TO FILE - ATOMICITY ENSURED
      semaphore.acquire()
      print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
      csvWriter.writerow(row)
      print "Inside semaphore after writing to file"
      semaphore.release()

      # RETURNING VALUE
      return row
  except:
      print "exception"

Obviously in real code the exception should be handled properly, but if you run this you'll now see exception printed for every invocation of do_job.

Look in the documentation for multiprocessing for more guidance - under the heading "16.6.1.4. Sharing state between processes" in the Python 2.7 Standard Library docs.

Sign up to request clarification or add additional context in comments.

6 Comments

Thank you for reply. So you are saying that the objects that are not directly passed to map_async() are not visible inside do_job() right? Then how is possible that the semaphore is visible? Also when I declare global_variable = "global variable" inside __main__ I'm able to print it from do_job with print global_variable. Isn't this the same scenario?
I would like to ask why is this exception silent? From my previous experiences when exception raised I was notified immediately. Is possible to explicitly say not to suppress exceptions silently? I can use try, except but how can I know which part of code is causing problems? Is there some global settings for this purpose?
There's no cPython implementation of a way of getting the exception back from the different process to the main() code - you have to implement that yourself see maybe stackoverflow.com/questions/6728236/… and stackoverflow.com/questions/16943404/…
Thank you, I will check the links. Can you please also react on passing variables that I've mentioned? One possible explanation is that file handlers are not allowed to share between processes while other types are. Is this correct? Thank you
The processes created by multiprocessing are running in completely separate memory spaces (e.g. they could be a different CPU core) - if you want a single interpreter/memory space use Threading which has very similar API but doesn't use OS processes, so must run in a single CPU core. Read the Python Standard Library docs for multiprocessing, try it out.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.