0

I have a json file with the below data

{
    "@odata.context": "XXXX",
    "value": [
        {
            "@odata.etag": "W/\"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn\"",
            "E_No": 345345,
            "G_Code": "007",
            "G_2_Code": ""
        },
        {
            "@odata.etag": "W/\"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn\"",
            "E_No": 234543,
            "G_Code": "008",
            "G_2_Code": ""
        }
    ],
    "@odata.nextLink": "XXXX"
}

I am trying to flatten this in Databricks using Scala. I created a dataframe DF

val DF= spark.read.json(path)

I want to feed this as json and I need a dataframe created with just E_No,G_Code and G_2_Code. Rest of the columns can be deleted form the dataframe

I tried to feed this json into flattening code that I found in one of the blogs

def flattenDataframe(df: DataFrame): DataFrame = {

    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    val length = fields.length
    
    for(i <- 0 to fields.length-1){
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
          val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
          return flattenDataframe(explodedDf)
        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
          val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
         val explodedf = df.select(renamedcols:_*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }

And when I ran the below command, I get error

val flattendedJSON = flattenDataframe(DF)

extraneous input '@' expecting {'(', 'COLLECT', 'CONVERT', 'DELTA', 'HISTORY', 'MATCHED', 'MERGE', 'OPTIMIZE', 'SAMPLE', 'TIMESTAMP', 'UPDATE', 'VERSION', 'ZORDER', 'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLONE', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COPY', 'COPY_OPTIONS', 'COST', 'CREATE', 'CREDENTIALS', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DAY', 'DBPROPERTIES', 'DEEP', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DROP', 'ELSE', 'ENCRYPTION', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FILES', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMAT_OPTIONS', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'HOUR', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MINUTE', 'MONTH', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PATTERN', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SECOND', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHALLOW', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'YEAR', '+', '-', '*', 'DIV', '~', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, EXPONENT_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 0)

== SQL ==
@odata.context
^^^

I am guessing it somehow doesn't like the '@odata' columns which I don't need either. I need to get rid of that column and then see if this flattening works.

If there is any better way of flattening this other than the flattening code I am using, please let me know.

Thanks

2
  • quick question, do you also don't need @odata.context? can you paste your expected output? Commented Aug 19, 2020 at 0:17
  • I do not need any @odata fields. All I need is a dataframe with E_No, D_Code and D_2_Code Commented Aug 19, 2020 at 0:21

2 Answers 2

1

Explode the nested array JSON and select the fields you want then write to file in JSON format.

val jsonDF= spark.read.json(path)

val explodeColName = "value" // name of the column we want to explode
val flattenColName = explodeColName + "_flat" // temp name

val listOfColsFromArrayType =
  jsonDF.schema
    .find(
      s => s.name == explodeColName && s.dataType.isInstanceOf[ArrayType])
    .map(
      _.dataType
        .asInstanceOf[ArrayType]
        .elementType
        .asInstanceOf[StructType]
        .names
    )

val filterColList =
  listOfColsFromArrayType.getOrElse(throw new Exception("explode Col Name not found")) // or handle the error as needed

val flattenFilterCols = filterColList.map { c =>
  if (c.contains(".")) { col(s"$flattenColName.`$c`") } else {
    col(s"$flattenColName.$c")
  }
}

val flatten = jsonDF
  .select(explode(col(explodeColName)).as(flattenColName))
  .select(flattenFilterCols: _*)
   
    flattenDF
      .write
      .json(outputPath)

The result will be

{"@odata.etag":"W/\"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn\"","E_No":345345,"G_2_Code":"","G_Code":"007"}
{"@odata.etag":"W/\"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn\"","E_No":234543,"G_2_Code":"","G_Code":"008"}
Sign up to request clarification or add additional context in comments.

9 Comments

Thank you @vkt for responding. Actually there are more than 50 columns, I mentioned 3 columns for the context. Is there a way you can make it work without mentioning the column, making it generic. As I need to apply this logic to multiple tables.
yes, if you pass a list of columns that you want. they name of array object ("value") here and also the columns you want to select.
@kranny check the updated answer. I assume you have those 50 cols name list based on some filter or as an input arguments.
those 50 columns need to be extracted from the jsonDF as that is the key thing. I need the filterColList from your code to be extracted from jsonDF automatically without naming them. I am new to scala, so trying to get that exact syntax/function
@kranny what would be the filter then? in your question, you said you only want three columns? if you don't want to hard code then we get all 4 columns like (@odata.etag,E_No,G_Code and G_2_Code), update your question or clarify your requirements.
|
0

I made few changes to your method and now it is working.

Please note that, I haven't renamed any of the underlying column. If you wanted to fetch that in further processing use backtique (`)

test data

 DF.show(false)
    DF.printSchema()

    /**
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |@odata.context|@odata.nextLink|value                                                                                                                                                                                         |
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |XXXX          |XXXX           |[[W/"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn", 345345, , 007], [W/"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn", 234543, , 008]]|
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- @odata.context: string (nullable = true)
      * |-- @odata.nextLink: string (nullable = true)
      * |-- value: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- @odata.etag: string (nullable = true)
      * |    |    |-- E_No: long (nullable = true)
      * |    |    |-- G_2_Code: string (nullable = true)
      * |    |    |-- G_Code: string (nullable = true)
      *
      */

flatten the nested columns of type array and struct


    def flattenDataframe(df: DataFrame): DataFrame = {

      val fields = df.schema.fields
      val fieldNames = fields.map(x => x.name)
      val length = fields.length

      for(i <- 0 to fields.length-1){
        val field = fields(i)
        val fieldtype = field.dataType
        val fieldName = field.name
        fieldtype match {
          case arrayType: ArrayType =>
            val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
            val fieldNamesAndExplode = fieldNamesExcludingArray.map(c => s"`$c`") ++
              Array(s"explode_outer($fieldName) as $fieldName")
            val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
            return flattenDataframe(explodedDf)
          case structType: StructType =>
            val childFieldnames = structType.fieldNames.map(childname => s"$fieldName.`$childname`")
            val newfieldNames = fieldNames.filter(_!= fieldName).map(c => s"`$c`") ++ childFieldnames
            val renamedcols = newfieldNames.map(x => col(x))
            val explodedf = df.select(renamedcols:_*)
            return flattenDataframe(explodedf)
          case _ =>
        }
      }
      df
    }

    val flattendedJSON = flattenDataframe(DF)
    flattendedJSON.show(false)
    flattendedJSON.printSchema()

    /**
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      * |@odata.context|@odata.nextLink|@odata.etag                                                                 |E_No  |G_2_Code|G_Code|
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      * |XXXX          |XXXX           |W/"JzQ0OzlxaDNzLys1WXBPbWFXaE5MbFdKbVpNYjMrWDQ1MmJSeGdxVVhrTVRZUXc9MTswMDsn"|345345|        |007   |
      * |XXXX          |XXXX           |W/"JzQ0O0ZNWkF2OGd1dVE2L21OQTdKR2g4YU05TldKMERpMUpMWTRSazFKQzZuTDQ9MTswMDsn"|234543|        |008   |
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      *
      * root
      * |-- @odata.context: string (nullable = true)
      * |-- @odata.nextLink: string (nullable = true)
      * |-- @odata.etag: string (nullable = true)
      * |-- E_No: long (nullable = true)
      * |-- G_2_Code: string (nullable = true)
      * |-- G_Code: string (nullable = true)
      */

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.