1

I'm tring to replace the string in a dataframe column using regexp_replace. I have to apply regex patterns to all the records in the dataframe column. But the strings are not replacing as expected.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql
from  pyspark.sql.functions import regexp_replace,col
import re

conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)


df=sc.parallelize([('2345','ADVANCED by John'),
('2398','ADVANCED by ADVANCE'),
('2328','Verified by somerandomtext'),
('3983','Double Checked by Marsha')]).toDF(['ID', "Notes"])

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

for i in range(len(reg_patterns)):
        res_split=re.findall(r"[^/]+",reg_patterns[i])
        res_split[0]
        df=df.withColumn('NotesUPD',regexp_replace(col('Notes'),res_split[0],res_split[1]))

df.show()

Output :

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|    ADVANCED by John|
|2398| ADVANCED by ADVANCE| ADVANCED by ADVANCE|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+

Expected Output:

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|    ADV by John|
|2398| ADVANCED by ADVANCE|    ADV by ADV |
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+

3 Answers 3

5

You should write a udf function and loop in your reg_patterns as below

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

import re
from pyspark.sql import functions as f
from pyspark.sql import types as t
def replaceUdf(column):
    res_split=[]
    for i in range(len(reg_patterns)):
        res_split=re.findall(r"[^/]+",reg_patterns[i])
        for x in res_split[0].split("|"):
            column = column.replace(x,res_split[1])
    return column

reg_replaceUdf = f.udf(replaceUdf, t.StringType())

df = df.withColumn('NotesUPD', reg_replaceUdf(f.col('Notes')))
df.show()

and you should have

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|         ADV by John|
|2398| ADVANCED by ADVANCE|          ADV by ADV|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

glad that it was helpful
4

The problem is that you code repeatedly overwrites previous results starting from the beginning. Instead you should build on the previous results:

notes_upd = col('Notes')

for i in range(len(reg_patterns)):
    res_split=re.findall(r"[^/]+",reg_patterns[i])
    res_split[0]
    notes_upd = regexp_replace(notes_upd, res_split[0],res_split[1])

and you'll get the desired result:

df.withColumn('NotesUPD', notes_upd).show()

# +----+--------------------+--------------------+
# |  ID|               Notes|            NotesUPD|
# +----+--------------------+--------------------+
# |2345|    ADVANCED by John|         ADV by John|
# |2398| ADVANCED by ADVANCE|          ADV by ADV|
# |2328|Verified by somer...|Verified by somer...|
# |3983|Double Checked by...|Double Checked by...|
# +----+--------------------+--------------------+

1 Comment

@marjun, this method is preferred over using a udf. You had a simple mistake in your original code, which this answer fixes.
2

Previous solutions are limited to short length reg_patterns. The below implementation scales nicely when the normalization pattern has many entries (e.g. spell correction using a custom dictionary).

Start by mapping the reg_patterns list to a dictionary:

from pyspark.sql.functions import col, udf

def parse_string(s, or_delim, target_delim):
    keys,value = s.rstrip(target_delim).rsplit(target_delim)
    return {key:value for key in keys.split(or_delim)}

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

normalization_dict = {}
for item in reg_patterns:
    normalization_dict.update(parse_string(item, "|", "/"))

Complete the normalization of the DataFrame "Notes" column using a curried function as follows:

def my_norm_func(s, ngram_dict, pattern):
    return pattern.sub(lambda x: ngram_dict[x.group()], s)

norm_pattern = re.compile(r'\b(' + '|'.join([re.escape(item)\
                      for item in normalization_dict.keys()]) + r')\b')
my_norm_udf = udf(lambda s: my_norm_func(s, normalization_dict, norm_pattern))
df = df.withColumn("NotesUPD", my_norm_udf(col("Notes")))

yields the following desired result:

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|         ADV by John|
|2398| ADVANCED by ADVANCE|          ADV by ADV|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.