5

Say I have a column filled with URLs like in the following:

+------------------------------------------+
|url                                       |
+------------------------------------------+
|https://www.example1.com?param1=1&param2=a|
|https://www.example2.com?param1=2&param2=b|
|https://www.example3.com?param1=3&param2=c|
+------------------------------------------+

What would be the best way of extracting the URL parameters from this column and adding them as columns to the dataframe to produce the below?

+-------------------------------------------+---------------+
|                                        url| param1| param2|
+-------------------------------------------+---------------+
|https://www.example1.com?param1=1&param2=a |      1|      a|
|https://www.example2.com?param1=2&param2=b |      2|      b|
|https://www.example3.com?param1=3&param2=c |      3|      c|
|etc...                                     | etc...| etc...|
+-------------------------------------------+---------------+

My Attempts

I can think of two possible methods of doing this, using functions.regexp_extract from the pyspark library or by using urllib.parse.parse_qs and urllib.parse.urlparse from the standard library. The former solution uses regex which is a finicky method of extracting parameters from strings but the latter would need to be wrapped in a UDF to be used.

from pyspark.sql import *
from pyspark.sql import functions as fn

df = spark.createDataFrame(
  [
    ("https://www.example.com?param1=1&param2=a",),
    ("https://www.example2.com?param1=2&param2=b",),
    ("https://www.example3.com?param1=3&param2=c",)
  ],
  ["url"]
)

Regex solution:

df2 = df.withColumn("param1", fn.regexp_extract('url', 'param1=(\d)', 1))
df2 = df2.withColumn("param2", fn.regexp_extract('url', 'param2=([a-z])', 1))
df2.show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

UDF solution:

from urllib.parse import urlparse, parse_qs
from pyspark.sql.types import MapType, StringType
extract_params = udf(lambda x: {k: v[0] for k, v in parse_qs(urlparse(x).query).items()}, MapType(StringType(), StringType()))

df3 = df.withColumn(
  "params", extract_params(df.url)
)

df3.withColumn(
  "param1", df3.params['param1']
).withColumn(
  "param2", df3.params['param2']
).drop("params").show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

I'd like to use the versatility of a library like urllib but would also like the optimisability of writing it in pyspark functions. Is there a better method than the two I've tried so far?

4
  • 1
    as there is no in-built pyspark function that does this (as of version 2.4), going with urllib in a udf might be a better approach. also, if the url format is consistent, you can use the multiple splits to get the desired result. Commented Jan 7, 2020 at 14:49
  • the params are identical for all rows? for instance one row might have 2 params and the other 3? Commented Jan 7, 2020 at 15:32
  • 1
    @VamsiPrabhala There is parse_url. But can only be used with SQL and expr. Commented Jan 7, 2020 at 19:02
  • @blackbishop .. thanks..didn't know that. Commented Jan 7, 2020 at 19:13

4 Answers 4

5

You can use parse_url within SQL expression expr.

Extract specific query parameter

parse_url can take a third parameter to specify the key (param) we want to extract from the URL:

df.selectExpr("*", "parse_url(url,'QUERY', 'param1')").show()

+------------------------------------------+------+
|url                                       |param1|
+------------------------------------------+------+
|https://www.example2.com?param1=2&param2=b|2     |
|https://www.example.com?param1=1&param2=a |1     |
|https://www.example3.com?param1=3&param2=c|3     |
+------------------------------------------+------+

Extract all query parameters to columns

If you want to extract all query parameters as new columns without having to specify their names, you can first, parse the URL then split and explode to get the parameters and their values and finally pivot to get each parameter as a column.

import pyspark.sql.functions as F

df.withColumn("parsed_url", F.explode(F.split(F.expr("parse_url(url, 'QUERY')"), "&"))) \
    .withColumn("parsed_url", F.split("parsed_url", "=")) \
    .select("url",
            F.col("parsed_url").getItem(0).alias("param_name"),
            F.col("parsed_url").getItem(1).alias("value")
            ) \
    .groupBy("url") \
    .pivot("param_name") \
    .agg(F.first("value")) \
    .show()

Gives:

+------------------------------------------+------+------+
|url                                       |param1|param2|
+------------------------------------------+------+------+
|https://www.example2.com?param1=2&param2=b|2     |b     |
|https://www.example.com?param1=1&param2=a |1     |a     |
|https://www.example3.com?param1=3&param2=c|3     |c     |
+------------------------------------------+------+------+

Another solution, as suggested by @jxc in the comments is to use str_to_map function:

df.selectExpr("*", "explode(str_to_map(split(url,'[?]')[1],'&','='))") \
    .groupBy('url') \
    .pivot('key') \
    .agg(F.first('value'))
Sign up to request clarification or add additional context in comments.

8 Comments

Accepting this answer as it's currently the closest method to using both query parsing utilities and pyspark optimisations
I'm really curious to know why this is actually getting downvoted... if someone can please enlighten me :)
it seems as if someone is downvoting everything including the question
she or he might or might not has her/his own reasons... but it doesn't really matter :D
if that is the case, I'd just use split to get PARAM and then convert it into MapType column, then do the regular things: df.selectExpr('*', 'explode(str_to_map(split(url,"[?]")[1],"&","="))').groupby('url').pivot('key').agg(first('value'))
|
1

I'll go with an UDF and a more generic output format using map type.

from urllib.parse import urlparse, parse_qs

from pyspark.sql import functions as F, Types as T

@F.udf(T.MapType(T.StringType(), T.ArrayType(T.StringType())))
def url_param_pars(url):
    parsed = urlparse(url) 
    return parse_qs(parsed.query)

df_params = df.withColumn("params", url_param_pars(F.col('url')))

df_params.show(truncate=False)
+------------------------------------------+------------------------------+
|url                                       |params                        |
+------------------------------------------+------------------------------+
|https://www.example.com?param1=1&param2=a |[param1 -> [1], param2 -> [a]]|
|https://www.example2.com?param1=2&param2=b|[param1 -> [2], param2 -> [b]]|
|https://www.example3.com?param1=3&param2=c|[param1 -> [3], param2 -> [c]]|
+------------------------------------------+------------------------------+

df_params.printSchema()                                                                                                         
root
 |-- url: string (nullable = true)
 |-- params: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = true)

With this method, you can have any number of params.

3 Comments

I address this mapped format in my second attempt. I do like it but I am interested in seeing if there is a better way of exporting to columnar format
@philthy use the map format will allow you to have only one execution per line, to recover all the data you need at once, and then, as in your example, explode the map in columns (or with an explode function, you transform the map in several lines)
I like as mentioned previously, I am however interested in seeing if methods exist that don't use UDFs as they can't be optimised
-1

You can add split function like following.

from pyspark.sql import functions as f
df3 = df3.withColumn("param1", f.split(f.split(df3.url, "param1=")[1], "&")[0])

Comments

-1

Here is one more solution which works for Spark >= 2.4 since it uses high order function filter.

The next solution is based on assumption that all the records have identical number of query parameters:

from pyspark.sql.functions import expr, col

# get the query string for the first non null url
query = df.filter(df["url"].isNotNull()).first()["url"].split("?")[1]

# extract parameters (this should remain the same for all the records)
params = list(map(lambda p: p.split("=")[0], query.split("&")))

# you can also omit the two previous lines (query parameters autodiscovery)
# and replace them with: params = ['param1', 'param2']
# when you know beforehand the query parameters

cols = [col('url')] + [expr(f"split( \
                                    filter( \
                                          split(split(url,'\\\?')[1], '&'), \
                                          p -> p like '{qp}=%' \
                                    )[0], \
                            '=')[1]").alias(qp) 
                       for qp in params]

df.select(*cols).show(10, False)

# +------------------------------------------+------+------+
# |url                                       |param1|param2|
# +------------------------------------------+------+------+
# |https://www.example.com?param1=1&param2=a |1     |a     |
# |https://www.example2.com?param1=2&param2=b|2     |b     |
# |https://www.example3.com?param1=3&param2=c|3     |c     |
# +------------------------------------------+------+------+

Explanation:

  1. split(split(url,'\\\?')[1], '&') -> [param1=1,param2=a]: first split with ? to retrieve the query string then by &. As result we get the array [param1=1,param2=a]

  2. filter(... , p -> p like '{qp}=%')[0] -> param1=1, param2=a ...: apply filter function on the items of the array we got from the previous step and apply the filter p -> p like '{qp}=%' where {qp}=% the param name i.e param1=%. qp stands for the items of the params array. filter will return an array hence we just access the first item since we know that the particular param should always exists. For the first parameter this will return param1=1, for the second param2=a etc.

  3. split( ... , '=')[1] -> 1, a, ... : Finally split by = to retrieve the value of the query parameter. Here we return the second value since the first one will be the query parameter name.

The basic idea here is that we divide the problem into two sub-problems, first get all the possible query parameters and then we extract the values for all the urls.

Why is that? Well you could indeed use pivot as @blackbishop brilliantly already implemented although I believe that that wouldn't work when the cardinality of the query parameters is very high i.e 500 or more unique params. This would require a big shuffle which consequently could cause an OOM exception. On the other side if you already know that the cardinality of the data is low then the @blackbishop's solution should be considered the ideal one for all the cases.

In order to face the previous problem is better first to find all the query params (here I just made the assumption that all the queries have identical params but the implementation for this part should be similar to the previous one) and then apply the above expression for each param to extract the params values. This will generate a select expression that will contain multiple expr expressions although this shouldn't cause any performance issues since select is a narrow transformation and will not cause any shuffle.

2 Comments

I like the parse_url method but it's still quite finicky what with all the splitting going on
hahhaha ok I thought so :) it's only about the logical steps of splitting and extracting the query string values, quite similar to what you would do with any modern programming language but here I use expr and SQL syntax provided by Spark. I also think that this approach will execute faster because it doesn't cause any shuffling and it doesn't use UDFs. I will add some details over the execution steps later probably

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.