1

This issue happen when I am trying to do some test on Spark-SQL external datasource.

I build the dataframe in two ways, and compare the speed of collect action. And I find that if the column number is too large, the dataframe built from external datasource will lag behind. I want to know if this is the limitation of Spark-SQL's external datasource. :-)

To present the question more clear, I write a piece of code:

https://github.com/sunheehnus/spark-sql-test/

In my benchmark code for External Datasource API, which implement a fake external datasource ( actually an RDD[String, Array[Int]] ), and get the dataframe by

val cmpdf = sqlContext.load("com.redislabs.test.dataframeRP", Map[String, String]())

Then I build the same RDD and get dataframe by

val rdd = sqlContext.sparkContext.parallelize(1 to 2048, 3)
val mappedrdd = rdd.map(x =>(x.toString, (x to x + colnum).toSeq.toArray))
val df = mappedrdd.toDF()
val dataColExpr = (0 to colnum).map(_.toString).zipWithIndex.map { case (key, i) => s"_2[$i] AS `$key`" }
val allColsExpr = "_1 AS instant" +: dataColExpr
val df1 = df.selectExpr(allColsExpr: _*)

When I run the test code, I can see the result(on my laptop):

9905
21427

But when I make the column less(512), I can see the result:

4323
2221

Looks like the question is that if the column count is small in Schema, External Datasource API will benefits, but with the growing of column count in Schema, External Datasource API will finally lag behind...... I am wondering if this is the Spark-SQL's limitaion for External Datasource API, or am I using the API in a wrong way? Thanks very much. :-)

1 Answer 1

0

You are not benchmarking what you wanted to benchmark here. This benchmark simply concludes that the more expensive version of the code (i.e. the code you wrote) is more expensive vs the built-in code.

There are two reasons for your result:

  1. When you declare a field (constant or normal column), Spark SQL actually generates fairly optimized code that unrolls the loop. When you were implementing the data source yourself, you didn't unroll the loop. As a matter of fact, not only did you not unroll the loop, you also had a few expensive operations such as "(x to x + colnum).toSeq.toArray" for each row. All of these are eliminated in the generated code by Spark SQL.

  2. If "needConversion" is false, Spark SQL introduces an inbound conversion when taking data from the data sources. This inbound conversion converts external Row format into an internal row format (e.g. strings are no longer Java Strings, but UTF8-encoded strings).

Sign up to request clarification or add additional context in comments.

2 Comments

Hello rxin, thanks very much for your help. Add the override val needConversion: Boolean = false to case class SCAN, the result with external datasource API is even more efficient. :-)
Hello rxin, switching to Spark-1.6, if add the override val needConversion: Boolean = false, will meet the exception Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.InternalRow. Add the needConversion to Spark-1.4 works well, the result using external datasource is even much better, but when switching to Spark-1.6, we can't use it any more... Is the needConversion related function cause the lag behind when the col number is very large? And is there some good ways to avoid this? Thanks very much

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.