@@ -183,6 +183,36 @@ typedef struct RelationSyncEntry
183183 MemoryContext entry_cxt ;
184184} RelationSyncEntry ;
185185
186+ /*
187+ * Maintain a per-transaction level variable to track whether the transaction
188+ * has sent BEGIN. BEGIN is only sent when the first change in a transaction
189+ * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
190+ * messages for empty transactions which saves network bandwidth.
191+ *
192+ * This optimization is not used for prepared transactions because if the
193+ * WALSender restarts after prepare of a transaction and before commit prepared
194+ * of the same transaction then we won't be able to figure out if we have
195+ * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
196+ * because we would have lost the in-memory txndata information that was
197+ * present prior to the restart. This will result in sending a spurious
198+ * COMMIT PREPARED without a corresponding prepared transaction at the
199+ * downstream which would lead to an error when it tries to process it.
200+ *
201+ * XXX We could achieve this optimization by changing protocol to send
202+ * additional information so that downstream can detect that the corresponding
203+ * prepare has not been sent. However, adding such a check for every
204+ * transaction in the downstream could be costly so we might want to do it
205+ * optionally.
206+ *
207+ * We also don't have this optimization for streamed transactions because
208+ * they can contain prepared transactions.
209+ */
210+ typedef struct PGOutputTxnData
211+ {
212+ bool sent_begin_txn ; /* flag indicating whether BEGIN has
213+ * been sent */
214+ } PGOutputTxnData ;
215+
186216/* Map used to remember which relation schemas we sent. */
187217static HTAB * RelationSyncCache = NULL ;
188218
@@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
488518}
489519
490520/*
491- * BEGIN callback
521+ * BEGIN callback.
522+ *
523+ * Don't send the BEGIN message here instead postpone it until the first
524+ * change. In logical replication, a common scenario is to replicate a set of
525+ * tables (instead of all tables) and transactions whose changes were on
526+ * the table(s) that are not published will produce empty transactions. These
527+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
528+ * using bandwidth on something with little/no use for logical replication.
492529 */
493530static void
494- pgoutput_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
531+ pgoutput_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
532+ {
533+ PGOutputTxnData * txndata = MemoryContextAllocZero (ctx -> context ,
534+ sizeof (PGOutputTxnData ));
535+
536+ txn -> output_plugin_private = txndata ;
537+ }
538+
539+ /*
540+ * Send BEGIN.
541+ *
542+ * This is called while processing the first change of the transaction.
543+ */
544+ static void
545+ pgoutput_send_begin (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
495546{
496547 bool send_replication_origin = txn -> origin_id != InvalidRepOriginId ;
548+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
549+
550+ Assert (txndata );
551+ Assert (!txndata -> sent_begin_txn );
497552
498553 OutputPluginPrepareWrite (ctx , !send_replication_origin );
499554 logicalrep_write_begin (ctx -> out , txn );
555+ txndata -> sent_begin_txn = true;
500556
501557 send_repl_origin (ctx , txn -> origin_id , txn -> origin_lsn ,
502558 send_replication_origin );
@@ -511,7 +567,25 @@ static void
511567pgoutput_commit_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
512568 XLogRecPtr commit_lsn )
513569{
514- OutputPluginUpdateProgress (ctx );
570+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
571+ bool sent_begin_txn ;
572+
573+ Assert (txndata );
574+
575+ /*
576+ * We don't need to send the commit message unless some relevant change
577+ * from this transaction has been sent to the downstream.
578+ */
579+ sent_begin_txn = txndata -> sent_begin_txn ;
580+ OutputPluginUpdateProgress (ctx , !sent_begin_txn );
581+ pfree (txndata );
582+ txn -> output_plugin_private = NULL ;
583+
584+ if (!sent_begin_txn )
585+ {
586+ elog (DEBUG1 , "skipped replication of an empty transaction with XID: %u" , txn -> xid );
587+ return ;
588+ }
515589
516590 OutputPluginPrepareWrite (ctx , true);
517591 logicalrep_write_commit (ctx -> out , txn , commit_lsn );
@@ -542,7 +616,7 @@ static void
542616pgoutput_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
543617 XLogRecPtr prepare_lsn )
544618{
545- OutputPluginUpdateProgress (ctx );
619+ OutputPluginUpdateProgress (ctx , false );
546620
547621 OutputPluginPrepareWrite (ctx , true);
548622 logicalrep_write_prepare (ctx -> out , txn , prepare_lsn );
@@ -556,7 +630,7 @@ static void
556630pgoutput_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
557631 XLogRecPtr commit_lsn )
558632{
559- OutputPluginUpdateProgress (ctx );
633+ OutputPluginUpdateProgress (ctx , false );
560634
561635 OutputPluginPrepareWrite (ctx , true);
562636 logicalrep_write_commit_prepared (ctx -> out , txn , commit_lsn );
@@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
572646 XLogRecPtr prepare_end_lsn ,
573647 TimestampTz prepare_time )
574648{
575- OutputPluginUpdateProgress (ctx );
649+ OutputPluginUpdateProgress (ctx , false );
576650
577651 OutputPluginPrepareWrite (ctx , true);
578652 logicalrep_write_rollback_prepared (ctx -> out , txn , prepare_end_lsn ,
@@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
12951369 Relation relation , ReorderBufferChange * change )
12961370{
12971371 PGOutputData * data = (PGOutputData * ) ctx -> output_plugin_private ;
1372+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
12981373 MemoryContext old ;
12991374 RelationSyncEntry * relentry ;
13001375 TransactionId xid = InvalidTransactionId ;
@@ -1370,6 +1445,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
13701445 & action ))
13711446 break ;
13721447
1448+ /*
1449+ * Send BEGIN if we haven't yet.
1450+ *
1451+ * We send the BEGIN message after ensuring that we will actually
1452+ * send the change. This avoids sending a pair of BEGIN/COMMIT
1453+ * messages for empty transactions.
1454+ */
1455+ if (txndata && !txndata -> sent_begin_txn )
1456+ pgoutput_send_begin (ctx , txn );
1457+
13731458 /*
13741459 * Schema should be sent using the original relation because it
13751460 * also sends the ancestor's relation.
@@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14201505 relentry , & action ))
14211506 break ;
14221507
1508+ /* Send BEGIN if we haven't yet */
1509+ if (txndata && !txndata -> sent_begin_txn )
1510+ pgoutput_send_begin (ctx , txn );
1511+
14231512 maybe_send_schema (ctx , change , relation , relentry );
14241513
14251514 OutputPluginPrepareWrite (ctx , true);
@@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14801569 relentry , & action ))
14811570 break ;
14821571
1572+ /* Send BEGIN if we haven't yet */
1573+ if (txndata && !txndata -> sent_begin_txn )
1574+ pgoutput_send_begin (ctx , txn );
1575+
14831576 maybe_send_schema (ctx , change , relation , relentry );
14841577
14851578 OutputPluginPrepareWrite (ctx , true);
@@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15101603 int nrelations , Relation relations [], ReorderBufferChange * change )
15111604{
15121605 PGOutputData * data = (PGOutputData * ) ctx -> output_plugin_private ;
1606+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
15131607 MemoryContext old ;
15141608 RelationSyncEntry * relentry ;
15151609 int i ;
@@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15481642 continue ;
15491643
15501644 relids [nrelids ++ ] = relid ;
1645+
1646+ /* Send BEGIN if we haven't yet */
1647+ if (txndata && !txndata -> sent_begin_txn )
1648+ pgoutput_send_begin (ctx , txn );
1649+
15511650 maybe_send_schema (ctx , change , relation , relentry );
15521651 }
15531652
@@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15851684 if (in_streaming )
15861685 xid = txn -> xid ;
15871686
1687+ /*
1688+ * Output BEGIN if we haven't yet. Avoid for non-transactional
1689+ * messages.
1690+ */
1691+ if (transactional )
1692+ {
1693+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1694+
1695+ /* Send BEGIN if we haven't yet */
1696+ if (txndata && !txndata -> sent_begin_txn )
1697+ pgoutput_send_begin (ctx , txn );
1698+ }
1699+
15881700 OutputPluginPrepareWrite (ctx , true);
15891701 logicalrep_write_message (ctx -> out ,
15901702 xid ,
@@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
16291741 if (!relentry -> pubactions .pubsequence )
16301742 return ;
16311743
1744+ /*
1745+ * Output BEGIN if we haven't yet. Avoid for non-transactional
1746+ * sequence changes.
1747+ */
1748+ if (transactional )
1749+ {
1750+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1751+
1752+ /* Send BEGIN if we haven't yet */
1753+ if (txndata && !txndata -> sent_begin_txn )
1754+ pgoutput_send_begin (ctx , txn );
1755+ }
1756+
16321757 OutputPluginPrepareWrite (ctx , true);
16331758 logicalrep_write_sequence (ctx -> out ,
16341759 relation ,
@@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
17991924 Assert (!in_streaming );
18001925 Assert (rbtxn_is_streamed (txn ));
18011926
1802- OutputPluginUpdateProgress (ctx );
1927+ OutputPluginUpdateProgress (ctx , false );
18031928
18041929 OutputPluginPrepareWrite (ctx , true);
18051930 logicalrep_write_stream_commit (ctx -> out , txn , commit_lsn );
@@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
18201945{
18211946 Assert (rbtxn_is_streamed (txn ));
18221947
1823- OutputPluginUpdateProgress (ctx );
1948+ OutputPluginUpdateProgress (ctx , false );
18241949 OutputPluginPrepareWrite (ctx , true);
18251950 logicalrep_write_stream_prepare (ctx -> out , txn , prepare_lsn );
18261951 OutputPluginWrite (ctx , true);
0 commit comments