0

I have a utility which is working fine for parsing simple JSONs, but cross joining in case multiple array[structs] is present in the JSON

I have tried distinct() or dropDuplicates() as well to remove duplicates which is happening due to the cross join that I have included in the code, but thats returning empty DF..

def flattenDataFrame(df: DataFrame): DataFrame = {

var flattenedDf: DataFrame = df
if (isNested(df)) {
  val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(df.schema)
  var simpleColumns: List[Column] = List.empty[Column]
  var complexColumns: List[Column] = List.empty[Column]

  flattenedSchema.foreach {
    case (col, isComplex) => {
      if (isComplex) {
        complexColumns = complexColumns :+ col
      } else {
        simpleColumns = simpleColumns :+ col
      }
    }
  }

  var crossJoinedDataFrame = df.select(simpleColumns: _*)
  complexColumns.foreach(col => {
    crossJoinedDataFrame = crossJoinedDataFrame.crossJoin(df.select(col))
    crossJoinedDataFrame = flattenDataFrame(crossJoinedDataFrame)
  })
  crossJoinedDataFrame
} else {
  flattenedDf
}
  }

private def flattenSchema(schema: StructType, prefix: String = null): Array[(Column, Boolean)] = {

schema.fields.flatMap(field => {

  val columnName = if (prefix == null) field.name else prefix + "." + field.name
  field.dataType match {
    case arrayType: ArrayType => {
      val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)](((explode_outer(col(columnName)).as(columnName.replace(".", "_"))), true))
      cols
      }
    case structType: StructType => {
      flattenSchema(structType, columnName)
    }
    case _ => {
      val columnNameWithUnderscores = columnName.replace(".", "_")
      val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
      Array(((col(columnName).as(columnNameWithUnderscores, metadata)), false))
    }
  }
}).filter(field => field != None)
}

def isNested(df: DataFrame): Boolean = {
df.schema.fields.flatMap(field => {
  field.dataType match {
    case arrayType: ArrayType => {
      Array(true)
    }
    case mapType: MapType => {
      Array(true)
    }
    case structType: StructType => {
      Array(true)
    }
    case _ => {
      Array(false)
    }
  }
}).exists(b => b)
}

A sample JSON in which I am facing the issue:

[
    {
        "id": "0001",
        "type": "donut",
        "name": "Cake",
        "ppu": 0.55,
        "batters":
            {
                "batter":
                    [
                        { "id": "1001", "type": "Regular" },
                        { "id": "1002", "type": "Chocolate" },
                        { "id": "1003", "type": "Blueberry" },
                        { "id": "1004", "type": "Devil's Food" }
                    ]
            },
        "topping":
            [
                { "id": "5001", "type": "None" },
                { "id": "5002", "type": "Glazed" },
                { "id": "5005", "type": "Sugar" },
                { "id": "5007", "type": "Powdered Sugar" },
                { "id": "5006", "type": "Chocolate with Sprinkles" },
                { "id": "5003", "type": "Chocolate" },
                { "id": "5004", "type": "Maple" }
            ]
    },
    {
        "id": "0002",
        "type": "donut",
        "name": "Raised",
        "ppu": 0.55,
        "batters":
            {
                "batter":
                    [
                        { "id": "1001", "type": "Regular" }
                    ]
            },
        "topping":
            [
                { "id": "5001", "type": "None" },
                { "id": "5002", "type": "Glazed" },
                { "id": "5005", "type": "Sugar" },
                { "id": "5003", "type": "Chocolate" },
                { "id": "5004", "type": "Maple" }
            ]
    }
]

1 Answer 1

1

Solution without join and more than that, no cross-join which is your problem:

Sorry for the formatting, can't really get it to format well for stack-overflow

def flattenDataFrame(df: DataFrame): DataFrame = {

val flattenedDf: DataFrame = df

if (isNested(df)) {
  val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(flattenedDf.schema)

  var simpleColumns: List[Column] = List.empty[Column]
  var complexColumns: List[Column] = List.empty[Column]

  flattenedSchema.foreach {
    case (col, isComplex) =>
      if (isComplex) {
        complexColumns = complexColumns :+ col
      } else {
        simpleColumns = simpleColumns :+ col
      }
  }

  val complexUnderlyingCols = complexColumns.map { column =>
    val name = column.expr.asInstanceOf[UnresolvedAttribute].name
    val unquotedColName = s"${name.replaceAll("`","")}"
    val explodeSelectColName = s"`${name.replaceAll("`","")}`"
    (unquotedColName, col(name).as(unquotedColName), explode_outer(col(explodeSelectColName)).as(unquotedColName))
  }

  var joinDataFrame = flattenedDf.select(simpleColumns ++ complexUnderlyingCols.map(_._2): _*)

  complexUnderlyingCols.foreach { case (name, tempCol, column) =>
    val nonTransformedColumns = joinDataFrame.schema.fieldNames.diff(List(name)).map(fieldName => s"`${fieldName.replaceAll("`", "")}`").map(col)
    joinDataFrame = joinDataFrame.select(nonTransformedColumns :+ column :_*)
  }
  flattenDataFrame(joinDataFrame)
} else {
  flattenedDf
}

}

private def flattenSchema(schema: StructType, prefix: String = null, level: Int = 0): Array[(Column, Boolean)] = { val unquotedPrefix = if (prefix != null) prefix.replace("", "") else null println(level) schema.fields.flatMap(field => { val fieldName = field.name val columnName = if (level == 0) { s"$fieldName" } else { val fullName = s"$unquotedPrefix.$fieldName" val x = fullName.split('.').reverse.zipWithIndex.reverse.foldLeft(new StringBuilder("")){ case (builder, (fieldPart, index)) => if(index > level) { builder.append(s".$fieldPart") } else if (index == level) { builder.append(s".$fieldPart") } else { builder.append(s".$fieldPart") } } x.replace(1,2,"").toString() } val unquotedColumnName = columnName.replace("", "") field.dataType match { case _: ArrayType => val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)]((col(columnName), true)) // We pass only the column as we'll generate explode function while expanding the DF cols case structType: StructType => flattenSchema(structType, columnName, level + 1) case _ => val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build() Array((col(columnName).as(unquotedColumnName, metadata), false)) } }) }

def isNested(df: DataFrame): Boolean = { df.schema.fields.flatMap(field => {

field.dataType match { case _: ArrayType => Array(x = true) case _: MapType => Array(x = true) case _: StructType => Array(x = true) case _ => Array(x = false) } }).exists(b => b) }

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.