3

I have a Dataframe like this (in Pyspark 2.3.1):

from pyspark.sql import Row

my_data = spark.createDataFrame([
  Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
  Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a           |b           |mask                      |
#+------------+------------+--------------------------+
#|[9, 3, 4]   |[a, b, c]   |[true, false, false]      |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+

Now I'd like to use the mask column in order to subset the a and b columns:

my_desired_output = spark.createDataFrame([
  Row(a=[9], b=['a']),
  Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a     |b     |
#+------+------+
#|[9]   |[a]   |
#|[7, 6]|[w, y]|
#+------+------+

What's the "idiomatic" way to achieve this? The current solution I have involves map-ing over the underlying RDD and subsetting with Numpy, which seems inelegant:

import numpy as np

def subset_with_mask(row):
    mask = np.asarray(row.mask)
    a_masked = np.asarray(row.a)[mask].tolist()
    b_masked = np.asarray(row.b)[mask].tolist()
    return Row(a=a_masked, b=b_masked)

my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))

Is this the best way to go, or is there something better (less verbose and/or more efficient) I can do using Spark SQL tools?

2
  • 1
    Depending on your pyspark version, you may be able to achieve this using arrays_zip and filter or with a pandas udf. Commented Apr 22, 2019 at 16:45
  • @pault it looks like filter isn't available in my version of PySpark (2.3.1) but an answer using/demonstrating those functions might be informative all the same. Commented Apr 22, 2019 at 16:58

3 Answers 3

2

One option is to use a UDF, which you can optionally specialize by the data type in the array:

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T

def _mask_list(lst, mask):
    return np.asarray(lst)[mask].tolist()

mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))

my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
    'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
    'b', mask_array_str(F.col('b'), F.col('mask'))
)
Sign up to request clarification or add additional context in comments.

Comments

1

UDFs mentioned in the previous answer is probably the way to go prior to the array functions added in Spark 2.4. For the sake of completeness, here is a "pure SQL" implementation before 2.4.

from pyspark.sql.functions import *

df = my_data.withColumn("row", monotonically_increasing_id())

df1 = df.select("row", posexplode("a").alias("pos", "a"))
df2 = df.select("row", posexplode("b").alias("pos", "b"))
df3 = df.select("row", posexplode("mask").alias("pos", "mask"))

df1\
    .join(df2, ["row", "pos"])\
    .join(df3, ["row", "pos"])\
    .filter("mask")\
    .groupBy("row")\
    .agg(collect_list("a").alias("a"), collect_list("b").alias("b"))\
    .select("a", "b")\
    .show()

Output:

+------+------+
|     a|     b|
+------+------+
|[7, 6]|[w, y]|
|   [9]|   [a]|
+------+------+

Comments

1

A better way to do this is to use pyspark.sql.functions.expr, filter, and transform:

import pandas as pd
from pyspark.sql import (
    functions as F,
    SparkSession
)

spark = SparkSession.builder.master('local[4]').getOrCreate()

bool_df = pd.DataFrame([
    ['a', [0, 1, 2, 3, 4], [True]*4 + [False]],
    ['b', [5, 6, 7, 8, 9], [False, True, False, True, False]]
], columns=['id', 'int_arr', 'bool_arr'])
bool_sdf = spark.createDataFrame(bool_df)

def filter_with_mask(in_col, mask_col, out_name="masked_arr"):
    filt_input = f'arrays_zip({in_col}, {mask_col})'
    filt_func = f'x -> x.{mask_col}'
    trans_func = f'x -> x.{in_col}'
    
    result = F.expr(f'''transform(
        filter({filt_input}, {filt_func}), {trans_func}
    )''').alias
    return result

Using the function:

bool_sdf.select(
    '*', filter_with_mask('int_arr', 'bool_arr', bool_sdf)
).toPandas()

Results in:

id         int_arr                          bool_arr   masked_arr
 a [0, 1, 2, 3, 4]   [True, True, True, True, False] [0, 1, 2, 3]
 b [5, 6, 7, 8, 9] [False, True, False, True, False]       [6, 8]

This should be possible with pyspark >= 2.4.0 and python >= 3.6.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.