1

In pyspark, I have a DataFrame with a column that contains a list of ordered nodes to go through:

osmDF.schema
Out[1]:
 StructType(List(StructField(id,LongType,true),
                         StructField(nodes,ArrayType(LongType,true),true),
                         StructField(tags,MapType(StringType,StringType,true),true)))
osmDF.head(3)
Out[2]:
|     id    |                         nodes                       |         tags        |
|-----------|-----------------------------------------------------|---------------------|
| 62960871  | [783186590,783198852]                               | "{""foo"":""bar""}" |
| 211528816 | [2215187080,2215187140,2215187205,2215187256]       | "{""foo"":""boo""}" |
| 62960872  | [783198772,783183397,783167527,783169067,783198772] | "{""foo"":""buh""}" |

I need to create a dataframe with a row for each consecutive combination of 2 nodes the list of nodes, then save it as parquet.

The expected result will have a length of n-1, with n len(nodes) for each rows. It would look like this (with other columns that I'll add):

|           id          |    from    |      to    |         tags        |
|-----------------------|------------|------------|---------------------|
| 783186590_783198852   | 783186590  | 783198852  | "{""foo"":""bar""}" |
| 2215187080_2215187140 | 2215187080 | 2215187140 | "{""foo"":""boo""}" |
| 2215187140_2215187205 | 2215187140 | 2215187205 | "{""foo"":""boo""}" |
| 2215187205_2215187256 | 2215187205 | 2215187256 | "{""foo"":""boo""}" |
| 783198772_783183397   | 783198772  | 783183397  | "{""foo"":""buh""}" |
| 783183397_783167527   | 783183397  | 783167527  | "{""foo"":""buh""}" |
| 783167527_783169067   | 783167527  | 783169067  | "{""foo"":""buh""}" |
| 783169067_783198772   | 783169067  | 783198772  | "{""foo"":""buh""}" |

I tried to initiate with the following

from pyspark.sql.functions import udf

def split_ways_into_arcs(row):
    arcs = []
    for node in range(len(row['nodes']) - 1):
      arc = dict()
      
      arc['id'] = str(row['nodes'][node]) + "_" + str(row['nodes'][node + 1])
      
      arc['from'] = row['nodes'][node]
      arc['to'] = row['nodes'][node + 1]
      arc['tags'] = row['tags']
      
      arcs.append(arc)

    return arcs

# Declare function as udf
split = udf(lambda row: split_ways_into_arcs(row.asDict()))

The issue I'm having is I don't know how many nodes there are in each row of the original DataFrame. I know how to apply a udf to add a column to an existing DataFrame, but not to create a new one from lists of dicts.

1 Answer 1

2

Iterate over the nodes array using transform and explode the array afterwards:

from pyspark.sql import functions as F

df = ...

df.withColumn("nodes", F.expr("transform(nodes, (n,i) -> named_struct('from', nodes[i], 'to', nodes[i+1]))")) \
  .withColumn("nodes", F.explode("nodes")) \
  .filter("not nodes.to is null") \
  .selectExpr("concat_ws('_', nodes.to, nodes.from) as id", "nodes.*", "tags") \
  .show(truncate=False)

Output:

+---------------------+----------+----------+-----------------+
|id                   |from      |to        |tags             |
+---------------------+----------+----------+-----------------+
|783198852_783186590  |783186590 |783198852 |{""foo"":""bar""}|
|2215187140_2215187080|2215187080|2215187140|{""foo"":""boo""}|
|2215187205_2215187140|2215187140|2215187205|{""foo"":""boo""}|
|2215187256_2215187205|2215187205|2215187256|{""foo"":""boo""}|
|783183397_783198772  |783198772 |783183397 |{""foo"":""buh""}|
|783167527_783183397  |783183397 |783167527 |{""foo"":""buh""}|
|783169067_783167527  |783167527 |783169067 |{""foo"":""buh""}|
|783198772_783169067  |783169067 |783198772 |{""foo"":""buh""}|
+---------------------+----------+----------+-----------------+
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for your reply. The idea here is that nodes is a list of ordered points to go through. So what I'm looking for is the first row to show from first node to second, second row to show second node to third, and so on. So the expected result is n - 1 rows, with n len(nodes) for each row.
Ok, I see. Your questions states for each combination of 2 nodes. You might want to rephrase this sentence so that it is clearer for other answerers
Right, thanks I'm editing.
That's why I was looping over for node in range(len(row['nodes']) - 1)
I see your changes and I have updated my code
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.