In our application , most of our code is just apply filter , group by and aggregate operations on DataFrame and save the DF to Cassandra database.
Like the below code, we have several methods which do the same kind of operations[filter, group by, join, agg] on different number of fields and returns an DF and that will be saved to Cassandra tables.
Sample code is:
val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
try {
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("Table" -> "tableName", "keyspace" -> keyspace)
.mode("append").save()
}
catch {
case e: Throwable => log.error(e)
}
}
Since i am calling the action by saving the DF to Cassandra, i hope i need to handle the exception only on that line as per this thread.
If i get any exception, i can see the exception in the Spark detailed log by default.
Do i have to really surround the filter, group by code with Try or try , catch?
I don't see any example on Spark SQL DataFrame API examples with exception handling.
How do i use the Try on saveToCassandra method? it returns Unit