27

I want to parse the date columns in a DataFrame, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").

I wrote the following code:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

However it doesn't work. It seems that I can only pass Columns to UDFs. And I wonder if it will be very slow if I convert the DataFrame to RDD and apply the function on each row.

Does anyone know the correct solution? Thank you!

2 Answers 2

58

Just use a little bit of currying:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))

and use it as follows:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

On a side note you should take a look at sql.functions.trunc and sql.functions.date_format. These should at least part of the job without using UDFs at all.

Note:

In Spark 2.2 or later you can use typedLit function:

import org.apache.spark.sql.functions.typedLit

which support a wider range of literals like Seq or Map.

Sign up to request clarification or add additional context in comments.

5 Comments

Thank you for your answer and the intuition of currying!
I wrote a tutorial on how to use currying to create Spark UDF that accepts extra parameters at invocation time. gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38
Bravo, quite an insight into Spark.
Is it possible to register currying UDF with spark.udf.register in order to make it sql available?
someone put this in the documentation..!
23

You can create a literal Column to pass to a udf using the lit(...) function defined in org.apache.spark.sql.functions

For example:

val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))

1 Comment

Thank you, I initially used lit as well, but it turns out that its performance is not as good as the other answer...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.