1

I am using spark 2.4.1 version and java 8.

I have scenario like:

  • Will be provided a list of classifiers from a property file to process.
  • These classifiers determines the data what to pull and process.

Something like the below:

val classifiers = Seq("classifierOne","classifierTwo","classifierThree");

for( classifier : classifiers ){
  // read from CassandraDB table   
  val acutalData = spark.read(.....).where(<classifier conditition>)

  // the data varies depend on the classifier passed in 
  // this data has many fields along with fieldOne, fieldTwo and fieldThree

Depend on the classifier I need to filter the data. Currently I am doing it as below:

if(classifier.===("classifierOne")) {
  val classifierOneDs =  acutalData.filter(col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()));
  writeToParquet(classifierOneDs);                           
} else if(classifier.===("classifierTwo")) {
  val classifierTwoDs =  acutalData.filter(col("classifierTwo").notEqual(lit("")).or(col("classifierTwo").isNotNull()));
  writeToParquet(classifierOneDs);
} else (classifier.===("classifierThree")) {
  val classifierThreeDs =  acutalData.filter(col("classifierThree").notEqual(lit("")).or(col("classifierThree").isNotNull()));
  writeToParquet(classifierOneDs);
}

Is there any way to avoid the if-else block here? Any other way to do/achieve the same in spark distrubated way?

1
  • your code is bad, 1) you're repeating the write method in each if condition and 2) it'salways the same dataframe writeToParquet(classifierOneDs); - you should read about how to use Scala properly ... val df = if() else ... and afterwards to the write once. Also Scala pattern matching is more Scala style ... Commented May 13, 2020 at 14:18

2 Answers 2

1

Your question seems more about how to structure your application than Spark itself. There are two parts really.

Is there any way to avoid the if-else block here?

"Avoid"? In what sense? Spark can't magically "discover" your way of doing distributed processing. You should help Spark a bit.

For this case I'd propose a lookup table with all possible filter conditions and their names to look up by, e.g.

val classifiers = Map(
  "classifierOne" -> col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()),
  "classifierTwo" -> ...,
  "classifierThree" -> ...)

In order to use it you simply iterate over all the classifiers (or look up as many as needed), e.g.

val queries = classifiers.map { case (name, cond) =>
  spark
    .read(.....)
    .where(cond)
    .filter(col(name).notEqual(lit("")).or(col(name).isNotNull()))
}

queries is a collection of DataFrames to be writeToParquet and it's up to you how to make the queries executed in parallel (Spark will take care of doing it in distributed way). Use Scala Futures or another parallel API.

I think the following could make it just fine:

queries.par.foreach(writeToParquet)

With queries.par.foreach you essentially execute all writeToParquet in parallel. Since writeToParquet executes a DataFrame action to writing in parquet format that follows all the rules of Spark for any other action. It will run a Spark job (perhaps even more than one) and the job is executed in distributed fashion using Spark machinery.

Think of queries.par as a way to execute the queries one by one without waiting for earlier queries to finish to start a new one. You are strongly recommended to configure FAIR scheduling mode:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads.

Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources.

Sign up to request clarification or add additional context in comments.

4 Comments

foreach runs on the driver and submit Spark jobs as if you used a series of Spark jobs one by one. It's more a coding trick not a Spark feature. The best is always to use web UI and see yourself.
Just to be clear @Jacek Laskowski this is only true for spark and not spark strcutured streaming
Well, it depends where foreach comes from. The types (classes) matter.
Fair enough. The thing is true 100%, but what is also true is that in structured streaming this is less of a concern, because you start the application with .start() and this is asynchronous. Does not matter if one use foreach or foreachbatch, the query start asynchronously always.
1

So, you need to select, what column to check, based on classifier name, that will be passed as a list?

val classifiers = Seq("classifierOne","classifierTwo","classifierThree");

for(classifier : classifiers) {

    val acutalData = spark.read(.....).where(<classifier conditition>)
    val classifierDs =  acutalData.filter(col(classifier).notEqual(lit("")).or(col(classifier).isNotNull()));
    writeToParquet(classifierDs);

}

As you're iterating through list, you would be going through all the classifiers anyway. If column name can be different from actual classifier name, you can make it List[Classifier], where Classifier is something like case class Classifier(colName: String, classifierName: String)

2 Comments

If you want outputs to be written separately, to different paths - then you should look into Scala Futures, so that you'll be able to run all the processing at once, and then wait for result. Take a look at docs.scala-lang.org/overviews/core/futures.html, you'll need something like val classificationFutures = classifiers.map {cl => Future { // same code as before } } val combinedFuture = Future.sequence(classificationFutures) Await.result(combinedFuture, Duration.Inf) (dirty code, just as example)
Other option - if you'll be combining results later in one dataframe, try classifiers.map {classifier => //all the code without parquet saving, returning DF}.reduce(_ union _) - this will give you single Dataframe with results of each classifier run (though I'm not sure if that is what you really need)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.