3

My objective is to add columns to an existing DataFrame and populate the columns using transformations from existing columns in the DF.

All of the examples I find use withColumn to add the column and when().otherwise() for the transformations.

I desire to use a defined function(x:String) with match case which allows me to use string functions and apply more complex transformations.

Sample DataFrame

val etldf = Seq(   
            ("Total, 20 to 24 years            "),
            ("Men, 20 to 24 years              "),
            ("Women, 20 to 24 years            ")).toDF("A")

Applying a simple transformation using when().otherwise(). I can nest a bunch of these together but soon it will get messy.

val newcol = when($"A".contains("Men"), "Male").
  otherwise(when($"A".contains("Women"), "Female").
  otherwise("Both"))
val newdf = etldf.withColumn("NewCol", newcol)      
newdf.select("A","NewCol").show(100, false)

The output as follows:

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |Both  |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

But lets say I wanted a slightly more complex transformation:

val newcol = when($"A".contains("Total") && $"A".contains("years"), $"A".indexOf("to").toString())

It doesn't like this because indexOf is a String function and not a member of ColumnName.

What I really want to do is define a function that can implement very complex transformations and pass that to withColumn():

 def AtoNewCol( A : String): String = A match {
   case a if a.contains("Men") => "Male"
   case a if a.contains("Women") => "Female"
   case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
   case other => "Both"
 }
 AtoNewCol("Total, 20 to 24 years            ")  

The output results in a value of 10 (the position of "to")

But I am faced the same type mismatch: withColumn() wants a ColumnName object:

scala> val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
<console>:33: error: type mismatch;
found   : org.apache.spark.sql.ColumnName
required: String
val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
                                                    ^

If I change the signature of AtoNewCol(A: org.apache.spark.sql.ColumnName) I get the same problem in the implementation:

scala>  def AtoNewCol( A : org.apache.spark.sql.ColumnName): String = A 
match {
 |     case a if a.contains("Men") => "Male"
 |     case a if a.contains("Women") => "Female"
 |     case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
 |     case other => "Both"
 |   }
<console>:30: error: type mismatch;
found   : org.apache.spark.sql.Column
required: Boolean
       case a if a.contains("Men") => "Male"
                           ^
.
.
.
etc.  

I am hoping that there is a syntax that allows binding the value of the column to the function.

Or maybe there is a function other than withColum() that enables defining more complex functions for the transformations.

Open to all suggestions.

1
  • You need a udf function for that Commented Apr 3, 2018 at 4:33

2 Answers 2

4

All you need is a udf function

import org.apache.spark.sql.functions._
def AtoNewCol = udf(( A : String) => A match {
  case a if a.contains("Men") => "Male"
  case a if a.contains("Women") => "Female"
  case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
  case other => "Both"
})

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

And you should get

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |10    |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

udf function works row by row and manipulation on data happens on primitive datatypes and not column-wise as with other inbuilt functions

Sign up to request clarification or add additional context in comments.

4 Comments

Hi Ramesh :) I don't want you to be so upset. Do points really matter that much. I like getting points but they are not that important. Isn't goal here for practitioners to come to the community and get help when they need it? Good questions and good answers make for a good knowledge base to search for future reference. We all benefit from that. That's why I do it.
If you hadnt said "So to be objective and fair I used the order as my decision tree and then validated with times" I wouldnt have got upset.. Did you get my point? And its not about points. Its about fairness and your decision tree. And why did you delete those worded comment?
@Threadid, And Instead of realizing your mistake and correcting that you are talking big things to me . wow
@Threadid, By the way I know that Stackoverflow team would eventually assign the acceptance to the deserved answer (even if you don't wish doing it). I have been through it. Stackoverflow team is just great.
3

You need to create UDF for that, you can try something following. I'm using your defined function as it is.

def AtoNewCol = udf((A: String) => {
  A match {
    case a if a.contains("Men") => "Male"
    case a if a.contains("Women") => "Female"
    case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString
    case other => "Both"
  }
})

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

//    output
//    +---------------------------------+------+
//    |A                                |NewCol|
//    +---------------------------------+------+
//    |Total, 20 to 24 years            |10    | 
//    |Men, 20 to 24 years              |Male  |
//    |Women, 20 to 24 years            |Female|
//    +---------------------------------+------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.