2

I have initially a DataFrame like follows:

Key     Emails                      PassportNum     Age
0001    [Alan@gmail,Alan@hotmail]   passport1       23
0002    [Ben@gmail,Ben@hotmail]     passport2       28

I need to apply a function over each Email, something dummy like add "_2" at the end for example, the operation is not relevant. So I will explode this column like this:

val dfExplode = df.withColumn("Email",explode($"Emails")).drop("Emails")

Now I will have a dataframe like this:

Key     Email           PassportNum     Age
0001    Alan@gmail      passport1       23
0001    Alan@hotmail    passport1       23
0002    Ben@gmail       passport2       28
0002    Ben@hotmail     passport2       28

I apply any change on passports and then what I want to have is again this:

Key     Emails                          PassportNum     Age
0001    [Alan_2@gmail,Alan_2@hotmail]   passport1       23
0002    [Ben_2@gmail,Ben_2@hotmail]     passport2       28

The option I was considering was this:

dfOriginal = dfExploded.groupBy("Key","PassportNum","Age").agg(collect_set("Email").alias("Emails"))

In this case it may not be such a bad approach. But in my real case I perform the explode over a single column and I have another 20 columns like PassportNum, Age... which are going to be duplicated.

This means that I will need to add around 20 columns in the groupBy, when I really can perform the group by over a single one, for example Key which is unique.

I was thinking to add this columns in the agg as well like this:

dfOriginal = dfExploded.groupBy("Key").agg(collect_set("Email").alias("Emails"),collect_set("PassportNum"),collect_set("Age"))

But I don't want them to be in a single element array.

Is it any way to make an aggregate without any collect_*? Is there any simpler approach to undo the explode?

2
  • Shouldn't you be using first for PassportNum and Age since they will have the same values anyway after the explode ? Commented Apr 2, 2018 at 14:00
  • You mean after the collect use first? Commented Apr 2, 2018 at 14:03

3 Answers 3

4

Assuming that you want to stay in the DataFrame world, it might be worth it to define a UDF that manipulates your input array. Something that takes a Seq as an input and returns a modified one. e.g.

def myUdf = udf[Seq[String], Seq[String]] { 
    inputSeq => inputSeq.map(elem => elem + "_2")
}

df.withColumn("Emails", myUdf($"Emails"))

Even better, you could pass the exact logic as a parameter:

def myUdf(myFunc: String => String) = udf[Seq[String], Seq[String]] {
    inputSeq => inputSeq.map(myFunc)
}

df.withColumn("Emails", myUdf((email: String) => email + "_XYZ")($"Emails"))
Sign up to request clarification or add additional context in comments.

7 Comments

Thanks for the feedback. I prefer to avoid that since the content of the array is a Map<string,string> which I will need to convert each field of the map into a column
Not sure if you understand the issue exactly. Can't you pass as an input a Seq[Map[String, String]]? After that you can do anything in just Scala
I will try that
I think I will follow your solution. Could you please give me an example if I pass to the udf something like Seq[Map[String, String]] how could I access a specific field in the map to apply another udf over it? So something like if the map has field1,field2,field3 => field1,field2,udf2(field3) I am not sure if this is even possible
At that point your inputSeq will be a Seq[Map[String, String]]. So something like this: inputSeq.map(_.get("myKey")) should help getting you started. It will return a Seq[String] where the strings are all the Map values for your "myKey"
|
1

Another option except the groupby on all common fields is to do the explode on a separate temporary dataframe then drop the exploded column from the original and join the re-grouped by

However it might be simpler to write a UDF that would manipulate the array directly without going into explode and gather

def handleEmail(emails: mutable.WrappedArray[String]) = {
     emails.map(dosomething)
  }

context.udf.register("handleEmailsm"m (em:mutabe.WrappedArray[String]) => handleEmail(em))

1 Comment

Thanks Arnon, But I prefer to use the UDF approach you mention since the content of the array is a Map<string,string> which I will need to convert each field of the map into a column
1

This means that I will need to add around 20 columns in the groupBy, when I really can perform the group by over a single one, for example Key which is unique.

You can skip writing each column names by doing a simple trick as below where you can use all of the column names ( or selected ones) except for the exploding column names

import org.apache.spark.sql.functions._
val dfExploded = df.withColumn("Emails", explode($"Emails"))

val groupColumns = dfExploded.columns.filterNot(_.equalsIgnoreCase("Emails"))

val dfOriginal = dfExploded.groupBy(groupColumns.map(col): _*).agg(collect_set("Emails").alias("Emails"))

Creating a struct column

You can create a single column by using struct inbuilt function and use that single column in groupBy as

val groupColumns = df.columns.filterNot(_.equalsIgnoreCase("Emails"))

import org.apache.spark.sql.functions._
val dfExploded = df.select(struct(groupColumns.map(col): _*).as("groupedKey"), col("Emails"))
  .withColumn("Emails", explode($"Emails"))

which would give you

+-------------------+------------+
|groupedKey         |Emails      |
+-------------------+------------+
|[0001,passport1,23]|Alan@gmail  |
|[0001,passport1,23]|Alan@hotmail|
|[0002,passport2,28]|Ben@gmail   |
|[0002,passport2,28]|Ben@hotmail |
+-------------------+------------+

and then use the groupedKey in groupBy and again separate them in select

val dfOriginal = dfExploded.groupBy("groupedKey").agg(collect_set("Emails").alias("Emails"))
  .select($"groupedKey.*", $"Emails")

3 Comments

Thanks for the trick Ramesh. But actually I don't want to avoid that solution because of the length of the query, but because of how expensive could be a groupBy over 20 columns. As I understand the less columns in the groupBy, the faster, isn't?
Thanks I will try this
For a use case I needed to do the similar work but using this method I faced an error due to the structure of mapping column that existed in my data. Any thought on the data which we have a column of map string structure how this work can be done?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.