0

I have a dynamic Dask Kubernetes cluster. I want to load 35 parquet files (about 1.2GB) from Gcloud storage into Dask Dataframe then process it with apply() and after saving the result to parquet file to Gcloud.

During loading files from Gcloud storage, a cluster memory usage is increasing to about 3-4GB. Then workers (each worker has 2GB of RAM) are terminated/restarted and some tasks getting lost, so cluster starts computing the same things in a circle. I removed apply() operation and leave only read_parquet() to test if my custom code causes a trouble, but the problem was the same, even with just single read_parquet() operation. This is a code:

client = Client('<ip>:8786')
client.restart()

def command():
    client = get_client()
    df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
    df = df.compute()

x = client.submit(command)
x.result()

Note: I'm submitting a single command function to run all necessary commands to avoid problems with gcsfs authentication inside a cluster

After some investigation, I understood that problem could be in .compute() which returns all data to a process, but this process (my command function) is running on a worker. Because of that, a worker doesn't have enough RAM, crashes and lose all computed task which triggers tasks re-run.

My goal is:

  • to read from parquet files
  • perform some computations with apply()
  • and without even returning data from a cluster write it back to Gcloud storage in parquet format.

So, simply I want to keep data on a cluster and not return it back. Just compute and save data somewhere else.

After reading Dask distributed docs, I have found client.persist()/compute() and .scatter() methods. They look like what I need, but I don't really understand how to use them.

Could you, please, help me with client.persist() and client.compute() methods for my example or suggest another way to do it? Thank you very much!

Dask version: 0.19.1

Dask distributed version: 1.23.1

Python version: 3.5.1

1 Answer 1

2
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()  # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe

df = df.persist()  # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the answer. As I understand df.persist() is async. The main problem is that command() function finishes before the computations will be performed, so other tasks are canceled. There is a way to wait for df.persist() to finish computations, but not to gather data?
Okay. Never mind my last question. I have found wait() method in docs. Thanks one more time.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.