1

This is my current schema :

 |-- _id: string (nullable = true)
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- adr1: struct (nullable = true)
 |    |    |    |-- resid: string (nullable = true)

And this is what I want to obtain :

 |-- _id: string (nullable = true)
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- resid: string (nullable = true)

I am using the java api.

3
  • flatMap on RDD or Dataset :) Commented Jun 26, 2017 at 17:23
  • 1
    @T.Gawęda When I saw it I thought the same, but it took me few hours to have a solution (first the test data and then the solution). It's a nice exercise I'd suggest you giving a try. Commented Jun 26, 2017 at 21:11
  • 1
    @JacekLaskowski It's a nice exercise, but I can't spent too much time in work to write answers ;) So I just wanted to give a hint that may help the author :) Commented Jun 26, 2017 at 21:16

2 Answers 2

3

You can use map transformation:

import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

Encoder<PeopleFlatten> peopleFlattenEncoder = Encoders.bean(PeopleFlatten.class);

people
  .map(person -> new PeopleFlatten(
      person.get_id(),
      person.getPerson().stream().map(p ->
        new PersonFlatten(
          p.getName(),
          p.getAdr1().getResid()
        )
      ).collect(Collectors.toList())
    ),
    peopleFlattenEncoder
  );

where PeopleFlatten and PersonFlatten are POJO corresponding to expected schema in question.

public class PeopleFlatten implements Serializable {
   private String _id;
   private List<PersonFlatten> person;
   // getters and setters
}

public class PersonFlatten implements Serializable {
   private String name;
   private String resid;
   // getters and setters
}
Sign up to request clarification or add additional context in comments.

Comments

2

If it were Scala, I'd do the following, but since the OP asked about Java, I'm offering it as a guidance only.

Solution 1 - Memory-Heavy

case class Address(resid: String)
case class Person(name: String, adr1: Address)

val people = Seq(
  ("one", Array(Person("hello", Address("1")), Person("world", Address("2"))))
).toDF("_id", "persons")

import org.apache.spark.sql.Row
people.as[(String, Array[Person])].map { case (_id, arr) => 
  (_id, arr.map { case Person(name, Address(resid)) => (name, resid) })
}

This approach however is quite memory expensive as the internal binary rows are copied to their JVM objects that puts the environment to face OutOfMemoryErrors.

Solution 2 - Expensive but Language-Independent

The other query with worse performance (but less memory requirement too) could use explode operator to destructure the array first that would give us an easy access to internal structs.

val solution = people.
  select($"_id", explode($"persons") as "exploded"). // <-- that's expensive
  select("_id", "exploded.*"). // <-- this is the trick to access struct's fields
  select($"_id", $"name", $"adr1.resid").
  select($"_id", struct("name", "resid") as "person").
  groupBy("_id"). // <-- that's expensive
  agg(collect_list("person") as "persons")
scala> solution.printSchema
root
 |-- _id: string (nullable = true)
 |-- persons: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- resid: string (nullable = true)

The nice thing about the solution is that it has almost nothing related to Scala or Java (so you could use it right away regardless of the language of your choice).

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.