4

I'm having some trouble figuring out how to do some row-level error handling with a Scala Spark program. In the code below, I'm reading in a CSV text file, parsing it, and creating a Row using a mapSchema method (not shown; basically, it takes the Array of strings that result from the CSV and uses a schema to convert the strings into ints, doubles, dates, etc.). It works great when the data is all formatted appropriately. However, if I have a bad row -- for example, one with fewer fields than expected -- I want to perform some error handling.

val rddFull = sqlContext.sparkContext.textFile(csvPath).map {
  case(txt) =>
    try {
      val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
      val parsedRow = reader.readNext()
      Row(mapSchema(parsedRow, schema) : _*)
    } catch {
      case err: Throwable =>
        println("a record had an error: "+ txt)
        throw new RuntimeException("SomeError")
    }

The problem is that the try/catch expressions don't seem to be working. When I give it bad row, I don't ever get the "SomeError" RuntimeException. Instead, I get the same error that I get when I don't use try/catch.

Any ideas about what could be going wrong here?

1 Answer 1

4

You need to look in the correct place for the logs . To start with: the catch does work. Here is an example from the spark-shell:

val d = sc.parallelize(0 until 10)
val e = d.map{ n =>
  try {
   if (n % 3==0) throw new IllegalArgumentException("That was a bad call")
   println(n)
 } catch {
    case e:  IllegalArgumentException =>  throw new UnsupportedOperationException("converted from Arg to Op except")
 }
}
e.collect

Here is the result: notice the exception were properly caught and converted:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in
stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in   
stage 0.0 (TID 5, localhost): 
java.lang.UnsupportedOperationException: converted from Arg to Op except
    at $anonfun$1.apply$mcVI$sp(<console>:29)
    at $anonfun$1.apply(<console>:24)
    at $anonfun$1.apply(<console>:24)

Try looking in the stderr logs of one or more of the workers.

Sign up to request clarification or add additional context in comments.

1 Comment

Ah, thanks for your simple example! It helped me track down that the error wasn't happening in the map step like I assumed, but in a subsequent step where it registered the RDD as a dataFrame.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.