2

Suppose I have a Spark Dataframe generated as:

val df = Seq(
    (Array(1, 2, 3), Array("a", "b", "c")),
    (Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")

It's possible to extract elements at the first index in "Col1" with something like:

val extractFirstInt = udf { (x: Seq[Int], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirstInt($"Col1", lit(1)))

And similarly for the second column "Col2" with e.g.

val extractFirstString = udf { (x: Seq[String], i: Int) => x(i) }
df.withColumn("Col2_1", extractFirstString($"Col2", lit(1)))

But the code duplication is a little ugly -- I need a separate UDF for each underlying element type.

Is there a way to write a generic UDF, that automatically infers the type of the underlying Array in the column of the Spark Dataset? E.g. I'd like to be able to write something like (pseudocode; with generic T)

val extractFirst = udf { (x: Seq[T], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirst($"Col1", lit(1)))

Where somehow the type T would just be automagically inferred by Spark / the Scala compiler (perhaps using reflection if appropriate).

Bonus points if you're aware of a solution that works both with array-columns and Spark's own DenseVector / SparseVector types. The main thing I'd like to avoid (if at all possible) is the requirement of defining a separate UDF for each underlying array-element type I want to handle.

3
  • Why udf at all? For vectors you don't have any reasonable option but array support getItem and apply... Commented May 7, 2017 at 3:24
  • Good point! Perhaps the best solution here would be to detect if the column contains an ArrayType object and, if so, use .getItem() and a UDF (for vectors) otherwise. Commented May 7, 2017 at 6:31
  • Yeah, and it is pretty much for free. Since Vectors are not native types there are trickier, but on the bright side you have only one type to worry about. Commented May 7, 2017 at 9:34

2 Answers 2

2

Perhaps frameless could be a solution?

Since manipulating datasets requires an Encoder for a given type, you have to define the type upfront so Spark SQL can create one for you. I think a Scala macro to generate all sorts of Encoder-supported types would make sense here.

As of now, I'd define a generic method and a UDF per type (which is against your wish to find a way to have "a generic UDF, that automatically infers the type of the underlying Array in the column of the Spark Dataset").

def myExtract[T](x: Seq[T], i: Int) = x(i)
// define UDF for extracting strings
val extractString = udf(myExtract[String] _)

Use as follows:

val df = Seq(
    (Array(1, 2, 3), Array("a", "b", "c")),
    (Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")

scala> df.withColumn("Col1_1", extractString($"Col2", lit(1))).show
+---------+---------+------+
|     Col1|     Col2|Col1_1|
+---------+---------+------+
|[1, 2, 3]|[a, b, c]|     b|
|[1, 2, 3]|[a, b, c]|     b|
+---------+---------+------+

You could explore Dataset (not DataFrame, i.e. Dataset[Row]) instead. That would give you all the type machinery (and perhaps you could avoid any macro development).

Sign up to request clarification or add additional context in comments.

1 Comment

is it possible for an unknown number of elements in the array? For example, if in col2 there is an array whose size is not known.
1

As per advice from @zero323, I centered on an implementation of the following form:

def extractFirst(df: DataFrame, column: String, into: String) = {

  // extract column of interest
  val col = df.apply(column)

  // figure out the type name for this column
  val schema = df.schema
  val typeName = schema.apply(schema.fieldIndex(column)).dataType.typeName

  // delegate based on column type
  typeName match {

    case "array"  => df.withColumn(into, col.getItem(0))
    case "vector" => {
      // construct a udf to extract first element
      // (could almost certainly do better here,
      // but this demonstrates the strategy regardless)
      val extractor = udf {
        (x: Any) => {
          val el = x.getClass.getDeclaredMethod("toArray").invoke(x)
          val array = el.asInstanceOf[Array[Double]]
          array(0)
        }
      }

      df.withColumn(into, extractor(col))
    }

    case _ => throw new IllegalArgumentException("unexpected type '" + typeName + "'")
  }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.