0

I have a dataframe and I want to use values from rows to execute a query (on delta lake) and get results in a new column. However in Synapse notebook I always get an error:

It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063

I have below function:

def execute_sql(database, table):
  query = f"SELECT COUNT(*) as count FROM {database}.{table}"
  result = spark.sql(query).collect()[0][0]
  return result

and I am applying it as such (there are many more columns, I am just using two, but want to keep others):

execute_sql=udf(execute_sql,StringType())
new_df=input_df.withColumn('TotalCount',execute_sql(col("Database"), col("Table")))
display(new_df)

I am trying to avoid complicating and iterate through dataframe with rdd.collect

1
  • You can not use reference spark object in udf. And to execute the sql queries either you need to do collect with for loop or multiprocessing on driver node itself. Commented Feb 16, 2024 at 8:07

1 Answer 1

0

You cannot reference the spark object inside a UDF, and you don't want to use collect. The only way is to connect to the server and query. Install the package below and restart the cluster:

databricks-sql-connector

enter image description here

Next, obtain the HTTP path and hostname in compute configurations Advanced options.

enter image description here

Next, get the access token in User settings developer tab.

enter image description here

Modify your function as follows:

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks import sql
import os

DATABRICKS_SERVER_HOSTNAME="https://adb...........azuredatabricks.net/"
DATABRICKS_HTTP_PATH="sql/protocolv1/o/34567890/7890789789"
DATABRICKS_TOKEN="dapi.............."

def get_res(db, tb):
    with sql.connect(server_hostname = DATABRICKS_SERVER_HOSTNAME, http_path = DATABRICKS_HTTP_PATH, access_token = DATABRICKS_TOKEN) as connection:

        with connection.cursor() as cursor:
            cursor.execute(f"SELECT count(*) FROM {db}.{tb}")
            result = cursor.fetchall()

        # print(result)
        cursor.close()
        return result[0][0]
    
execute_sql = udf(get_res, IntegerType())

Next, execute your dataframe statement.

df.withColumn("ts", execute_sql(col("database"), col("tableName"))).display()

Output:

database tableName ts
default control_table 6
default maxtimestamp 1
default student 6
default table 1
default table_name 8
default week_of_5 0

enter image description here

Refer below documentation on how to connect to databricks using python. Connect Python and pyodbc to Databricks

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.