I have two dataframes.
The first one looks like this (number of channel will vary depending on the Type) This dataframe stores the type of device and the value for each channel.
+-----+----------+----------+
| Type|X_ChannelA|Y_ChannelB|
+-----+----------+----------+
|TypeA| 11| 20|
+-----+----------+----------+
The second dataframe is imported from an csv and is generated by me.
Right now I have this format (Can be changed to anything needed)
+-----+--------------+--------------+--------------+--------------+
| Type|X_ChannelA_min|X_ChannelA_max|Y_ChannelB_min|Y_ChannelB_max|
+-----+--------------+--------------+--------------+--------------+
|TypeA| 8| 12| 9| 13|
+-----+--------------+--------------+--------------+--------------+
Now I want to compare the actual Channel values to the min and max ones and create a new column with _status which contains a one if the value is in between min and max and a zero if it exceeds either min or max.
Wanted result for this examples
+-----+----------+----------+-----------------+-----------------+
| Type|X_ChannelA|Y_ChannelB|X_ChannelA_status|Y_ChannelB_status|
+-----+----------+----------+-----------------+-----------------+
|TypeA| 11| 20| 1| 0|
+-----+----------+----------+-----------------+-----------------+
Code is here:
val df_orig = spark.sparkContext.parallelize(Seq(
("TypeA", 11, 20)
)).toDF("Type", "X_ChannelA", "Y_ChannelB")
val df_def = spark.sparkContext.parallelize(Seq(
("TypeA", 8, 12, 9, 13)
)).toDF("Type", "X_ChannelA_min", "X_ChannelA_max", "Y_ChannelB_min", "Y_ChannelB_max")
I have tried a few different things already without success.
Like creating columns by getting a string array of all channels and then creating the columns with
val pattern = """[XYZP]_Channel.*"""
val fieldNames = df_orig.schema.fieldNames.filter(_.matches(pattern))
fieldNames.foreach(x => df.withColumn(s"${x}_status", <compare logic comes here>)
My next idea was to join df_orig with df_def and then add channel_value, channel_min, channel_max with concat_ws into a single column, compare the values with the compare logic and write the result into the column
+-----+----------+----------+----------------+----------------+-------------+...
| Type|X_ChannelA|Y_ChannelB|X_ChannelA_array|Y_ChannelB_array|X_ChannelA_st|
+-----+----------+----------+----------------+----------------+-------------+...
|TypeA| 11| 20| [11, 8, 12]| [20, 9, 13]| 1|
+-----+----------+----------+----------------+----------------+-------------+...
If there is a simpler solution it would be nice to get a push into the right direction.
Edit: If my description was unclear basically what I am looking for is: what i am looking for is
foreach channel in channellist (
ds.withColumn("<channel>_status", when($"<channel>" < $"<channel>_min" || $"<channel>" > $"<channel>_max"), 1).otherwise 0)
)
Edit: I found a solution which is:
val df_joined = df_orig.join(df_def, Seq("Type"))
val pattern = """[XYZP]_Channel.*"""
val fieldNames = df_orig.schema.fieldNames.filter(_.matches(pattern))
val df_newnew = df_joined.select(col("*") +: (fieldNames.map(c => when(col(c) <= col(c+"_min") || col(c) >= col(c+"_max"), 1).otherwise(0).as(c+"_status))): _*)