I had a similar problem, and I solved by using Spark-JobServer.
The main approach with Spark-Jobserver (SJS) usually is to create a special job that extends their SparkSQLJob such as in the following example:
object ExecuteQuery extends SparkSQLJob {
override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = sqlContext.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
However, for this approach to work well with Cassandra, you have to use the spark-cassandra-connector and then, to load the data you will have two options:
1) Before calling this ExecuteQuery via REST, you have to transfer the full data you want to query from Cassandra to Spark. For that, you would do something like (code adapted from the spark-cassandra-connector documentation):
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()
And then register it as a table in order to SparkSQL be able to access it:
df.registerAsTempTable("myTable") // As a temporary table
df.write.saveAsTable("myTable") // As a persistent Hive Table
Only after that you would be able to use the ExecuteQuery to query from myTable.
2) As the first option can be inefficient in some use cases, there is another option.
The spark-cassandra-connector has a special CassandraSQLContext that can be used to query C* tables directly from Spark. It can be used like:
val cc = new CassandraSQLContext(sc)
val df = cc.sql("SELECT * FROM keyspace.table ...")
However, to use a different type of context with Spark-JobServer, you need to extend SparkContextFactory and use it in the moment of context creation (which can be done by a POST request to /contexts). An example of a special context factory can be seen on SJS Gitub. You also have to create a SparkCassandraJob, extending SparkJob (but this part is very easy).
Finally, the ExecuteQuery job have to be adapted to use the new classes. It would be something like:
object ExecuteQuery extends SparkCassandraJob {
override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = cc.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
After that, the ExecuteQueryjob can be executed via REST with a POST request.
Conclusion
Here I use the first option because I need the advanced queries available in the HiveContext (window functions, for example), which are not available in the CassandraSQLContext. However, if you don't need those kind of operations, I recommend the second approach, even if it needs some extra coding to create a new ContextFactory for SJS.