1

I'm trying to learn PySpark better and I'm streaming tweets and trying to capture the hashtags from the tweet's text (I know twitter API's json already provides the hashtags, I'm doing this as an excercise).

So with a pyspark dataframe named Hashtags,

-------------------------------------------
Batch: 18
-------------------------------------------
+--------------------+--------------------+
|               value|            Hashtags|
+--------------------+--------------------+
|Instead, it has c...|[instead,, it, ha...|
|  #iran #abd #Biden |[#iran, #abd, #bi...|
+--------------------+--------------------+

I take the column "value", make it to lower case, split on whitespace/tab/newline thus creating an array column named "Hashtags", and then attempt to remove any elements with just whitespace, and any elements that don't begin with "#".

Hashtags = Hashtags.withColumn("Hashtags", lower(Hashtags["value"]))
Hashtags = Hashtags.withColumn("Hashtags", split(Hashtags["Hashtags"], r'\s'))
Hashtags = Hashtags.withColumn("Hashtags", F.array_remove(Hashtags["Hashtags"], r'\s'))
Hashtags = Hashtags.withColumn("Hashtags", F.array_remove(Hashtags["Hashtags"], r'^(?!#).+'))

As far as I can tell, the array_remove() does remove elements with the regex r'\s' but it doesn't remove elements that don't begin with a "#".

I know the regex itself works outside of array_remove() because I tested it like this:

RegText = r'^(?!#).+'
print(re.findall(RegText, "#AnandWrites"), re.match(RegText, "#AnandWrites"))
print(re.findall(RegText, "AnandWrites"), re.match(RegText, "AnandWrites"))
print(re.findall(RegText, "with\xe2\x80\xa6"), re.match(RegText, "with\xe2\x80\xa6"))
print(re.findall(RegText, "An#andWrites"), re.match(RegText, "An#andWrites"))

which gives me the following result, indicating that it successfully matches strings that don't begin with a "#"

[] None
['AnandWrites'] <re.Match object; span=(0, 11), match='AnandWrites'>
['withâ\x80¦'] <re.Match object; span=(0, 7), match='withâ\x80¦'>
['An#andWrites'] <re.Match object; span=(0, 12), match='An#andWrites'>

2 Answers 2

2

array_remove cannot be used with regex. You can consider using filter with rlike instead:

df2 = df.withColumn(
    'Hashtags', 
    F.expr(r"""
        filter(
            split(lower(value), '\\s'), 
            x -> x not rlike '\\s' and x not rlike '^(?!#).+'
        )
    """)
)

df2.show(truncate=False)
+-----------------+---------------------+
|value            |Hashtags             |
+-----------------+---------------------+
|Instead, it has  |[]                   |
|#iran #abd #biden|[#iran, #abd, #biden]|
+-----------------+---------------------+
Sign up to request clarification or add additional context in comments.

Comments

0

You can use the following udf (user defined function):

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def keep_hashtags(array):
    return [x for x in array if x.startswith("#")]

keep_hashtags_udf = udf(keep_hashtags, ArrayType(StringType()))

Hashtags = Hashtags.withColumn("Hashtags", keep_hashtags_udf(Hashtags["Hashtags"]))

This should give you the desired result. Also, check my blog: https://byambaa1982.github.io/2023/02/11/pysparkv4.html

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.