0

Using Spark 2.3.2.

I am trying to use the values of some columns of a DataFrame and put them into an existing JSON structure. Assuming I have this DataFrame:

val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")

// used as key for nested json structure
val app = "appX"

Basically, I would like to get from this column

{
  "foo": "bar",
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    }
  }
}

to this:

{
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    },
    "appX": {
      "p": "10",
      "o": "1337"
    }
  }
}

based on the columns p and o of the DataFrame.

I have tried:

def process(inputDF: DataFrame, appName: String): DataFrame = {
  val res = inputDF
    .withColumn(appName, to_json(expr("(p, o)")))
    .withColumn("meta", struct(get_json_object('key, "$.meta")))
    .selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
    .select(to_json('myStruct).as("newMeta"))

  res.show(false)
  res
}

val resultDF = process(testDF, app)

val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)

StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")

But this assertion is not matching as I can't

  • get the content of appX into the same level of the other two apps
  • do not know how to properly handle quotation marks, and
  • do not know how to rename "col1" into "meta".

The test fails with:

Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual   :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
0

1 Answer 1

1
  1. Extract meta content
  2. Convert p,o column into map data type. map(lit(appX),struct($"p",$"o"))
  3. Then use map_concat function to concat data.

Check below code.

scala> testDF.show(false)
+---------------------------------------------------------------------------------+---+----+
|key                                                                              |p  |o   |
+---------------------------------------------------------------------------------+---+----+
|{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}|10 |1337|
+---------------------------------------------------------------------------------+---+----+

Create schema to convert string to json.

scala> val schema = new StructType().add("foo",StringType).add("meta",MapType(StringType,new StructType().add("p",StringType).add("o",StringType)))

Print Schema

scala> schema.printTreeString
root
 |-- foo: string (nullable = true)
 |-- meta: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- p: string (nullable = true)
 |    |    |-- o: string (nullable = true)
val appX = "appX"

testDF
.withColumn("key",from_json($"key",schema)) // convert json string to json using predefined schema.
.withColumn(
    "key",
    struct(
        $"key.foo", // foo value from key column.
        map_concat(
            $"key.meta", // extracting meta from key column.
            map(
                lit(appX), // Constant appX value
                struct($"p",$"o") // wrapping p, o values into struct.
            ) // converting appX,p,o into map(appX -> (p,o))
        )
        .as("meta") // giving alias to match existing meta in key.
    ) // using struct to combine foo, meta columns.
)
.select(to_json(struct($"key")).as("json_data")) // converting key value into json format.
.show(false)

Final Output

+-----------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------+
|{"key":{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}}|
+-----------------------------------------------------------------------------------------------------------------+

Spark Version >= 2.4.0

With UDF & Case class help.

Define Case class to hold p & o column values

scala> case class PO(p:String,o:String)

Define UDF to concat map.

scala> val map_concat = udf((mp:Map[String,PO],mpa:Map[String,PO]) => mp ++ mpa)
scala> df
.withColumn("key",from_json($"key",schema))
.withColumn(
    "key",
    to_json(
        struct(
            $"key.foo",
            map_concat(
                $"key.meta",
                map(
                    lit(app),
                    struct($"p",$"o")
                )
            ).as("meta")
        )
    )
)
.show(false)

Final Output

+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|key                                        |p  |o   |newMap                                                                                                   |
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|[bar,Map(app1 -> [2,100], app2 -> [5,200])]|10 |1337|{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}|
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.