I am trying to execute a udf function and it returns an error
from pyspark.sql.functions import udf
mytab = spark.read.jdbc(url=jdbcUrl, table="mytab",properties=connectionProperties)
def buscarx(Alm_r, Pro, Data_mat):
data_s = mytab.where(col("doc")==Data_mat).where(col("alm")!=Alm_r).limit(1)
if(data_s.count()==0):
return Pro
else:
temp = "0"
for item in data_s.collect():
temp = data_s.Alm
return temp
buscarx_udf = udf(buscarx)
df_temp = mytab.withColumn("alm_origen", buscarx_udf(mytab.Alm,mytab.Proveedor,mytab.Doc_mat))
Error:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 473, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
I ran some tests and found that the problem is caused by:
data_s = mytab.where(col("doc")==Data_mat).where(col("alm")!=Alm_r).limit(1)
Any suggestions to fix this? I need to perform a query within the function.