I am new to Pyspark and I have created a pandas udf. The purpose of this udf is to accept a series and apply an ML model on it. I have data that has these columns: ID, models_name, prices, units_sold, date..... ID is basically a product id and is a string. models_name is LR, XGboost, SARIMAX etc. prices and units_sold are timeseries. In order to send these as series, I have collected these as a list:
grouped_spark_df = spark_df.groupBy("ID").agg(
F.collect_list("prices").alias("prices"),
F.collect_list("units_sold").alias("units_sold"),
F.collect_list("date").alias("date"))
output:
| ID | prices | units_sold | date |
|---|---|---|---|
| P_1 | [10.5, 11.0, 9.8] | [2, 3, 1] | ["2024-03-01", "2024-03-02", "2024-03-03"] |
| P_2 | [15.0, 16.2] | [5, 7] | ["2024-03-01", "2024-03-02"] |
| P_3 | [7.5] | [10] | ["2024-03-01"] |
Then I add models_name based on a list of models
exploded_df = grouped_spark_df.withColumn("models_name", explode(array([lit(m) for m in models_list])))
| ID | prices | units_sold | date | models_name |
|---|---|---|---|---|
| P_1 | [10.5, 11.0, 9.8] | [2, 3, 1] | ["2024-03-01", "2024-03-02", "2024-03-03"] | A |
| P_1 | [10.5, 11.0, 9.8] | [2, 3, 1] | ["2024-03-01", "2024-03-02", "2024-03-03"] | B |
| P_1 | [10.5, 11.0, 9.8] | [2, 3, 1] | ["2024-03-01", "2024-03-02", "2024-03-03"] | C |
| P_2 | [15.0, 16.2] | [5, 7] | ["2024-03-01", "2024-03-02"] | A |
| P_2 | [15.0, 16.2] | [5, 7] | ["2024-03-01", "2024-03-02"] | B |
| P_2 | [15.0, 16.2] | [5, 7] | ["2024-03-01", "2024-03-02"] | C |
| P_3 | [7.5] | [10] | ["2024-03-01"] | A |
| P_3 | [7.5] | [10] | ["2024-03-01"] | B |
| P_3 | [7.5] | [10] | ["2024-03-01"] | C |
Then I call my pandas udf like this:
partitioned_df = exploded_df.repartition(50, "Bucket","ID","models_name").coalesce(10)
results_spark = partitioned_df.select(
col("ID"),
col("models_name"),
train_model_udf(
col("ID"), col("prices"), col("units_sold"), col("date")
).alias("model_metrics")
)
Here is my pandas udf:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import MapType, StringType, FloatType,ArrayType
import pandas as pd
from sklearn.model_selection import train_test_split
def create_broadcast_and_udf(feat_cols,model_results_data,feature_set_name,target,dt_string):
#broadcast variables....
@pandas_udf(StringType())
def train_model_udf(
ID, prices, units_sold, date, models_name):
# # Convert to Pandas DataFrame
data = pd.DataFrame({
"ID": ID, "prices": prices, "units_sold": units_sold, "date": date,
"models_name": models_name
})
for col in data.columns:
data[col] = data[col].apply(lambda x: x.tolist() if isinstance(x, np.ndarray) else x)
explode_cols = [col for col in data.columns if col != "ID" and isinstance(data[col][0], list)]
# Explode all selected columns
data = data.explode(explode_cols, ignore_index=True)
try:
new_data = []
ID = str(data["ID"].iloc[0])
# return pd.Series([models_name] * len(ID))
model_directory = f'{model_results_data}/{dt_string}/{models_name}/{feature_set_name}_{target}'
if not os.path.exists(model_directory): # checking if model directory exists or not
os.makedirs(model_directory)
train, test = train_test_split(data, test_size=0.2, shuffle=False)
combined_data = pd.concat([train, test]).sort_values(by=['ID', 'WEEK'])
data.to_csv(f"{model_directory}/{ID}_ID_CombinedData.csv", index=False)
model_write_path = f"{model_directory}/{ID}.pkl"
# training models
if models_name.lower() =='sarimax':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_sarimax(train,test,model_write_path,price_feature,feat_cols,target,elasticity_method)
elif models_name.lower() =='lr':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_lr(train,test,model_write_path,price_feature,broadcast_features.value,target)
elif models_name.lower() =='xgboost':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_xgb(train,test,model_write_path,price_feature,feat_cols,target)
elif models_name.lower() =='gam':
spline_features = feat_cols.copy()
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_gam(data,test,ID,spline_features,price_feature,target,model_write_path)
print(price_feature)
print(target)
temp_df = data.copy()
predicted_volumes = np.concatenate((y_train_pred, forecast))
# Add new columns to the copy
temp_df['Experiment_Model'] = models_name
temp_df['feat_cols'] = "-".join(feat_cols)
temp_df['MAPE'] = mape_hj
temp_df['WMAPE'] = wmape_hj
temp_df['final_elasticity'] = elasticity
temp_df['predicted_volume'] = predicted_volumes
temp_df = temp_df.reset_index()
# Append the rows of the modified DataFrame to the list
new_data.extend(temp_df.to_dict('records'))
# calculation percent price for calculating simulated volume
test['pct_change_pr'] = test['updated_WAP'].pct_change()
# Initializing list to store simulated volumes
simulated_volumes = []
# Start with the first available volume in the test set as the initial simulated volume
initial_volume = test[target].iloc[0]
simulated_volumes.append(initial_volume)
# Iterate over the test set, starting from the second row
for i in range(1, len(test)):
previous_simulated_volume = simulated_volumes[-1] # Get the last simulated volume
pct_change_pr = test['pct_change_pr'].iloc[i] # Current row's price change
# Calculate the new simulated volume based on the previous simulated volume
simulated_volume = previous_simulated_volume + previous_simulated_volume * (elasticity * pct_change_pr)
# Append the calculated volume to the list
simulated_volumes.append(simulated_volume)
# Add the simulated volumes to the test set as a new column
test['simulated_volume'] = simulated_volumes
test['elasticity'] = elasticity
# saving file to check if simulated volume is correctly calculated
test[['ID','updated_WAP','pct_change_pr',target,'elasticity','simulated_volume']].to_csv(f"{model_directory}/{ID}_simulated.csv")
# for plotting the graphs
temp_test_df = pd.DataFrame()
temp_test_df['date']= test[target].index
temp_test_df['actual']= test[target].values
temp_test_df['predicted']= forecast
temp_test_df['ppg_name']= ID
temp_test_df['features used']= [feat_cols]* len(temp_test_df)
temp_test_df['rmse']= rmse
temp_test_df['mse']= mse
# temp_test_df['mae']= mae
temp_test_df['mape']= mape_hj
temp_test_df['wmape']= wmape_hj
temp_test_df['elasticity']= elasticity
temp_test_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(test[target], forecast)]
test = test.reset_index(drop=True)
temp_test_df['simulated_volume'] = test['simulated_volume']
train_temp_df = pd.DataFrame()
train_temp_df['date'] = train.index
train_temp_df['actual'] = train[target].values
train_temp_df['predicted'] = y_train_pred
train_temp_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(train[target].values, y_train_pred)]
temp_test_df['set'] = 'Test'
train_temp_df['set'] = 'Train'
combined_df = pd.concat([train_temp_df, temp_test_df])
combined_df = combined_df.reset_index()
data = data.reset_index()
combined_df[price_feature] = data[price_feature]
combined_df.to_csv(f'{model_directory}/{ID}_ttPred.csv', index=False)
row_to_add = {
'Experiment':models_name,
'feat_cols':('-').join(feat_cols),
'ID': ID,
'elasticity':elasticity,
'mape':mape_hj,
'wmape':wmape_hj,
'rmse': rmse,
'mse': mse,
'Train':'MODEL TRAINED ON: '+str(len(data))}
except Exception as e:
print(f"Excpetion:-- {str(e)} --:in ID {data}")
# print(data[data['ID']==ppg_unique[combined_id]])
row_to_add = {
'Experiment':models_name,
'feat_cols':('-').join(feat_cols),
'ID': ID,
'elasticity':0,
'mape':0,
'wmape':0,
'rmse': 0,
'mse': 0,
'Train':f"Model not trained error with exception: {str(e)},,, {data}"}
final_df = pd.DataFrame(columns=['Experiment','feat_cols', 'ID','elasticity','mape','wmape','rmse', 'mse','Train'])
new_df = pd.DataFrame([row_to_add], columns=final_df.columns)
filePath = f'{model_directory}/Elasticities_{ID}.csv'
new_df.to_csv(filePath, index = False)
new_df = new_df.astype(str)
result_dicts = new_df.to_dict(orient="records")
return pd.Series(str(result_dicts))
return broadcast_features, train_model_udf
My code runs fine for some IDs but when I scale it to 1600 IDs it gives me this error: The issue isnt the number itself 1600. Even when I run it on 20 IDs it gives the error. I shortlisted to one ID and when I repartitioned the data correctly it helped resolve issue for that ID but when I run it over all IDs again I still get the error. Data skew doesnt seem to be the issue anymore as I made sure that my data is evenly distributed. Length of lists do vary. For some IDs it is 41, for some 65. I tried filtering out only ones that are less than 41 of length but still the same issue.
pyspark.errors.exceptions.base.PySparkRuntimeError: [SCHEMA_MISMATCH_FOR_PANDAS_UDF] Result vector from pandas_udf was not the required length: expected 21, got 1.