1

I was looking at this excellent question so as to improve my Scala skills and the answer: Extract a column value and assign it to another column as an array in spark dataframe

I created my modified code as follows which works, but am left with a few questions:

import spark.implicits._   
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
    ("r1", 1, 1),
    ("r2", 6, 4),
    ("r3", 4, 1),
    ("r4", 1, 2)
  )).toDF("ID", "a", "b")

val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList    
def myfun: Int => List[Int] = _ => uniqueVal 
def myfun_udf = udf(myfun)

df.withColumn("X", myfun_udf( col("b") )).show

+---+---+---+---------+
| ID|  a|  b|        X|
+---+---+---+---------+
| r1|  1|  1|[1, 4, 2]|
| r2|  6|  4|[1, 4, 2]|
| r3|  4|  1|[1, 4, 2]|
| r4|  1|  2|[1, 4, 2]|
+---+---+---+---------+

It works, but:

  • I note b column is put in twice.
  • I can also put in column a on the second statement and I get the same result. E.g. and what point is that then?

df.withColumn("X", myfun_udf( col("a") )).show

  • If I put in col ID then it gets null.
  • So, I am wondering why the second col is input?
  • And how this could be made to work generically for all columns?

So, this was code that I looked at elsewhere, but I am missing something.

1 Answer 1

1

The code you've shown doesn't make much sense:

  • It is not scalable - in the worst case scenario size of each row is proportional to the size
  • As you've already figure out it doesn't need argument at all.
  • It doesn't need (and what's important it didn't need) udf at the time it was written (on 2016-12-23 Spark 1.6 and 2.0 where already released)
  • If you still wanted to use udf nullary variant would suffice

Overall it is just another convoluted and misleading answer that served OP at the point. I'd ignore (or vote accordingly) and move on.

So how could this be done:

  • If you have a local list and you really want to use udf. For single sequence use udf with nullary function:

    val uniqueBVal: Seq[Int] = ???
    val addUniqueBValCol = udf(() => uniqueBVal)
    
    df.withColumn("X", addUniqueBValCol())
    

    Generalize to:

    import scala.reflect.runtime.universe.TypeTag
    
    def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
    
    val x = addLiteral[Int](uniqueBVal)
    df.withColumn("X", x())
    
  • Better don't use udf:

    import org.apache.spark.sql.functions._
    
    df.withColumn("x", array(uniquBVal map lit: _*))
    
  • As of

    And how this could be made to work generically for all columns?

    as mentioned at the beginning the whole concept is hard to defend. Either window functions (completely not scalable)

    import org.apache.spark.sql.expressions.Window
    
    val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"${c}_unique")): _*)
    

    or cross join with aggregate (most of the time not scalable)

    val uniqueValues = df.select(
      df.columns map (c => collect_set(col(c)).alias(s"${c}_unique")):_*
    )
    df.crossJoin(uniqueValues)
    

    In general though - you'll have to rethink your approach, if this comes anywhere actual applications, unless you know for sure, that cardinalities of columns are small and have strict upper bounds.

Take away message is - don't trust random code that random people post in Internet. This one included.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.