0

I'm trying to do some computation using a great quantity of data. The computation consists of simple correlation, however, my amount of data is significant and I was staring at my computer for more then 10 minutes with no output at all.

Then I tried to use multiprocessing.Pool. This is my code now:

from multiprocessing import Pool
from haversine import haversine

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()


def correlation(type_1, type_2, dist):
    in_range = 0
    for l1 in type_2:      # Coordinates of a data in data_1
        for l2 in type_2:  # Coordinates of a data in data_2
            p1 = (float(l1[0]), float(l1[1]))
            p2 = (float(l2[0]), float(l2[1]))
            if haversine(p1, p2) <= dist:  # Distance between two data of types *i* and *j*
                in_range += 1              # Number of data in data_2 inside area of data in data_1
        total = float(len(type_1) * len(type_2))
        if total != 0:
            return in_range / total  # Correlation between category *i* and *j*

corr = calculateCorrelation(permiters_per_region, complaints_per_region, 20)

However, speed hasn't improved. It seems that no parallel processing is being done:

enter image description here

As just one thread concentrates almost all work. At some point, all Python workers are using 0.0% of the CPU, and one thread is using 100%.

Am I missing something?

2
  • data_2 = collections.OrderedDict(sorted(data_1.items())) is that supposed to be data_2.items() Commented Mar 2, 2016 at 19:46
  • Yes, thank you, @GarrettR! Commented Mar 2, 2016 at 19:47

1 Answer 1

3

In the loop where you generate the jobs, you call apply_async and then wait for it to complete which effectively serializes the work. You could add the result object to a queue and wait after all the dispatch work is done (see below) or even move to the map method.

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)
    results = []

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            results.append((result, index_1, index_2))
    for result, index_1, index_2 in results:
        corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()
Sign up to request clarification or add additional context in comments.

4 Comments

when you refer to "wait for it to complete" is that the result.get() call? Is that a blocking call?
Yes! That's the problem. result.get() blocks until the job is complete then returns the result.
docs.python.org/2/library/… Is it implied, because the docs just say: "Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get()." I'm just curious
I should have noted that. I followed an answer of a guy with > 30k of reputation and though it was correct (stackoverflow.com/questions/25888255/…)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.