0

I am implementing a code to dynamically add multiple columns to a Dataframe with null values in row

I found the following code snippet in scala where the map function of Dataframe object is used.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, "string", false))
  schema
}

I have implemented the following two functions in java

 private StructType getSchema(Dataset<Row> df, List<String> cols){
        StructType schema = df.schema();
        cols.forEach(col -> schema.add(col, "int", true));
        return schema;
    }

private addColumnsViaMap(Dataset<Row> df, List<String> cols){
    Encoder<Row> encoder1 = 
  RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
   df.map(new MapFunction<Set<String>, Row>() {
                private static final long serialVersionUID = 1L;

                @Override
                public Row call(Set<String> cols) throws Exception {
                    // TODO Auto-generated method stub
                }
            }, encoder1);
}

The addColumnsViaMap method has compilation error connot resolve anonymous map function method due to the parameters mismatch.

and i dont understand the scala code of mappingRows especially the following StructType => List[String] => Row => Row = (schema) => (words) => (row) what this means ??

and how to implement the above scala code in Java ?

1

2 Answers 2

2

Well, this declaration is a bit complex (and IMO a bit unreadable too), so let's step back.

In scala, String, List... are types everyone knows of. You can make a variable of type String.

What you can also do, is assign a function to a variable (this is the functionnal orientation of scala), so functions also have types. Say for example, if you have a function that takes a List and outputs a String, it is of type List => String.

And does that look like in code ?

// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString

But we have a shorter notation for declaring functions, we may declare them "inline", without using a def statement. So that the above code can be equivalently written :

val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")

So, generically speaking :

  • A => B is a type, of a function that takes an A and returns a B
  • (arg: A) => { new B() } is an actual function that takes an instance of A as input (the instance being bound to the variable name arg and whose body returns an instance of B

Now let's do something crazy, let's... start over. Say that F is a function that takes a List and returns a String. What would a function that takes an Int and return a F look like ?

Well it would be :

  • Int => F.
  • That is to say : Int => (List => String)
  • Which can be written Int => List => String

And how do you declare it ?

// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")

Here, intToListToString is a function that takes an int and returns "a function that takes a List and returns a String".

And you can nest again, and again.

Until you get : StructType => List[String] => Row => Row which is a type that means "a function that takes a StructType as input and returns (a function that takes a List[String]as input and returns (a function that takes a Rowas input and returns a row)).

And you could implement it as :

(schema) => // a function that takes schema, and returns
    (words) => // a function that takes a list of words and returns
        (row) => // a function that takes a row and returns
            Row.fromSeq(...) // another row

Now what would that look like in Java ?

If you want to convert it strictly as it is, you may think about it this way : the natural equivalent of scala's A => B is java.util.Function<A, B>. On top of it, if you want to use a function to do a Spark map operation on a Dataframe, you have to use a MapFunction<>.

So we are looking to implement a Function<Schema, Function<List<String>, MapFunction<Row, Row>>> or something of the sort.

Using java lambda notation, you can do it this way :

schema ->  words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))

Which is a function that takes a schema,

  • that returns a function that takes a list of word

    • that returns a function that takes a Row

      • that returns a row augmented with columns containing null

Maybe my java syntax is correct, maybe not I do not know.

What I do know is that it is a vastly too complex way of achieving your requirements.

What is this requirement : you have a dataframe, you have a list of words, you want to create new columns with this name and containing null.

So what I would have done in scala is this :

import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))

val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show

+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
|      a|      b|   null|   null|
|      c|      d|   null|   null|
+-------+-------+-------+-------+

Which you can probably write in java as such

DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
    for (String word: words) {
        dataframe = dataframe.withColumn(word, lit((String) null))
    }
    return dataframe;
}

Once again, I do not have a Java based spark environment, but my point is : if you get the principle, rewritting is simple.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks that helps, now i understand the scala code clearly. Now that if i have to implement 3 MapFunction methods in one single statement, its lot more confusing and difficult to implement. Could you please help me to break the above scala code to 3 separate map functions. So that i can try to implement in java as 3 separate map functions.
@vkumar22 you do not have to write it as 3 separate functions. Though you probably can. First : I've rewritten it all in one line of scala that does not involve any nesting of functions. Second : having functions that return functions 3 level deep is not natural in Java (possible ? yes, natural ? not so much). Scala being functionnally oriented is more natural for this. But it could just as easily be written as a single function that takes 3 arguments in both languages, instead of a curried version (3 functions taking one argument each). If you undertand it, you can convert it.
Thanks !! Yes, i wanted to follow that approach of adding the new columns initially. But i found in the link lansalo.com/2018/05/13/… where they have explained that solution is not so performance/memory efficient. So i wanted to follow the other efficient approach.
But what are they measuring ? They are measuring the time spent declaring the dataframe. Surely the time spent creating a dataframe is ridiculously dwarfed by the computation that it involves. Who cares if declaring the computation takes 400 millli seconds instead of 70. Surely you are not running a spark cluster for a computation whose order of magnitude is in the milli seconds ball-park ? Have you actually measured ?
0
private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

Simply put, that could be read as :

mappingRows is a 'function' that takes 3 parameters (of types StructType, List and Row, say schema, words and row) and that returns a Row. But instead of calling it like that :

mappingRows(schema, words, row)`

you will go

mappingRows(schema)(words)(row)

This means that calling just

mappingRows(schema)(words)

will return a function that take a Row and returns a Row : a mapping function that you can pass to the typical .map() function.

Basically, given a schema and a list of col names, the closure takes a row as input. It simply adds on null column to that row for each given col name.

Does it help you answer your question ?

3 Comments

Thanks for your quick response. If you look at the following code ``` def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = { val encoder = RowEncoder.apply(getSchema(df, words)) df.map(mappingRows(df.schema)(words))(encoder) } ``` The mappingRows is called with only two parameters schema and words, but row is not passed to it. how is row accessed inside that ?
@vkumar22 yes it is called with only two params : schema and words : that will return a function that takes a Row and returns a Row : a mapping function, that you can pass to the map() function ... Got it ? I edited my answer accordingly.
Thanks for providing more details, i got it. Let me see if i can try that in java

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.