2

I am familiar with this approach - case in point an example from How to obtain the average of an array-type column in scala-spark over all row entries per entry?

val array_size = 3
val avgAgg = for (i <- 0 to array_size -1) yield avg($"value".getItem(i))
df.select(array(avgAgg: _*).alias("avg_value")).show(false)

However, the 3 is hard-coded in reality.

No matter how hard I try not to use an UDF, I cannot do this type of thing dynamically based on the size of an array column already present in the data frame. E.g:

...
val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol._2", i), element_at($"sortedCol._3", i) )
...
...
.withColumn("Z", array(z: _*)  )

I am looking as to how this can be done by applying to an existing array col which is variable in length. transform, expr? Not sure.

Full code as per request:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class abc(year: Int, month: Int, item: String, quantity: Int)

val df0 = Seq(abc(2019, 1, "TV", 8), 
              abc(2019, 7, "AC", 10),  
              abc(2018, 1, "TV", 2),  
              abc(2018, 2, "AC", 3), 
              abc(2019, 2, "CO", 10)).toDS()

val df1 = df0.toDF()
// Gen some data, can be done easier, but not the point.

val itemsList= collect_list(struct("month", "item", "quantity"))

// This nn works.
val nn = 3
val z =  for (i <- 1 to nn) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
// But want this.
//val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )


val df2 = df1.groupBy($"year")
   .agg(itemsList as "items")
   .withColumn("sortedCol", sort_array($"items", asc = true))  
   .withColumn("S", size($"sortedCol")) // cannot use this either
   .withColumn("Z", array(z: _*)  )
   .drop("items")
   .orderBy($"year".desc)
df2.show(false)
// Col Z is the output I want, but not the null value Array 

UPD

In apache spark SQL, how to remove the duplicate rows when using collect_list in window function? there I solve with a very simple UDF but I was looking for a way without UDF and in particular the dynamic setting of the to value in the for loop. The answer proves that certain constructs are not possible - which was the verification being sort.

6
  • @blackbishop Any ideas here? What am I missing Commented Apr 21, 2020 at 7:52
  • @LeoC interested in your insights Commented Apr 21, 2020 at 7:56
  • If all the arrays have the same size then you can first get it like this : val array_size = df.select(size($"sortedCol")).first.getInt(0). The rest of the code remains the same. Commented Apr 21, 2020 at 8:11
  • @blackbishop But that is exactly the point, they do not. I noted your solution and I applied this aspect in the past or took a high default and dropped null values generated. But I am wondering why the issue exists. Seems a pretty common 'enough' use case. Commented Apr 21, 2020 at 8:15
  • OK, I see now. Could you add a reproductible example to the question please? Commented Apr 21, 2020 at 8:33

1 Answer 1

2

If I correctly understand your need, you can simply use transform function like this:

val df2 = df1.groupBy($"year")
             .agg(itemsList as "items")
             .withColumn("sortedCol", sort_array($"items", asc = true))


val transform_expr = "transform(sortedCol, x -> array(x.item, x.quantity))"

df2.withColumn("Z", expr(transform_expr)).show(false)

//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|year|items                                 |sortedCol                             |Z                            |
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|2018|[[1, TV, 2], [2, AC, 3]]              |[[1, TV, 2], [2, AC, 3]]              |[[TV, 2], [AC, 3]]           |
//|2019|[[1, TV, 8], [7, AC, 10], [2, CO, 10]]|[[1, TV, 8], [2, CO, 10], [7, AC, 10]]|[[TV, 8], [CO, 10], [AC, 10]]|
//+----+--------------------------------------+--------------------------------------+-----------------------------+
Sign up to request clarification or add additional context in comments.

3 Comments

OK, I did that with a UDF as opposed to transform which amounts to the same thing essentially. But the crus of my question was can we ever have something like val z = for (i <- 1 to size($"sortedCol") ... or is that just not allowed?
Answer is allocated, but I am looking for the above point in a particular manner. Maybe it is a red herring because of the alternatives.
@thebluephantom When you do z = for (i <- 1 to size($"sortedCol")), you're trying to use a Scala for-comprehension but you give a Spark Column in the enumerator. It's just not possible to do that... Because the Column is evaluated under a Spark DF and not with Scala.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.