41

I have a Spark 1.5.0 DataFrame with a mix of null and empty strings in the same column. I want to convert all empty strings in all columns to null (None, in Python). The DataFrame may have hundreds of columns, so I'm trying to avoid hard-coded manipulations of each column.

See my attempt below, which results in an error.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |    |   2|
## |null|null|
## +----+----+

## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple

## A string value of null (obviously) doesn't work...
testDF.replace('', 'null').na.drop(subset='col1').show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## +----+----+
5
  • @palsch, No, it doesn't return a list. It returns a DataFrame. I updated the question with a link to the Spark documentation. Commented Oct 22, 2015 at 20:16
  • 2
    @palsch it's not a general Python question! Spark DataFrames are distributed data structure used generally to allow heavy data analysis on big data. So you're solution isn't fit. Commented Oct 22, 2015 at 20:41
  • 1
    @eliasah Truth be told Pythonic lambda x: None if not x else x wrapped with udf would work just fine :) Commented Oct 23, 2015 at 18:41
  • 1
    @zero323 but he asked the OP to return a list... Commented Oct 23, 2015 at 18:43
  • Which of the answers is most efficient? Commented Mar 8, 2019 at 13:47

9 Answers 9

52

It is as simple as this:

from pyspark.sql.functions import col, when

def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1"))

dfWithEmptyReplaced.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## |null|null|
## +----+----+

dfWithEmptyReplaced.na.drop().show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## +----+----+

If you want to fill multiple columns you can for example reduce:

to_convert = set([...]) # Some set of columns

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF)

or use comprehension:

exprs = [
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns]

testDF.select(*exprs)

If you want to specifically operate on string fields please check the answer by robin-loxley.

Sign up to request clarification or add additional context in comments.

7 Comments

Thanks @zero323. Can your answer be extended to handle many columns automatically and efficiently? Perhaps list all the column names, generate similar code as your answer for each column, and then evaluate the code?
I don't see any reason why you couldn't. DataFrames are lazily evaluated and the rest is just a standard Python. You'll find some options in the edit.
I'll accept this answer, but could you please add the bit from @RobinLoxley first? Or, if you don't mind I can edit your answer.
@dnlbrky It wouldn't be fair.
The statement .otherwise(None) is not necessary. None is always returned for unmatched conditions (see spark.apache.org/docs/latest/api/python/…)
|
29

UDFs are not terribly efficient. The correct way to do this using a built-in method is:

df = df.withColumn('myCol', when(col('myCol') == '', None).otherwise(col('myCol')))

5 Comments

I'm getting a 'str' not callable error on this. Any ideas why?
check your parentheses
hmm I copied directly from here.
I just tested the code and it is valid. The error is likely introduced somewhere else in the manipulation of the dataframe and the error is raised only after an "Action" like collect() or show(). Do you get the same error if you do not include my code and run df.show()?
This is definitely the right solution, using the built in functions allows a lot of optimization on the spark side. Python UDFs are very expensive, as the spark executor (which is always running on the JVM whether you use pyspark or not) needs to serialize each row (batches of rows to be exact), send it to a child python process via a socket, evaluate your python function, serialize the result and read it back off a socket
8

Simply add on top of zero323's and soulmachine's answers. To convert for all StringType fields.

from pyspark.sql.types import StringType
string_fields = []
for i, f in enumerate(test_df.schema.fields):
    if isinstance(f.dataType, StringType):
        string_fields.append(f.name)

1 Comment

What is the purpose of the enumerate? I mean, I know what it does, but is there a reason it was used instead of for field in test_df.schema.fields: ?
6

My solution is much better than all the solutions I'v seen so far, which can deal with as many fields as you want, see the little function as the following:

  // Replace empty Strings with null values
  private def setEmptyToNull(df: DataFrame): DataFrame = {
    val exprs = df.schema.map { f =>
      f.dataType match {
        case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name)
        case _ => col(f.name)
      }
    }

    df.select(exprs: _*)
  }

You can easily rewrite the function above in Python.

I learned this trick from @liancheng

Comments

2

If you are using python u can check the following.


+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2|     |    |
|    |name3|null|
+----+-----+----+

def convertToNull(dfa):
   for i in dfa.columns:
    dfa = dfa.withColumn(i , when(col(i) == '', None ).otherwise(col(i)))
  return dfa

convertToNull(dfa).show()

+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2| null|null|
|null|name3|null|
+----+-----+----+

Comments

0

I would add a trim to @zero323's solution to deal with cases of multiple white spaces:

def blank_as_null(x):
    return when(trim(col(x)) != "", col(x))

Comments

0

Thanks to @zero323 , @Tomerikoo and @Robin Loxley
Ready to use function:

def convert_blank_to_null(df, cols=None):
    from pyspark.sql.functions import col, when, trim
    from pyspark.sql.types import StringType

    def blank_as_null(x):
        return when(trim(col(x)) == "", None).otherwise(col(x))
    # Don't know how to parallel
    for f in (df.select(cols) if cols else df).schema.fields:
        if isinstance(f.dataType, StringType):
            df = df.withColumn(f.name, blank_as_null(f.name))
    return df

Comments

0

This helps me to sanitize the values.

For all the columns:

address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in address_df.columns]).distinct()
address_sanitize_df.show()

For specific columns:

sanitize_cols=["address_line2","zip4"]
address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sanitize_cols])
address_sanitize_df.show()

Comments

-2

This is a different version of soulmachine's solution, but I don't think you can translate this to Python as easily:

def emptyStringsToNone(df: DataFrame): DataFrame = {
  df.schema.foldLeft(df)(
    (current, field) =>
      field.dataType match {
        case DataTypes.StringType =>
          current.withColumn(
            field.name,
            when(length(col(field.name)) === 0, lit(null: String)).otherwise(col(field.name))
          )
        case _ => current
      }
  )
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.