6

I am wondering if it's possible to query transactions using standard query select.

For example:

SELECT * FROM information_schema.transaction_logs 
WHERE table_name = 'product' AND time_stamp > '2016-01-01';

the result would be something like

===> table_name | operation | old_val_json   | new_val_json...
      product   |    update | {....desc:...} | {...desc...}| 

The query does not work because there is no such table as transaction_logs, but does something similar to this exist?

2
  • 1
    No, there is nothing like that built-in. You are probably looking for an audit trigger: okbob.blogspot.de/2015/01/… or 8kb.co.uk/blog/2015/01/19/… Commented Mar 29, 2016 at 18:46
  • This is not exactly what I wanted, but it's simple enough to implement. Thanks! Commented Mar 29, 2016 at 19:24

1 Answer 1

15

You can query write-ahead log stream through a logical replication slot.

First, you need to change a couple of parameters and then restart the server for the changes to take place:

postgres=# alter system set wal_level = logical;
postgres=# alter system set max_replication_slots = 1;

Then (after restart) you need to create a slot:

postgres=# SELECT * FROM pg_create_logical_replication_slot('slot', 'test_decoding');
 slot_name | xlog_position 
-----------+---------------
 slot      | 2E/839F3300
(1 row)

Here, test_decoding is an output plugin name which is intended to convert log records (which are binary) to some text representation.

Then let's create a table...

postgres=# create table product(id serial, val json);
CREATE TABLE

Now you can query WAL stream:

postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
  location   |  xid  |     data     
-------------+-------+--------------
 2E/83A0BA48 | 80243 | BEGIN 80243
 2E/83A1D2B8 | 80243 | COMMIT 80243
(2 rows)

Unfortunately, now you can't decode DDL, so you get just BEGIN and END. Xid field represents transaction number.

But let's insert something...

postgres=# insert into product(val) values ('{"desc":"aaa"}');
INSERT 0 1

Now query the stream again:

postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
  location   |  xid  |                                  data                                  
-------------+-------+------------------------------------------------------------------------
 2E/83A1D3C0 | 80244 | BEGIN 80244
 2E/83A1D3C0 | 80244 | table public.product: INSERT: id[integer]:1 val[json]:'{"desc":"aaa"}'
 2E/83A1D440 | 80244 | COMMIT 80244
(3 rows)

Here you can see the table name and inserted values.

The same for an update statement:

postgres=# update product set val = '{"desc":"bbb"}';
UPDATE 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
  location   |  xid  |                                  data                                  
-------------+-------+------------------------------------------------------------------------
 2E/83A1D560 | 80245 | BEGIN 80245
 2E/83A1D560 | 80245 | table public.product: UPDATE: id[integer]:1 val[json]:'{"desc":"bbb"}'
 2E/83A1D5E8 | 80245 | COMMIT 80245
(3 rows)

Note that after you "consume" some changes from the stream by using pg_logical_slot_get_changes function, you cannot query the same changes again.

Drop the slot if you need it no more:

postgres=# SELECT pg_drop_replication_slot('slot');
 pg_drop_replication_slot 
--------------------------

(1 row)

You can read more about logical decoding in the documentation.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.