1

I have Spark DataFrame DF1 with millions of rows. Each row have up to 100 columns.

col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

Also, I have another DataFrame DF2 where I have hundreds of rows with name and body columns. Name contains function name, body contains plain Python code, the boolean function which returns true or false. These functions inside their logic, can refer to any column in the single row from DF1.

func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

I need to join both of these DataFrames - DF1 with DF2 and apply each function from Df2 to each row in DF1. Each function must be able to accept the parameters from DF1, let's say dictionary array with key/value pairs which represent name/value of all columns of the corresponding row from DF1.

I know how to join DF1 and DF2, also, I understand that execution of Python functions will not work in destributed fashion. That's fine for now. This is a temporal solution. I just need to destribute all of the rows from DF1 over the workers nodes, and apply each Python function to each row of DF1 in different tasks of Apache Spark application. Evaluate eval() them and pass dictionary array with key/value pairs inside, as I mentioned above.

In general, each Python function is a tag, that I'd like to assign to row in DF1 in case certain function returned true. For example, this is resulting DataFrame DF3:

col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

Is it possible with PySpark and if so, could you please show an example how it can be achieved? Is UDF functions with Map from DF.columns as an input parameter is a right way to go or it can be done in some more simple fashion? Does Spark have any limitations on how much UDF functions(number) can be registered at one point of time?

1 Answer 1

2

You can achieve that using SQL expressions which can be evaluated using expr. However, you'll not be able to join the 2 DataFrames as SQL expressions can't be evaluated as column values (see this post), so you have to collect the functions into a list (as you have only hundreds of lines, it can fit in memory).

Here is a working example you can adapt for your requirement:

data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
    .show(truncate=False)

After adding the array column tags, filter function is used to remove null values that correspond to unsatisfied expressions. This function is only available starting from Spark 2.4+, you'll have to use and UDF for older versions.

Gives:

+----+----+----+----+----+----+---------------------+
|col1|col2|col3|col4|col5|col6|tags                 |
+----+----+----+----+----+----+---------------------+
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |
+----+----+----+----+----+----+---------------------+
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for your answer. Unfortunately I can't rewrite the mentioned Python functions to SQL expressions. I need to evaluate Python functions as Python code
@alexanoid OK I see. Unfortunately, if you want to use them as it is, you'll have to register UDF for each of them. You can pass all the columns of df1 as a list to all the UDFs and in each one get the columns that you want and apply your logic...But using this method you don't let spark optimize the calculation, so maybe you'll encounter some performance problems.
thanks! Old-fashioned UDFs is a temporal solution on the way to port legacy logic to Spark API. So, temporal performance issues is not a big issue bacause at some point fo time all of these functions will be substituted with new logic written with Apache API. One addition question - is it possible to invoke one UDF function from another?
As far as I know, you can't call a UDF from another UDF...What do you want to do exactly?
Hi @alexanoid, yes that's it! It's still interpreted as Python function when you call it. If you register the created UDFs, so you can just keep their names in the DF2 and then use the same logic as in the above code.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.