3737
3838#include "multimaster.h"
3939
40- typedef struct PGLogicalProtoMM
41- {
42- PGLogicalProtoAPI api ;
43- int nodeId ;
44- bool isLocal ;
45- } PGLogicalProtoMM ;
40+ static bool MtmIsFilteredTxn ;
4641
4742static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
4843
@@ -72,30 +67,31 @@ static char decide_datum_transfer(Form_pg_attribute att,
7267static void
7368pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel )
7469{
75- PGLogicalProtoMM * mm = (PGLogicalProtoMM * )data -> api ;
76- if (!mm -> isLocal ) {
77- const char * nspname ;
78- uint8 nspnamelen ;
79- const char * relname ;
80- uint8 relnamelen ;
81-
82- pq_sendbyte (out , 'R' ); /* sending RELATION */
83-
84- nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
85- if (nspname == NULL )
86- elog (ERROR , "cache lookup failed for namespace %u" ,
87- rel -> rd_rel -> relnamespace );
88- nspnamelen = strlen (nspname ) + 1 ;
89-
90- relname = NameStr (rel -> rd_rel -> relname );
91- relnamelen = strlen (relname ) + 1 ;
92-
93- pq_sendbyte (out , nspnamelen ); /* schema name length */
94- pq_sendbytes (out , nspname , nspnamelen );
95-
96- pq_sendbyte (out , relnamelen ); /* table name length */
97- pq_sendbytes (out , relname , relnamelen );
98- }
70+ const char * nspname ;
71+ uint8 nspnamelen ;
72+ const char * relname ;
73+ uint8 relnamelen ;
74+
75+ if (MtmIsFilteredTxn ) {
76+ return ;
77+ }
78+
79+ pq_sendbyte (out , 'R' ); /* sending RELATION */
80+
81+ nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
82+ if (nspname == NULL )
83+ elog (ERROR , "cache lookup failed for namespace %u" ,
84+ rel -> rd_rel -> relnamespace );
85+ nspnamelen = strlen (nspname ) + 1 ;
86+
87+ relname = NameStr (rel -> rd_rel -> relname );
88+ relnamelen = strlen (relname ) + 1 ;
89+
90+ pq_sendbyte (out , nspnamelen ); /* schema name length */
91+ pq_sendbytes (out , nspname , nspnamelen );
92+
93+ pq_sendbyte (out , relnamelen ); /* table name length */
94+ pq_sendbytes (out , relname , relnamelen );
9995}
10096
10197/*
@@ -105,21 +101,19 @@ static void
105101pglogical_write_begin (StringInfo out , PGLogicalOutputData * data ,
106102 ReorderBufferTXN * txn )
107103{
108- PGLogicalProtoMM * mm = ( PGLogicalProtoMM * ) data -> api ;
104+ bool isRecovery = MtmIsRecoveredNode ( MtmReplicationNodeId ) ;
109105 csn_t csn = MtmTransactionSnapshot (txn -> xid );
110- bool isRecovery = MtmIsRecoveredNode (mm -> nodeId );
111106 MTM_TRACE ("pglogical_write_begin %d CSN=%ld\n" , txn -> xid , csn );
112- if (csn == INVALID_CSN && !isRecovery ) {
113- //Assert(txn->origin_id != InvalidRepOriginId);
114- mm -> isLocal = true;
115- } else {
116- mm -> isLocal = false;
117- //Assert(txn->origin_id == InvalidRepOriginId || isRecovery);
118- pq_sendbyte (out , 'B' ); /* BEGIN */
107+
108+ if (csn == INVALID_CSN && !isRecovery ) {
109+ MtmIsFilteredTxn = true;
110+ } else {
111+ pq_sendbyte (out , 'B' ); /* BEGIN */
119112 pq_sendint (out , MtmNodeId , 4 );
120113 pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
121- pq_sendint64 (out , csn );
122- }
114+ pq_sendint64 (out , csn );
115+ MtmIsFilteredTxn = false;
116+ }
123117}
124118
125119/*
@@ -129,9 +123,11 @@ static void
129123pglogical_write_commit (StringInfo out , PGLogicalOutputData * data ,
130124 ReorderBufferTXN * txn , XLogRecPtr commit_lsn )
131125{
132- PGLogicalProtoMM * mm = (PGLogicalProtoMM * )data -> api ;
133126 uint8 flags = 0 ;
134127
128+ if (MtmIsFilteredTxn ) {
129+ return ;
130+ }
135131 if (txn -> xact_action == XLOG_XACT_COMMIT )
136132 flags = PGLOGICAL_COMMIT ;
137133 else if (txn -> xact_action == XLOG_XACT_PREPARE )
@@ -143,18 +139,19 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
143139 else
144140 Assert (false);
145141
146-
142+ #if 0
147143 if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
148144 if (mm -> isLocal ) {
149145 return ;
150146 }
151147 } else {
152148 csn_t csn = MtmTransactionSnapshot (txn -> xid );
153- bool isRecovery = MtmIsRecoveredNode (mm -> nodeId );
149+ bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
154150 if (csn == INVALID_CSN && !isRecovery ) {
155151 return ;
156152 }
157153 }
154+ #endif
158155
159156 pq_sendbyte (out , 'C' ); /* sending COMMIT */
160157
@@ -185,11 +182,10 @@ static void
185182pglogical_write_insert (StringInfo out , PGLogicalOutputData * data ,
186183 Relation rel , HeapTuple newtuple )
187184{
188- PGLogicalProtoMM * mm = (PGLogicalProtoMM * )data -> api ;
189- if (!mm -> isLocal ) {
190- pq_sendbyte (out , 'I' ); /* action INSERT */
191- pglogical_write_tuple (out , data , rel , newtuple );
192- }
185+ if (!MtmIsFilteredTxn ) {
186+ pq_sendbyte (out , 'I' ); /* action INSERT */
187+ pglogical_write_tuple (out , data , rel , newtuple );
188+ }
193189}
194190
195191/*
@@ -199,32 +195,31 @@ static void
199195pglogical_write_update (StringInfo out , PGLogicalOutputData * data ,
200196 Relation rel , HeapTuple oldtuple , HeapTuple newtuple )
201197{
202- PGLogicalProtoMM * mm = (PGLogicalProtoMM * )data -> api ;
203- if (!mm -> isLocal ) {
204- pq_sendbyte (out , 'U' ); /* action UPDATE */
205- /* FIXME support whole tuple (O tuple type) */
206- if (oldtuple != NULL )
207- {
208- pq_sendbyte (out , 'K' ); /* old key follows */
209- pglogical_write_tuple (out , data , rel , oldtuple );
210- }
211-
212- pq_sendbyte (out , 'N' ); /* new tuple follows */
213- pglogical_write_tuple (out , data , rel , newtuple );
214- }
198+ if (!MtmIsFilteredTxn ) {
199+ pq_sendbyte (out , 'U' ); /* action UPDATE */
200+ /* FIXME support whole tuple (O tuple type) */
201+ if (oldtuple != NULL )
202+ {
203+ pq_sendbyte (out , 'K' ); /* old key follows */
204+ pglogical_write_tuple (out , data , rel , oldtuple );
205+ }
206+
207+ pq_sendbyte (out , 'N' ); /* new tuple follows */
208+ pglogical_write_tuple (out , data , rel , newtuple );
209+ }
215210}
211+
216212/*
217213 * Write DELETE to the output stream.
218214 */
219215static void
220216pglogical_write_delete (StringInfo out , PGLogicalOutputData * data ,
221217 Relation rel , HeapTuple oldtuple )
222218{
223- PGLogicalProtoMM * mm = (PGLogicalProtoMM * )data -> api ;
224- if (!mm -> isLocal ) {
225- pq_sendbyte (out , 'D' ); /* action DELETE */
226- pglogical_write_tuple (out , data , rel , oldtuple );
227- }
219+ if (!MtmIsFilteredTxn ) {
220+ pq_sendbyte (out , 'D' ); /* action DELETE */
221+ pglogical_write_tuple (out , data , rel , oldtuple );
222+ }
228223}
229224
230225/*
@@ -422,16 +417,16 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
422417PGLogicalProtoAPI *
423418pglogical_init_api (PGLogicalProtoType typ )
424419{
425- PGLogicalProtoMM * pmm = palloc0 (sizeof (PGLogicalProtoMM ));
426- PGLogicalProtoAPI * res = & pmm -> api ;
427- pmm -> isLocal = false;
428- sscanf (MyReplicationSlot -> data .name .data , MULTIMASTER_SLOT_PATTERN , & pmm -> nodeId );
420+ PGLogicalProtoAPI * res = palloc0 (sizeof (PGLogicalProtoAPI ));
421+ sscanf (MyReplicationSlot -> data .name .data , MULTIMASTER_SLOT_PATTERN , & MtmReplicationNodeId );
422+ elog (WARNING , "%d: PRGLOGICAL init API for slot %s node %d" , MyProcPid , MyReplicationSlot -> data .name .data , MtmReplicationNodeId );
429423 res -> write_rel = pglogical_write_rel ;
430424 res -> write_begin = pglogical_write_begin ;
431425 res -> write_commit = pglogical_write_commit ;
432426 res -> write_insert = pglogical_write_insert ;
433427 res -> write_update = pglogical_write_update ;
434428 res -> write_delete = pglogical_write_delete ;
429+ res -> setup_hooks = MtmSetupReplicationHooks ;
435430 res -> write_startup_message = write_startup_message ;
436431 return res ;
437432}
0 commit comments