1

I have a structure like the following in orc/parquet format.

{
  "Register": {
    "Persons": [
      {
        "Name": "Name1",
        "Age": 12,
        "Address": [
          {
            "Apt": "Apt1"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 1234
          }
        ]
      },
      {
        "Name": "Name2",
        "Age": 14,
        "Address": [
          {
            "Apt": "Apt2"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 55555
          }
        ]
      }

    ]
  }
}

I need to create a new DF based on condition Apt= Apt1 and Change Phone number of that entry to 7777. NB: Need to keep the same structure. I have tried out couple methods in scala-spark, but not able to update the nested array struct type. Any expert advise will be helpful.

Update: Following this link, i am able to get the named_struct variables. When it comes to array, I am not able to get the answer. https://kb.databricks.com/data/update-nested-column.html#how-to-update-nested-columns

3
  • Would each top-level-entry of your dataframe be the "Register" or a "Person"? Could you please post the output of df.printSchema()? Commented Sep 21, 2019 at 16:51
  • |-- Register: struct (nullable = true) | |-- Persons: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- Address: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- Apt: string (nullable = true) | | | |-- Age: long (nullable = true) | | | |-- Name: string (nullable = true) | | | |-- Phone: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- PhoneNum: long (nullable = true) Commented Sep 22, 2019 at 4:14
  • @werner This is sample thing, I have given the Schema . I tried recreate a DF as similar to the following from existing DF, but not able to replace the existing values in Nested Array. selectExp named_struct(Register,named_struct{Persons,array(named_struct{"Name", Register.Persons.Name, "Phone",Register.Persons.Phone})). as named_struct """).select($"named_struct.Register") Commented Sep 22, 2019 at 4:16

2 Answers 2

1

The idea is to use case classes to convert the nested structure into a set of simple Scala classes that can be more easily handled - or in Spark terms: use a (typed) Dataset instead of a untyped DataFrame.

case class Phone(var PhoneNum:String)
case class Apt(Apt:String)
case class Person(Name: String, Age: Long, Address:Array[Apt], Phone:Array[Phone])
case class Register(Persons:Array[Person])
case class TopLevel(Register:Register)

Convert the dataframe into a dataset and then apply a map call on each entry of the dataset:

val df = ...
val ds = df.as[TopLevel]
val transformed = ds.map(tl => {
  for( p <- tl.Register.Persons) {
    if(p.Address.contains(Apt("Apt1"))) p.Phone.transform(_ => Phone("7777"))
  }
  tl
})
transformed.toJSON.show(false)

prints:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Register":{"Persons":[{"Name":"Name1","Age":12,"Address":[{"Apt":"Apt1"}],"Phone":[{"PhoneNum":"7777"}]},{"Name":"Name2","Age":14,"Address":[{"Apt":"Apt2"}],"Phone":[{"PhoneNum":"55555"}]}]}}|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

A remark on the data structure/schema in the question:

As the question is asked, a dataframe of registers is used. That means that each entry of the dataframe contains a single register. It would be more intuitively if the dataframe contained a list of persons and if this list of persons was called "Register". This would lead to a much easier structure of the data. In this case, the classes TopLevel and Register could be omitted.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks. The second part, as you mentioned it may not be first element of the array.
What exactly would be the expected behaviour if one person had more than one address and/or more than one phone number?
Thanks a lot Werner . Logic works well. But I have a problem like , I can't have a fixed schema in case class, that will be changing in actual scenario.
You might want to rewrite your question or ask a new one that describes more in detail what is fixed and what the conditions are when the data should be updated.
0

The first step is the mapping of your json in dataframe, Then we create a custom UDF that takes in input the Apt column, the PhoneNum column and the new phone number allows to change the phone number if Apt = Apt1

  def main(args: Array[String]): Unit = {


    val inputJson = "{\"Register\":{\"Persons\":[{\"Name\":\"Name1\",\"Age\":12,\"Address\":[{\"Apt\":\"Apt1\"}],\"Phone\":[{\"PhoneNum\":1234}]},{\"Name\":\"Name2\",\"Age\":14,\"Address\":[{\"Apt\":\"Apt2\"}],\"Phone\":[{\"PhoneNum\":55555}]}]}}"


    import sparkSession.implicits._

    val outputDataFrame = sparkSession.read.option("multiline", true).option("mode","PERMISSIVE")
      .json(Seq(inputJson).toDS)
      .select(
          // First layer mapping
          col("Register").getItem("Persons").as("Persons")
        )
      .withColumn("Persons", explode(col("Persons")))
        .select(
          // Second layer mapping
          col("Persons").getItem("Name").as("Name"),
          col("Persons").getItem("Age").as("Age"),
          col("Persons").getItem("Address").as("Address"),
          col("Persons").getItem("Phone").as("Phone")
        )
        .select(col("Name"),col("Age"),
          // last layer mapping
          col("Address").getItem("Apt").as("Apt"),
          col("Phone").getItem("PhoneNum").as("PhoneNum"))
        .withColumn("Apt", explode(col("Apt")))
        .withColumn("PhoneNum", explode(col("PhoneNum")))
        .withColumn("PhoneNum", changePhoneNumUDF(col("Apt"), col("PhoneNum"), lit(987654))) // apply user defined function to change PhoneNume according to Apt

    outputDataFrame.show


  }
  def changePhoneNum(Apt : String, oldPhoneNum : Long ,NewPhoneNum : Long) : Long = Apt match {
    case "Apt1" => NewPhoneNum
    case _ => oldPhoneNum
  }
  val changePhoneNumUDF = udf(changePhoneNum _)
}

Output :

+-----+---+----+--------+
| Name|Age| Apt|PhoneNum|
+-----+---+----+--------+
|Name1| 12|Apt1|  987654|
|Name2| 14|Apt2|   55555|
+-----+---+----+--------+

2 Comments

SRV, did it help you ?
Thanks Simba for the input; for me explode doesn't fit, I need to keep the same JSON kind nested structure

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.