10

In order to handle Spark exception on RDD operations I can use the following approach with additional exceptions column:

val df: DataFrame = ...

val rddWithExcep = df.rdd.map { row: Row =>
  val memberIdStr = row.getAs[String]("member_id")
  val memberIdInt = Try(memberIdStr.toInt) match {
    case Success(integer) => List(integer, null)
    case Failure(ex) => List(null, ex.toString)
  }
  Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}

val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int", IntegerType, true)
  , StructField("exceptions", StringType, true)))

val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep, castWithExceptionSchema)

castExcepDf.printSchema()
castExcepDf.show()

Is it possible to handle such exception on Spark SQL? For example, currently in case of any errors, Spark SQL simply returns null value and hides the error.

For example division by 0 will be resulted into null value and not into an error.. In my opinion - this is a very serious issue in Spark SQL because it can simple produces unexpected/wrong data that you won't even notice.

Is it possible to override this behavior and let Spark fail with an appropriate detailed exception?

1 Answer 1

2

Since Spark 3.0, you can set property spark.sql.ansi.enabled to true in your spark session to throw exception and stop spark execution instead of saving null value in column. However, failure will be global and not on row per row basis. See ANSI Compliance page on Spark's documentation website for more details.

So the following code snippet:

sparkSession.conf.set("spark.sql.ansi.enabled", "true")

Seq(1, 2, 3).toDF("MyCol")
  .withColumn("divideByZero", col("MyCol") / 0)
  .show(false)

throws the following exception:

Exception in thread "main" org.apache.spark.SparkArithmeticException: divide by zero
    at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:140)
    at org.apache.spark.sql.catalyst.expressions.DivModLike.eval(arithmetic.scala:437)
    at org.apache.spark.sql.catalyst.expressions.DivModLike.eval$(arithmetic.scala:425)
    at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:534)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
    at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
    ...
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.