I'm having some trouble figuring out how to do some row-level error handling with a Scala Spark program. In the code below, I'm reading in a CSV text file, parsing it, and creating a Row using a mapSchema method (not shown; basically, it takes the Array of strings that result from the CSV and uses a schema to convert the strings into ints, doubles, dates, etc.). It works great when the data is all formatted appropriately. However, if I have a bad row -- for example, one with fewer fields than expected -- I want to perform some error handling.
val rddFull = sqlContext.sparkContext.textFile(csvPath).map {
case(txt) =>
try {
val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
val parsedRow = reader.readNext()
Row(mapSchema(parsedRow, schema) : _*)
} catch {
case err: Throwable =>
println("a record had an error: "+ txt)
throw new RuntimeException("SomeError")
}
The problem is that the try/catch expressions don't seem to be working. When I give it bad row, I don't ever get the "SomeError" RuntimeException. Instead, I get the same error that I get when I don't use try/catch.
Any ideas about what could be going wrong here?