1

The following codesnippet is causing NullPointerException. I am not sure, if this exception is happening on some rows or always as the dataframe is huge and not able to pin point the row.

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String, f2: String) => {
    if(f1 != null && f2 != null) {
        val preproList = List(removeUnwantedLetters(f2.toLowerCase));

        if(preproList > 0) {
            val key_items = preproList.toDF("Description")
        }
    }

    (1, 1)
})



mydataframe.withColumn("pv", myudf($"f1", $"f2")).show

The entire code is huge, so sorry for not pasting the entire code here, tried my best to minimize the failing code here. Following is the exception that I am getting on the actual code:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 274.0 failed 4 times, most recent failure: Lost task 0.3 in stage 274.0 (TID 23387, 10.62.145.186, executor 2): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at $anonfun$1.apply(<console>:100)
    at $anonfun$1.apply(<console>:82)
    ... 22 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 66 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:100)
  at $anonfun$1.apply(<console>:82)
  ... 22 more

By trial and error I have found out that this line val key_items = preproList.toDF("Description") is causing the NPE. Because if I change it simply to val key_items = preproList, it works fine.

Can anyone please let me know when would `val key_items = preproList.toDF("Description")` give a `NullPointerException`.

Update

It seems like we cannot create a dataframe inside a UDF. Because I tried replacing val key_items = preproList.toDF("Description") with val key_items = List(1,2,3,4).toDF("VL"). And to my surprise, it failed with the same exception.

Is it not possible to create a temporary dataframe inside a UDF?

Update 2

I am trying to create a temporary dataframe to use JohnSnowLabs Norvig Spell correction model using its pipeline as follows:

val nlpPipeline = new Pipeline().setStages(Array(
  new DocumentAssembler().setInputCol("Description").setOutputCol("document"),
  new Tokenizer().setInputCols("document").setOutputCol("tokens"),
  norvigspell.setInputCols("tokens").setOutputCol("Description_corrected"),
  new Finisher().setInputCols("Description_corrected")
))

val dbDF = preproList.toDF("Description")

val spellcorrectedDF = dbDF.transform(dbDF=> nlpPipeline.fit(dbDF).transform(dbDF))

1 Answer 1

2

The sort answer is: No, you can't create a DataFrame (or Dataset) inside a UDF. UDFs operate on individual row values and so are required to return simple values that can be stored in a new column, think of them as Calculated Columns. If you could create a DataFrame inside a UDF, it will only have one row, and you would be creating many of them, one per row of the parent DataFrame.

Now, from your code is difficult to tell what you want to do, in a way I see you are attempting some sort of character clean up, storing it in a key_items value (as a DataFrame) and never using it... to end up returning a constant (1, 1) pair regardless of the previous computation... The fact that your UDF takes 2 parameters and you only use one is puzzling to me too.

I will guess that you want to compute the description based on the value of one given column (you are only using one) so something like the following will get you something similar:

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String) => {
    if(f1 != null) {
        removeUnwantedLetters(f2.toLowerCase)
    } else ""
})

// This seems to be the DataFrame you are looking for
val descriptionDF = mydataframe
  .withColumn("Description", myudf($"f2"))
  .select("Description")

With the previous, Spark can create the column Description out of invoking your UDF over all the values of the DataFrame. Then, by using .select("Description") you are creating a new DataFrame that only has the Description column.

Sign up to request clarification or add additional context in comments.

4 Comments

I am trying to create the dataframe just to use the JohnSnowLabs NorvigSpell correction pipeline. Because it is expecting a dataframe. Updating my question description with this extra information which explains the need for creating a dataframe. Please suggest any alternative approach. Thank you for taking time to answer my query.
I have updated the description under "Update2" section. Please suggest anything which I can do to use the JohnSnowLabs spell correction.
@user3243499 I can't really tell how to use the correction model you are talking about, that may need a different question. What I can say is that you are misunderstanding the usage of an UDF, I edited my answer as an attempt to clarify it.
But this sample code works where I do create a dataframe inside a UDF: val upperUDF = spark.udf.register("upperUDF", (input: String) => { val xy = List(1,2,3,4,5).toDF("nums") input.toUpperCase }); val dst = Seq((0, "hello"), (1, "world")).toDF("id", "text"); dst.withColumn("upper", upperUDF('text)).show;

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.