0

I am new to spark and scala. I have a json array struct as input, similar to the below schema.

root
|-- entity: struct (nullable = true)
|    |-- email: string (nullable = true)
|    |-- primaryAddresses: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- postalCode: string (nullable = true)
|    |    |    |-- streetAddress: struct (nullable = true)
|    |    |    |    |-- line1: string (nullable = true)

I flattened the array struct to the below sample Dataframe

+-------------+--------------------------------------+--------------------------------------+
|entity.email |entity.primaryAddresses[0].postalCode |entity.primaryAddresses[1].postalCode |....
+-------------+--------------------------------------+--------------------------------------+
|[email protected]      |                                      |                                      |
|[email protected]      |                                      |12345                                 |
|[email protected]      |12345                                 |                                      |
|[email protected]      |0                                     |0                                     |
+-------------+--------------------------------------+--------------------------------------+

My end goal is to calculate presence/absence/zero counts for each of the columns for data quality metrics.But before I calculate the data quality metrics I am looking for an approach to derive one new column for each of the array column elements as below such that

  • if all values of particular array element is empty, then the derived column is empty for that element
  • if at least one value is present for an array element, the element presence is considered 1
  • if all values of an array element is zero the I mark the element as zero (I calibrate this as presence =1 and zero =1 when I calculate data quality later)

Below is a sample intermediate dataframe that I am trying to achieve with a column derived for each of array elements. The original array elements are dropped.

 
+-------------+--------------------------------------+
|entity.email |entity.primaryAddresses.postalCode    |.....
+-------------+--------------------------------------+
|[email protected]      |                                      |
|[email protected]      |1                                     |
|[email protected]      |1                                     |
|[email protected]      |0                                     |
+-------------+--------------------------------------+

The input json records elements are dynamic and can change. To derive columns for array element I build a scala map with a key as column name without array index (example:entity.primaryAddresses.postalCode) and value as list of array elements to run rules on for the specific key. I am looking for an approach to achieve the above intermediate data frame.

One concern is that for certain input files after I flatten the Dataframe , the dataframe column count exceeds 70k+. And since the record count is expected to be in millions I am wondering if instead of flattening the json if I should explode each of elements for better performance.

Appreciate any ideas. Thank you.

1
  • can you add some sample data Commented Jul 1, 2020 at 13:31

2 Answers 2

2

Created helper function & You can directly call df.explodeColumns on DataFrame. Below code will flatten multi level array & struct type columns.

Use below function to extract columns & then apply your transformations on that.

scala> df.printSchema
root
 |-- entity: struct (nullable = false)
 |    |-- email: string (nullable = false)
 |    |-- primaryAddresses: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- postalCode: string (nullable = false)
 |    |    |    |-- streetAddress: struct (nullable = false)
 |    |    |    |    |-- line1: string (nullable = false)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}
scala> df.explodeColumns.printSchema
root
 |-- entity_email: string (nullable = false)
 |-- entity_primaryAddresses_postalCode: string (nullable = true)
 |-- entity_primaryAddresses_streetAddress_line1: string (nullable = true)

Sign up to request clarification or add additional context in comments.

Comments

1

You can leverage on a custom user define function that can help you do the data quality metrics.

val postalUdf = udf((postalCode0: Int, postalCode1: Int) => {
        //TODO implement you logic here
    })

then use is to create a new dataframe column

df
  .withColumn("postcalCode", postalUdf(col("postalCode_0"), col("postalCode_1")))
  .show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.