3

I have the following apache spark udf in scala:

val myFunc = udf {
  (userBias: Float, otherBiases: Map[Long, Float],
    userFactors: Seq[Float], context: Seq[String]) => 
    var result = Float.NaN

    if (userFactors != null) {
      var contexBias = 0f

      for (cc <- context) {
       contexBias += otherBiases(contextMapping(cc))
      }

      // definition of result
      // ...
    }
    result
}

Now I want to pass parameters to this function, however I always get the message Not Applicable due to the parameter context. I know that user defined functions take inputs by rows, and this function runs if I delete context... How to solve this issue? Why doesn't it read rows from Array[Seq[String]], i.e. from context? Alternatively, it would be acceptable to passcontext as DataFrame or something similar.

// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))

// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
                   myFunc(userBias("bias"),
                          otherBias("biases"),
                          userFactors("features"),
                          context)
                   .as($(newCol)))

UPDATE:

I tried the solution indicated in the answer of zero323, however still there is a small issue with context: Array[Seq[String]]. In particular the problem is with looping over this Array for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }. I should pass a String to contextMapping, not a Seq[String]:

  def myFunc(context: Array[Seq[String]]) = udf {
    (userBias: Float, otherBiases: Map[Long, Float],
     userFactors: Seq[Float]) =>
      var result = Float.NaN

      if (userFactors != null) {
        var contexBias = 0f
        for (cc <- context) {
          contexBias += otherBiases(contextMapping(cc))
        }

        // estimation of result

      }
      result
  }

Now I call it as follows:

myDataframe.select(dataset("*"),
                   myFunc(context)(userBias("bias"),
                                   otherBias("biases"),
                                   userFactors("features"))
           .as($(newCol)))
0

2 Answers 2

1

Spark 2.2+

You can use typedLit functions:

import org.apache.spark.sql.functions.typedLit

myFunc(..., typedLit(context))

Spark < 2.2

Any argument that is passed directly to the UDF has to be a Column so if you want to pass constant array you'll have to convert it to column literal:

import org.apache.spark.sql.functions.{array, lit}

val myFunc: org.apache.spark.sql.UserDefinedFunction = ???

myFunc(
  userBias("bias"),
  otherBias("biases"),
  userFactors("features"),
  // org.apache.spark.sql.Column
  array(context.map(xs => array(xs.map(lit _): _*)): _*)  
)

Non-Column objects can be passed only indirectly using closure, for example like this:

def myFunc(context: Array[Seq[String]]) = udf {
  (userBias: Float, otherBiases: Map[Long, Float],  userFactors: Seq[Float]) => 
    ???
}

myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features"))
Sign up to request clarification or add additional context in comments.

1 Comment

Note that Spark 2.2 has not been released yet.
0

The alternative way is to use struct

val seq: Seq[String] = ...
val func: UserDefinedFunction = udf((seq: Row) => ...)
val seqStruct = struct(seq.map(v => lit(v)): _*)
func(seqStruct)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.