2

I have a function with multiple parameters, iterable_token,dataframe,label_array. However, only iterable_token is iterable in the function.

def cross_tab(label,token_presence):
    A_token=0
    B_token=0
    C_token=0
    D_token=0
    for i,j in zip(list(label),list(token_presence)):
        if i==True and j==True:
            A_token+=1
        elif i==False and j==False:
            D_token+=1
        elif i==True and j==False:
            C_token+=1
        elif i==False and j==True:
            B_token+=1
    return A_token,B_token,C_token,D_token

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    for token in iterable_token:
        try:
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                A[token]=A_token
                B[token]=B_token
                C[token]=C_token
                D[token]=D_token
                token_count[token]=token_presence_sum
                token_list.append(token)
        except Exception as e:
            pass
    return (A,B,C,D,token_count,token_list)

How do i parallelize My_ParallelFunction function?

Edit1: I tried the method suggested in example 1 because that's what i am looking for, to parallelize function.

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
    results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))

but error message is

RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<timed exec> in <module>

/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
    272         `func` and (a, b) becomes func(a, b).
    273         '''
--> 274         return self._map_async(func, iterable, starmapstar, chunksize).get()
    275 
    276     def starmap_async(self, func, iterable, chunksize=None, callback=None,

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given

Edit2: Here is the file i am using. You can download it from here and unzip. Also, run below script to get the required input parameters. Make sure to install nltk, pandas and numpy and change path to the file TokenFile.csv.

from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np

dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)

def get_uniquetoken(stop_words,input_doc_list):
    ##get unique words across all documents
    if stop_words:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
    else:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
    unique_words=set(unique_words)
    print('unique_words done! length is:',len(unique_words) )
    return unique_words


input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)

Edit 3 This is the solution i am using

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    i=0
    
    with mp.Pool(4) as p:
        token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
    #print(token_result[0])
    return token_result#(A,B,C,D,token_count,token_list)


def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
    #print(token)
    try:
        token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
        token_presence_sum=sum(token_presence)
        #print(token_presence_sum)
        if token_presence_sum:
            A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
            #print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
#             print('token_list:',token_list)
    except Exception as e:
        pass
    return A,B,C,D,token_count,token_list

However it is not giving me the result i want. Its a 949 X 6 X different_sizes matrix

4
  • 1
    Do you want to parallelize the entire function or only the for token ... loop? Commented Aug 3, 2020 at 7:18
  • 1
    @alec_djinn, i want to parallelize only the for loop for token ... but ultimately this will be part of this function. Commented Aug 3, 2020 at 7:25
  • 1
    Then you should make a function out of the for loop, and call a Pool.map() inside My_ParallelFunction. I will make an example of it. Commented Aug 3, 2020 at 7:27
  • 1
    @alec_djinn ok, sure Commented Aug 3, 2020 at 7:33

2 Answers 2

4

Here are two toy examples to show how you can parallelize a similar function.

First Option. If you want to parallelize the whole function. You can do that using Pool.starmap(). .starmap() works like map(), but you can pass multiple arguments to it.

from multiprocessing import Pool
import time


#Example 1 Simple function parallelization
def f(a,b,c,_list):
    x = a+b+c
    time.sleep(1)
    _list.append(x)
    return _list

inputs = [
    (1,2,3,['a','b','c']),
    (1,2,3,['d','e','f']),
    (1,2,3,['x','y','z']),
    (1,2,3,['A','B','C']),
]

start = time.time()
with Pool(4) as p:
    results = p.starmap(f, inputs)
end = time.time()

for r in results:
    print(r)
    
print(f'done in {round(end-start, 3)} seconds')

Output:

['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds

Second option. If you want to parallelize only the for-loop inside the function. In that case, you should rewrite your loop as a function and call it using Pool.map() or Pool.starmap().

#Example 2. Function calling a parallel function

#loop
def g(_string):
    time.sleep(1)
    return _string + '*'

#outer function
def f(a,b,c,_list):
    x = a+b+c
    _list.append(str(x))
    #loop parallelization
    with Pool(4) as p:
        new_list = p.map(g, _list)
    return new_list

start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()

print(result)
print(f'done in {round(end-start, 3)} seconds')

Output:

['a*', 'b*', 'c*', '6*']
done in 1.048 seconds

Note that the "loop function" contains the logic to deal with a single element of the iterable. Pool.map() will take care of run it for all the elements.

The time.sleep(1) calls are to simulate some time-consuming calculation. If the parallelization works, you should be able to process 4 inputs in 1 second rather than in 4 seconds.

Here is an example using your code:

def My_ParallelFunction(iterable_token, dataframe, label_array):

    with mp.Pool(4) as p:
        token_result = p.starmap(
            _loop,
            [(token, dataframe, label_array) for token in iterable_token]
        )
    return token_result


def _loop(token, dataframe, label_array):
    A={}
    B={}
    C={}
    D={}
    token_count = {}
    token_list = []
    try:
        
        token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
        token_presence_sum=sum(token_presence)
        if token_presence_sum:
            A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
            return A,B,C,D,token_count,token_list

    except Exception as e:
        print(e)
Sign up to request clarification or add additional context in comments.

11 Comments

Not all the items are iterable. Only the first item is to be iterated. I followed your code in first scenario as with mp.Pool(mp.cpu_count()) as p: results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array)) but error message is TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given. Added my response in edited details
Of course, you have to pass a single element of the iterable in that case. In your case, the best is option is to rewrite the loop. If you share your dataframe I can make a proper example.
@StatguyUser I have updated my answer to include an example using your code. You can increase the number of processes you want to use by incrementing the number in Pool(). As a rule of thumb, you should not use more processes than the number of CPU cores you have.
There is a logical flaw in _loop function, dataframe and label_array are not an input parameter but is being called inside the fuction in the line token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b'). Please read the question title again to get an idea what exactly i am looking for.
@StatguyUser Take my code as an example. You have to rewrite your function in discrete blocks. You use also other variables like A,B,C,D and token_counts, that need to be passed in the _loop function. But it would be better to rewrite that part instead.
|
0

Something along these lines should work, if you wish to only multiprocess the for loop and not the entire function.

from multiprocessing import Pool

def My_ParallelFunction(iterable_token,dataframe,label_array):
    def get_token_counts(token,dataframe,label_array):
        try:
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                return token,A_token,B_token,C_token,D_token
        except Exception as e:
            print(e)
            pass
        
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    with Pool() as p:
        p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
        
    for res in p_results:
        if res is None:
            continue
        token,A_token,B_token,C_token,D_token = res
        A[token]=A_token
        B[token]=B_token
        C[token]=C_token
        D[token]=D_token
        token_count[token]=token_presence_sum
        token_list.append(token)
    return (A,B,C,D,token_count,token_list)

I removed the part where you add elements to lists and dictionnaries from the worker function since you'd have to look into queues or shared objects in order to multip appending to a list or a dict. It's more work but should make your code run slightly faster (it all depends on how many elements you have in your iterable and what takes a lot of time to compute).

The idea behind this code is you create a worker function get_token_counts that will run inside every thread provided it has a token, a dataframe and a label_array. The returning part of the function contains all the elements needed to add your elements to a dictionnary (since you can't really know which thread finishes first, you return token with it and it solves all your indexing problems. Although, maybe starmap keeps the order of the arguments, so maybe it's not necessary).

Once all elements have been computed, you proceed to adding them to your lists and dicts.

This is basically multiprocessing some of the dataframe functions along with cross_tab, not exactly My_ParallelFunction.

Since you gave no example I can't really test out the code and come up with something better.

1 Comment

I added the data and code to get to input parameters. Check the details under Edit2

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.