-1

I have a spark dataframe which has a timestamp field and i want to convert this to long datatype. I used a UDF and the standalone code works fine but when i plug to to a generic logic where any timestamp will need to be converted i m not ble to get it working.Issue is how can i assing the return value from UDF back to the dataframe column

Below is the code snippet

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test3").getOrCreate();
      import org.apache.spark.sql.functions._
      val sqlContext  = spark.sqlContext
      val df2 = sqlContext.jsonRDD(spark.sparkContext.parallelize(Array(
        """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
        """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
      )))

      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

        df2.withColumn("manufacture_ts",getTime(df2("manufacture_ts"))).show

       +-----+----------+-----+--------------+-----+----+
        |     |No Comment|Tesla| 1508126400000|    S|2012|
        |     |   Get one| Ford| 1508126400000| E350|1997|
        |     |          |Chevy| 1508126400000| Volt|2015|
        +-----+----------+-----+--------------+-----+----+ 

    Now i want to invoke this from a dataframe to be clled on all columns which are of type long

    object Test4 extends App{

        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate();
        import spark.implicits._

        import scala.collection.JavaConversions._    
        val long : Long  = "1508299200000".toLong    

        val data = Seq(Row("10000020_LUX_OTC",long,"2020-02-14"))

        val schema = List( StructField("rowkey",StringType,true)
                                  ,StructField("order_receipt_dt",LongType,true)
                                  ,StructField("maturity_dt",StringType,true))

        val dataDF =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schema))

        val modifedDf2= schema.foldLeft(dataDF) { case (newDF,StructField(name,dataType,flag,metadata)) =>
          newDF.withColumn(name,DataTypeUtil.transformLong(newDF,name,dataType.typeName))
modifedDf2,show
        }

      }


      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

      def transformLong(dataFrame: DataFrame,name:String, fieldType:String):Column = {
        import org.apache.spark.sql.functions._

        fieldType.toLowerCase match {

          case "timestamp"  => convertTimeStamp(dataFrame(name))
          case _ => dataFrame.col(name)
        }
      }
2
  • 2
    Can you explain me what you are trying to do & what issue you are getting ? Commented May 10, 2020 at 6:50
  • Why don't you use unix_timestamp()? Commented May 10, 2020 at 11:06

1 Answer 1

1

Maybe your udf crashed if the timestamp is nullYou can do :

  • use unix_timestamp instead of UDF.. or make your UDF null-safe
  • only apply on fields which need to be converted.

Given the data:

import spark.implicits._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

val df = Seq(
  (1L,Timestamp.valueOf(LocalDateTime.now()),Timestamp.valueOf(LocalDateTime.now()))
).toDF("id","ts1","ts2")

you can do:

val newDF = df.schema.fields.filter(_.dataType == TimestampType).map(_.name)
  .foldLeft(df)((df,field) => df.withColumn(field,unix_timestamp(col(field))))

newDF.show()

which gives:

+---+----------+----------+
| id|       ts1|       ts2|
+---+----------+----------+
|  1|1589109282|1589109282|
+---+----------+----------+
Sign up to request clarification or add additional context in comments.

1 Comment

can you help and suggest how to handle this stackoverflow.com/questions/62036791/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.