0

I'm trying to perform calculations on a large network object to perform some predictions on links appearing between nodes. I am able to do this in serial, but not in parallel using Pythons multiprocessing. The function never seems to return from the parallel implementation looking at my task manager it does not seem to take up a lot of memory or CPU-power either

def jaccard_serial_predictions(G):
    """
    Create a ranked list of possible new links based on the Jaccard similarity,
    defined as the intersection of nodes divided by the union of nodes

    parameters
    G: Directed or undirected nx graph
    returns
    list of linkbunches with the score as an attribute
    """
    potential_edges = []
    G_undirected = nx.Graph(G)
    for non_edge in nx.non_edges(G_undirected):
        u = set(G.neighbors(non_edge[0]))
        v = set(G.neighbors(non_edge[1]))
        uv_un = len(u.union(v))
        uv_int = len(u.intersection(v))
        if uv_int == 0 or uv_un == 0:
            continue
        else:
            s = (1.0*uv_int)/uv_un

        potential_edges.append(non_edge + ({'score': s},))

    return potential_edges

def jaccard_prediction(non_edge):
    u = set(G.neighbors(non_edge[0]))
    v = set(G.neighbors(non_edge[1]))
    uv_un = len(u.union(v))
    uv_int = len(u.intersection(v))
    if uv_int == 0 or uv_un == 0:
        return
    else:
        s = (1.0*uv_int)/uv_un
    return non_edge + ({'score': s},)

def jaccard_mp_predictions(G):
    """
    Create a ranked list of possible new links based on the Jaccard similarity,
    defined as the intersection of nodes divided by the union of nodes

    parameters
    G: Directed or undirected nx graph
    returns
    list of linkbunches with the score as an attribute
    """
    pool = mp.Pool(processes=4)
    G_undirected = nx.Graph(G)
    results = pool.map(jaccard_prediction, nx.non_edges(G_undirected))
    return results

Calling jaccard_serial_predictions(G) with G being a graph of 95000000 potential edges returns in 4.5 minutes, but jaccard_mp_predictions(G)does not return even after running for half an hour.

5
  • What is G? Some global object which you intend to use from multiple processes? That won't work. Try this approach: change jaccard_serial_predictions so that it only runs jaccard_prediction multiple times (sequentially), providing it all needed data without shared objects. Once that works, try multiprocessing. Also, google about differences between threads and processes - you are not using threads here. Commented Dec 28, 2015 at 21:42
  • Do your workers even start? There are some problems when big objects are transfered. It's worth nothing that no error is reported but simply the data is not received. Commented Dec 28, 2015 at 22:15
  • Not related to question: you can save some lines if you rewrite if not(uv_int == 0 or uv_un == 0): potential_edges.append(non_edge + ({'score': (1.0*uv_int)/uv_un},)) Commented Dec 29, 2015 at 5:31
  • @zvone: G is a Graph object which contains the necessary functions to return the neighborhoods of nodes. The way I understand the multiprocessing documentation the context is shared between processes and so it should be OK to read from global objects as long as you do not use them for sharing state. See docs, section 17.2.1.2 @memoselyk: The serial implementation only uses about 200 MB when running, so I expect the parallel implementation to be a bit lower than 800 MB. I don't think this is big enough to pose a problem. Commented Dec 29, 2015 at 10:11
  • @SimonThordal My point about G was that if one process modifies it, others will not see the change. If it is just an immutable object which provides some functions for calculation, that should be fine. Commented Dec 29, 2015 at 11:21

1 Answer 1

1

I'm not sure about this, but I think I'm spotting a potential slowdown. Compare the code for the serial operation on each node:

u = set(G.neighbors(non_edge[0]))
v = set(G.neighbors(non_edge[1]))
uv_un = len(u.union(v))
uv_int = len(u.intersection(v))
if uv_int == 0 or uv_un == 0:
    continue
else:
    s = (1.0*uv_int)/uv_un
potential_edges.append(non_edge + ({'score': s},))

With that for the parallel operation:

u = set(G.neighbors(non_edge[0]))
v = set(G.neighbors(non_edge[1]))
uv_un = len(u.union(v))
uv_int = len(u.intersection(v))
if uv_int == 0 or uv_un == 0:
    return
else:
    s = (1.0*uv_int)/uv_un
return non_edge + ({'score': s},)

In the serial version, whenever this condition uv_int == 0 or uv_un == 0 is true, you skip adding to the list. But in the parallelized version, you return None.

The mapping operation isn't smart enough to not add None to the list, while the serial operation just skips over those elements. That could lead to a slowdown, due to an additional appending operation for each non-scoreable element in the parallel version. If you have a lot of those, the slowdown could be huge!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.