1

How can I select a case class based on a String value?

My code is

val spark = SparkSession.builder()...
val rddOfJsonStrings: RDD[String] = // some json strings as RDD
val classSelector: String = ??? // could be "Foo" or "Bar", or any other String value
case class Foo(foo: String)
case class Bar(bar: String)


if (classSelector == "Foo") {
  val df: DataFrame = spark.read.json(rddOfJsonStrings)
  df.as[Foo]
} else if (classSelector == "Bar") {
  val df: DataFrame = spark.read.json(rddOfJsonStrings)
  df.as[Bar]
} else {
  throw ClassUnknownException //custom Exception
}

The variable classSeletector is a simple String that should be used to point to the case class of the same name.

Imagine I don't only have Foo and Bar as case classes but more then those two. How is it possible to call the df.as[] statement based on the String (if possible at all)?

Or is there a completely different approach available in Scala?

5
  • can you explain - classSelector & is it function ? Commented Jun 9, 2020 at 13:27
  • the classSelector is a simple String that is given e.g. through the main(args) function. Is it possible to use the value of this string to point to the case class with the identical name? Commented Jun 9, 2020 at 13:31
  • means something like classSelector="Foo" ?? Commented Jun 9, 2020 at 13:33
  • The "completely different approach" would be to use a typeclass that encapsulates the specific behaviour for Foo and Bar. There are various tutorials about this online, so pick one that looks right for you. Commented Jun 9, 2020 at 13:42
  • @mike Note: I've edited the answer because it turns out it's much easier to do partially than I thought, writing so you'll get a notification. Commented Jun 10, 2020 at 9:21

5 Answers 5

2

Check below code

classSeletector match {
    case c if Foo.getClass.getSimpleName.replace("$","").equalsIgnoreCase(c) =>  spark.read.json(rddOfJsonStrings).as[Foo]
    case c if Bar.getClass.getSimpleName.replace("$","").equalsIgnoreCase(c) =>  spark.read.json(rddOfJsonStrings).as[Bar]
    case _ => throw ClassUnknownException //custom Exception
}

Sign up to request clarification or add additional context in comments.

Comments

1

How is it possible to call the df.as[] statement based on the String (if possible at all)?

It isn't (or based on any runtime value). You may note that all answers still need to:

  1. have a separate branch for Foo and Bar (and one more branch for each class you'll want to add);

  2. repeat the class name twice in the branch.

You can avoid the second:

import scala.reflect.{classTag, ClassTag}

val df: DataFrame = spark.read.json(rddOfJsonStrings)
// local function defined where df and classSelector are visible
def dfAsOption[T : Encoder : ClassTag] =
  Option.when(classSelector == classTag[T].runtimeClass.simpleName)(df.as[T])

dfAsOption[Foo].dfAsOption(asOption[Bar]).getOrElse(throw ClassUnknownException)

But for the first you'd need a macro if it's possible at all. I would guess it isn't.

Comments

1

Define a generic method and invoke it,


getDs[Foo](spark,rddOfJsonStrings)
getDs[Bar](spark,rddOfJsonStrings)

def getDs[T](spark : SparkSession, rddOfJsonStrings:String)  {
    spark.read.json(rddOfJsonStrings).as[T](Encoders.bean[T](classOf[T]))
  }

1 Comment

Thank you for your reply @QuickSilver. If I understand correctly, this will reduce a bit of code, but how could I use the content of classSelector to avoid calling getDs[Foo] and getDs[Bar] individually. I was hoping to do something like getDs[classSelector.asClassName].
0

Alternative-

highlights-

  1. Use simpleName of the case class and not of the companion object
  2. if classSelector is null, the solution won't fail
case class Foo(foo: String)
case class Bar(bar: String)

Testcase-

 val rddOfJsonStrings: RDD[String] = spark.sparkContext.parallelize(Seq("""{"foo":1}"""))
    val classSelector: String = "Foo" // could be "Foo" or "Bar", or any other String value

    val ds = classSelector match {
      case foo if classOf[Foo].getSimpleName == foo =>
        val df: DataFrame = spark.read.json(rddOfJsonStrings)
        df.as[Foo]
      case bar if classOf[Bar].getSimpleName == bar =>
        val df: DataFrame = spark.read.json(rddOfJsonStrings)
        df.as[Bar]
      case _ => throw new UnsupportedOperationException
    }

    ds.show(false)

    /**
      * +---+
      * |foo|
      * +---+
      * |1  |
      * +---+
      */

Comments

0

You can use reflective toolbox

import org.apache.spark.sql.{Dataset, SparkSession}
import scala.reflect.runtime
import scala.tools.reflect.ToolBox

object Main extends App {
  val spark = SparkSession.builder
    .master("local")
    .appName("Spark SQL basic example")
    .getOrCreate()
  import spark.implicits._

  val rddOfJsonStrings: Dataset[String] = spark.createDataset(Seq("""{"foo":"aaa"}"""))
  // val rddOfJsonStrings: Dataset[String] = spark.createDataset(Seq("""{"bar":"bbb"}"""))

  val classSelector: String = "Foo"
  // val classSelector: String = "Bar"

  case class Foo(foo: String)
  case class Bar(bar: String)

  val runtimeMirror = runtime.currentMirror
  val toolbox = runtimeMirror.mkToolBox()

  val res = toolbox.eval(toolbox.parse(s"""
    import org.apache.spark.sql.DataFrame
    import Main._
    import spark.implicits._
    val df: DataFrame = spark.read.json(rddOfJsonStrings)
    df.as[$classSelector]
  """)).asInstanceOf[Dataset[_]]

  println(res) // [foo: string]
}

Notice that statically you will have a Dataset[_], not Dataset[Foo] or Dataset[Bar].

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.