1

I am new in Python, I had a program which loads one big CSV file where is over 100k lines, each line had 4 columns. In FOR loop I check for each row same duplicated list (dlist), this dlist is list of objects of DRef class which I load with another function

DsRef class:

from tqdm import tqdm
from multiprocessing import Pool, cpu_count, freeze_support

class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)  

    def __iter__(self):
        return iter(self.__dict__.items())

Duplication class:

class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                 'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp 

Load data file sample for testing:

dlist= []
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))

Method to find and return rows where were duplicated values:

def FindDuplications(dlist):
    duplicates = []
    for pn, comp in enumerate(dlist):            
        matches = [xpn for xpn, xcomp in enumerate(dlist) if pn == xpn and comp == xcomp]
        duplicates.append(Duplication(pn, comp, len(matches)))
    return duplicates

row.pn == x.pn and row.comp == x.comp if its true I find a duplication I compare first 2 parameters of each objech with each object in list

Now I try to use something like that for use all processor for a faster result, now it takes over 15 minutes

if __name__ == '__main__':
    freeze_support()
    p = Pool(cpu_count())
    duplicates = p.map(FindDuplications, dlist)
    p.close()
    p.join()

In first I got an error when Class is not iterable then I create iter functions for first class, after that, I got an error then tuple object does not know pn or comp parameter, then I use in for enumerate(dlist) but still does not work

Could you please help me?

I would like also use TQDM to check the progress of processing function to find duplications


there is an original working function without use Multithreading:

def CheckDuplications(dlist):
    print(get_yellow("========= CHECK CROSS DUPLICATIONS ========="))
    duplicates = []
    for r in tqdm(dlist):
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    results = [d for d in duplicates if d.cnt > 1]
    results = set(results)
    return results

From function FindDuplications I got list of DsRef objects (simple copy), but this must return list of Duplication objects, something is wrong

Thank you

7
  • what is the problem exactly? I just tried running your code, and it seems to execute fine at least. Is the output not what you expect? Commented Dec 12, 2019 at 17:43
  • When I try to run CheckDuplications function it works fine, but it uses only one from 12 logical processor cores, when I use FindDuplications I would like use all logical core or more then one for faster result. When I run multithread function FindDuplications script end after 3 seconds, but function CheckDuplications takes over 17 minutes, but I had over 100k rows Commented Dec 12, 2019 at 18:05
  • 1
    ooh, I think I got your problem. the pool.map() calls the given function on every item independently. The FindDuplications doesn't receive the full list, and it can't have access to the rest of the list to find other duplicates. Commented Dec 12, 2019 at 18:22
  • btw, python convention uses snake_case for functions, it should be find_duplications Commented Dec 12, 2019 at 18:23
  • Ok snake_case will be ok, but do you have some idea how solve this problem or how to fix it, please ? Commented Dec 12, 2019 at 18:53

1 Answer 1

1
+150

There were a few troubles in the code, you didn't parallel it, you can't just run one-thread code with a heavy task on multiple cores. The code requires some adopts.

Ok, anyway, here we are :)

from math import ceil
from multiprocessing import Pool, cpu_count, freeze_support


def get_red(val):
    return val


class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)


class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                     'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp


dlist = []
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))


def FindDuplications(task):
    dlist, start, count = task

    duplicates = []
    for r in dlist[start:start + count]:
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    return {d for d in duplicates if d.cnt > 1}


if __name__ == '__main__':
    freeze_support()

    threads = cpu_count()
    tasks_per_thread = ceil(len(dlist) / threads)

    tasks = [(dlist, tasks_per_thread * i, tasks_per_thread) for i in range(threads)]

    p = Pool(threads)
    duplicates = p.map(FindDuplications, tasks)
    p.close()
    p.join()

    duplicates = {item for sublist in duplicates for item in sublist}

    print(duplicates)
    print(type(duplicates))

It works well for me and returns the same results as one-thread function and works in all available cores in parallel.

Output

python test.py
{TTT_EEE;CCC_VVV;2
, TTT_XXX;CCC_VVV;2
}
<class 'set'>
Sign up to request clarification or add additional context in comments.

6 Comments

Hi Alexander, thanks now it works in parallel with all cores, but I need to return only Duplication Class, not DsRef. Into Duplication class I need to load all data, and after I select data where is Class attribute CNT larger then 1 for select only duplication. That mean, then each item form task list (DsRef class object) must check all rows in dlist. Is it possible to debug in parallel function with VS Code breakpoint?
The source code works with your function, just modified for the multithread usage. I didn't change anything in it logic related duplicates.append(Duplication(pn, comp, len(matches)))
Original (NON Multithread) function returns list of Duplication objects, but now it returns Duplicate object, where attribute comp had DsRef class, but Duplication class has only string attributes and CNT attribute for number of duplications
woof, I didn't see that you changed the original function. Ok, updated to the original logic had. Please check now
I do not change, anything, but it does not matter. I need to add 2 parameters into a function (first is tasks list per core and also full dlist ) Because task list must check all data in origin list
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.