1

I use spark(2.4) with scala. I have a dataframe and I am trying to replace null values (of my array columns) by défault values (empty array).

val emptyStringArray = udf(() => Array.empty[String],
DataTypes.createArrayType(DataTypes.StringType, false))

def ensureNonNullCol: DataFrame => DataFrame = inputDf => {

    inputDf.select(inputDf.schema.fields.map { f: StructField =>
      f.dataType match {
        case array: ArrayType => new Column(
          
          AssertNotNull(when(col(f.name).isNull,
            array.elementType match {
              case DataTypes.StringType => emptyStringArray()
            }).otherwise(col(f.name)).expr)

        ).as(f.name)
      }
    }: _*)

}

At the end, i get :

 |-- StrAarrayColumn: array (nullable = false)
 |    |-- element: string (containsNull = true)

How can I have :

 |-- StrAarrayColumn: array (nullable = false)
 |    |-- element: string (containsNull = false)

?

1 Answer 1

1

The problem is, your first dataframe has a struct, which contains an array of strings which may contain nulls. Now your ensureNonNullCol function receives an input dataframe with some struct, you just select some values, and don't change the struct of your dataframe, and just return it. Before I get to the solution, there are 3 important points to your code.

    1. matching only one possible case of many cases is very dangerous, discouraged and results in match error (notice in your code when you match ArrayType and StringType)
    1. udf with empty input arguments is discouraged and results in warnings, in my spark version it result in runtime exception.
    1. You can return an empty array of string in place, instead of calling that udf.

Anyway, the solution is to also update the struct type after you select the fields you want in your function:

def ensureNonNullCol: DataFrame => DataFrame = inputDf => {
    val newStruct = StructType(inputDf.schema.map { field =>
      val newDataType = field.dataType match {
        case arr: ArrayType if arr.elementType == StringType => arr.copy(containsNull = false)
        case other => other
      }
      field.copy(dataType = newDataType)
    })
    val newDF = inputDf.select(inputDf.schema.fields.map { f: StructField =>
      f.dataType match {
        case array: ArrayType => new Column(

          AssertNotNull(when(col(f.name).isNull,
            array.elementType match {
              case DataTypes.StringType => Array.empty[String]
              case _ => col(f.name)
            }).otherwise(col(f.name)).expr)
        ).as(f.name)

        case _ => col(f.name)
      }
    }: _*)

    spark.createDataFrame(newDF.rdd, newStruct)

  }
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your answer ! Unfortunately, i half worked. I used my solution, and I updated it with what you did with the schema, and it worked ! val newStruct = StructType(inputDf.schema.map { field => val newDataType = field.dataType match { case arr: ArrayType if arr.elementType == StringType => arr.copy(containsNull = false) case other => other } field.copy(dataType = newDataType) }) spark.createDataFrame(inputDf, newStruct)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.