0

I have a dataframe yeadDF, created by reading an RDBMS table as below:

val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
                                                .option("dbtable", s"(${query}) as year2017")
                                                .option("user", devUserName)
                                                .option("password", devPassword)
                                                .option("numPartitions",15)
                                                .load()

I have to apply a regex pattern to the above dataframe before ingesting it into Hive table on HDFS. Below is the regex pattern:

regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' )

I should be applying this regex only on the columns that are of datatype String in the dataframe: yearDF. I tried the following way:

val regExpr = yearDF.schema.fields
    .map(x => 
        if(x.dataType == String)
             "regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x,x)
     )
yearDF.selectExpr(regExpr:_*)

But it gives me a compilation error: Type mismatch, expected: Seq[String], actual: Array[Any]

I cannot use yearDF.columns.map as this will act on all the columns and I am unable to properly form the logic here. Could anyone let me know how can I apply the regex mentioned above on the dataframe:yearDF only on the columns that are of String type ?

2 Answers 2

1

It's because yearDF.selectExpr(regExpr:_*) expects regExpr to be a Seq of String, while your regExpr is Array[Any]. Ok, that you see in the message. But why it's Array[Any]?

Look at your map function. for each field in schema, you are mapping: - each column with StringType to expression with regular expression - other cases -> None.

Btw., use org.apache.spark.sql.types.StringType, String.

So, instead, write:

val regExpr = yearDF.schema.fields
    .map(x => 
        if (x.dataType == StringType) 
              "regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x.name, x.name)
        else x.name
     )
yearDF.selectExpr(regExpr:_*)
Sign up to request clarification or add additional context in comments.

5 Comments

I am getting an exception: extraneous input '(' expecting {')', ',', '.', '[', 'OR', 'AND', 'IN', NOT, 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, '+', '-', '*', '/', '%', 'DIV', '&', '|', '^'}(line 1, pos 74) regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\n]+', ' ', 'g' ), E'[\\r]+', ' ', 'g' ), E'[\\t]+', ' ', 'g' ), E'[\\cA]+', ' ', 'g' ), E'[\\ca]+', ' ', 'g' ) as at com.partition.source.YearPartition$.main(YearPartition.scala:101) I checked all the brackets and they are all balanced. Don't understand what is wrong.
@NewComer Ok, try with your string with s before " + format() ;) I maybe made some mistake in the string, but it's quite long to easily debug :P
@NewComer Changed also in the answer
It says "Literals of type 'E' are currently not supported." when it is first encountered. I tried to look out for 'E' in regex_replace but couldn't find anything.
@NewComer you have it in here: regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ) and in other occurences ;) Probably a typo, but not related to the core of the question. Changing i.e. to upper(%s) as %s shows that code works, regexp may be wrong, but it's a candidate for other question
0

I also faced similar issues while applying regex_replace() to only strings columns of a dataframe. The trick is to make regEx pattern (in my case "pattern") that resolves inside the double quotes and also apply escape characters.

val pattern="\"\\\\\\\\[\\\\\\\\x{FFFD}\\\\\\\\x{0}\\\\\\\\x{1F}\\\\\\\\x{81}\\\\\\\\x{8D}\\\\\\\\x{8F}\\\\\\\\x{90}\\\\\\\\x{9D}\\\\\\\\x{A0}\\\\\\\\x{0380}\\\\\\\\]\""

val regExpr = parq_out_df_temp.schema.fields.map(x => 
if(x.dataType == StringType){s"regexp_replace(%s, $pattern,'') as %s".format(x.name,x.name)} 
else x.name)

val parq_out=parq_out_df_temp.selectExpr(regExpr:_*)

This worked fine for me!!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.