@@ -34,10 +34,24 @@ typedef struct
3434 bool include_xids ;
3535 bool include_timestamp ;
3636 bool skip_empty_xacts ;
37- bool xact_wrote_changes ;
3837 bool only_local ;
3938} TestDecodingData ;
4039
40+ /*
41+ * Maintain the per-transaction level variables to track whether the
42+ * transaction and or streams have written any changes. In streaming mode the
43+ * transaction can be decoded in streams so along with maintaining whether the
44+ * transaction has written any changes, we also need to track whether the
45+ * current stream has written any changes. This is required so that if user
46+ * has requested to skip the empty transactions we can skip the empty streams
47+ * even though the transaction has written some changes.
48+ */
49+ typedef struct
50+ {
51+ bool xact_wrote_changes ;
52+ bool stream_wrote_changes ;
53+ } TestDecodingTxnData ;
54+
4155static void pg_decode_startup (LogicalDecodingContext * ctx , OutputPluginOptions * opt ,
4256 bool is_init );
4357static void pg_decode_shutdown (LogicalDecodingContext * ctx );
@@ -255,8 +269,12 @@ static void
255269pg_decode_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
256270{
257271 TestDecodingData * data = ctx -> output_plugin_private ;
272+ TestDecodingTxnData * txndata =
273+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
274+
275+ txndata -> xact_wrote_changes = false;
276+ txn -> output_plugin_private = txndata ;
258277
259- data -> xact_wrote_changes = false;
260278 if (data -> skip_empty_xacts )
261279 return ;
262280
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
280298 XLogRecPtr commit_lsn )
281299{
282300 TestDecodingData * data = ctx -> output_plugin_private ;
301+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
302+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
303+
304+ pfree (txndata );
305+ txn -> output_plugin_private = NULL ;
283306
284- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
307+ if (data -> skip_empty_xacts && !xact_wrote_changes )
285308 return ;
286309
287310 OutputPluginPrepareWrite (ctx , true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
442465 Relation relation , ReorderBufferChange * change )
443466{
444467 TestDecodingData * data ;
468+ TestDecodingTxnData * txndata ;
445469 Form_pg_class class_form ;
446470 TupleDesc tupdesc ;
447471 MemoryContext old ;
448472
449473 data = ctx -> output_plugin_private ;
474+ txndata = txn -> output_plugin_private ;
450475
451476 /* output BEGIN if we haven't yet */
452- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
477+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
453478 {
454479 pg_output_begin (ctx , data , txn , false);
455480 }
456- data -> xact_wrote_changes = true;
481+ txndata -> xact_wrote_changes = true;
457482
458483 class_form = RelationGetForm (relation );
459484 tupdesc = RelationGetDescr (relation );
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
527552 int nrelations , Relation relations [], ReorderBufferChange * change )
528553{
529554 TestDecodingData * data ;
555+ TestDecodingTxnData * txndata ;
530556 MemoryContext old ;
531557 int i ;
532558
533559 data = ctx -> output_plugin_private ;
560+ txndata = txn -> output_plugin_private ;
534561
535562 /* output BEGIN if we haven't yet */
536- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
563+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
537564 {
538565 pg_output_begin (ctx , data , txn , false);
539566 }
540- data -> xact_wrote_changes = true;
567+ txndata -> xact_wrote_changes = true;
541568
542569 /* Avoid leaking memory by using and resetting our own context */
543570 old = MemoryContextSwitchTo (data -> context );
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
592619 ReorderBufferTXN * txn )
593620{
594621 TestDecodingData * data = ctx -> output_plugin_private ;
622+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
595623
596- data -> xact_wrote_changes = false;
624+ /*
625+ * Allocate the txn plugin data for the first stream in the transaction.
626+ */
627+ if (txndata == NULL )
628+ {
629+ txndata =
630+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
631+ txndata -> xact_wrote_changes = false;
632+ txn -> output_plugin_private = txndata ;
633+ }
634+
635+ txndata -> stream_wrote_changes = false;
597636 if (data -> skip_empty_xacts )
598637 return ;
599638 pg_output_stream_start (ctx , data , txn , true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
615654 ReorderBufferTXN * txn )
616655{
617656 TestDecodingData * data = ctx -> output_plugin_private ;
657+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
618658
619- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
659+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
620660 return ;
621661
622662 OutputPluginPrepareWrite (ctx , true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
634674{
635675 TestDecodingData * data = ctx -> output_plugin_private ;
636676
637- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
677+ /*
678+ * stream abort can be sent for an individual subtransaction but we
679+ * maintain the output_plugin_private only under the toptxn so if this is
680+ * not the toptxn then fetch the toptxn.
681+ */
682+ ReorderBufferTXN * toptxn = txn -> toptxn ? txn -> toptxn : txn ;
683+ TestDecodingTxnData * txndata = toptxn -> output_plugin_private ;
684+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
685+
686+ if (txn -> toptxn == NULL )
687+ {
688+ Assert (txn -> output_plugin_private != NULL );
689+ pfree (txndata );
690+ txn -> output_plugin_private = NULL ;
691+ }
692+
693+ if (data -> skip_empty_xacts && !xact_wrote_changes )
638694 return ;
639695
640696 OutputPluginPrepareWrite (ctx , true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
651707 XLogRecPtr commit_lsn )
652708{
653709 TestDecodingData * data = ctx -> output_plugin_private ;
710+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
711+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
712+
713+ pfree (txndata );
714+ txn -> output_plugin_private = NULL ;
654715
655- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
716+ if (data -> skip_empty_xacts && !xact_wrote_changes )
656717 return ;
657718
658719 OutputPluginPrepareWrite (ctx , true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
681742 ReorderBufferChange * change )
682743{
683744 TestDecodingData * data = ctx -> output_plugin_private ;
745+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
684746
685747 /* output stream start if we haven't yet */
686- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
748+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
687749 {
688750 pg_output_stream_start (ctx , data , txn , false);
689751 }
690- data -> xact_wrote_changes = true;
752+ txndata -> xact_wrote_changes = txndata -> stream_wrote_changes = true;
691753
692754 OutputPluginPrepareWrite (ctx , true);
693755 if (data -> include_xids )
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
734796 ReorderBufferChange * change )
735797{
736798 TestDecodingData * data = ctx -> output_plugin_private ;
799+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
737800
738- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
801+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
739802 {
740803 pg_output_stream_start (ctx , data , txn , false);
741804 }
742- data -> xact_wrote_changes = true;
805+ txndata -> xact_wrote_changes = txndata -> stream_wrote_changes = true;
743806
744807 OutputPluginPrepareWrite (ctx , true);
745808 if (data -> include_xids )
0 commit comments