2

I have a pyspark dataframe df like

+-----+----+------------+------------+-------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_Values     |S_values    | 
+-----+----+------------+------------+-------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|["ab",1]     | [1,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |["a","b","c"]| []         |
+-----+----+------------+------------+-------------+------------+

I would like to final create df as below,

+-----+----+------------+------------+
| Name| Age| Attribute  |      Values|
+-----+----+------------+------------+
| Bob1| 16 |  x1        |     ab     |
| Bob1| 16 |  x2        |     1      |
| Bob1| 16 |  x1        |     1      |
| Bob1| 16 |  x3        |     2      |
| Bob2| 16 |  x1        |     a      |
| Bob2| 16 |  x2        |     b      |
| Bob2| 16 |  x3        |     c      |
+-----+----+------------+------------+

basically I want to merge these 2 column and explode them into rows. With the help of pyspark array functions I was able to concat arrays and explode, but to identify difference between professional attributes and sport attributes later as they can have same names. I need a type column as well,

+-----+----+------------+------------+------------+
| Name| Age|   Attribute|       type |Value       |
+-----+----+------------+------------+------------+
| Bob1| 16 |  x1        |     1      | ab         |
| Bob1| 16 |  x2        |     1      | 1          |
| Bob1| 16 |  x1        |     2      | 1          |
| Bob1| 16 |  x3        |     2      | 2          |
| Bob2| 16 |  x1        |     1      | a          |
| Bob2| 16 |  x2        |     1      | b          |
| Bob2| 16 |  x3        |     1      | c          |
+-----+----+------------+------------+------------+  

So I thought to create a separate array columns initially as,

+-----+----+------------+------------+------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_type      |S_type      |
+-----+----+------------+------------+------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|   [1,1]    | [2,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |  [1,1,1]   |  []        |
+-----+----+------------+------------+------------+------------+

So that I can merge columns and explode with required type column as well as shown in above df. Problem is I am not able to create P_type and S_type columns dynamically. I tried below code,

new_df = df.withColumn("temp_P_type", F.lit(1))\
                .withColumn("P_type", F.array_repeat("temp_P_type",F.size("P_Attribute")))

This throws TypeError: Column is not iterable error. it also doesn't work if length of column is already extracted as another column. Can anybody help me with this or if there is any better solution to do this? Is it possible to do this as df level without going to RDD and python functions (without UDF)?

P.S. I am using spark 2.4

1
  • 1
    I would like to create df as below, This intended result (2nd table) is very confusing. do you instead only need one attribute column like 3rd table below ? Commented May 3, 2020 at 17:55

2 Answers 2

1

I would suggest using higher order function transform, with struct and array_union and then explode once and just select both using .* expansion..

df.show()
#+----+---+------------+------------+
#|Name|Age| P_Attribute|S_Attributes|
#+----+---+------------+------------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|
#|Bob2| 16|[x1, x2, x3]|          []|
#+----+---+------------+------------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union(F.expr("""transform(P_Attribute,x-> struct(x as Attribute,1 as Type))"""),\
              F.expr("""transform(S_Attributes,x-> struct(x as Attribute,2 as Type))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+
#|Name|Age|Attribute|Type|
#+----+---+---------+----+
#|Bob1| 16|       x1|   1|
#|Bob1| 16|       x2|   1|
#|Bob1| 16|       x1|   2|
#|Bob1| 16|       x3|   2|
#|Bob2| 16|       x1|   1|
#|Bob2| 16|       x2|   1|
#|Bob2| 16|       x3|   1|
#+----+---+---------+----+

UPDATE:

df.show()

#+----+---+------------+------------+---------+--------+
#|Name|Age| P_Attribute|S_Attributes| P_Values|S_values|
#+----+---+------------+------------+---------+--------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|  [ab, 1]|  [1, 2]|
#|Bob2| 16|[x1, x2, x3]|          []|[a, b, c]|      []|
#+----+---+------------+------------+---------+--------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union\
               (F.expr("""transform(arrays_zip(P_Attribute,P_Values),x->\
                          struct(x.P_Attribute as Attribute,1 as Type,string(x.P_Values) as Value))"""),\
                F.expr("""transform(arrays_zip(S_Attributes,S_Values),x->\
                          struct(x.S_Attributes as Attribute,2 as Type,string(x.S_Values) as Value))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+-----+
#|Name|Age|Attribute|Type|Value|
#+----+---+---------+----+-----+
#|Bob1| 16|       x1|   1|   ab|
#|Bob1| 16|       x2|   1|    1|
#|Bob1| 16|       x1|   2|    1|
#|Bob1| 16|       x3|   2|    2|
#|Bob2| 16|       x1|   1|    a|
#|Bob2| 16|       x2|   1|    b|
#|Bob2| 16|       x3|   1|    c|
#+----+---+---------+----+-----+
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the answer @Mohammand Murtaza Hashmi. It solves the problem. Is it possible to add multiple columns to F.expr like F.expr("""transform(S_Attributes,S_value, x, y -> struct(x as Attribute, y as Vaue, 2 as Type))""")))
so transform can only accept one arguement, or (x,i) where i is the index of x. so for that to work u would have to combine S_Attributes and S_value and then traverse it using transform. If S_value is also an array u could do this F.expr("""transform(arrays_zip(S_Attributes,S_value), x -> struct(x.S_Attributes as Attribute, x.S_value as Value, 2 as Type))""")
if you could provide S_value column and P_value column, id be happy to update solution with them @NachiketKate
1

You can do something as following. First collect P_attributes and S_attributes into a single Attributes column, then do posexplode on it, this should give the type column that refers to the source of Attributes (P or S) as you needed. Finally explode the Attributes column to flatten all the attributes.

import pyspark.sql.functions as f

df = spark.createDataFrame([
    ['Bob1', 16, ['x1', 'x2'], ['x1', 'x3']],
    ['Bob2', 16, ['x1', 'x2', 'x3'], []]],
    ['Name', 'Age', 'P_Attribute', 'S_Attributes'])

df.withColumn('Attributes', f.array('P_Attribute', 'S_Attributes'))\
  .select('Name', 'Age', f.posexplode('Attributes').alias('type', 'Attribute'))\
  .withColumn('Attribute', f.explode('Attribute'))\
  .show()

+----+---+----+---------+
|Name|Age|type|Attribute|
+----+---+----+---------+
|Bob1| 16|   0|       x1|
|Bob1| 16|   0|       x2|
|Bob1| 16|   1|       x1|
|Bob1| 16|   1|       x3|
|Bob2| 16|   0|       x1|
|Bob2| 16|   0|       x2|
|Bob2| 16|   0|       x3|
+----+---+----+---------+

2 Comments

Thanks for answer @Psidom. Is it possible to use f.array() in case of having P_values and S_values columns as values of these attributes?
Not entirely sure what you need. But probably f.posexplode(f.explode(f.array(f.create_map('P_Attribute', 'P_values'), f.create_map('S_Attributes', 'S_values')))).alias('type', 'Attribute') If attributes columns don't have duplicated elements.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.