0

I have two DataFrames:

df1= 
+---+----------+
| id|filter    |
+---+----------+
|  1|       YES|
|  2|        NO|
|  3|        NO|
+---+----------+

df2 = 
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

What I want to do is to create a new column in df1, filtering a field name in df2 based in a row value of df1. My output would be like this:

df3 =
+---+----------+----------------+
| id|filter    | value          |
+---+----------+----------------+
|  1|       YES|[XXXXXX, YYYYYY]|
|  2|        NO|        []      |
|  3|        NO|        []      |
+---+----------+----------------+

I know how to it with Pandas, but I don't know how to do it with PySpark.

I've tried the following, but it doesn't seem to work:

df3 = df1.withColumn('value', f.when(df1['filter'] == 'YES', df2.select(f.col('id')).collect()).otherwise(f.lit([]))

Thank you very much

1 Answer 1

3

Load the test data provided

  val data1 =
      """
        | id|filter
        |  1|       YES
        |  2|        NO
        |  3|        NO
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.printSchema()
    df1.show(false)
    /**
      * root
      * |-- id: integer (nullable = true)
      * |-- filter: string (nullable = true)
      *
      * +---+------+
      * |id |filter|
      * +---+------+
      * |1  |YES   |
      * |2  |NO    |
      * |3  |NO    |
      * +---+------+
      */

    val data2 =
      """
        |                   1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15
        |XXXXXX              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
        |YYYYYY              |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.printSchema()
    df2.show(false)
    /**
      * root
      * |-- 1: string (nullable = true)
      * |-- 2: double (nullable = true)
      * |-- 3: double (nullable = true)
      * |-- 4: double (nullable = true)
      * |-- 5: double (nullable = true)
      * |-- 6: double (nullable = true)
      * |-- 7: double (nullable = true)
      * |-- 8: double (nullable = true)
      * |-- 9: double (nullable = true)
      * |-- 10: double (nullable = true)
      * |-- 11: double (nullable = true)
      * |-- 12: double (nullable = true)
      * |-- 13: double (nullable = true)
      * |-- 14: double (nullable = true)
      * |-- 15: double (nullable = true)
      *
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      * |1     |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      * |XXXXXX|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
      * |YYYYYY|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
      * +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
      */

melt/unpivot the dataframe 2 and then join


    val stringCol = df2.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")

    val processedDF = df2.selectExpr(s"stack(${df2.columns.length}, $stringCol) as (id, value)")
    processedDF.show(false)

    /**
      * +---+------+
      * |id |value |
      * +---+------+
      * |1  |XXXXXX|
      * |2  |NaN   |
      * |3  |NaN   |
      * |4  |NaN   |
      * |5  |NaN   |
      * |6  |NaN   |
      * |7  |NaN   |
      * |8  |NaN   |
      * |9  |NaN   |
      * |10 |NaN   |
      * |11 |NaN   |
      * |12 |NaN   |
      * |13 |NaN   |
      * |14 |NaN   |
      * |15 |NaN   |
      * |1  |YYYYYY|
      * |2  |NaN   |
      * |3  |NaN   |
      * |4  |NaN   |
      * |5  |NaN   |
      * +---+------+
      * only showing top 20 rows
      */
    df1.join(processedDF, "id")
      .groupBy("id", "filter")
      .agg(collect_list("value").as("value"))
      .selectExpr("id", "filter", "FILTER(value, x -> x != 'NaN') as value")
      .show(false)

    /**
      * +---+------+----------------+
      * |id |filter|value           |
      * +---+------+----------------+
      * |2  |NO    |[]              |
      * |1  |YES   |[XXXXXX, YYYYYY]|
      * |3  |NO    |[]              |
      * +---+------+----------------+
      */
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you very much! Do you know how to apply it in Python? (the melt/unpivot part)
take help from this ans to convert it to python - stackoverflow.com/a/62574110/4758823

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.