1

I am new to Pyspark and I have created a pandas udf. The purpose of this udf is to accept a series and apply an ML model on it. I have data that has these columns: ID, models_name, prices, units_sold, date..... ID is basically a product id and is a string. models_name is LR, XGboost, SARIMAX etc. prices and units_sold are timeseries. In order to send these as series, I have collected these as a list:

 grouped_spark_df = spark_df.groupBy("ID").agg(
                    F.collect_list("prices").alias("prices"),
                    F.collect_list("units_sold").alias("units_sold"),
                    F.collect_list("date").alias("date"))

output:

ID prices units_sold date
P_1 [10.5, 11.0, 9.8] [2, 3, 1] ["2024-03-01", "2024-03-02", "2024-03-03"]
P_2 [15.0, 16.2] [5, 7] ["2024-03-01", "2024-03-02"]
P_3 [7.5] [10] ["2024-03-01"]

Then I add models_name based on a list of models

exploded_df = grouped_spark_df.withColumn("models_name", explode(array([lit(m) for m in models_list])))
ID prices units_sold date models_name
P_1 [10.5, 11.0, 9.8] [2, 3, 1] ["2024-03-01", "2024-03-02", "2024-03-03"] A
P_1 [10.5, 11.0, 9.8] [2, 3, 1] ["2024-03-01", "2024-03-02", "2024-03-03"] B
P_1 [10.5, 11.0, 9.8] [2, 3, 1] ["2024-03-01", "2024-03-02", "2024-03-03"] C
P_2 [15.0, 16.2] [5, 7] ["2024-03-01", "2024-03-02"] A
P_2 [15.0, 16.2] [5, 7] ["2024-03-01", "2024-03-02"] B
P_2 [15.0, 16.2] [5, 7] ["2024-03-01", "2024-03-02"] C
P_3 [7.5] [10] ["2024-03-01"] A
P_3 [7.5] [10] ["2024-03-01"] B
P_3 [7.5] [10] ["2024-03-01"] C

Then I call my pandas udf like this:

partitioned_df = exploded_df.repartition(50, "Bucket","ID","models_name").coalesce(10)
results_spark = partitioned_df.select(
        col("ID"),
        col("models_name"),
        train_model_udf(
            col("ID"), col("prices"), col("units_sold"), col("date")
         
        ).alias("model_metrics")
    )

Here is my pandas udf:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import MapType, StringType, FloatType,ArrayType
import pandas as pd
from sklearn.model_selection import train_test_split

 
def create_broadcast_and_udf(feat_cols,model_results_data,feature_set_name,target,dt_string):
    #broadcast variables....
    @pandas_udf(StringType())
   
    def train_model_udf(
            ID, prices, units_sold, date, models_name):

        #    # Convert to Pandas DataFrame
            data = pd.DataFrame({
                "ID": ID, "prices": prices, "units_sold": units_sold, "date": date, 
                               "models_name": models_name
            })
            for col in data.columns:
                data[col] = data[col].apply(lambda x: x.tolist() if isinstance(x, np.ndarray) else x)

            explode_cols = [col for col in data.columns if col != "ID" and isinstance(data[col][0], list)]

        # Explode all selected columns
            data = data.explode(explode_cols, ignore_index=True)
     
            try:
                
                new_data = []
                ID = str(data["ID"].iloc[0])
                # return  pd.Series([models_name] * len(ID))
                model_directory = f'{model_results_data}/{dt_string}/{models_name}/{feature_set_name}_{target}'
                if not os.path.exists(model_directory): # checking if model directory exists or not
                                os.makedirs(model_directory)
                
   
                train, test = train_test_split(data, test_size=0.2, shuffle=False) 
                combined_data = pd.concat([train, test]).sort_values(by=['ID', 'WEEK'])

                data.to_csv(f"{model_directory}/{ID}_ID_CombinedData.csv", index=False)
                model_write_path = f"{model_directory}/{ID}.pkl" 
                                        
                # training models
                if models_name.lower() =='sarimax':
                    elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae  = train_sarimax(train,test,model_write_path,price_feature,feat_cols,target,elasticity_method)
                elif models_name.lower() =='lr':
                    elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae  = train_lr(train,test,model_write_path,price_feature,broadcast_features.value,target)
                elif models_name.lower() =='xgboost':
                    elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae   = train_xgb(train,test,model_write_path,price_feature,feat_cols,target)
                elif models_name.lower() =='gam':
                    spline_features = feat_cols.copy()
                    elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae  = train_gam(data,test,ID,spline_features,price_feature,target,model_write_path)
                    print(price_feature)
                    print(target)
                temp_df = data.copy()
                predicted_volumes =   np.concatenate((y_train_pred, forecast)) 
                # Add new columns to the copy
                temp_df['Experiment_Model'] = models_name
                temp_df['feat_cols'] =  "-".join(feat_cols)
                temp_df['MAPE'] = mape_hj
                temp_df['WMAPE'] = wmape_hj
                temp_df['final_elasticity'] = elasticity
                temp_df['predicted_volume'] = predicted_volumes
                temp_df = temp_df.reset_index()
                # Append the rows of the modified DataFrame to the list
                new_data.extend(temp_df.to_dict('records'))

                # calculation percent price for calculating simulated volume 
                test['pct_change_pr'] = test['updated_WAP'].pct_change()

                # Initializing list to store simulated volumes
                simulated_volumes = []

                # Start with the first available volume in the test set as the initial simulated volume
                initial_volume = test[target].iloc[0]
                simulated_volumes.append(initial_volume)

                # Iterate over the test set, starting from the second row
                for i in range(1, len(test)):
                    previous_simulated_volume = simulated_volumes[-1]  # Get the last simulated volume
                    pct_change_pr = test['pct_change_pr'].iloc[i]  # Current row's price change

                    # Calculate the new simulated volume based on the previous simulated volume
                    simulated_volume = previous_simulated_volume + previous_simulated_volume * (elasticity * pct_change_pr)
                    
                    # Append the calculated volume to the list
                    simulated_volumes.append(simulated_volume)

                # Add the simulated volumes to the test set as a new column
                test['simulated_volume'] = simulated_volumes
                test['elasticity'] = elasticity
                                    
                # saving file to check if simulated volume is correctly calculated
                test[['ID','updated_WAP','pct_change_pr',target,'elasticity','simulated_volume']].to_csv(f"{model_directory}/{ID}_simulated.csv")

                # for plotting the graphs 
                temp_test_df = pd.DataFrame()

                temp_test_df['date']= test[target].index

                temp_test_df['actual']= test[target].values
                temp_test_df['predicted']= forecast
                temp_test_df['ppg_name']= ID
                temp_test_df['features used']= [feat_cols]* len(temp_test_df)
                temp_test_df['rmse']= rmse
                temp_test_df['mse']= mse
                # temp_test_df['mae']= mae
                temp_test_df['mape']= mape_hj
                temp_test_df['wmape']= wmape_hj
                temp_test_df['elasticity']= elasticity
                temp_test_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(test[target], forecast)]
                test = test.reset_index(drop=True)

                temp_test_df['simulated_volume'] = test['simulated_volume']

                train_temp_df = pd.DataFrame()
                train_temp_df['date'] = train.index
                train_temp_df['actual'] = train[target].values
                train_temp_df['predicted'] = y_train_pred
                train_temp_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(train[target].values, y_train_pred)]
                temp_test_df['set'] = 'Test'
                train_temp_df['set'] = 'Train'

                combined_df = pd.concat([train_temp_df, temp_test_df])

                combined_df = combined_df.reset_index()
                data = data.reset_index()
                combined_df[price_feature] = data[price_feature]
                combined_df.to_csv(f'{model_directory}/{ID}_ttPred.csv', index=False)
                row_to_add = {
                                            'Experiment':models_name,
                                            'feat_cols':('-').join(feat_cols),
                                            'ID': ID,
                                            'elasticity':elasticity,
                                            'mape':mape_hj,
                                            'wmape':wmape_hj,
                                            'rmse': rmse,
                                            'mse': mse,
                                            'Train':'MODEL TRAINED ON: '+str(len(data))}
            except Exception as e:
                print(f"Excpetion:-- {str(e)} --:in ID {data}")

                # print(data[data['ID']==ppg_unique[combined_id]])
                row_to_add = {
                        'Experiment':models_name,
                        'feat_cols':('-').join(feat_cols),
                        'ID': ID,
                        'elasticity':0,
                        'mape':0,
                        'wmape':0,
                        'rmse': 0,
                        'mse': 0,
                        'Train':f"Model not trained error with exception: {str(e)},,, {data}"}

            final_df = pd.DataFrame(columns=['Experiment','feat_cols', 'ID','elasticity','mape','wmape','rmse', 'mse','Train'])
            new_df = pd.DataFrame([row_to_add], columns=final_df.columns)
            filePath = f'{model_directory}/Elasticities_{ID}.csv'
            new_df.to_csv(filePath, index = False)
            new_df = new_df.astype(str)
            result_dicts = new_df.to_dict(orient="records")

            return pd.Series(str(result_dicts))
            
    return broadcast_features, train_model_udf

My code runs fine for some IDs but when I scale it to 1600 IDs it gives me this error: The issue isnt the number itself 1600. Even when I run it on 20 IDs it gives the error. I shortlisted to one ID and when I repartitioned the data correctly it helped resolve issue for that ID but when I run it over all IDs again I still get the error. Data skew doesnt seem to be the issue anymore as I made sure that my data is evenly distributed. Length of lists do vary. For some IDs it is 41, for some 65. I tried filtering out only ones that are less than 41 of length but still the same issue.

pyspark.errors.exceptions.base.PySparkRuntimeError: [SCHEMA_MISMATCH_FOR_PANDAS_UDF] Result vector from pandas_udf was not the required length: expected 21, got 1.
1
  • 2
    Can you provide a minimal reproducible example to help debug this? Most of the logic in your UDF is unlikely to be related to this error, so please try to create a tiny version of it where you still get this error. Commented Mar 29 at 19:46

1 Answer 1

1

It is possible that the error occurs due to the fact that when using a scalar Pandas UDF, Spark expects the function to return exactly the same number of rows as it receives. In other words, the transformation must be 1:1 for each row. If the UDF returns more rows than it received, Spark cannot map the input to the output, which leads to an error.

Possible Solutions:

  1. Ensure a 1:1 Mapping:

    • Review your UDF logic to guarantee that for each group or input row, exactly one row is returned in the output.
    • If you need to add additional information, consider doing so by creating new columns, thereby preserving the number of rows.
  2. Use a Different Type of UDF if Required:

    • If you actually need the function to generate more rows than the input (for example, to expand the dataset or perform a mapping that results in a variable number of rows), you should consider a different approach. For instance, using a GROUPED_MAP Pandas UDF allows you to transform each group into an output DataFrame with a different row count than the input.
    • Alternatively, you could break down the process into multiple steps: first applying a Pandas UDF that preserves the row count, and then performing an additional transformation to add the extra rows.
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.