@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
7676 ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
7777 bool transactional , const char * prefix ,
7878 Size sz , const char * message );
79+ static bool pg_decode_filter_prepare (LogicalDecodingContext * ctx ,
80+ const char * gid );
81+ static void pg_decode_begin_prepare_txn (LogicalDecodingContext * ctx ,
82+ ReorderBufferTXN * txn );
83+ static void pg_decode_prepare_txn (LogicalDecodingContext * ctx ,
84+ ReorderBufferTXN * txn ,
85+ XLogRecPtr prepare_lsn );
86+ static void pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx ,
87+ ReorderBufferTXN * txn ,
88+ XLogRecPtr commit_lsn );
89+ static void pg_decode_rollback_prepared_txn (LogicalDecodingContext * ctx ,
90+ ReorderBufferTXN * txn ,
91+ XLogRecPtr prepare_end_lsn ,
92+ TimestampTz prepare_time );
7993static void pg_decode_stream_start (LogicalDecodingContext * ctx ,
8094 ReorderBufferTXN * txn );
8195static void pg_output_stream_start (LogicalDecodingContext * ctx ,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
87101static void pg_decode_stream_abort (LogicalDecodingContext * ctx ,
88102 ReorderBufferTXN * txn ,
89103 XLogRecPtr abort_lsn );
104+ static void pg_decode_stream_prepare (LogicalDecodingContext * ctx ,
105+ ReorderBufferTXN * txn ,
106+ XLogRecPtr prepare_lsn );
90107static void pg_decode_stream_commit (LogicalDecodingContext * ctx ,
91108 ReorderBufferTXN * txn ,
92109 XLogRecPtr commit_lsn );
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
123140 cb -> filter_by_origin_cb = pg_decode_filter ;
124141 cb -> shutdown_cb = pg_decode_shutdown ;
125142 cb -> message_cb = pg_decode_message ;
143+ cb -> filter_prepare_cb = pg_decode_filter_prepare ;
144+ cb -> begin_prepare_cb = pg_decode_begin_prepare_txn ;
145+ cb -> prepare_cb = pg_decode_prepare_txn ;
146+ cb -> commit_prepared_cb = pg_decode_commit_prepared_txn ;
147+ cb -> rollback_prepared_cb = pg_decode_rollback_prepared_txn ;
126148 cb -> stream_start_cb = pg_decode_stream_start ;
127149 cb -> stream_stop_cb = pg_decode_stream_stop ;
128150 cb -> stream_abort_cb = pg_decode_stream_abort ;
151+ cb -> stream_prepare_cb = pg_decode_stream_prepare ;
129152 cb -> stream_commit_cb = pg_decode_stream_commit ;
130153 cb -> stream_change_cb = pg_decode_stream_change ;
131154 cb -> stream_message_cb = pg_decode_stream_message ;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
141164 ListCell * option ;
142165 TestDecodingData * data ;
143166 bool enable_streaming = false;
167+ bool enable_twophase = false;
144168
145169 data = palloc0 (sizeof (TestDecodingData ));
146170 data -> context = AllocSetContextCreate (ctx -> context ,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
241265 errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
242266 strVal (elem -> arg ), elem -> defname )));
243267 }
268+ else if (strcmp (elem -> defname , "two-phase-commit" ) == 0 )
269+ {
270+ if (elem -> arg == NULL )
271+ continue ;
272+ else if (!parse_bool (strVal (elem -> arg ), & enable_twophase ))
273+ ereport (ERROR ,
274+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
275+ errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
276+ strVal (elem -> arg ), elem -> defname )));
277+ }
244278 else
245279 {
246280 ereport (ERROR ,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
252286 }
253287
254288 ctx -> streaming &= enable_streaming ;
289+ ctx -> twophase &= enable_twophase ;
255290}
256291
257292/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320355 OutputPluginWrite (ctx , true);
321356}
322357
358+ /* BEGIN PREPARE callback */
359+ static void
360+ pg_decode_begin_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
361+ {
362+ TestDecodingData * data = ctx -> output_plugin_private ;
363+ TestDecodingTxnData * txndata =
364+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
365+
366+ txndata -> xact_wrote_changes = false;
367+ txn -> output_plugin_private = txndata ;
368+
369+ if (data -> skip_empty_xacts )
370+ return ;
371+
372+ pg_output_begin (ctx , data , txn , true);
373+ }
374+
375+ /* PREPARE callback */
376+ static void
377+ pg_decode_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
378+ XLogRecPtr prepare_lsn )
379+ {
380+ TestDecodingData * data = ctx -> output_plugin_private ;
381+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
382+
383+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
384+ return ;
385+
386+ OutputPluginPrepareWrite (ctx , true);
387+
388+ appendStringInfo (ctx -> out , "PREPARE TRANSACTION %s" ,
389+ quote_literal_cstr (txn -> gid ));
390+
391+ if (data -> include_xids )
392+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
393+
394+ if (data -> include_timestamp )
395+ appendStringInfo (ctx -> out , " (at %s)" ,
396+ timestamptz_to_str (txn -> commit_time ));
397+
398+ OutputPluginWrite (ctx , true);
399+ }
400+
401+ /* COMMIT PREPARED callback */
402+ static void
403+ pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
404+ XLogRecPtr commit_lsn )
405+ {
406+ TestDecodingData * data = ctx -> output_plugin_private ;
407+
408+ OutputPluginPrepareWrite (ctx , true);
409+
410+ appendStringInfo (ctx -> out , "COMMIT PREPARED %s" ,
411+ quote_literal_cstr (txn -> gid ));
412+
413+ if (data -> include_xids )
414+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
415+
416+ if (data -> include_timestamp )
417+ appendStringInfo (ctx -> out , " (at %s)" ,
418+ timestamptz_to_str (txn -> commit_time ));
419+
420+ OutputPluginWrite (ctx , true);
421+ }
422+
423+ /* ROLLBACK PREPARED callback */
424+ static void
425+ pg_decode_rollback_prepared_txn (LogicalDecodingContext * ctx ,
426+ ReorderBufferTXN * txn ,
427+ XLogRecPtr prepare_end_lsn ,
428+ TimestampTz prepare_time )
429+ {
430+ TestDecodingData * data = ctx -> output_plugin_private ;
431+
432+ OutputPluginPrepareWrite (ctx , true);
433+
434+ appendStringInfo (ctx -> out , "ROLLBACK PREPARED %s" ,
435+ quote_literal_cstr (txn -> gid ));
436+
437+ if (data -> include_xids )
438+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
439+
440+ if (data -> include_timestamp )
441+ appendStringInfo (ctx -> out , " (at %s)" ,
442+ timestamptz_to_str (txn -> commit_time ));
443+
444+ OutputPluginWrite (ctx , true);
445+ }
446+
447+ /*
448+ * Filter out two-phase transactions.
449+ *
450+ * Each plugin can implement its own filtering logic. Here we demonstrate a
451+ * simple logic by checking the GID. If the GID contains the "_nodecode"
452+ * substring, then we filter it out.
453+ */
454+ static bool
455+ pg_decode_filter_prepare (LogicalDecodingContext * ctx , const char * gid )
456+ {
457+ if (strstr (gid , "_nodecode" ) != NULL )
458+ return true;
459+
460+ return false;
461+ }
462+
323463static bool
324464pg_decode_filter (LogicalDecodingContext * ctx ,
325465 RepOriginId origin_id )
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
701841 OutputPluginWrite (ctx , true);
702842}
703843
844+ static void
845+ pg_decode_stream_prepare (LogicalDecodingContext * ctx ,
846+ ReorderBufferTXN * txn ,
847+ XLogRecPtr prepare_lsn )
848+ {
849+ TestDecodingData * data = ctx -> output_plugin_private ;
850+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
851+
852+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
853+ return ;
854+
855+ OutputPluginPrepareWrite (ctx , true);
856+
857+ if (data -> include_xids )
858+ appendStringInfo (ctx -> out , "preparing streamed transaction TXN %s, txid %u" ,
859+ quote_literal_cstr (txn -> gid ), txn -> xid );
860+ else
861+ appendStringInfo (ctx -> out , "preparing streamed transaction %s" ,
862+ quote_literal_cstr (txn -> gid ));
863+
864+ if (data -> include_timestamp )
865+ appendStringInfo (ctx -> out , " (at %s)" ,
866+ timestamptz_to_str (txn -> commit_time ));
867+
868+ OutputPluginWrite (ctx , true);
869+ }
870+
704871static void
705872pg_decode_stream_commit (LogicalDecodingContext * ctx ,
706873 ReorderBufferTXN * txn ,
0 commit comments