1

I am quite new to twisted and I really need one thing - to run arbitrary number of functions (starting at the same tame), collect the results from all of them and do some processing.

Here is what I have:

from twisted.internet import defer
import time


# slow computing query
def process_data(num, data):
    time.sleep(5)
    array = []
    # mock the results obtained from processed data
    for i in range(0, 5):
        array.append(num)
    return array

def process_results(arrays):
    # this should collect return arrays of all callbacks 
    print arrays

data = []
callbacks_refs = []
for i in range(0, 5):
   d=defer.Deferred()
   d.addCallback(process_data)
   callbacks_refs.append(d)

callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)

for i, d in enumerate(callbacks_refs):
    d.callback(i, data)

I was hoping that the last for loop will start the execution of all callbacks asynchronously (like normally with Promises) and all the results will be passed to process_results function that will be executed after all of the callbacks from callbacks_refs complete, but I feel that I am terribly wrong with it.

2 Answers 2

3

I don't know how closely your sample resembles your actual code, but the sample code shows some misconception about what Twisted does. Twisted does not magically make your synchronous code asynchronous. You are blocking the event loop in time.sleep. If you are doing something CPU-bound (as opposed to I/O bound), you can use multiple threads or processes.

I will assume that process_data is a blocking call, and provide you a solution based on multithreading:

import time
from twisted.internet import defer, task, threads

# slow computing query
def process_data(num):
    time.sleep(5)
    array = []
    # mock the results obtained from processed data
    for i in range(0, 5):
        array.append(num)
    return array

def process_results(arrays):
    # this should collect return arrays of all callbacks
    print arrays

def main(_):
    callbacks_refs = []
    for i in range(0, 5):
        callbacks_refs.append(threads.deferToThread(process_data, i))
    callbacks = defer.DeferredList(callbacks_refs)
    callbacks.addCallback(process_results)
    return callbacks

task.react(main)

I will also give you one general advice about Twisted programming - if you find yourself typing d = defer.Deferred(), something is likely wrong with your design.

Sign up to request clarification or add additional context in comments.

Comments

0

I don't know if this has a workaround, but with the way you are making the defer.callback(), you are getting the wrong parameters passed to your callback.

If you were to attach an errback along with a callback, you might find you are just getting a load of failed results... so its working, but not working as expected.

I see two fixes.

from functools import partial
for i in range(0, 5):
    d=defer.Deferred()
    d.addCallback(partial(process_data,i,data[i]))
    # This partial is still kinda crooked, but hopefully I have made my point
    callbacks_refs.append(d)

or change the way you pass data in your callback functions

# slow computing query
def process_data(data_dict):
    #data_dict['num']
    #data_dict['data']

#...and further down
d.callback({'num':4,'data':(1,2,3)})

I am sorry, I am not quite as familiar on deferredlist, but I think once you fix the deferreds, the deferredlist may just work automagically.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.