0

I would like to execute set of tasks in parallel. I have defined a function in a class which takes the parameter and executes the operation based on parameter. Class structure is like below.

from threading import Thread
from concurrent.futures import *
class Test(object):

  def process_dataframe(self,id:int):
    print(id*id)


  def run_task(self):
    thd = []
    for i in range(1,10): 
      thd.append( "self.process_dataframe({0})".format(i))
    self.run_functions_in_parallel(thd)

  def run_functions_in_parallel(self,fns)->bool:
    def wrap_function(self,fnToCall):
      try:
        eval(fnToCall)
        return ("0")
      except Exception as e:
        return "{0}".format(e)

    thd = []
    isError = False
    executor = ThreadPoolExecutor(max_workers=len(fns))
    errorMessage = ""

    for fn in fns:     
      t = executor.submit(wrap_function,self,fn)
      thd.append(t)

    for td in thd:
      ret = td.result()
      if ret != "0":
        isError = True
        errorMessage = errorMessage + """
        """ + ret
    if isError == True:
      print (errorMessage)
      raise Exception (errorMessage)
    else:
      return True


d=Test()
d.run_task()

I have managed to make it work and tasks are executing properly. I am wondering whether there is better/simpler way to accomplish the same. I would like to keep run_functions_in_parallel method generic so that it can be used as common method in a module.

3
  • 1
    If you really want parallelism, then you must be using multiprocessing Commented Dec 25, 2019 at 8:36
  • It is always a bad idea to use a magic string as error indicator. Try a function with content {}[0]. Commented Dec 25, 2019 at 8:53
  • Don't use eval. You can just use function handles, like submit also does. Commented Dec 25, 2019 at 8:55

1 Answer 1

1

You don't need to use a wrapper, since ThreadPoolExecutor catches errors in a better way. A function, that always returns True or raises an error, don't need a return value, but if you have functions with return values, you want to call in parallel, you should return their results. It is a bad idea to use a magic string as indicator for errors. format(e) of a KeyError: 0 also leads to "0". Better use a unique value, like None in our case. Don't use eval if you don't have to. In your case, you can use partial. Don't use a to large value for max_workers.

from functools import partial
from concurrent.futures import ThreadPoolExecutor

class Test(object):
    def process_dataframe(self, id):
        print(id*id)

    def run_task(self):
        functions = []
        for i in range(1,10): 
            functions.append(partial(self.process_dataframe, i))
        self.run_functions_in_parallel(functions)

    def run_functions_in_parallel(self, functions, max_workers=8):
        executor = ThreadPoolExecutor(max_workers=max_workers)
        futures = [
            executor.submit(function)
            for function in functions
        ]

        errors = []
        results = []
        for future in futures:
            try:
                result = future.result()
            except Exception as e:
                errors.append(e)
            else:
                results.append(result)
        if errors:
            raise Exception(errors)
        return results

d = Test()
d.run_task()
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your guidance. Futures approach worked perfectly

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.