1

I have issues in solving the following problem. Basically I want to find on which date a particular item(item_code) was sold maximum and minimum volume.

Input DataFrame

item_code, sold_date, price, volume
101,      10-12-2017, 20,    500
101,      11-12-2017, 20,    400
201,      10-12-2017, 50,    200
201,      13-12-2017, 51,    300

Expected output

Find max and min volume with sold date.I want this solution without using any lambda operations.

enter image description here

df.groupBy("item_code")agg(min("volume"),max("volume"))

the above one will help me to get max and min of volume but I want them along with respective date.

I tried my level best with udf but I could not crack it. any help highly appreciated.

6
  • 1
    Please try to post text samples instead of images. Thanks. Commented Sep 4, 2017 at 7:08
  • Thanks.Updated my post @philantrovert Commented Sep 4, 2017 at 7:13
  • it din help me. I want , on which sold_date , volume max/min for given item_code. first() returns same date to all my results. Commented Sep 4, 2017 at 7:24
  • In the groupBy clause, after you grouping it will be a list of dates so you must choose with an aggregate function between them. What aggregate function to you want to use ?. Take for example another row for id: 101, what date should be chosen ? Commented Sep 4, 2017 at 7:26
  • 1
    What means "along with respective date" ? . What should be the output if you add the following rows: 101, 9-12-2017, 20, 500 and 101, 6-12-2017, 20, 500 Commented Sep 4, 2017 at 7:54

2 Answers 2

2

The final output you desire needs complex process. You can use the following process.

Given the input dataframe as

+---------+----------+-----+------+
|item_code|sold_date |price|volume|
+---------+----------+-----+------+
|101      |10-12-2017|20   |500   |
|101      |11-12-2017|20   |400   |
|201      |10-12-2017|50   |200   |
|201      |13-12-2017|51   |300   |
+---------+----------+-----+------+

You can use the following code

import org.apache.spark.sql.functions._
val tempDF = df.groupBy("item_code").agg(min("volume").as("min"),max("volume").as("max"))
tempDF.as("t2").join(df.as("t1"), col("t1.item_code") === col("t2.item_code") && col("t1.volume") === col("t2.min"), "left")
  .select($"t2.item_code", $"t2.max", concat_ws(",", $"t2.item_code", $"t2.min", $"t1.sold_date").as("min"))
  .join(df.as("t3"), col("t3.item_code") === col("t2.item_code") && col("t3.volume") === col("t2.max"), "left")
  .select($"min", concat_ws(",", $"t3.item_code", $"t2.max", $"t3.sold_date").as("max"))
  .show(false)

which is going to give you the dataframe you desire

+------------------+------------------+
|min               |max               |
+------------------+------------------+
|101,400,11-12-2017|101,500,10-12-2017|
|201,200,10-12-2017|201,300,13-12-2017|
+------------------+------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Seems to be quite expensive process..... takes lot of time even against 200mb of dataset.
joins are always expensive :)
0

Best approach here is to create a new index (ie column) to the Dataframe as a result of concatenation of the columns required for sorting. Implement a smart sorting on String based new index so that the results are still sorted numerically but you carry along the information of Date and actually whatever you need to retrieve as part of the query.

That way there is no need for JOINs.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.