5

Say I have a dataframe df1 with the column "color" that contains a bunch of colors, and another dataframe df2 with column "phrase" that contains various phrases.

I'd like to join the two dataframes where the color in d1 appears in phrases in d2. I cannot use d1.join(d2, d2("phrases").contains(d1("color")), since it would join on anywhere the word appears within the phrase. I don't want to match on words like scaRED for example, where RED is a part of another word. I only want to join when the color appears as a seperate word in the phrases.

Can I use a regular expression to solve this? What function can I use and how is the syntax when I need to reference the column in the expression?

3 Answers 3

2

You could create a REGEX pattern that checks for word boundaries (\b) when matching colors and use a regexp_replace check as the join condition:

val df1 = Seq(
  (1, "red"), (2, "green"), (3, "blue")
).toDF("id", "color")

val df2 = Seq(
  "red apple", "scared cat", "blue sky", "green hornet"
).toDF("phrase")

val patternCol = concat(lit("\\b"), df1("color"), lit("\\b"))

df1.join(df2, regexp_replace(df2("phrase"), patternCol, lit("")) =!= df2("phrase")).
  show
// +---+-----+------------+
// | id|color|      phrase|
// +---+-----+------------+
// |  1|  red|   red apple|
// |  3| blue|    blue sky|
// |  2|green|green hornet|
// +---+-----+------------+

Note that "scared cat" would have been a match in the absence of the enclosed word boundaries.

Sign up to request clarification or add additional context in comments.

Comments

2

Building up on your own solution, you can also try this:

d1.join(d2, array_contains(split(d2("phrases"), " "), d1("color")))

Comments

2

Did not see your data but this is a starter, with a little variation. No need for regex as far as I can see, but who knows:

// You need to do some parsing like stripping of . ? and may be lowercase or uppercase
// You did not provide an example on the JOIN

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val checkValue = udf { (array: WrappedArray[String], value: String) => array.iterator.map(_.toLowerCase).contains(value.toLowerCase() ) }

//Gen some data
val dfCompare = spark.sparkContext.parallelize(Seq("red", "blue", "gold", "cherry")).toDF("color")
val rdd = sc.parallelize( Array( (("red","hello how are you red",10)), (("blue", "I am fine but blue",20)), (("cherry", "you need to do some parsing and I like cherry",30)), (("thebluephantom", "you need to do some parsing and I like fanta",30)) ))
//rdd.collect
val df = rdd.toDF()
val df2 = df.withColumn("_4", split($"_2", " ")) 
df2.show(false)
dfCompare.show(false)
val res = df2.join(dfCompare, checkValue(df2("_4"), dfCompare("color")), "inner")
res.show(false)

returns:

+------+---------------------------------------------+---+--------------------------------------------------------+------+
|_1    |_2                                           |_3 |_4                                                      |color |
+------+---------------------------------------------+---+--------------------------------------------------------+------+
|red   |hello how are you red                        |10 |[hello, how, are, you, red]                             |red   |
|blue  |I am fine but blue                           |20 |[I, am, fine, but, blue]                                |blue  |
|cherry|you need to do some parsing and I like cherry|30 |[you, need, to, do, some, parsing, and, I, like, cherry]|cherry|
+------+---------------------------------------------+---+--------------------------------------------------------+------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.