0

I am trying to use Dask Distributed's LocalCluster to run code in parallel using all the cores of a single machine.

Consider a sample python data pipeline, with the folder structure below.

sample_dask_program
├── main.py
├── parallel_process_1.py
├── parallel_process_2.py
├── process_1.py
├── process_2.py
└── process_3.py

main.py is the entry point, which executes while pipeline sequentially.

Eg:

def run_pipeline():
    stage_one_run_util()
    stage_two_run_util()

    ...

    stage_six_run_util()


if __name__ == '__main__':

    ...

    run_pipeline()

parallel_process_1.py and parallel_process_2.py are modules which create a Client() and use futures to achieve parallelism.

with Client() as client:
            # list to store futures after they are submitted
            futures = []

            for item in items:
                future = client.submit(
                    ...
                )
                futures.append(future)

            results = client.gather(futures)

process_1.py, process_2.py and process_3.py are modules which do simple computation that need not be run in parallel using all the CPU cores.

Traceback:

  File "/sm/src/calculation/parallel.py", line 140, in convert_qty_to_float
    results = client.gather(futures)
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/client.py", line 1894, in gather
    asynchronous=asynchronous,
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/client.py", line 778, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/iouser/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
concurrent.futures._base.CancelledError

This is the error thrown by the workers:

distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:33901 -> tcp://127.0.0.1:38821
Traceback (most recent call last):
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 248, in write
    future = stream.write(frame)
  File "/home/iouser/.local/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
    self._check_closed()
  File "/home/iouser/.local/lib/python3.7/site-packages/tornado/iostream.py", line 1035, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/worker.py", line 1248, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 255, in write
    convert_stream_closed_error(self, e)
  File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe

I am not able to locally reproduce this error or find a minimum reproducible example, as the occurrence of this error is abrupt.

Is this the right way to use Dask LocalCluster in a modular python program?

EDIT

I have observed that these errors come up when the LocalCluster is created with a relatively high number of threads and processes. I am doing computations which uses NumPy and Pandas and this is not a good practice as described here.

At times, when the LocalCluster is created using 4 workers and 16 processes, no error gets thrown. When the LocalCluster is created using 8 workers and 40 processes, the error I described above gets thrown.

As far as I understand, dask randomly selects this combination (is this an issue with dask?), as I tested on the same AWS Batch instance (with 8 cores (16 vCPUs)).

The issue does not pop up when I forcefully create the cluster with only threads.

Eg:

cluster = LocalCluster(processes=False)
with Client(cluster) as client:
    client.submit(...)
    ...

But, creating the LocalCluster using only threads slows down the execution by 2-3 times.

So, is the solution to the problem, finding the right number of processes/threads suitable to the program?

2
  • distributed.dask.org/en/latest/killed.html may help Commented Apr 9, 2020 at 13:53
  • Thank you @mdurant, this page helped me get here. The computation I am doing involves NumPy/Pandas. It seems that the error pops up when the number of processes is relatively high. Commented Apr 10, 2020 at 13:57

1 Answer 1

1

It is more common to create a Dask Client once, and then run many workloads on it.

with Client() as client:
    stage_one(client)
    stage_two(client)

That being said, what you're doing should be fine. If you're able to reproduce the error with a minimal example, that would be useful (but no expectations).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.