0

MY case is that I have an array column that I'd like to filter. Consider the following:

+------------------------------------------------------+
|                                                column|
+------------------------------------------------------+
|[prefix1-whatever, prefix2-whatever, prefix4-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix5-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
+------------------------------------------------------+

I'd like to filter only columns containing prefix-4, prefix-5, prefix-6, prefix-7, [...]. So,using an "or" statement is not scalable here.

Of course, I can just:

val prefixesList = List("prefix-4", "prefix-5", "prefix-6", "prefix-7")

df
.withColumn("prefix", explode($"column"))
.withColumn("prefix", split($"prefix", "\\-").getItem(0))
.withColumn("filterColumn", $"prefix".inInCollection(prefixesList))

But that involves exploding, which I want to avoid. My plan right now is to define an array column from prefixesList, and then use array_intersect to filter it - for this to work, though, I have to get rid of the -whatever part (which is, obviously, different for each entry). Was this a Scala array, I could easily do a map over it. But, being it a Spark Array, I don't know if that is possible.


TL;DR I have a dataframe containing an array column. I'm trying to manipulate it and filter it without exploding (because, if I do explode, I'll have to manipulate it later to reverse the explode, and I'd like to avoid it).

Can I achieve that without exploding? If so, how?

0

3 Answers 3

1

Not sure if I understood your question correctly: you want to keep all lines that do not contain any of the prefixes in prefixesList?

If so, you can write your own filter function:

def filterPrefixes (row: Row) : Boolean = {
  for( s <- row.getSeq[String](0)) {
    for( p <- Seq("prefix4", "prefix5", "prefix6", "prefix7")) {
      if( s.startsWith(p) ) {
        return false
      }
    }
  }
  return true
}

and then use it as argument for the filter call:

df.filter(filterPrefixes _)
  .show(false)

prints

+------------------------------------------------------+
|column                                                |
+------------------------------------------------------+
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
+------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

I wanted to keep lines that do contain any of the prefixes - but, solution wise, it is a matter of a boolean flag, doesn't really change the idea. Simple implementation, I liked it. I know I didn't mention it, so you shouldn't really care, but, do you have any idea of the performance of such operation? Is this just like an UDF, performance-wise? If you don't know, no problems, though.
The performance of the filter function should be approximately the same like any Scala code based operation (like udf, map, flatMap, etc). This means it will break the Catalyst optimizer (for push down filters for example) as Catalyst cannot look into the function and check which columns are really necessary. If there is a way to solve the problem only with Spark-SQL-functions and your code is performance critical, I would go for the SQL solution
1

It's relatively trivial to convert the Dataframe to a Dataset[Array[String]], and map over those arrays as whole elements. The basic idea is that you can iterate over your list of arrays easily, without having to flatten the entire dataset.

val df = Seq(Seq("prefix1-whatever", "prefix2-whatever", "prefix4-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix3-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix5-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix3-whatever")
).toDF("column")

val pl = List("prefix4", "prefix5", "prefix6", "prefix7")

val df2 = df.as[Array[String]].map(a => {
    a.flatMap(s => {
        val start = s.split("-")(0)
        if(pl.contains(start)) {
            Some(s)
        } else {
            None
        }
    })
}).toDF("column")

df2.show(false)

The above code results in:

+------------------+
|column            |
+------------------+
|[prefix4-whatever]|
|[]                |
|[prefix5-whatever]|
|[]                |
+------------------+

I'm not entirely sure how this would compare performance wise to actually flattening and recombining the data set. Doing this misses any catalyst optimizations, but avoids a lot of unnecessary shuffling of data.

P.S. I corrected for a minor issue in your prefix list, since "prefix-N" didn't match the data's pattern.

1 Comment

Thanks for the correction. It was probably a type. Also, thanks for the suggestion of casting it, haven't thought of that. When I first started reading your answer, I also thought "hey, well thought! But, performance... oh, I see." I guess I'd have to run some performance tests in order to evaluate that - from what I see, it seems to me that this would equate to an UDF, which is potentially performance-harming. But, hey, a right answer is a right answer, specially when performance hansn't been mentioned in the OP.
1

You can achieve it using SQL API. If you want to keep only rows that contain any of values prefix-4, prefix-5, prefix-6, prefix-7 you could use arrays_overlap function. Otherwise, if you want to keep rows that contain all of your values you could try array_intersect and then check if its size is equal to count of your values.

 val df = Seq(
  Seq("prefix1-a", "prefix2-b", "prefix3-c", "prefix4-d"),
  Seq("prefix4-e", "prefix5-f", "prefix6-g", "prefix7-h", "prefix8-i"),
  Seq("prefix6-a", "prefix7-b", "prefix8-c", "prefix9-d"),
  Seq("prefix8-d", "prefix9-e", "prefix10-c", "prefix12-a")
).toDF("arr")


val schema = StructType(Seq(
  StructField("arr", ArrayType.apply(StringType)),
  StructField("arr2", ArrayType.apply(StringType))
))
val encoder = RowEncoder(schema)

val df2 = df.map(s =>
  (s.getSeq[String](0).toArray, s.getSeq[String](0).map(s => s.substring(0, s.indexOf("-"))).toArray)
).map(s => RowFactory.create(s._1, s._2))(encoder)


val prefixesList = Array("prefix4", "prefix5", "prefix6", "prefix7")
val prefixesListSize = prefixesList.size
val prefixesListCol = lit(prefixesList)

df2.select('arr,'arr2,
  arrays_overlap('arr2,prefixesListCol).as("OR"),
  (size(array_intersect('arr2,prefixesListCol)) === prefixesListSize).as("AND")
).show(false)

it will give you:

+-------------------------------------------------------+---------------------------------------------+-----+-----+
|arr                                                    |arr2                                         |OR   |AND  |
+-------------------------------------------------------+---------------------------------------------+-----+-----+
|[prefix1-a, prefix2-b, prefix3-c, prefix4-d]           |[prefix1, prefix2, prefix3, prefix4]         |true |false|
|[prefix4-e, prefix5-f, prefix6-g, prefix7-h, prefix8-i]|[prefix4, prefix5, prefix6, prefix7, prefix8]|true |true |
|[prefix6-a, prefix7-b, prefix8-c, prefix9-d]           |[prefix6, prefix7, prefix8, prefix9]         |true |false|
|[prefix8-d, prefix9-e, prefix10-c, prefix12-a]         |[prefix8, prefix9, prefix10, prefix12]       |false|false|
+-------------------------------------------------------+---------------------------------------------+-----+-----+

so finally you can use:

df2.filter(size(array_intersect('arr2,prefixesListCol)) === prefixesListSize).show(false)

and you will get below result:

+-------------------------------------------------------+---------------------------------------------+
|arr                                                    |arr2                                         |
+-------------------------------------------------------+---------------------------------------------+
|[prefix4-e, prefix5-f, prefix6-g, prefix7-h, prefix8-i]|[prefix4, prefix5, prefix6, prefix7, prefix8]|
+-------------------------------------------------------+---------------------------------------------+

1 Comment

Thanks for your answer. I just didn't understand what did you propose me in order to clean the entries before comparing them with overlap or intersect. I see that, in your sample, you just inserted the prefixes, while, in my sample, I have the prefixes and some other text right after them - if I only had the prefixes that would not be a problem, but I need help on how to get rid of the -whatever part. How did you deal with that? Could you include that part on your answer?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.