1

I have a Scala class that is intended to generalize some functionality of linear models--specifically the user should be able to create an instance with an array of coefficients and an array of predictors, and the class pulls the data from a DataFrame, and uses a simple linear model to create predictions on an entire DataFrame as shown below.

I am stuck on the last line... which I expect to generate a column of predicted values. I have attempted a number of approaches (all but one of which are commented out). The code as it is now wont compile b/c of a type mismatch:

[error]  found   : Array[org.apache.spark.sql.Column]
[error]  required: org.apache.spark.sql.Column
[error]       .withColumn("prediction", colMod(preds.map(p => data(p))))
[error]                                               ^

...which I also get in the pred <- preds version... and the foreach version:

[error]  found   : Unit
[error]  required: org.apache.spark.sql.Column
[error]       .withColumn("prediction", colMod(preds.foreach(data(_))))
[error]                                                   ^

Been trying in vain to resolve... would be grateful for any suggestions.

  class LinearModel(coefficients: Array[Double],
                    predictors: Array[String],
                    data: DataFrame) {

    val coefs = coefficients
    val preds = Array.concat(Array("bias"), predictors)
    require(coefs.length == preds.length)

    /**
      * predict: computes linear model predictions as the dot product of the coefficents and the
      * values (X[i] in the model matrix)
      * @param values: the values from a single row of the given variables from model matrix X
      * @param coefs: array of coefficients to be applied to each of the variables in values
      *             (the first coef is assumed to be 1 for the bias/intercept term)
      * @return: the predicted value 
      */
    private def predict(values: Array[Double], coefs: Array[Double]): Unit = {
      (for ((c, v) <- coefs.zip(values)) yield c * v).sum
    }

    /**
      * colMod (udf): passes the values for each relevant value to predict()
      * @param values: an Array of the numerical values of each of the specified predictors for a
      *              given record
      */
    private val colMod = udf((values: Array[Double]) => predict(values, coefs))

    val dfPred = data
      // create the column with the prediction
      .withColumn("prediction", colMod(preds.map(p => data(p))))
      //.withColumn("prediction", colMod(for (pred <- preds) yield data(pred)))
      //.withColumn("prediction", colMod(preds.foreach(data(_))))
      // prev line should = colMod(data(pred1), data(pred2), ..., data(predn))
  }

1 Answer 1

1

Here is how it be can done properly:

import org.apache.spark.sql.functions.{lit, col}
import org.apache.spark.sql.Column

def predict(coefficients: Seq[Double], predictors: Seq[String], df: DataFrame) = {

  // I assume there is no predictor for bias
  // but you can easily correct for that
  val prediction: Column = predictors.zip(coefficients).map {
    case (p, c) => col(p) * lit(c)
  }.foldLeft(col("bias"))(_ + _)

  df.withColumn("prediction", prediction)
}

Example usage:

val df = Seq((1.0, -1.0, 3.0, 5.0)).toDF("bias", "x1", "x2", "x3")

predict(Seq(2.0, 3.0), Seq("x1", "x3"), df)

with result being:

+----+----+---+---+----------+
|bias|  x1| x2| x3|prediction|
+----+----+---+---+----------+
| 1.0|-1.0|3.0|5.0|      14.0|
+----+----+---+---+----------+

Regarding your code you've made a number of mistakes:

  • Array[_] is not a valid external type for ArrayType column. Valid external representation is Seq[_] so the argument of function you pass to udf should be Seq[Double].
  • Function passed to udf cannot be Unit. In your case it should be Double. Combining with the previous point a valid signature would be (Seq[Double], Seq[Double]) => Double.
  • colMod expects a single argument of type Column.

    import org.apache.spark.sql.functions.array
    
    colMod(array(preds.map(col): _*))
    
  • Your code is not NULL / null safe.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.