0

I have a pyspark.sql.dataframe.DataFrame, where one of the columns has an array of Row objects:

    +------------------------------------------------------------------------------------------------+
    |column                                                                          |
    +------------------------------------------------------------------------------------------------+
    |[Row(arrival='2019-12-25 19:55', departure='2019-12-25 18:22'),                                 |
    |  Row(arrival='2019-12-26 14:56', departure='2019-12-26 08:52')]                                |
    +------------------------------------------------------------------------------------------------+

Not all the rows in the column have the same quantity of elements (in this case, we have 2 but we could have more).

What I am trying to do is to generate a concatenation of the hours of each date, to have something like this:

18:22_19:55_08:52_14:56

This means, the departure time of the first element, concatenated with the arrival time of the first element, concatenated again with the departure time of the second element and once again with the arrival time of the second element.

Is there a simple way to do so using pyspark?

1 Answer 1

2

Assume the column name is col1 which is an array of structs:

df.printSchema()                                                                                                    
root
 |-- col1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- arrival: string (nullable = true)
 |    |    |-- departure: string (nullable = true)

Method-1: for spark 2.4+, use array_join + transform

from pyspark.sql.functions import expr

df.withColumn('new_list', expr("""
    array_join(
        transform(col1, x -> concat(right(x.departure,5), '_', right(x.arrival,5)))
      , '_'
    )
  """) 
).show(truncate=False)

+----------------------------------------------------------------------------+-----------------------+
|col1                                                                        |new_list               |
+----------------------------------------------------------------------------+-----------------------+
|[[2019-12-25 19:55, 2019-12-25 18:22], [2019-12-26 14:56, 2019-12-26 08:52]]|18:22_19:55_08:52_14:56|
+----------------------------------------------------------------------------+-----------------------+

Method-2: Use udf:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def arrays_join(arr):
    return '_'.join('{}_{}'.format(x.departure[-5:], x.arrival[-5:]) for x in arr) if isinstance(arr, list) else arr    

udf_array_join = udf(arrays_join, StringType())

df.select(udf_array_join('col1')).show(truncate=False)

Method-3: use posexplode + groupby + collect_list:

from pyspark.sql.functions import monotonically_increasing_id, posexplode, regexp_replace, expr

(df.withColumn('id', monotonically_increasing_id()) 
    .select('*', posexplode('col1').alias('pos', 'col2')) 
    .select('id', 'pos', 'col2.*') 
    .selectExpr('id', "concat(pos, '+', right(departure,5), '_', right(arrival,5)) as dt") 
    .groupby('id') 
    .agg(expr("concat_ws('_', sort_array(collect_list(dt))) as new_list")) 
    .select(regexp_replace('new_list', r'(?:^|(?<=_))\d+\+', '').alias('new_list')) 
    .show(truncate=False))

Method-4: use string operations:

for this particular problem only, convert the array into string and then do a bunch of string operations (split + concat_ws + regexp_replace + trim) to get desired sub-strings:

from pyspark.sql.functions import regexp_replace, concat_ws, split, col

(df.select(
    regexp_replace(
        concat_ws('_', split(col('col1').astype('string'), r'[^0-9 :-]+'))
      , r'[_ ]+\d\d\d\d-\d\d-\d\d '
      , '_'
    ).alias('new_list')
).selectExpr('trim(both "_" from new_list) as new_list') 
.show(truncate=False))
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for taking the time to answer! Unfortunately I have Spark 2.3.1.Isn't there other way?
for older version, you might have to use udf.
BTW, you can also use explode + concat + groupby + collect_list, but also need to add a unique id. if the order of the list is important, then will be a little more extra work. it will not be an very efficient way anyway.
for this particular problem, the task can also be handled by some string manipulations, see the updated method-4.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.