19

I have two dataframes called left and right.

scala> left.printSchema
root
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)

scala> right.printSchema
root
|-- user_uid: double (nullable = false)
|-- real_labelVal: double (nullable = false)

Then, I join them to get the joined Dataframe. It is a left outer join. Anyone interested in the natjoin function can find it here.

https://gist.github.com/anonymous/f02bd79528ac75f57ae8

scala> val joinedData = natjoin(predictionDataFrame, labeledObservedDataFrame, "left_outer")

scala> joinedData.printSchema
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)
|-- real_labelVal: double (nullable = false)

Since it is a left outer join, the real_labelVal column has nulls when user_uid is not present in right.

scala> val realLabelVal = joinedData.select("real_labelval").distinct.collect
realLabelVal: Array[org.apache.spark.sql.Row] = Array([0.0], [null])

I want to replace the null values in the realLabelVal column with 1.0.

Currently I do the following:

  1. I find the index of real_labelval column and use the spark.sql.Row API to set the nulls to 1.0. (This gives me a RDD[Row])
  2. Then I apply the schema of the joined dataframe to get the cleaned dataframe.

The code is as follows:

 val real_labelval_index = 3
 def replaceNull(row: Row) = {
    val rowArray = row.toSeq.toArray
     rowArray(real_labelval_index) = 1.0
     Row.fromSeq(rowArray)
 }

 val cleanRowRDD = joinedData.map(row => if (row.isNullAt(real_labelval_index)) replaceNull(row) else row)
 val cleanJoined = sqlContext.createDataFrame(cleanRowRdd, joinedData.schema)

Is there an elegant or efficient way to do this?

Goolging hasn't helped much. Thanks in advance.

2
  • What does nat stand for in natjoin? Commented Aug 12, 2016 at 16:20
  • 1
    @JosiahYoder nat stands for Natural Join. Commented Sep 12, 2016 at 19:11

1 Answer 1

28

Have you tried using na

joinedData.na.fill(1.0, Seq("real_labelval"))
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for the quick response. The problem is we use cloudera distribution and the cluster has spark 1.3.0. The fill functions were introduced in spark 1.4 I think. I am accepting this as the answer.
Do I need to import anything to use na? Thanks
@GavinNiu No, na is a method directly on DataFrame
What is Seq() doing?
fill takes an array (Seq), thus the wrapper.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.