43

I am trying to take my input data:

A    B       C
--------------
4    blah    2
2            3
56   foo     3

And add a column to the end based on whether B is empty or not:

A    B       C     D
--------------------
4    blah    2     1
2            3     0
56   foo     3     1

I can do this easily by registering the input dataframe as a temp table, then typing up a SQL query.

But I'd really like to know how to do this with just Scala methods and not having to type out a SQL query within Scala.

I've tried .withColumn, but I can't get that to do what I want.

3 Answers 3

102

Try withColumn with the function when as follows:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
    .toDF("A", "B", "C")

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

newDf.show() shows

+---+----+---+---+
|  A|   B|  C|  D|
+---+----+---+---+
|  4|blah|  2|  1|
|  2|    |  3|  0|
| 56| foo|  3|  1|
|100|null|  5|  0|
+---+----+---+---+

I added the (100, null, 5) row for testing the isNull case.

I tried this code with Spark 1.6.0 but as commented in the code of when, it works on the versions after 1.4.0.

Sign up to request clarification or add additional context in comments.

7 Comments

This is exactly what I was looking for. I tried a couple different things with when and otherwise but I guess I was getting the exact format wrong. Slightly off topic, but do you know how Spark handles withColumn? Like, if I'm adding ~20 columns, would it be faster to do 20 .withColumn and keep it a dataframe or to map it to an RDD and just add them all in the map then convert back to a dataframe to save to parquet?
Just found this. I think UDFs are what I am looking for.
why does this not work with if? df.withColumn("D", if(df("B") == "") lit(0) else lit(1))
@SumitKumarGhosh df("B") is a column. the condition df("B") == "" should never be true, because a column is not the same kind of object as a string. furthermore, the condition df("B") == "" is an all-or-nothing condition. it is not evaluated row-by-row, as i suspect you want. the when/otherwise syntax does the right thing, by contrast
Can we do 'in' query while applying when
|
4

My bad, I had missed one part of the question.

Best, cleanest way is to use a UDF. Explanation within the code.

// create some example data...BY DataFrame
// note, third record has an empty string
case class Stuff(a:String,b:Int)
val d= sc.parallelize(Seq( ("a",1),("b",2),
     ("",3) ,("d",4)).map { x => Stuff(x._1,x._2)  }).toDF

// now the good stuff.
import org.apache.spark.sql.functions.udf
// function that returns 0 is string empty 
val func = udf( (s:String) => if(s.isEmpty) 0 else 1 )
// create new dataframe with added column named "notempty"
val r = d.select( $"a", $"b", func($"a").as("notempty") )

    scala> r.show
+---+---+--------+
|  a|  b|notempty|
+---+---+--------+
|  a|  1|    1111|
|  b|  2|    1111|
|   |  3|       0|
|  d|  4|    1111|
+---+---+--------+

1 Comment

There is only one dataframe at play here. You might want to re-read the question
2

How about something like this?

val newDF = df.filter($"B" === "").take(1) match {
  case Array() => df
  case _ => df.withColumn("D", $"B" === "")
}

Using take(1) should have a minimal hit

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.