0

I would like to process a temporal graph (essentially, a list of networkx graphs) in parallel using asynchronous parallelism on a shared memory machine. To achieve it I use Pool.apply_async() from the multiprocessing module. The temporal graph consists of 5 unit (snapshot) graphs. For each unit graph I perform multiple computationally expensive matrix operations.

Consider a simple sequential example first:

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

# for each unit graph
for Gk in Gt:

    # Temporal adjacency matrix
    Atk = adj_mtrx(Gk)

    # Temporal weight matrix
    # ...

    # Temporal eigenvector centrality
    # ...

    k += 1

It works flawlessly. Next, I attempt to assign each matrix operation to a worker from a pool:

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

if __name__ == '__main__':

    with Pool(processes=NP) as pool:

        # for each unit graph
        for Gk in Gt:
    
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
    
            # Temporal weight matrix
            # ...

            # Temporal eigenvector centrality
            # ...

            k += 1

However, here the code crashes with the following error:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./aggr_vs_time_dat_par_mini.py", line 100, in <module>
    Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
TypeError: adj_mtrx() takes 1 positional argument but 100 were given

I need help debugging the problem. It seems, the graph Gk is decomposed by the Pool and is passed to the function as a set of vertices. Also I would be grateful, if you could comment on (the appropriatness of) my general parallelisation approach with Pool.apply_async() from multiprocessing.

You may find all the necessary code for the minimal working example below:

import networkx as nx
import random   as rnd
import numpy    as np

from multiprocessing import Pool

# Generates random graph
def gen_rnd_graph(nv, ne):
    
    # Create random list of sources
    Vsrc = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of sinks
    Vsnk = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of edge weights
    U = [rnd.random() for iter in range(ne)]
    
    # Create list of tuples {Vsrc, Vsnk, U}
    T = list(zip(Vsrc,Vsnk,U))
    
    # Create graph
    G = nx.Graph()
    
    # Create list of vertices
    V = list(range(nv))
    
    # Add nodes to graph
    G.add_nodes_from(V)
    
    # Add edges between random vertices with random edge weights
    G.add_weighted_edges_from(T)
    
    return G

# Generates time-varying graph
def gen_time_graph(nv, ne, ng):

    # Initialise list of graphs
    l = []

    for i in range(ng):
        gi = gen_rnd_graph(nv, ne)
        l.append(gi)

    return l

# Computes adjacency matrix for snaphot of time-varying graph
def adj_mtrx(Gk):

    # no. of vertices
    n = Gk.number_of_nodes()

    # adjacency matrix
    Ak = np.zeros([n,n])

    # for each vertex
    for i in range(n):
        for j in range(n):
            if Gk.has_edge(i,j): Ak[i,j] = 1
        
    return Ak

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

if __name__ == '__main__':

    with Pool(processes=NP) as pool:

        # for each unit graph
        for Gk in Gt:
        
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
        
            k += 1
1
  • 1
    I guess its only a minor typo. Can you try replacing (Gk) with (Gk,)? Commented Jul 9, 2020 at 16:45

1 Answer 1

1

From the documentation of apply_async the signature of the function is

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Consequently, you need to pass Gk as a tuple, i.e. (Gk,):

Atk = pool.apply_async( adj_mtrx, (Gk,) ).get()

Background

Your function retrieves *Gk as input, which results in the list of nodes:

import networks as nx
g = nx.karate_club_graph()
print(*g)
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

1 and 0 length tuples

More details about creating 0 and 1 element tuples: How to create a tuple with only one element or directly the section in the python documentation

Basically, you create with () an tuple of length 0, with (Gk,) a tuple of length 1, and for any larger number of elements you can either use (x_1, ..., x_n) or (x_1, ..., x_n,).

*-operator

The *-operator can be used to use an arbitrary amount of arguments. See python documentation and section before. Similarly, you can use ** for arbitrary amount of keyword arguments. For more details, take a look at What does the star operator mean, in a function call? and the duplicates listed within this question.

Sign up to request clarification or add additional context in comments.

2 Comments

It works! Would you please expand your answer on the (Gk,) "comma" notation? What shall I pass, when there are two arguments, for example? (Gk, Gl, ) or (Gk, Gl)? Also it is hard to notice from the apply_async() function signature, that the input parameters should be a tuple. If possible, would you also tell a few words about the *g? What is it?
I've added some references for the single element tuple and the *-operator

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.