2

Spark version 3.0

I have a dataframe like this

+-------------------------------------------------+
|named_val                                        |
+-------------------------------------------------+
|[[Alex, 1], [is, 1], [a, 1], [good, 1], [boy, 1]]|
|[[Bob, 1], [Bob, 1], [bad, 1], [Bob, 1]]         |
+-------------------------------------------------+

I need to create a map with count of unique values as shown below

Expected output

+-------------------------------------------------+
|named_val                                        |
+-------------------------------------------------+
|{Alex->1, is->1, a->1, good->1, boy->1}          |
|{Bob->3, bad->1}                                 |
+-------------------------------------------------+

To reproduce the code use

df = spark.createDataFrame([([['Alex', 1], ['is', 1], ['a', 1], ['good', 1], ['boy', 1]],),([['Bob', 1], ['Bob', 1], ['bad', 1], ['Bob', 1]],)],['named_val'])

2 Answers 2

2

What about our old friend UDF? se/de cost should be low in comparison to shuffling:

from pyspark.sql.functions import udf

def sum_merge(ar):
  d = dict()
  for i in ar:
    k, v = i[0], int(i[1])    
    d[k] = d[k] + v if k in d else v
  return d

sum_merge_udf = udf(sum_merge)

df.select(sum_merge_udf("named_val").alias("named_val"))

# +----------------------------------+
# |named_val                         |
# +----------------------------------+
# |{a=1, Alex=1, is=1, boy=1, good=1}|
# |{bad=1, Bob=3}                    |
# +----------------------------------+
Sign up to request clarification or add additional context in comments.

11 Comments

yes is comes with the cost of serialization/deserialization although the last versions of PySpark have done significant improvements towards that direction. Although that cost will be much lower than any shuffle since we still keep the narrow transformations. The optimal case would be this df.withColumn("new_map", expr("aggregate(named_val, map(), (acc, i) -> map_concat(acc, map(i[0], element_at(acc, i[0]) + i[0])))")) which I didn't manage to make it work. Maybe is a challenge for someone else ;)
I was also struggling with higher order functions to make it work but no luck with solution....will update the performance matrix using this udf
i have been trying with no success too.. i will resort to udf as well :)
@anky in fact I have realized that merge_concat allows identical keys, for instance will not replace them. Here I faced the same issue
it does for seperate arrays though i am not sure how it would work here. may be when i get some free time I will play more, you can too and let me know(would be interested )
|
2

In scala, but python version will be very similar:

val df =  Seq(Seq(("Alex",1),("is",1),("a",1),("good",1),("boy",1)),Seq(("Bob",1),("Bob",1),("bad",1),("Bob",1))).toDF()
df.show(false)
+-------------------------------------------------+
|value                                            |
+-------------------------------------------------+
|[[Alex, 1], [is, 1], [a, 1], [good, 1], [boy, 1]]|
|[[Bob, 1], [Bob, 1], [bad, 1], [Bob, 1]]         |
+-------------------------------------------------+


df.withColumn("id",monotonicallyIncreasingId)
.select('id,explode('value))
.select('id,'col.getField("_1").as("val"))
.groupBy('id,'val).agg(count('val).as("ct"))
.select('id,map('val,'ct).as("map"))
.groupBy('id).agg(collect_list('map))
.show(false)

+---+-----------------------------------------------------------+
|id |collect_list(map)                                          |
+---+-----------------------------------------------------------+
|0  |[[is -> 1], [Alex -> 1], [boy -> 1], [a -> 1], [good -> 1]]|
|1  |[[bad -> 1], [Bob -> 3]]                                   |
+---+-----------------------------------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.