2

I need to implement a dynamic "bring-your-own-code" function for registering UDFs that are created from outside my own code. This is containerized and the entrypoint is a standard python interpreter (not pypsark). Based upon config settings at startup, the spark container would initialize itself with something like the below. We don't know ahead of time the function definition, but we can pre-install dependencies, if needed, on the container.

def register_udf_module(udf_name, zip_or_py_path, file_name, function_name, return_type="int"):
    # Psueduocode:
    global sc, spark

    sc.addPyFile(zip_or_py_path)
    module_ref = some_inspect_function_1(zip_or_py_path)
    file_ref = module_ref[file_name]
    function_ref = module_ref[function_name]
    spark.udf.register(udf_name, function_ref, return_type)

I can't seem to find any reference for how to accomplish this. And specifically, the use case is that after initializing the spark cluster by running this, users would then need this UDF available for SQL functions (via Thrift JDBC connection). There is no interface I know of between the JDBC/SQL connection and the ability to register the UDF, so it has to be up and running for SQL queries, and I can't later expect the user will call 'spark.udf.register' on their side.

1 Answer 1

2

The solution I've found is to take an environment variable at start of launch which points to a directory of UDFs, then load and inspect each .py file in that path, loading any functions found as UDF functions in spark.

Sample working code below:

def init_spark():
    global sc

    # Init spark (nothing special here)
    conf = SparkConf()
    spark = (
        SparkSession.builder.config(conf=conf)
        .master("local")
        .appName("Python Spark")
        .enableHiveSupport()
        .getOrCreate()
    )

    if "SPARK_UDFS_PATH" in os.environ:
        add_udf_module(os.environ.get("SPARK_UDFS_PATH"))


def add_udf_module(module_dir=None):
    global sc

    from inspect import getmembers, isfunction

    module_dir = os.path.realpath(module_dir)
    if not os.path.isdir(module_dir):
        raise ValueError(f"Folder '{module_dir}' does not exist.")
    for file in io.list_files(module_dir):
        if file.endswith(".py"):
            module = path_import(file)
            for member in getmembers(module):
                if isfunction(member[1]):
                    logging.info(f"Found module function: {member}")
                    func_name, func = member[0], member[1]
                    if func_name[:1] != "_" and func_name != "udf":
                        logging.info(f"Registering UDF '{func_name}':\n{func.__dict__}")
                        spark.udf.register(func_name, func)

def path_import(absolute_file_path):
    module_name = os.path.basename(absolute_file_path)
    module_name = ".".join(module_name.split(".")[:-1]) # removes '.py'
    spec = importlib.util.spec_from_file_location(module_name, absolute_file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module

Related:

Sample UDF python file:

from pyspark.sql.functions import udf
from pyspark.sql import types


@udf(types.Long())
def times_five(value):
    return value * 5

@udf("long")
def times_six(value):
    return value * 6

Sample SQL:

SELECT times_six(7) AS the_answer
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.