2

This may seem strange, but I was curious to know if it was possible for a code block to be executed following an INSERT statement in a postgres database?

Specifically, I'm interested in executing Python code after an INSERT statement has occurred in a pg database.

6
  • 1
    You want to write an AFTER INSERT trigger in PL/Python? Commented Aug 9, 2014 at 6:10
  • @Sathish After an INSERT statement is done on Database A which is a shared database, I want the application to do a save() command that does an INSERT statement onto Database B. Commented Aug 9, 2014 at 6:13
  • @muistooshort Yes. In Django, actually. Commented Aug 9, 2014 at 6:46
  • Isn't there some sort of callback or after-save hook in Django-land that would take care of this? I don't think you're going to be running a bunch of Django stuff inside the database in a trigger. Commented Aug 9, 2014 at 6:52
  • 1
    @muistooshort there's post_save signals in Django. But the problem is that Database A is being inserted into by an external application, and so post_save will not work as the save operation is happening elsewhere. Commented Aug 9, 2014 at 7:04

1 Answer 1

4

There are several ways to approach this.

LISTEN/NOTIFY

The simple way to tackle this is to use postgresql notifications.

You can add after insert/update trigger which will do notification:

CREATE OR REPLACE FUNCTION on_insert() RETURNS trigger AS
$$
BEGIN

    execute E'NOTIFY ENTITY_CHANGE, \'' || NEW.id || E'\'';

    RETURN NEW;
END
$$
LANGUAGE 'plpgsql' VOLATILE;

create trigger trig_on_insert
after insert on ENTITY
for each row
execute procedure on_insert_to_t();

ENTITY_CHANGE is identifier of the channel you can take any you like.

And your application should listen to it in separate thread (or process) and do what is needed:

from django.db import connection

curs = connection.cursor()
curs.execute("LISTEN ENTITY_CHANGED;")

while not_finish:
    if select.select([connection],[],[],5) == ([],[],[]):
        print "Timeout"
    else:
        connection.poll()
        while connection.notifies:
            notify = connection.notifies.pop()
            entity_id = notify.payload
            do_post_save(entity_id)

The only caveat is that notifications are not transactional and can be lost if some catastrophic failure happen. That is in situation when your application get notification but then crashed (or was killed) before it finishes processing of the notification such notification is lost forever.

If you need to guarantee that post save processing is always happen you need to maintain some table of tasks. After insert/update trigger should add task to this table and some python process should poll this table and do required processing. The downside is polling - it will do unnecessary queries when system is not doing save of entity.

You can combine both approaches to get best of all worlds that is use notify to start processing but processor should take tasks from task table which is filled by trigger. During your application startup processing should be run to do unfinished work if any.

Django pgpubsub library implements exactly that approach and provides a rather simple declarative API that allows to execute callbacks on django model changes:

# this defines a postgres channel that is used to send notifications
# and a trigger that does NOTIFY on MyModel change
@dataclass
class MyModelTriggerChannel(TriggerChannel):
    model = MyModel

# This defines a callback to be invoked on MyModel chagne
@pgpubsub.post_update_listener(MyModelTriggerChannel)
def on_my_model_update(old: MyModel, new: MyModel):
   # use new variable to access updated model data
   ...

Logical Replication

The better and more reliable approach is to use logical replication.

This option uses transaction log directly and the consumer acknowledges received change notifications so no notification are missed and the delivery can be reliable.

To demonstrate this I'm using here an image preconfigured for the logical replication and with the installed wal2json plugin for WAL decoding:

docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14

Here is an example of the consumer:

import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection

my_connection = psycopg2.connect(
    "dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
    connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
    cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
    pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
    slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)

def consume(msg):
    print(msg.payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)

cur.consume_stream(consume)

Now executing the insert like insert into table1 values (1, 'hello') produces this:

{
    "change": [
        {
            "kind": "insert",
            "schema": "public",
            "table": "table1",
            "columnnames": ["i", "t"],
            "columntypes": ["integer", "text"],
            "columnvalues": [1, "hello"]
        }
    ]
}

The downside of this is that you will get all the changes in DB and will need to filter and decode the data (I'm not aware of libraries that make this simple for you as a user).

Sign up to request clarification or add additional context in comments.

3 Comments

This looks very promising, but is it possible to get data extracted from the INSERT statement? For example, if INSERT INTO users (name, age) VALUES ("Joe", 30) occurs, then it would NOTIFY with the data in the payload string to be processed by the application?
Great! Is there space limitations on a notification's payload? Would it be okay if the data contained some text field (which may be substantial)?
AFAIR limitation is something about 2000 bytes, please see pg docs, they have all details. It's better to pass id and then read all values from the table itself or think about tasks table I've described in post as this will avoid transaction issues with listen/notify.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.