0

I have a dataframe where I need to convert rows of the same group to columns. basically pivot these. below is my df.

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

I need the resultant data for the above as..

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

I tried to groupBy and pivot() function, but its throwing error says large pivot values found. Is there any way to get the result without using the pivot() function..any help is greatly appreciated. thanks.

6
  • 1
    You probably have more than 10000 distinct values in the pivot column, which is the default max value (spark.sql.pivotMaxValues) Commented Mar 8, 2021 at 10:58
  • @blackbishop.yes. thats right. is there any way to achieve the result, without using pivot() function. Commented Mar 8, 2021 at 10:59
  • You can always increase this value spark.conf.set("spark.sql.pivotMaxValues", newMaxValue), but pivot is a resource-intensive operation and you may face performance issues. Commented Mar 8, 2021 at 11:10
  • yes.. thats the reason, looking for any alternate solution. appreciated Commented Mar 8, 2021 at 11:18
  • Potentially you could do a grouped map with a pandas udf. Seems like you would group on Customer and ID and then produce the needed dataframe as output. Spark "stacks" them and outputs the final Spark dataframe. You may need to enclose over some external data but seems like it should be doable. Commented Mar 8, 2021 at 11:30

3 Answers 3

3

This looks like a typical case of using dense_rank() aggregate function to create a generic sequence (dr in the below code) of distinct IDs under each group of Customer, then do pivoting on this sequence. we can do the similar to order column using row_number() so that it can be used in groupby:

from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

Add two Window Specs: w1 to get dense_rank() of IDs over Customer and w2 to get row_number() of order under the same Customer and ID.

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

Add two new columns based on the above two WinSpecs: dr(dense_rank) and sid(row_number)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

Find the max(dr), so that we can pre-define the list to pivot on which is range(1,N+1) (this will improve the efficiency of pivot method).

N = df1.agg(F.max('dr')).first()[0]

Groupby Customer, sid and pivot with dr and then do the aggregate:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

Rename the column names if needed:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
Sign up to request clarification or add additional context in comments.

Comments

1

below is my solution.. doing the rank and then flattening the results.

df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )    
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)

Comments

0

There may be multiple ways to do this but a pandas udf can be one such way. Here is a toy example based on your data:

df = pd.DataFrame({'Customer': ['John']*6, 
                   'ID': [123]*2 + [345]*2 + [789]*2, 
                   'unit': ['00015', '00016', '00205', '00206', '00283', '00284'], 
                   'order': range(1, 7)})
sdf = spark.createDataFrame(df)

# Spark 2.4 syntax. Spark 3.0 is less verbose
return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
    groups = pdf.groupby('ID')
    out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
    out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
    return out

sdf.groupby('Customer').apply(convert_to_wide).show()

+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015|        1| 345| 00205|        3| 789| 00283|        5|
| John| 123| 00016|        2| 345| 00206|        4| 789| 00284|        6|
+-----+----+------+---------+----+------+---------+----+------+---------+

2 Comments

thanks for helping out. unfortunately our cluster is running on spark 2.3 and its failing with this error.. " PyArrow >= 0.8.0 must be installed; however, it was not found. ". I cannot able to install it. is there any alternative? thanks. @mathfish
Not sure how you are connecting to the cluster (e.g. client or cluster) but I believe in either case you can pass a zipped python environment to be used by both the driver and workers using the --archives option. But I don't want to lead you down the rabbit hole. Worst comes to worse, you can just transform the table directly. And as a bonus it is a good way to learn core pyspark.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.