I need to write some custum code using multiple columns within a group of my data.
My custom code is to set a flag if a value is over a threshold, but suppress the flag if it is within a certain time of a previous flag.
Here is some sample code:
df = spark.createDataFrame(
[
("a", 1, 0),
("a", 2, 1),
("a", 3, 1),
("a", 4, 1),
("a", 5, 1),
("a", 6, 0),
("a", 7, 1),
("a", 8, 1),
("b", 1, 0),
("b", 2, 1)
],
["group_col","order_col", "flag_col"]
)
df.show()
+---------+---------+--------+
|group_col|order_col|flag_col|
+---------+---------+--------+
| a| 1| 0|
| a| 2| 1|
| a| 3| 1|
| a| 4| 1|
| a| 5| 1|
| a| 6| 0|
| a| 7| 1|
| a| 8| 1|
| b| 1| 0|
| b| 2| 1|
+---------+---------+--------+
from pyspark.sql.functions import udf, col, asc
from pyspark.sql.window import Window
def _suppress(dates=None, alert_flags=None, window=2):
sup_alert_flag = alert_flag
last_alert_date = None
for i, alert_flag in enumerate(alert_flag):
current_date = dates[i]
if alert_flag == 1:
if not last_alert_date:
sup_alert_flag[i] = 1
last_alert_date = current_date
elif (current_date - last_alert_date) > window:
sup_alert_flag[i] = 1
last_alert_date = current_date
else:
sup_alert_flag[i] = 0
else:
alert_flag = 0
return sup_alert_flag
suppress_udf = udf(_suppress, DoubleType())
df_out = df.withColumn("supressed_flag_col", suppress_udf(dates=col("order_col"), alert_flags=col("flag_col"), window=4).Window.partitionBy(col("group_col")).orderBy(asc("order_col")))
df_out.show()
The above fails, but my expected output is the following:
+---------+---------+--------+------------------+
|group_col|order_col|flag_col|supressed_flag_col|
+---------+---------+--------+------------------+
| a| 1| 0| 0|
| a| 2| 1| 1|
| a| 3| 1| 0|
| a| 4| 1| 0|
| a| 5| 1| 0|
| a| 6| 0| 0|
| a| 7| 1| 1|
| a| 8| 1| 0|
| b| 1| 0| 0|
| b| 2| 1| 1|
+---------+---------+--------+------------------+