24

I have a PySpark DataFrame with 2 ArrayType fields:

>>>df
DataFrame[id: string, tokens: array<string>, bigrams: array<string>]
>>>df.take(1)
[Row(id='ID1', tokens=['one', 'two', 'two'], bigrams=['one two', 'two two'])]

I would like to combine them into a single ArrayType field:

>>>df2
DataFrame[id: string, tokens_bigrams: array<string>]
>>>df2.take(1)
[Row(id='ID1', tokens_bigrams=['one', 'two', 'two', 'one two', 'two two'])]

The syntax that works with strings does not seem to work here:

df2 = df.withColumn('tokens_bigrams', df.tokens + df.bigrams)

Thanks!

3 Answers 3

42

Spark >= 2.4

You can use concat function (SPARK-23736):

from pyspark.sql.functions import col, concat 

df.select(concat(col("tokens"), col("tokens_bigrams"))).show(truncate=False)

# +---------------------------------+                                             
# |concat(tokens, tokens_bigrams)   |
# +---------------------------------+
# |[one, two, two, one two, two two]|
# |null                             |
# +---------------------------------+

To keep data when one of the values is NULL you can coalesce with array:

from pyspark.sql.functions import array, coalesce      

df.select(concat(
    coalesce(col("tokens"), array()),
    coalesce(col("tokens_bigrams"), array())
)).show(truncate = False)

# +--------------------------------------------------------------------+
# |concat(coalesce(tokens, array()), coalesce(tokens_bigrams, array()))|
# +--------------------------------------------------------------------+
# |[one, two, two, one two, two two]                                   |
# |[three]                                                             |
# +--------------------------------------------------------------------+

Spark < 2.4

Unfortunately to concatenate array columns in general case you'll need an UDF, for example like this:

from itertools import chain
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *


def concat(type):
    def concat_(*args):
        return list(chain.from_iterable((arg if arg else [] for arg in args)))
    return udf(concat_, ArrayType(type))

which can be used as:

df = spark.createDataFrame(
    [(["one", "two", "two"], ["one two", "two two"]), (["three"], None)], 
    ("tokens", "tokens_bigrams")
)

concat_string_arrays = concat(StringType())
df.select(concat_string_arrays("tokens", "tokens_bigrams")).show(truncate=False)

# +---------------------------------+
# |concat_(tokens, tokens_bigrams)  |
# +---------------------------------+
# |[one, two, two, one two, two two]|
# |[three]                          |
# +---------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

how can i remove duplicates after merging both the arrays ?
@j' df.withColumn('concat_no_duplicates', array_distinct(col('concat_(tokens, tokens_bigrams)')))
6

In Spark 2.4.0 (2.3 on Databricks platform) you can do it natively in the DataFrame API using the concat function. In your example you could do this:

from pyspark.sql.functions import col, concat

df.withColumn('tokens_bigrams', concat(col('tokens'), col('bigrams')))

Here is the related jira.

Comments

1

I was using Spark < 2.4 and the above solutions didn't work for me, was getting the error 'input to function concat should have StringType or BinaryType'. This worked for me :

from pyspark.sql import functions as F

df.select("*",F.array(F.concat_ws(',', col('tokens'), col('bigrams))).\
                            alias('concat_cols'))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.