3939#include "pglogical_relid_map.h"
4040
4141static bool MtmIsFilteredTxn ;
42+ static int MtmTransactionRecords ;
4243
4344static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
4445
@@ -106,7 +107,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
106107{
107108 bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
108109 csn_t csn = MtmTransactionSnapshot (txn -> xid );
109- MTM_LOG2 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d" , MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery );
110+ MTM_LOG1 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111+ MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
110112
111113 if (csn == INVALID_CSN && !isRecovery ) {
112114 MtmIsFilteredTxn = true;
@@ -116,6 +118,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116118 pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
117119 pq_sendint64 (out , csn );
118120 MtmIsFilteredTxn = false;
121+ MtmTransactionRecords = 0 ;
119122 }
120123}
121124
@@ -137,6 +140,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
137140{
138141 uint8 flags = 0 ;
139142
143+ MTM_LOG1 ("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
144+ MyProcPid , txn -> xid , MtmReplicationNodeId , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
145+
146+
140147 if (txn -> xact_action == XLOG_XACT_COMMIT )
141148 flags = PGLOGICAL_COMMIT ;
142149 else if (txn -> xact_action == XLOG_XACT_PREPARE )
@@ -150,6 +157,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150157
151158 if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
152159 if (MtmIsFilteredTxn ) {
160+ Assert (MtmTransactionRecords == 0 );
153161 return ;
154162 }
155163 } else {
@@ -161,6 +169,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161169 */
162170 if (csn == INVALID_CSN && !isRecovery )
163171 {
172+ Assert (MtmTransactionRecords == 0 );
164173 return ;
165174 }
166175 if (MtmRecoveryCaughtUp (MtmReplicationNodeId , txn -> end_lsn )) {
@@ -176,18 +185,23 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
176185 pq_sendbyte (out , flags );
177186 pq_sendbyte (out , MtmNodeId );
178187
188+ Assert (txn -> xact_action != XLOG_XACT_PREPARE || txn -> xid < 1000 || MtmTransactionRecords >= 2 );
189+ pq_sendint (out , MtmTransactionRecords , 4 );
190+
179191 /* send fixed fields */
180192 pq_sendint64 (out , commit_lsn );
181193 pq_sendint64 (out , txn -> end_lsn );
182194 pq_sendint64 (out , txn -> commit_time );
183195
184196 if (txn -> xact_action == XLOG_XACT_COMMIT_PREPARED ) {
197+ Assert (MtmTransactionRecords == 0 );
185198 pq_sendint64 (out , MtmGetTransactionCSN (txn -> xid ));
186199 }
187200 if (txn -> xact_action != XLOG_XACT_COMMIT ) {
188201 pq_sendstring (out , txn -> gid );
189202 }
190203
204+ MtmTransactionRecords = 0 ;
191205 MTM_TXTRACE (txn , "pglogical_write_commit Finish" );
192206}
193207
@@ -199,6 +213,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
199213 Relation rel , HeapTuple newtuple )
200214{
201215 if (!MtmIsFilteredTxn ) {
216+ MtmTransactionRecords += 1 ;
202217 pq_sendbyte (out , 'I' ); /* action INSERT */
203218 pglogical_write_tuple (out , data , rel , newtuple );
204219 }
@@ -212,6 +227,11 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
212227 Relation rel , HeapTuple oldtuple , HeapTuple newtuple )
213228{
214229 if (!MtmIsFilteredTxn ) {
230+ MtmTransactionRecords += 1 ;
231+
232+ MTM_LOG1 ("%d: pglogical_write_update confirmed_flush=%lx" , MyProcPid , MyReplicationSlot -> data .confirmed_flush );
233+
234+
215235 pq_sendbyte (out , 'U' ); /* action UPDATE */
216236 /* FIXME support whole tuple (O tuple type) */
217237 if (oldtuple != NULL )
@@ -233,6 +253,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
233253 Relation rel , HeapTuple oldtuple )
234254{
235255 if (!MtmIsFilteredTxn ) {
256+ MtmTransactionRecords += 1 ;
236257 pq_sendbyte (out , 'D' ); /* action DELETE */
237258 pglogical_write_tuple (out , data , rel , oldtuple );
238259 }
0 commit comments