3

I am trying to pass a word2vec model object to my spark udf. Basically I have a test set with movie Ids and I want to pass the ids along with the model object to get an array of recommended movies for each row.

def udfGetSynonyms(model: org.apache.spark.ml.feature.Word2VecModel) = 
     udf((col : String)  => {
          model.findSynonymsArray("20", 1)
})

however this gives me a null pointer exception. When I run model.findSynonymsArray("20", 1) outside the udf I get the expected answer. For some reason it doesn't understand something about the function within the udf but can run it outside the udf.

Note: I added "20" here just to get a fixed answer to see if that would work. It does the same when I replace "20" with col.

Thanks for the help!

StackTrace:

SparkException: Job aborted due to stage failure: Task 0 in stage 23127.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23127.0 (TID 4646648, 10.56.243.178, executor 149): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$udfGetSynonyms1$1: (string) => array<struct<_1:string,_2:double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:126)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:111)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:350)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Word2VecModel.findSynonymsArray(Word2Vec.scala:273)
at linebb57ebe901e04c40a4fba9fb7416f724554.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$udfGetSynonyms1$1.apply(command-232354:7)
at linebb57ebe901e04c40a4fba9fb7416f724554.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$udfGetSynonyms1$1.apply(command-232354:4)
... 12 more
7
  • After the edit the original post become mostly irrelevant. Could you please edit the question and clean this up? After edit - this code won't work, as findSynonyms uses distributed ops inside. You'll have to find another way to approach the problem. Commented Apr 26, 2018 at 22:57
  • Are you sure about it using distributed ops? I don't see any here: github.com/apache/spark/blob/master/mllib/src/main/scala/org/… Commented Apr 27, 2018 at 1:12
  • @JoeK Actually your right, it doesn't. I checked this and I cannot reproduce NPE neither, can you? Commented Apr 27, 2018 at 9:34
  • 1
    sure I'll shape this up a bit better @user6910411 Commented Apr 27, 2018 at 16:37
  • 1
    Aah I think the problem is because I have a cluster..I can't reproduce the NPE issue on a single machine. It seems that the wordVectors might not be available when running it on more than one node Commented Apr 27, 2018 at 16:43

2 Answers 2

1

The SQL and udf API is a bit limited and I am not sure if there is a way to use custom types as columns or as inputs to udfs. A bit of googling didn't turn up anything too useful.

Instead, you can use the DataSet or RDD API and just use a regular Scala function instead of a udf, something like:

val model: Word2VecModel = ...
val inputs: DataSet[String] = ...
inputs.map(movieId => model.findSynonymsArray(movieId, 10))

Alternatively, I guess you could serialize the model to and from a string, but that seems much uglier.

Sign up to request clarification or add additional context in comments.

1 Comment

It shouldn't make any difference here.
0

I think this issue happens because wordVectors is a transient variable

class Word2VecModel private[ml] (
    @Since("1.4.0") override val uid: String,
    @transient private val wordVectors: feature.Word2VecModel)
  extends Model[Word2VecModel] with Word2VecBase with MLWritable {

I have solved this by broadcasting w2vModel.getVectors and re-creating the Word2VecModel model inside each partition

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.