1

I have a custom foreach writer for Spark streaming. For each row I write to JDBC source. I also want to do somekind of fast lookup before I perform a JDBC operation and update the value after I perform JDBC operations, like "Step-1" and "Step-3" in below sample code ...

I don't want to use external databases like REDIS, MongoDB. I want something with low foot print like RocksDB, Derby, etc ...

I'm okay with storing one-file per application, just like checkpointing , I'll create a internal-db folder ...

I could not see any in-memory DB for Spark ..

def main(args: Array[String]): Unit = {

val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();

val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;

val kafkaDataframe = sparkSession.readStream.format("kafka")
  .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
  .load()

kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat ( topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")

val customForEachWriter = new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long) = {
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
    true
  }

  override def process(value: Row) = {
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store

    // JDBC operations

    // Step 3 ==> Update the value in persistent KEY-VALUE store
  }

  override def close(errorOrNull: Throwable) = {
    println(" ************** Closed ****************** ")
  }
}

val yy = sqlDataframe
  .writeStream
  .queryName("foreachquery")
  .foreach(customForEachWriter)
  .start()

yy.awaitTermination()

sparkSession.close();

}

3
  • 1
    Are you asking about db.apache.org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html ? I don't really know what a "persistent in-memory database" is, unless you're talking about using hardware like NVM? Without special hardware a Derby in-memory database is NOT durable. Commented Sep 15, 2017 at 19:50
  • 1
    What I meant in-memory means .. mysql, redis runs as separate process... which I don't want ... derby loads into spark driver programs and from executors i want to connect to derby... coz my spark job which is run by yarn will be on 5 machines .. so can i use derby is spark ... and will it work for my need step 1 and 3 ... however does not support MVCC so I'm thinking of H2 database... so I want to experience of using Derby and H2 is Spark Commented Sep 15, 2017 at 23:53
  • 2
    OK. Derby's term for that "in-process" database engine is "embedded", and yes it works well for embedding Derby into another (Java) application. You are correct that Derby is not a MVCC database engine. To get started with Derby I recommend the tutorials at: db.apache.org/derby/docs/10.13/getstart Commented Sep 16, 2017 at 14:43

1 Answer 1

2

Manjesh,

What you are looking for, "Spark and your in-memory DB as one seamless cluster, sharing a single process space", with support for MVCC is exactly what SnappyData provides. With SnappyData, the tables that you want to do a fast lookup on are in the same process that is running your Spark streaming job. Check it out here

SnappyData has a Apache V2 license for the core product and the specific use that you are referring to is available in the OSS download.

(Disclosure: I am a SnappyData employee and it makes sense to provide a product specific answer to this question because the product is the answer to the question)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.