0

I have files coming in from external system into by DB and for each new file - I am processing it by passing it through 4 functions in sequence. My code is able to process one file at a time.

Currently, I am trying to process files in parallel using Pool. I am not sure if my code is processing in parallel or not because parallel processing is new to me and can't figure out a way to see details in my console like -

file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.

Please can any one help me getting such kind of output in console.

My Python code:

import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool

class Workflow:

    def sql_connection(self):
        conn = pyodbc.connect('Driver={SQL Server};'
                              'Server=MSSQLSERVER01;'
                              'Database=TEST;'
                              'Trusted_Connection=yes;')
        print("DB Connected..")
        return conn

    def Function1(self):
        print ("function 1 Started..")


    def Function2(self):
        print ("function 2 Started..")

    def Function3(self):
        print ("function 3 Started..")


    def Function4(self):
        print ("function 4 Started..")

    def ProcessFile(self):
        print (" Processs %s\tWaiting %s seconds" )
        self.Function1()
        self.Function2()
        self.Function3()
        self.Funciton4()
        print (" Process %s\tDONE" )


    def Start(self):

        #Get number of files in REQUESTED STATE.
        connsql = self.sql_connection()
        query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
        files = connsql.cursor().execute(query).fetchone()
        print(str(files[0]) + " files to be processed..")

        # Get filing ids of files in REQUESTED STATE.
        query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
        resultset = connsql.cursor().execute(query).fetchall()

        filingIds = []

        for id in resultset:
            filingIds.append(id[0])

        connsql.cursor().commit()
        connsql.close()

        #Create Threads based on number of file ids to be processed.
        pool = ThreadPool(len(filingIds))

        results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.

        print(results)

        # close the pool and wait for the work to finish
        pool.close()
        pool.join()

A = Workflow()
A.Start()
1

2 Answers 2

2

I think the issue is simply that you used ThreadPool.map incorrectly. You have to pass self.ProcessFile instead of self.ProcessFile(). Why?

map expects a Callable, but self.ProcessFile() is actually the result of the ProcessFile call, which is None. So map tries to call None, which probably fails silently.

Sign up to request clarification or add additional context in comments.

3 Comments

Updated to pool.map(self.ProcessFile,filingIds) got this exception: TypeError: ProcessFile() takes 1 positional argument but 2 were given Any Idea about this?
Your function definition of ProcessFile is missing an argument (e.g. def ProcessFile(self, arg): )
Thanks! got the idea of calling my functions in parallel
0
from multiprocessing import Process

import time
class WorkFlow:
    def __init__(self):
        pass

    def func1(self, *args):
        print('Func1 : {}'.format(args))
        time.sleep(5)
        print('Func1 Completed!')

    def func2(self, *args):
        print('Func2 : {}'.format(args))
        time.sleep(10)
        print('Func2 Completed!')

if __name__ == '__main__':
    wf = WorkFlow()
    processes = [Process(target=wf.func1), Process(target=wf.func2)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

The above code will start 3 Python Processes (1 Master Process, 2 Slave Proceesses). 1st Python Process will terminate after 5 secs and the second will terminate after 10 secs.

This can be seen using top command on linux.

PID   COMMAND      %CPU TIME     #TH   #WQ  #PORT MEM    PURG   CMPRS
9918  Python       0.0  00:00.00 1     0    8     2148K  0B     0B
9917  Python       0.0  00:00.00 1     0    8     2144K  0B     0B
9916  Python       0.0  00:00.05 1     0    14    6680K  0B     0B

2 Comments

for p in processes: p.start() Does this initiates the functions 1 and 2 in parallel?
Yes..This initiates the function1 and 2 in Parallel with 2 different Python Processes If you use threading then it will be 1 Python Process but 2 threads but with Python GIL we can't achieve true parallelism.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.