4

I have a streaming dataframe that I am trying to write into a database. There is documentation for writing an rdd or df into Postgres. But, I am unable to find examples or documentation on how it is done in Structured streaming.

I have read the documentation https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch , but I couldn't understand where I would create a jdbc connection and how I would write it to the database.

def foreach_batch_function(df, epoch_id):
    # what goes in here?
    pass

view_counts_query = windowed_view_counts.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function)
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start() \
    .awaitTermination()

This function takes in a regular dataframe and writes into a postgres table

def postgres_sink(config, data_frame):
    config.read('/src/config/config.ini')
    dbname = config.get('dbauth', 'dbname')
    dbuser = config.get('dbauth', 'user')
    dbpass = config.get('dbauth', 'password')
    dbhost = config.get('dbauth', 'host')
    dbport = config.get('dbauth', 'port')

    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }

    data_frame.write.jdbc(url=url, table="metrics", mode="append",
                          properties=properties)

2 Answers 2

5

Examples of how Postgres ingestion is done using Structured streaming

class PostgreSqlSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "org.postgresql.Driver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO Table (A,B,C) values ( ?, ?, ?)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }
  def process(value: org.apache.spark.sql.Row): Unit = {
    // ignoring value(0) as this is address
    statement.setString(1, value(1).toString)
    statement.setString(2, value(2).toString)
    statement.setString(3, value(3).toString)
    statement.executeUpdate()        
  }
  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

val url = "jdbc:postgresql://XX.XX.XX.XX:5432/postgres"
val user = "abc"
val pw = "abc@123"
val jdbcWriter = new PostgreSqlSink(url,user,pw)
val writeData = pg_df.writeStream 
    .foreach(jdbcWriter)
    .outputMode("Append")
    .trigger(ProcessingTime("30 seconds"))
    .option("checkpointLocation", "s3a://bucket/check")
    .start()

writeData.awaitTermination
Sign up to request clarification or add additional context in comments.

2 Comments

OP is using Pyspark.
how to handle a scenario where postgres db is not accessible? Eg: DB fail-over
4

There is really little be done here, beyond what you already have. foreachBatch takes a function (DataFrame, Int) => None, so all you need is a small adapter, and everything else should work just fine:

def foreach_batch_for_config(config)
    def _(df, epoch_id):
        postgres_sink(config, df)
   return _

view_counts_query = (windowed_view_counts
    .writeStream
    .outputMode("append") 
    .foreachBatch(foreach_batch_for_config(some_config))
    ...,
    .start()
    .awaitTermination())

though to be honest passing ConfigParser around is a strange idea from the beginning. You could adjust the signature adn initialize it in place

def postgres_sink(data_frame, batch_id):
    config = configparser.ConfigParser()
    ...
    data_frame.write.jdbc(...)

and keep the rest as-is. This way you could use your function directly:

...
.foreachBatch(postgres_sink)
...

1 Comment

how to handle Database fail-over here ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.