0

I have two different dataframes in Pyspark of String type. First dataframe is of single work while second is a string of words i.e., sentences. I have to check existence of first dataframe column from the second dataframe column. For example, df2

    +------+-------+-----------------+
    |age|height|   name|      Sentences  |
    +---+------+-------+-----------------+
    | 10|    80|  Alice|   'Grace, Sarah'|
    | 15|  null|    Bob|          'Sarah'|
    | 12|  null|    Tom|'Amy, Sarah, Bob'|
    | 13|  null| Rachel|       'Tom, Bob'|
    +---+------+-------+-----------------+

Second dataframe df1

+-------+
| token |
+-------+
| 'Ali' |
|'Sarah'|
|'Bob'  |
|'Bob'  |
+-------+

So, how can I search for each token of df1 from df2 Sentence column. I need count for each word and add as a new column in df1

I have tried this solution, but work for a single word i.e., not for a complete column of dataframe

2
  • Pls provide code to create dataframe and your sample code as well if you have tried Commented Apr 22, 2022 at 6:52
  • I have given a link in question. Till now, I am checking for some option to apply this filter. You can check the datafframe code very similar to stackoverflow.com/questions/57050728/… Commented Apr 22, 2022 at 7:16

2 Answers 2

1

Considering the dataframe in the prev answer

from pyspark.sql.functions import explode,explode_outer,split, length,trim
df3 = df2.select('Sentences',explode(split('Sentences',',')).alias('friends'))
df3 = df3.withColumn("friends", trim("friends")).withColumn("length_of_friends", length("friends")) 
display(df3)

df3 = df3.join(df1, df1.token == df3.friends,how='inner').groupby('friends').count()


display(df3)
Sign up to request clarification or add additional context in comments.

Comments

1

You could use pyspark udf to create the new column in df1. Problem is you cannot access a second dataframe inside udf (view here).

As advised in the referenced question, you could get sentences as broadcastable varaible.

Here is a working example :

from pyspark.sql.types import *
from pyspark.sql.functions import udf

# Instanciate df2
cols = ["age", "height", "name", "Sentences"]
data = [
        (10, 80, "Alice", "Grace, Sarah"),
        (15, None, "Bob", "Sarah"),
        (12, None, "Tom", "Amy, Sarah, Bob"),
        (13, None, "Rachel", "Tom, Bob")
        ]

df2 = spark.createDataFrame(data).toDF(*cols)

# Instanciate df1
cols = ["token"]
data = [
        ("Ali",),
        ("Sarah",),
        ("Bob",),
        ("Bob",)
        ]

df1 = spark.createDataFrame(data).toDF(*cols)

# Creating broadcast variable for Sentences column of df2
lstSentences = [data[0] for data in df2.select('Sentences').collect()]
sentences = spark.sparkContext.broadcast(lstSentences)

def countWordInSentence(word):
    # Count if sentence contains word
    return sum(1 for item in lstSentences if word in item)

func_udf = udf(countWordInSentence, IntegerType())
df1 = df1.withColumn("COUNT",
                     func_udf(df1["token"]))
df1.show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.