0

I am getting this error after running the function below.

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

The objective of this piece of code is to create a flag for every row based on the date differences. Multiple rows per user are supplied to the function to create the values of the flag.

Does anyone know what this means and how I can fix it?

def selection(df):
  _i = 0
  l = []
  x = df.select("Date").orderBy("Date").rdd.flatMap(lambda x: x).collect()
  rdd = spark.sparkContext.parallelize(x)
  l.append((x[_i],1))

  for _j in range(_i+1,rdd.count()):

    if((d2.year - d1.year) * 12 + (d2.month  - d1.month) >= 2 ):
      l.append((x[_j],1))
    else:
      l.append((x[_j],0))
      continue
    _i=_j

  columns = ['Date','flag']
  new_df = spark.createDataFrame(l, columns)
  df_new = df.join(new_df,['Date'],"inner")

  return df_new

ToKeep = udf(lambda z: selection(z))
sample_new = sample.withColumn("toKeep",ToKeep(sample).over(Window.partitionBy(col("id")).orderBy(col("id"),col("Date"))))

1 Answer 1

0
@udf (stringType())
def function(x):
    #could not have spark operation(such as spark.sql)
    #if it contains spark operation ,must create new sparkContext
    pass
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.