0

I have the below Spark dataframe/dataset. Column_2 has dates in string format.

Column_1 Column_2
A        2020-08-05
B        2020-08-01
B        2020-09-20
B        2020-12-31
C        2020-05-10

My expected output dataframe should have only one row per value in Column_1 and if there are multiple dates in column_2 for same key in column_1, then the next available date should be picked. if only one row is there, then the date should be retained

Expected Output:

Column_1 Column_2
A        2020-08-05
B        2020-09-20
C        2020-05-10

Is there a way to achieve this Java spark? possibly without using UDF?

6
  • "the next available date should be picked" what is the logic for this? Why do you choose the second not last? Commented Aug 4, 2020 at 7:27
  • The next immediate date which is greater than current date. for B in column_1, today is 2020-08-04 and the next date after current date is 2020-09-20. Commented Aug 4, 2020 at 7:30
  • The logic is it should be greater than the current date and when there are more than 2 dates which is greater than current date, we have to take the next immediate one available Commented Aug 4, 2020 at 7:31
  • Ah, Now I understood. Commented Aug 4, 2020 at 7:31
  • Was trying to find if we can do by doing a aggregate and collect list? Commented Aug 4, 2020 at 7:32

3 Answers 3

1

Perhaps this is helpful-

   dataset.show(false);
        dataset.printSchema();
        /**
         *+--------+----------+
         * |Column_1|Column_2  |
         * +--------+----------+
         * |A       |2020-08-05|
         * |D       |2020-08-01|
         * |D       |2020-08-02|
         * |B       |2020-08-01|
         * |B       |2020-09-20|
         * |B       |2020-12-31|
         * |C       |2020-05-10|
         * +--------+----------+
         *
         * root
         *  |-- Column_1: string (nullable = true)
         *  |-- Column_2: string (nullable = true)
         */

        dataset.withColumn("Column_2", to_date(col("Column_2")))
                .withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
                .withColumn("positive", when(col("count").gt(1),
                        when(col("Column_2").gt(current_date()), col("Column_2"))
                ).otherwise(col("Column_2")))
                .withColumn("negative", when(col("count").gt(1),
                        when(col("Column_2").lt(current_date()), col("Column_2"))
                ).otherwise(col("Column_2")))
                .groupBy("Column_1")
                .agg(min("positive").as("positive"), max("negative").as("negative"))
                .selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
                .show(false);
        /**
         * +--------+----------+
         * |Column_1|Column_2  |
         * +--------+----------+
         * |A       |2020-08-05|
         * |D       |2020-08-02|
         * |B       |2020-09-20|
         * |C       |2020-05-10|
         * +--------+----------+
         */
Sign up to request clarification or add additional context in comments.

3 Comments

Will try this out.. Thanks.. Can you explain the expr("struct(diff, Column_2)")).. I can understand that we are creating a count column and checking the date difference when the count is greater than 1 here. But didnt understand the rest of part
also how would this work when all dates given for a key column is less than current date, in that case would it be possible to pick up the closest date that is less than current date?
@Padfoot13288, Updated an answer, now it will pick the nearest date greater than the current date else nearest date less than current date if no positive. Single date is being as is. I think, the new solution is quite self explainatory
0

Create the DataFrame First

df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],[ "col1","col2"])
_w = W.partitionBy("col1").orderBy("col1")
df_b = df_b.withColumn("rn", F.row_number().over(_w))

The logic here to pick the second element of each group if any group has a more than one row. In order to do that we can first assign a row number to every group and we will pick first element of every group where row count is 1 and , first 2 row of every group where row count is more than 1 in every group.

case = F.expr("""
            CASE WHEN rn =1 THEN 1
                    WHEN rn =2 THEN 1
              END""")

df_b = df_b.withColumn('case_condition', case)
df_b = df_b.filter(F.col("case_condition") == F.lit("1")) 

Intermediate Output

+----+----------+---+--------------+
|col1|      col2| rn|case_condition|
+----+----------+---+--------------+
|   B|2020-08-01|  1|             1|
|   B|2020-09-20|  2|             1|
|   C|2020-05-10|  1|             1|
|   A|2020-08-05|  1|             1|
+----+----------+---+--------------+

Now, finally just take the last element of every group --

df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
df.show()
+----+----------+
|col1|      col2|
+----+----------+
|   A|2020-08-05|
|   B|2020-09-20|
|   C|2020-05-10|
+----+----------+

1 Comment

This works when the second row date is greater than the current date. Assume a scenario where the second row date is less than the current date and on third row i have the date which is greater than current date
0

SCALA: This will give the result.

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("Column_1")

df.withColumn("count", count("Column_2").over(w))
  .withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
  .filter("count = 1 or (count != 1 and later = True)")
  .groupBy("Column_1")
  .agg(min("Column_2").alias("Column_2"))
  .orderBy("Column_1")
  .show(false)

+--------+----------+
|Column_1|Column_2  |
+--------+----------+
|A       |2020-08-05|
|B       |2020-09-20|
|C       |2020-05-10|
+--------+----------+

It has an exception that if the count of the dates for the Column_1 is larger than 1 and there is no date after the current_timestamp, it will not give the result for the value of Column_1.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.