1
case class Varnish(ID: String, varnish_latency: Float)


val seq = sc.sequenceFile[LongWritable, BytesWritable](logfile_path)
val usableRDD = seq.map({case (_, v : BytesWritable) => Text.decode(v.getBytes)})
                   .map(_.split(" "))
                   .map(p => Varnish(p(11), p(8).toFloat))
                   .toDF()
usableRDD.registerTempTable("Varnish")
sqlContext.sql("SELECT * from Varnish LIMIT 5").collect().foreach(println) // works fine
val countResult = sqlContext.sql("SELECT COUNT(*) FROM Varnish").collect() // throws Err
val cnt2 = countResult.head.getLong(0)

16/01/23 02:56:18 sparkDriver-akka.actor.default-dispatcher-20 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/23 02:56:18 Thread-3 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 57 in stage 1.0 failed 4 times, most recent failure:
Lost task 57.3 in stage 1.0 (TID 89, 10.1.201.14): java.lang.NumberFormatException: For input string: "nan"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)

1 Answer 1

4

Exception seems to be rather self-explanatory. Some of the values you pass contains nan string which is not interpreted as a valid Float representation:

scala> "nan".toFloat
java.lang.NumberFormatException: For input string: "nan"
...

As long as data doesn't come from the source which has been already validated (like RDBMS or Parquet files) you should never blindly trust it has a correct format. You can modify your code to properly handle this case and other malformed entries by using options:

import scala.util.Try

case class Varnish(ID: String, varnish_latency: Option[Float])

...
  .map(p => Varnish(p(11), Try(p(8).toFloat).toOption))

drop case class and handle this using SQL:

...
  .map(p => Varnish(p(11), p(8)))
  .toDF("ID", "varnish_latency")
  .withColumn("varnish_latency", $"varnish_latency".cast("double"))

or pre-validate before you call .toFloat and drop malformed entries.

The first two options will convert Nones to nulls. Since it is not semantically precise (original not-a-number vs missing value) and result in a loss of information you may prefer handling "nan" case explicitly. It can be done for example by replacing "nan" with "NaN" (correct representation) before calling toFloat or pattern matching:

p(8) match {
  case "nan" => Float.NaN
  case s => s.toFloat
}
Sign up to request clarification or add additional context in comments.

2 Comments

that's the solution i was looking for. Thanks. Sorry I am new to scala, How do i continue in scala case match. I would like to ignore/drop columns where p(8) == "nan". p(8) match { case "nan" => ? case s => s.toFloat }
There are different ways you can approach this but flatMap + empty Seq is would be my personal choice.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.