3

I have read a JSON file into Spark. This file has the following structure:

 root
      |-- a: struct (nullable = true)
      |    |-- rt: array (nullable = true)
      |    |    |-- element: struct (containsNull = true)
      |    |    |    |-- rb: struct (nullable = true)
      |    |    |    |         |-- a: struct (nullable = true)
      |    |    |    |    |    |-- b: string (nullable = true)
      |    |    |    |    |    |-- c: boolean (nullable = true)
      |    |    |    |    |    |-- d: long (nullable = true)
      |    |    |    |    |    |-- e: string (nullable = true)

  

I created a recursive function to flatten the schema with columns that are of nested StructType

def flattenSchema(schema: StructType, prefix: String = null):Array[Column]= 
        {
        schema.fields.flatMap(f => {
          val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    
          f.dataType match {
            case st: StructType => flattenSchema(st, colName)
            case _ => Array(col(colName).alias(colName))
          }
        })
        }
 
val newDF=df.select(flattenSchema(df.schema):_*)

val secondDF=newDF.toDF(newDF.columns.map(_.replace(".", "_")): _*)

How can i flatten the ArrayType that contain nested StructType for example engagementItems: array (nullable = true)

Any help is appreciated.

2
  • 1
    Does the array have a fixed length? If not, what you are trying to do will be complicated... To help us help you, could you provide some sample input and expected output? You could also simplify your question to a minimal schema for which your problem occur. Commented Mar 28, 2019 at 13:14
  • 1
    if it is ArrayType then explode action should be perform on dataframe. Commented Mar 28, 2019 at 13:55

1 Answer 1

1

The problem here is that you need to manage the case for the ArrayType and after convert it into StructType. Therefore you can use the the Scala runtime conversion for that.

First I generated the scenario as next (btw it would be very helpful to include this in your question since makes the reproduction of the problem much easier):

  case class DimapraUnit(code: String, constrained: Boolean, id: Long, label: String, ranking: Long, _type: String, version: Long, visible: Boolean)
  case class AvailabilityEngagement(dimapraUnit: DimapraUnit)
  case class Element(availabilityEngagement: AvailabilityEngagement)
  case class Engagement(engagementItems: Array[Element])
  case class root(engagement: Engagement)
  def getSchema(): StructType ={
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[root].dataType.asInstanceOf[StructType]

    schema.printTreeString()
    schema
  }

This will print out:

root
 |-- engagement: struct (nullable = true)
 |    |-- engagementItems: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- availabilityEngagement: struct (nullable = true)
 |    |    |    |    |-- dimapraUnit: struct (nullable = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- constrained: boolean (nullable = false)
 |    |    |    |    |    |-- id: long (nullable = false)
 |    |    |    |    |    |-- label: string (nullable = true)
 |    |    |    |    |    |-- ranking: long (nullable = false)
 |    |    |    |    |    |-- _type: string (nullable = true)
 |    |    |    |    |    |-- version: long (nullable = false)
 |    |    |    |    |    |-- visible: boolean (nullable = false)

Then I modified your function by adding an extra check for the ArrayType and converting it to StructType using asInstanceOf:

  import org.apache.spark.sql.types._  
  def flattenSchema(schema: StructType, prefix: String = null):Array[Column]=
  {
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f.dataType match {
        case st: StructType => flattenSchema(st, colName)
        case at: ArrayType =>
          val st = at.elementType.asInstanceOf[StructType]
          flattenSchema(st, colName)
        case _ => Array(new Column(colName).alias(colName))
      }
    })
  }

And finally the results:

val s = getSchema()
val res = flattenSchema(s)

res.foreach(println(_))

Output:

engagement.engagementItems.availabilityEngagement.dimapraUnit.code AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.code`
engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained`
engagement.engagementItems.availabilityEngagement.dimapraUnit.id AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.id`
engagement.engagementItems.availabilityEngagement.dimapraUnit.label AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.label`
engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking`
engagement.engagementItems.availabilityEngagement.dimapraUnit._type AS `engagement.engagementItems.availabilityEngagement.dimapraUnit._type`
engagement.engagementItems.availabilityEngagement.dimapraUnit.version AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.version`
engagement.engagementItems.availabilityEngagement.dimapraUnit.visible AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.visible`
Sign up to request clarification or add additional context in comments.

8 Comments

Hi @J-kram the answer is based on the question: How can i flatten the ArrayType that contain nested StructType for example engagementItems: array (nullable = true) Your version was stopping the process of the inner elements on engagementItems thus returning the data for the availabilityEngagement. So I changed to be able to handle this case as well in order to reach the last level of dimapraUnit. So that if you try to do val newDF=SIWINSDF.select(flattenSchema(SIWINSDF.schema):_*) you will get flattened the dimapraUnit struct. Was not this your question about?
Exactly @J-kram, so did my post answers your question or not :)? I am not sure I understood
You mean to get each element inside the array?
This can't exist on the same dataframe, StructType(engagement) can accept only one DataType(engagementItems) at least this was my assumption for the schema that I created! In other words for one dataframe with the above specific schema, engagementItems should always have the same name and not array_column_X. If this is your case this schema is not valid and you need to change it. If would really helpful to provide some data example though because we do a lot of assumptions :) also please let me know if my answer is not relevant to your actual question so to remove it.
If you have a schema like this: val schema = new StructType().add("a", new StructType().add("b", IntegerType)) with this data: { "a": { "b": 1 } } this is not the same with this: { "a": { "c": 1 } } because then you dont know which of select("a.b") or select("a.c") is correct. This is your case?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.