0

I am trying to compare 2 set of data. one is dataframe a set of static data and write as Avro format .Now this comparison reading back from Avro and checking which has a timestamp column and comparison is failing since Avro store data as Long and the sql type conversion giving a different value

**--CREATE THE DATAFRAME**
val data = Seq(Row("1",java.sql.Timestamp.valueOf("2019-03-15 18:20:06.456")))
val schemaOrig = List( StructField("rowkey",StringType,true)
,StructField("txn_ts",TimestampType,true))

val sourceDf =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schemaOrig))
sourceDf.write.avro("test")
sourceDf.printSchema
root
 |-- rowkey: string (nullable = true)
 |-- txn_ts: timestamp (nullable = true)
sourceDf.show(false)


+----------------+-----------------------+
|rowkey          |txn_ts                 |
+----------------+-----------------------+
|1               |2019-03-15 18:20:06.456|
+----------------+-----------------------+

--As shown above the avro file has the expected schema specified ie String and Timestamp
--Now Read the data back from Avro 
val avroDf=spark.read.avro("test")
avroDf.printSchema

root
 |-- rowkey: string (nullable = true)
 |-- txn_ts: long (nullable = true)

avroDf.show(false)
--Avro Df schema is printing the timestamp field as long and data showing epoch time 

+----------------+-------------+
|rowkey          |txn_ts       |
+----------------+-------------+
|1               |1552688406456|
+----------------+-------------+
compare the 2 Df 
sourceDf.except(avroDf).show(false)
--Gives error due to datatype mismatch 
org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. bigint <> timestamp at the second column of the second table;;
'Except
:- AnalysisBarrier

CAST the avro data long field back to time
stamp 
val modifiedAvroDf=avroDf.withColumn("txn_ts", col("txn_ts").cast(TimestampType))
modifiedAvroDf.printSchema

 |-- rowkey: string (nullable = true)
  |-- txn_ts: timestamp (nullable = true)
      modifiedAvroDf.show(false)
--Showing wrong timestamp value 
+----------------+-----------------------+
|rowkey          |txn_ts                 |
+----------------+-----------------------+
|1               |51172-09-26 11:07:366.0|
+----------------+-----------------------+

--Now Try to cast the source column to long 
val sourceModDf=sourceDf.withColumn("txn_ts",col("txn_ts").cast(LongType))
sourceModDf.printSchema

 |-- rowkey: string (nullable = true)
 |-- txn_ts: long (nullable = true)
sourceModDf.show(false)
sourceModDf.except(modifiedAvroDf).show(false)

1 Answer 1

1

Created UDF to convert long to timestamp string. Please check below code.

scala> val df = Seq(1552688406456L).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> import org.joda.time.DateTime
import org.joda.time.DateTime

scala> import org.joda.time.DateTimeZone
import org.joda.time.DateTimeZone

scala> val datetime = udf((date: Long) => new DateTime(date, DateTimeZone.UTC).toString.replace("Z","").replace("T"," "))
datetime: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> df.select(datetime($"value").as("dt")).show(false)
+------------------------+
|dt                      |
+------------------------+
|2019-03-15 22:20:06.456 |
+------------------------+

scala> df.select(datetime($"value").as("dt").cast("timestamp")).show(false)
+-----------------------+
|dt                     |
+-----------------------+
|2019-03-15 22:20:06.456|
+-----------------------+


scala> df.select(datetime($"value").as("dt").cast("timestamp")).printSchema
root
 |-- dt: timestamp (nullable = true)

Sign up to request clarification or add additional context in comments.

11 Comments

Thanks a lot . how can make it generic ? like in my case different table hav diff set of columns and i am invoking it like val modifiedDf = filter._2.fields.foldLeft(sourceDF) { case (newDf, SchemaField(name, fieldType)) => newDf.withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType)))
datetime is udf, declare somewhere top level. use it whenever required like this - newDf.withColumn("ts",datetime($"ts")).withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType))
not really. when it try generic approach using udf the value is nt getting chnged. please see post stackoverflow.com/questions/61708010/…
Hi Srinivas there is issue with the Timezone as you see the return values is 2019-03-16T03:50:06.456+05:30 where as original timetamp was 2019-03-15 18:20:06.456. I guess the issue is due to Timezone .Is there a way to force the same timezone so that Long value stored should be read bk to the exact same timestamp
updated answer to convert utc timezone, if you want you can change it to different timezone, Please accept or upvote if it helps to solve issue.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.