0

I share the code that I have:

// define a case class
case class Zone(id: Int, team: String, members: Int ,name: String,  lastname: String)
        val df = Seq (
         (1,"team1", 3, "Jonh", "Doe"),
         (1,"team2", 4, "Jonh", "Doe"),
         (1,"team3", 5, "David", "Luis"),
         (2,"team4", 6, "Michael", "Larson"))
         .toDF("id", "team", "members", "name",  "lastname").as[Zone]

val df_grouped = df
   .withColumn("team_info", to_json(struct(col("team"), col("members"))))
   .withColumn("users", to_json(struct(col("name"), col("lastname"))))
   .groupBy("id")
   .agg(collect_list($"team_info").alias("team_info"), collect_list($"users").alias("users"))

df_grouped.show    
    +---+--------------------+--------------------+
    | id|           team_info|               users|
    +---+--------------------+--------------------+
    |  1|[{"team":"team1",...|[{"name":"Jonh","...|
    |  2|[{"team":"team4",...|[{"name":"Michael...|
    +---+--------------------+--------------------+

I need to remove duplicates inside column "users" because in my case if the json inside the array are exactly the same are duplicates. Is there any way to do it changing the value of that column with df.withColumn or any other approach?

3
  • provide your piece of code you tried Commented Aug 29, 2018 at 18:54
  • sorry, I will add the code. Commented Sep 1, 2018 at 13:01
  • I already figured out how to solve it. Using collect_set instead of collect_list Commented Sep 1, 2018 at 14:21

2 Answers 2

0

This is likely not the most elegant solution, but it should work:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders

val df = sc.parallelize(
  Array("[{\"name\":\"John\",\"lastName\":\"Doe\"},{\"name\":\"John\",\"lastName\":\"Doe\"},{\"name\":\"David\",\"lastName\":\"Luis\"}]")
).toDF("users")

case class Users(name: String, lastName: String)
val schema = ArrayType(Encoders.product[Users].schema)

df.withColumn("u", from_json($"users", schema))
  .select("u")
  .as[Array[Users]]
  .map(_.distinct)
  .toDF("u")
  .withColumn("users", to_json($"u"))
  .select("users")

Assuming your users will have more attributes than in your example, just add those attributes to the case class. As long as the types are simple, the Encoder should infer the schema automatically.

Sign up to request clarification or add additional context in comments.

Comments

0

You can use explode and dropDuplicates inbuild functions

spark dropDuplicates based on json array field

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.