1

I have a pipeline working with LocalCluster:

from distributed import Client
client = Client()

list_of_queries = [...]  # say 1_000 queries

loaded_data = client.map(sql_data_loader, list_of_queries)

processed_data = client.map(data_processor, loaded_data)

writer_results = client.map(data_writer, processed_data)

results = client.gather(writer_results)

Everything works, but not quite as I would expect.

Looking at dashboard's status page I see somethings like this:

sql_data_loader             900 / 1000
data_processor                0 / 1000
data_writer                   0 / 1000

I.e. tasks are executed sequentially as opposed to "in parallel". As a result data_processor does not start executing until all 1000 queries have been loaded. And data_writer waits until 'data_processor' finishes processing all its futures.

Based on previous experience with dask where dask.delayed was used instead of client.map expected behavior would be something like:

sql_data_loader              50 / 1000
data_processor               10 / 1000
data_writer                   5 / 1000

Is this a false expectation or is there something I am missing with how to set up pipeline to ensure behavior that would be similar to dask.delayed?

1 Answer 1

1

If you run the maps one after the other then everything should pipeline nicely.

There is some tension between two desired objectives:

  1. Tasks should pipeline, as you like
  2. Tasks that are submitted first should have higher priority

To balance between these two objectives Dask assigns policies based on the delay between calls. If two map calls happen right after each other then Dask assumes that they're part of the same computation, however if they are separated by a significant amount of time then Dask assumes that they are different computations, and so prioritizes the earlier tasks. You can control this with the fifo_timeout keyword

client.map(f, ..., fifo_timeout='10 minutes')

Here is the relevant documentation page

Here is an example showing the behavior you want if you bundle map calls together:

In [1]: from dask.distributed import Client

In [2]: client = Client(processes=False)

In [3]: import time

In [4]: def f(x):
   ...:     time.sleep(0.1)
   ...:     print('f', x)
   ...:     return x
   ...: 

In [5]: def g(x):
   ...:     time.sleep(0.1)
   ...:     print('g', x)
   ...:     return x
   ...: 

In [6]: futures = client.map(f, range(20))
   ...: futures = client.map(g, futures)
   ...: 

In [7]: f 0
f 1
f 2
f 3
f 5
f 4
f 6
f 7
g 0
g 1
g 3
g 2
g 4
g 5
g 6
g 7
f 8
f 9
f 10
f 11
f 12
g 8
f 13
g 9
g 10
g 11
g 12
f 14
g 13
f 15
f 16
f 17
g 14
f 18
g 15
f 19
g 16
g 17
g 18
g 19
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much. For illustrative example and the link. I did have partial and logging calls in between the .map commands. After those were moved to other places tasks are executed most of the time like expected. I say "most of the time" because I did not change fifo_timeout parameter yet and sometimes one of the tasks appears to be lagging behind and is waiting for others to complete. But it is clear now where the problem is. Thanks again.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.