This looks like a typical case of using dense_rank() aggregate function to create a generic sequence (dr in the below code) of distinct IDs under each group of Customer, then do pivoting on this sequence. we can do the similar to order column using row_number() so that it can be used in groupby:
from pyspark.sql import Window, functions as F
# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
Add two Window Specs: w1 to get dense_rank() of IDs over Customer and w2 to get row_number() of order under the same Customer and ID.
w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')
Add two new columns based on the above two WinSpecs: dr(dense_rank) and sid(row_number)
df1 = df.select(
"*",
F.dense_rank().over(w1).alias('dr'),
F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
| John|123|00015| 1| 1| 1|
| John|123|00016| 2| 1| 2|
| John|345|00205| 3| 2| 1|
| John|345|00206| 4| 2| 2|
| John|789|00283| 5| 3| 1|
| John|789|00284| 6| 3| 2|
| John|789|00285| 7| 3| 3|
+--------+---+-----+-----+---+---+
Find the max(dr), so that we can pre-define the list to pivot on which is range(1,N+1) (this will improve the efficiency of pivot method).
N = df1.agg(F.max('dr')).first()[0]
Groupby Customer, sid and pivot with dr and then do the aggregate:
df_new = df1.groupby('Customer','sid') \
.pivot('dr', range(1,N+1)) \
.agg(
F.first('ID').alias('ID'),
F.first('unit').alias('unit'),
F.first('order').alias('order')
)
df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
Rename the column names if needed:
import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
spark.sql.pivotMaxValues)spark.conf.set("spark.sql.pivotMaxValues", newMaxValue), but pivot is a resource-intensive operation and you may face performance issues.