7

I am working to try to convert a program to be parallelizable/multithreaded with the excellent dask library. Here is the program I am working on converting:

Python PANDAS: Stack by Enumerated Date to Create Records Vectorized

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)

expected results:

id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8 

It occurred to me that this might be a good candidate to try to parallelize because the separate dask partitions should not need to know anything about each other to accomplish the required operations. Here is a naive representation of how I thought it might work:

dd_test = dd.from_pandas(df_test, npartitions=3)

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)

So far I have been trying to work through the following errors or idiomatic differences:

  1. "NotImplementedError: Only integer valued repeats supported." I have tried to convert the index into a int column/array to try as well but still run into the issue.

2. dask does not support the mutating operator: "+="

3. No dask .to_timedelta() argument

4. No dask .cumcount() (but I think .cumsum() is interchangable?!)

If there are any dask experts out there who might be able let me know if there are fundamental impediments to preclude me from trying this or any tips on implementation, that would be a great help!

Edit:

I think I have made a bit of progress on this since posting the question:

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]') 
dd_test = dd_test.reset_index(drop=True)

However, I am still stuck on the dask array repeats error. Any tips still welcome.

4
  • What is df_test['units'].dtype? Commented Feb 17, 2018 at 5:31
  • @John Zwinck It should be an int. Commented Feb 17, 2018 at 5:53
  • What is df_test['units'].dtype? Please run the code and see. Don't say "it should be" without checking it. Commented Feb 17, 2018 at 5:55
  • In both df_test['units'] and dd_test['units'], they are int64 to confirm. Commented Feb 17, 2018 at 18:47

1 Answer 1

2

Not sure if this is exactly what you are looking for, but I replaced the da.repeat with using np.repeat, along with explicity casting dd_test.index and dd_test['units'] to numpy arrays, and finally adding dd_test['transaction_dt'].astype('M8[us]') to your timedelta calculation.

df_test = pd.read_csv(StringIO(test_data), sep=',')

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[np.repeat(np.array(dd_test.index), 
np.array(dd_test['units']))]
dd_test['transaction_dt'] = dd_test['transaction_dt'].astype('M8[us]') + (dd_test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)

df_expected = dd_test.compute()
Sign up to request clarification or add additional context in comments.

1 Comment

Wow, thanks for the interesting solution and very creative. I'm going to take a look into how dask and numpy are inter-operating on this, how much overhead is being incurred, and if this is fully taking advantage of dask for multithreading.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.