1

I am working with Spark 2.3.2.

On one column within my Dataframe I am performing many spark.sql.functions sequentually. How can I wrap this sequence of functions into a user-defined-function (UDF) to make it reusable?

Here is my example focusing on the one column "columnName". First I am creating my test data:

val testSchema = new StructType()
  .add("columnName", new StructType()
    .add("2020-11", LongType)
    .add("2020-12", LongType)
  )

val testRow = Seq(Row(Row(1L, 2L)))
val testRDD = spark.sparkContext.parallelize(testRow)
val testDF = spark.createDataFrame(testRDD, testSchema)
testDF.printSchema()

/*
root
 |-- columnName: struct (nullable = true)
 |    |-- 2020-11: long (nullable = true)
 |    |-- 2020-12: long (nullable = true)
*/

testDF.show(false)

/*
+----------+
|columnName|
+----------+
|[1, 2]    |
+----------+
*/

And here is the sequence of applied Spark SQL functions (just as an example):

val testResult = testDF
  .select(explode(split(regexp_replace(to_json(col("columnName")), "[\"{}]", ""), ",")).as("result"))

I am failing to create a UDF "myUDF", such that I can get the same result when calling

val testResultWithUDF = testDF.select(myUDF(col("columnName"))

This is what I "would like" to do:

def parseAndExplode(spalte: Column): Column = {
  explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ",")
}
val myUDF = udf(parseAndExplode _)

testDF.withColumn("udf_result", myUDF(col("columnName"))).show(false)

but it is throwing an Exception:

Schema for type org.apache.spark.sql.Column is not supported
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

Also tried with using a Row as input parameter but then again failed trying to apply built-in SQL functions.

1 Answer 1

2

There is no need to use an udf here. explode, split and most other functions from org.apache.spark.sql.functions return already an object of type Column.

def parseAndExplode(spalte: Column): Column = {
  explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ","))
}

testDF.withColumn("udf_result",parseAndExplode('columnName)).show(false)

prints

+----------+----------+
|columnName|udf_result|
+----------+----------+
|[1, 2]    |2020-11:1 |
|[1, 2]    |2020-12:2 |
+----------+----------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.