1

I am generating negative pairs with the help of positive pairs. I would like to speed up the process by using all core of the CPU. On a single CPU core, it takes almost five days including day and night.

I tend to change the below code in multiprocessing. Meanwhile, I have no list of "positives_negatives.csv"

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    for combo in tqdm(itertools.combinations(identities.values(), 2), desc="Negatives"):
        for cross_sample in itertools.product(combo[0], combo[1]):
            negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
                                         ignore_index=True)
    negatives["decision"] = "No"
    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)

Modified code

def multi_func(iden, negatives):
    for combo in tqdm(itertools.combinations(iden.values(), 2), desc="Negatives"):
        for cross_sample in itertools.product(combo[0], combo[1]):
            negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
                                         ignore_index=True)

Used

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        secs = [5, 4, 3, 2, 1]
        results = executor.map(multi_func(identities, negatives), secs)

    negatives["decision"] = "No"
    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)
4
  • Your best bet would be to break up the the work into subgroups, then use multiprocessing from there. Commented Jan 30, 2021 at 4:59
  • If possible for you please give me an example related to the "else" clause Commented Jan 30, 2021 at 5:02
  • 2
    Not really... Start with this maybe? Commented Jan 30, 2021 at 5:09
  • Actually, I already did two it never works Commented Jan 30, 2021 at 6:11

1 Answer 1

1

The best way is to implement Process Pool Executor class and create a separate function. Like you can achieve in this way

Libraries

from concurrent.futures.process import ProcessPoolExecutor
import more_itertools
from os import cpu_count

def compute_cross_samples(x):
    return pd.DataFrame(itertools.product(*x), columns=["file_x", "file_y"])

Modified code

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    with ProcessPoolExecutor() as pool:
        # take cpu_count combinations from identities.values
        for combos in tqdm(more_itertools.ichunked(itertools.combinations(identities.values(), 2), cpu_count())):
            # for each combination iterator that comes out, calculate the cross
            for cross_samples in pool.map(compute_cross_samples, combos):
                # for each product iterator "cross_samples", iterate over its values and append them to negatives
                negatives = negatives.append(cross_samples)

    negatives["decision"] = "No"

    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)
Sign up to request clarification or add additional context in comments.

3 Comments

Can you add some more about this? ie. what sort of speedup did you see?
Additionally, you should be able to mark your own answer as the Answer with the check to its left after 2-ish days!
@ti7 Yes. Later, I will add more details

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.