44

When I m trying to do the same thing in my code as mentioned below

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

I have taken the above reference from here: Scala: How can I replace value in Dataframs using scala But I am getting encoder error as

Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.

Note: I am using spark 2.0!

2
  • 5
    You need to import spark.implicits._. Commented Sep 11, 2016 at 6:22
  • 6
    Thanks @Yuval . but it did not work. Commented Sep 11, 2016 at 6:29

4 Answers 4

91

There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

  • in 1.x DataFrame.map is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • in 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

If you really want to use map you should use statically typed Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

or at least return an object which will have implicit encoder:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

Finally if for some completely crazy reason you really want to map over Dataset[Row] you have to provide required encoder:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)
Sign up to request clarification or add additional context in comments.

5 Comments

@Advika This is probably the worst one you could choose (really, there is no good reason to deal with Row / Any here) but I am glad it was helpful.
Thanks @zero323. Ya after doing the self review I realized the approach I have taken so far it beats all the purpose of spark. I need to come up with better logic. :)
@zero323 Why is it "completely crazy" to map over Dataset[Row]? In fact, I have a use case where I want to flatMap over Dataset[Row].
@JaneWayne Because a) you don't get performance gains offered by DataFrame and binary Encoders b) you don't get type safety c) you explicitly match each type which makes it verbose and error prone. d) you have to specify the schema for Encoder. Once again verbose an error prone. For flatMap like on Dataframe explode is usually more than enough.
@zero323, speaking about code in the map itself, I think you explicitly match each type in case of implicit encoders while in the last example you specify only type for the attribute you use in the logic.
5

For scenario where dataframe schema is known in advance answer given by @zero323 is the solution

but for scenario with dynamic schema / or passing multiple dataframe to a generic function: Following code has worked for us while migrating from 1.6.1 from 2.2.0

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

this code executes on both the versions of spark.

disadvantage : optimization provided by spark on dataframe/datasets api wont be applied.

Comments

3

Just to add a few other important-to-know points in order to well understand the other answers (especially the final point of @zero323's answer about map over Dataset[Row]):

  • First of all, Dataframe.map gives you a Dataset (more specifically, Dataset[T], rather than Dataset[Row])!
  • And Dataset[T] always requires an encoder, that's what this sentence "Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]" means.
  • There are indeed lots of encoders predefined already by Spark (which can be imported by doing import spark.implicits._), but still the list would not be able to cover many domain specific types that developers may create, in which case you need to create encoders yourself.
  • In the specific example on this page, df.map returns a Row type for Dataset, and hang on a minute, Row type is not within the list of types that have encoders predefined by Spark, hence you are going to create one on your own.
  • And I admit that creating an encoder for Row type is a bit different than the approach described in the above link, and you have to use RowEncoder which takes StructType as param describing type of a row, like what @zero323 provides above:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))

// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)

Comments

0

In my case of spark 2.4.4 version, I had to import implicits. This is a general answer

val spark2 = spark
import spark2.implicits._

val data = df.rdd.map(row => my_func(row))

where my_func did some operation.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.