7

I have a DataFrame with many columns of str type, and I want to apply a function to all those columns, without renaming their names or adding more columns, I tried using a for-in loop executing withColumn (see example bellow), but normally when I run the code, it shows a Stack Overflow (it rarely works), this DataFrame is not big at all, it has just ~15000 records.

# df is a DataFrame
def lowerCase(string):
    return string.strip().lower()

lowerCaseUDF = udf(lowerCase, StringType())

for (columnName, kind) in df.dtypes:
    if(kind == "string"):
        df = df.withColumn(columnName, lowerCaseUDF(df[columnName]))

df.select("Tipo_unidad").distinct().show()

The complete error is very long, therefore I decided to paste only some lines. But you can find the full trace here Complete Trace

Py4JJavaError: An error occurred while calling o516.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 38, worker2.mcbo.mood.com.ve): java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:2774)

I am thinking that this problem is produced because this code launches many jobs (one for each column of type string), could you show me another alternative or what I am doing wrong?

7
  • 1
    How many columns do you have? Commented Jan 28, 2016 at 16:22
  • 1
    @eliasah around 136, I think that they aren't too many Commented Jan 28, 2016 at 16:23
  • 1
    I think the loop is keeping the dataframe in memory each time you are computing on it and the GC doesn't have time to clean it thus no memory => SO Commented Jan 28, 2016 at 16:23
  • 1
    @eliasah It's very probable, but I don't have any other user friendly alternative (the other one will be to do this manually column by column) Commented Jan 28, 2016 at 16:25
  • 1
    Could you try to use a single select instead? This SO smells like some kind of issue with growing lineage. Also I wouldn't use UDF here. It is kind of wasteful and can be handled directly on internal representation. Commented Jan 28, 2016 at 16:30

1 Answer 1

15

Try something like this:

from pyspark.sql.functions import col, lower, trim

exprs = [
    lower(trim(col(c))).alias(c) if t == "string" else col(c) 
    for (c, t) in df.dtypes
]

df.select(*exprs)

This approach has two main advantages over you current solution:

  • it requires only as single projection (no growing lineage which most likely responsible for SO) instead of projection per string column.
  • it operates directly only an internal representation without passing data to Python (BatchPythonProcessing).
Sign up to request clarification or add additional context in comments.

2 Comments

Worked perfectly, but how would I do if I have to apply a really complex function, in every string column
Well, pretty much the same way :) If you cannot use expression (in 1.6 Spark it shouldn't be a problem - there is enough to choose so you can create arbitrary complex transformation) just replace lower ∘ trim with an UDF.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.