-1

I am trying to read huge csv file and do some actions on it and store result of each processed row in a queue , I wrote code based on this answer on SO.

But unfortunately it is not working , any help will be appreciated.

Below is my code.

def process_line(drow):
    print('sleeping')
    sleep(10)
    g_descript = drow['desc']
    date_str = g_descript.split('—')
        
    if len(date_str) > 1:
        ndt = search_dates(date_str[0])
    else:
        ndt = None
    drow['ndt'] = ndt
    q.put(drow)




def get_n_line():
    file_path = '/home/ztcUK.csv'
    with open(file_path,'r') as f:
        cdict = csv.DictReader(f)
        for row in cdict:
            yield row

if __name__ == '__main__':
    f = get_n_line()
    p = Pool(processes=10)

    n = 0 
    for i in f:
        n = n + 1
        print(n)
        p.map(process_line,(i,))
    p.close()
    p.join()

I expect 10 parallel threads running but only 1 is running , I put sleep for debugging purpose.

Below is the output.

enter image description here

6
  • 1
    "it is not working" is not a good question or problem description. What is not working? What is the expected output? Have you tried to create a minimal reproducible example? Commented Nov 29, 2021 at 7:24
  • It is not working as "I expect 10 concurrent threads" but only 1 is running Commented Nov 29, 2021 at 7:25
  • 1
    How do you conclude that only 1 thread is running? I cannot derive that from what you wrote. Commented Nov 29, 2021 at 7:26
  • 1
    By the way, please do not upload images of code/errors when asking a question. Commented Nov 29, 2021 at 7:27
  • 1
    What makes you think there is only one thread running at a time? How would you expect the output to be? Commented Nov 29, 2021 at 7:38

1 Answer 1

1

You are misusing pool.map(). This method sends a bunch of tasks to a pool thread, and then waits until they're all finished so it can return all the results as a list.

You're calling pool.map() with a single task at a time. So the pool.map() waits until that single task is done before continuing to the next iteration of the for loop

What you want is either to use pool.map() correctly:

with Pool() as pool:
    pool.map(process_line, f)

so that all of your threads are run as a part of a single map.

Alternatively, keep your current code, but replace pool.map() with pool.apply_async().

with Pool() as pool:
    for i in f:
        ....
        pool.apply_async(process_line, (i,))

As an aside, I highly recommend using with Pool()... which ensures that the pool gets closed and joined before continuing.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.