0

Spark SQL is pretty clear to me. However, I am just getting started with spark's RDD API. As spark apply function to columns in parallel points out this should allow me to get rid of slow shuffles for

def handleBias(df: DataFrame, colName: String, target: String = this.target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }
}

In pseudo code: df foreach column (handleBias(column) So a minimal data frame is loaded up

val input = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  )
  val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

but fails to map correctly

val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}}
      rdd1_inputDf.toDF.show

It fails with

java.lang.ClassNotFoundException: scala.Any
java.lang.ClassNotFoundException: scala.Any

An example can be found https://github.com/geoHeil/sparkContrastCoding respectively https://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala for the problem outlined in this question.

1 Answer 1

3

When you call .rdd on a DataFrame you get an RDD[Row] which is not strongly typed. If you want to be able to map over the elements you will need to pattern match over Row:

scala> val input = Seq(
     |     (0, "A", "B", "C", "D"),
     |     (1, "A", "B", "C", "D"),
     |     (0, "d", "a", "jkl", "d"),
     |     (0, "d", "g", "C", "D"),
     |     (1, "A", "d", "t", "k"),
     |     (1, "d", "c", "C", "D"),
     |     (1, "c", "B", "C", "D")
     |   )
input: Seq[(Int, String, String, String, String)] = List((0,A,B,C,D), (1,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D))

scala> val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 
inputDf: org.apache.spark.sql.DataFrame = [TARGET: int, col1: string ... 3 more fields]

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val rowRDD = inputDf.rdd
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:27

scala> val typedRDD = rowRDD.map{case Row(a: Int, b: String, c: String, d: String, e: String) => (a,b,c,d,e)}
typedRDD: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[20] at map at <console>:29

scala> typedRDD.keyBy(_._1).groupByKey.foreach{println}
[Stage 7:>                                                          (0 + 0) / 4]
(0,CompactBuffer((A,B,C,D), (d,a,jkl,d), (d,g,C,D)))
(1,CompactBuffer((A,B,C,D), (A,d,t,k), (d,c,C,D), (c,B,C,D)))

Otherwise you can use a typed Dataset:

scala> val ds = input.toDS
ds: org.apache.spark.sql.Dataset[(Int, String, String, String, String)] = [_1: int, _2: string ... 3 more fields]

scala> ds.rdd
res2: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[8] at rdd at <console>:30

scala> ds.rdd.keyBy(_._1).groupByKey.foreach{println}
[Stage 0:>                                                          (0 + 0) / 4]
(0,CompactBuffer((0,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D)))
(1,CompactBuffer((1,A,B,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D)))
Sign up to request clarification or add additional context in comments.

5 Comments

As I want to use this in a ml.Pipeline and the output step is DataFrame the the "schema is lost" e.g. I will need to use pattern matching? is this correct? But there are quite a lot of columns is there a way to "infer" them somewhat (partial shcema?
Yeah the DF => RDD conversion doesn't use the schema at all unfortunately (and I don't think there is a good way to force its use). However take a look at my new Dataset example: there is no need to use an intermediary Dataframe and it looks like the DataSet infers the types nicely (in Spark 2.0 I think anything you could do with a DF can also be done with a DS)
@GeorgHeiler (not sure if you were notified of ^^^^)
thanks. Indeed you are right. However a spark transformer in a ml pipeline will output only dataframes ;) even with dataset as the input. So I think the schema will be lost for subsequent transformer steps
I posted a follow-up question here stackoverflow.com/questions/41445571/… maybe you have a suggestion as well.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.