0

I am new to UDF in spark. I have also read the answer here

Problem statement: I'm trying to find pattern matching from a dataframe col.

Ex: Dataframe

val df = Seq((1, Some("z")), (2, Some("abs,abc,dfg")),
             (3,Some("a,b,c,d,e,f,abs,abc,dfg"))).toDF("id", "text")

df.show()

+---+--------------------+
| id|                text|
+---+--------------------+
|  1|                   z|
|  2|         abs,abc,dfg|
|  3|a,b,c,d,e,f,abs,a...|
+---+--------------------+


df.filter($"text".contains("abs,abc,dfg")).count()
//returns 2 as abs exits in 2nd row and 3rd row

Now I want to do this pattern matching for every row in column $text and add new column called count.

Result:

+---+--------------------+-----+
| id|                text|count|
+---+--------------------+-----+
|  1|                   z|    1|
|  2|         abs,abc,dfg|    2|
|  3|a,b,c,d,e,f,abs,a...|    1|
+---+--------------------+-----+

I tried to define a udf passing $text column as Array[Seq[String]. But I am not able to get what I intended.

What I tried so far:

val txt = df.select("text").collect.map(_.toSeq.map(_.toString)) //convert column to Array[Seq[String]
val valsum = udf((txt:Array[Seq[String],pattern:String)=> {txt.count(_ == pattern) } )
df.withColumn("newCol", valsum( lit(txt) ,df(text)) )).show()

Any help would be appreciated

2
  • how are you deciding the value of count? why is it equal to 2 for the second row? Commented Jul 14, 2017 at 9:11
  • @philantrovert if pattern z exists in the any other column , then is counted as 1. In above example abs,abc,dfg is one whole string, which also present partially in 3 row a,b,c,d,e,f,abs,abc,dfg, thats why its 2 Commented Jul 14, 2017 at 13:21

1 Answer 1

1

You will have to know all the elements of text column which can be done using collect_list by grouping all the rows of your dataframe as one. Then just check if element in text column in the collected array and count them as in the following code.

import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq((1, Some("z")), (2, Some("abs,abc,dfg")),(3,Some("a,b,c,d,e,f,abs,abc,dfg"))).toDF("id", "text")

val valsum = udf((txt: String, array : mutable.WrappedArray[String])=> array.filter(element => element.contains(txt)).size)
df.withColumn("grouping", lit("g"))
  .withColumn("array", collect_list("text").over(Window.partitionBy("grouping")))
  .withColumn("count", valsum($"text", $"array"))
  .drop("grouping", "array")
  .show(false)

You should have following output

+---+-----------------------+-----+
|id |text                   |count|
+---+-----------------------+-----+
|1  |z                      |1    |
|2  |abs,abc,dfg            |2    |
|3  |a,b,c,d,e,f,abs,abc,dfg|1    |
+---+-----------------------+-----+

I hope this is helpful.

Sign up to request clarification or add additional context in comments.

8 Comments

thx for you answer. What I am trying to do is to find z in $text coloumn and it appears ones, similarly , I am looking for pattern abs,abc,dfg in whole $text col and its appears twice (2 row,3 row)
is your final result in dataframe or just counts?
the final result should be a Dataframe with a newCol of counts(pattern matched)
can you explain how you get your final dataframe? how did you get 1 for first row, 2 for second row and 1 for the third row?
if i call the UDF with like df.withColumn("newCol", valsum( lit(txt) ,df(text)) )).show(). The first argument is the Array[Seq[String]] and second arg is a Datframe col. For every row of this col, does the pattern matching. Ex: z in the Array[Seq[String]] and should return 1 because array has just one z , but for second pattern abs,abc,dfg, it is present is 2nd and 3rd row, hence count 2. This is what I want to achieve
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.