i have written the below spark scala code, where in i am trying to implement spark cassandra api. when i try to run it ,i am getting the exception like input mismatch on the date field. and automatically its populating with the data values. i am not able to understand to solve this. please help me on the same.
Below is the method which is converting long to Date format:
def getTimeInMillis2Date( timeInMillis :Long):Date = {
if (timeInMillis == 0l) {
return null;
}
val calendar = Calendar.getInstance()
calendar.setTimeInMillis(timeInMillis)
val date = calendar.getTime()
return date;
}
Below is the method, which is using the Date: [edit-2]
def getCurrentTrip(s_id1: Long, a_id1: String, summ_typ1: String, summ_dt1:Date, trp_summ_Id1: String): Boolean = {
var foundtrip = false
val df_read2 = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host","host")
.option("spark.cassandra.connection.port","9042")
.option( "spark.cassandra.auth.username","username")
.option("spark.cassandra.auth.password","pass")
.option("keyspace","ap")
.option("table","t_s_data")
.load()
df_read2.createOrReplaceTempView("query_data2")
var sqlDate: java.sql.Date = new java.sql.Date(summ_dt1.getTime());
var res = sparkSession.sql(s"select * from ap.t_s_data where s_id =$s_id1 and a_id =$a_id1 and summ_typ =$summ_typ1 and summ_dt =$sqlDate and trp_summ_id =$trp_summ_Id1")
val row = res.first()
if (row != null) {
println ("Found Trip")
foundtrip = true
} else {
println ("Not Found")
foundtrip = false
}
foundtrip
}
-------------------------------------------------------------------------------
ERROR Stacktrace:
18/09/20 17:29:14 ERROR app.ProcessMPacket$: error for processing this event For M-packet
org.apache.spark.sql.AnalysisException: cannot resolve '(query_data2.`summ_dt` = ((1974 - 11) - 12))' due to data type mismatch: differing types in '(query_data2.`summ_dt` = ((1974 - 11) - 12))' (timestamp and int).; line 1 pos 130;
'Project [*]
+- 'Filter ((((service_id#120L = cast(1000001 as bigint)) && (cast(asset_id#121 as int) = 50000000)) && (summ_typ#122 = T)) && ((summ_dt#123 = ((1974 - 11) - 12)) && (trp_summ_id#124 = (((('8e85b4a3 - 'fbe5) - '322b) - 'aaf2) - '23f335200848))))
+- SubqueryAlias query_data2
+- Relation[service_id#120L,asset_id#121,summ_typ#122,summ_dt#123,trp_summ_id#124,asset_serial_no#125,avg_sp#126,c_dist#127,c_epa#128,c_gal#129,c_mil#130,device_id#131,device_serial_no#132,dist#133,en_addr#134,en_dt#135,en_lat#136,en_long#137,epa#138,gal#139,h_dist#140,h_epa#141,h_gal#142,h_mil#143,... 11 more fields] org.apache.spark.sql.cassandra.CassandraSourceRelation@38f4a1ee
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
at com.vzt.afm.hum.dh.util.CassandraUtils$.getCurrentTrip(CassandraUtils.scala:253)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:169)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:129)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:129)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
... and summ_dt =Tue Nov 12 22:49:38 IST 1974 and trp_summ_id ...- I am not familiar with cassandra, but it looks like in your sql you need to put your string parameter values into single quotes and convert string representation of the date into date value