0

i have written the below spark scala code, where in i am trying to implement spark cassandra api. when i try to run it ,i am getting the exception like input mismatch on the date field. and automatically its populating with the data values. i am not able to understand to solve this. please help me on the same.

Below is the method which is converting long to Date format:

def getTimeInMillis2Date( timeInMillis :Long):Date = {
        if (timeInMillis == 0l) {
            return null;
        }
        val calendar = Calendar.getInstance()
        calendar.setTimeInMillis(timeInMillis)
        val date = calendar.getTime()
        return date;
    }

Below is the method, which is using the Date: [edit-2]

def getCurrentTrip(s_id1: Long, a_id1: String, summ_typ1: String, summ_dt1:Date, trp_summ_Id1: String): Boolean = {

         var foundtrip = false



         val df_read2 = sparkSession.read
                             .format("org.apache.spark.sql.cassandra")
                             .option("spark.cassandra.connection.host","host")
                             .option("spark.cassandra.connection.port","9042")
                             .option( "spark.cassandra.auth.username","username")
                             .option("spark.cassandra.auth.password","pass")
                             .option("keyspace","ap")
                             .option("table","t_s_data")
                             .load()


         df_read2.createOrReplaceTempView("query_data2")

          var sqlDate: java.sql.Date = new java.sql.Date(summ_dt1.getTime());

         var res = sparkSession.sql(s"select * from ap.t_s_data where s_id =$s_id1 and a_id =$a_id1 and summ_typ =$summ_typ1 and summ_dt =$sqlDate and trp_summ_id =$trp_summ_Id1")

         val row = res.first()
         if (row != null) {
             println ("Found Trip")
             foundtrip = true
          } else {
              println ("Not Found")
              foundtrip = false
          }
      foundtrip

    }

    -------------------------------------------------------------------------------
    ERROR Stacktrace:



    18/09/20 17:29:14 ERROR app.ProcessMPacket$: error for processing this event For M-packet
org.apache.spark.sql.AnalysisException: cannot resolve '(query_data2.`summ_dt` = ((1974 - 11) - 12))' due to data type mismatch: differing types in '(query_data2.`summ_dt` = ((1974 - 11) - 12))' (timestamp and int).; line 1 pos 130;
'Project [*]
+- 'Filter ((((service_id#120L = cast(1000001 as bigint)) && (cast(asset_id#121 as int) = 50000000)) && (summ_typ#122 = T)) && ((summ_dt#123 = ((1974 - 11) - 12)) && (trp_summ_id#124 = (((('8e85b4a3 - 'fbe5) - '322b) - 'aaf2) - '23f335200848))))
   +- SubqueryAlias query_data2
      +- Relation[service_id#120L,asset_id#121,summ_typ#122,summ_dt#123,trp_summ_id#124,asset_serial_no#125,avg_sp#126,c_dist#127,c_epa#128,c_gal#129,c_mil#130,device_id#131,device_serial_no#132,dist#133,en_addr#134,en_dt#135,en_lat#136,en_long#137,epa#138,gal#139,h_dist#140,h_epa#141,h_gal#142,h_mil#143,... 11 more fields] org.apache.spark.sql.cassandra.CassandraSourceRelation@38f4a1ee

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
    at com.vzt.afm.hum.dh.util.CassandraUtils$.getCurrentTrip(CassandraUtils.scala:253)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:169)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:129)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:129)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
3
  • ... and summ_dt =Tue Nov 12 22:49:38 IST 1974 and trp_summ_id ... - I am not familiar with cassandra, but it looks like in your sql you need to put your string parameter values into single quotes and convert string representation of the date into date value Commented Sep 19, 2018 at 5:59
  • i tried, var res = sparkSession.sql("select * from ap.t_s_data where s_id =$s_id1 and a_id ='$a_id1' and summ_typ =$summ_typ1 and summ_dt =to_date($summ_dt1) and trp_summ_id ='$trp_summ_Id1'"), but its giving error like,org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'from' expecting <EOF> Commented Sep 19, 2018 at 10:38
  • i have edited the code, tried converting to java.sql.date, but getting error..please help me. Commented Sep 20, 2018 at 12:19

1 Answer 1

1

The error message suggests that summ_dt field is of timestamp type, so try to create a timestamp value from your date:

val sqlDate = summ_dt1.getTime()
val res = sparkSession.sql(
  s"""select *
     |from ap.t_s_data
     |where s_id = $s_id1
     |  and a_id = $a_id1
     |  and summ_typ = $summ_typ1
     |  and summ_dt = timestamp($sqlDate)
     |  and trp_summ_id = $trp_summ_Id1""".stripMargin)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.