@@ -67,7 +67,7 @@ typedef struct
6767{
6868 TransactionId xid ;
6969 int count ;
70- } ExternalTransaction ;
70+ } LocalTransaction ;
7171
7272#define DTM_SHMEM_SIZE (1024*1024)
7373#define DTM_HASH_SIZE 1003
@@ -77,6 +77,9 @@ void _PG_fini(void);
7777
7878PG_MODULE_MAGIC ;
7979
80+ PG_FUNCTION_INFO_V1 (mm_start_replication );
81+ PG_FUNCTION_INFO_V1 (mm_stop_replication );
82+
8083static Snapshot DtmGetSnapshot (Snapshot snapshot );
8184static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
8285static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
@@ -103,10 +106,11 @@ static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
103106static void ByteBufferAppendInt32 (ByteBuffer * buf , int data );
104107static void ByteBufferFree (ByteBuffer * buf );
105108
109+ static void MMMarkTransAsLocal (TransactionId xid );
106110
107111static shmem_startup_hook_type prev_shmem_startup_hook ;
108112static HTAB * xid_in_doubt ;
109- static HTAB * external_trans ;
113+ static HTAB * local_trans ;
110114static DtmState * dtm ;
111115static Snapshot CurrentTransactionSnapshot ;
112116
@@ -128,9 +132,10 @@ static TransactionManager DtmTM = {
128132 DtmDetectGlobalDeadLock
129133};
130134
131- static char * MultimasterConnStrs ;
132- static int MultimasterNodeId ;
133- static int MultimasterNodes ;
135+ static char * MMConnStrs ;
136+ static int MMNodeId ;
137+ static int MMNodes ;
138+ static bool MMDoReplication = true;
134139
135140static char * DtmHost ;
136141static int DtmPort ;
@@ -145,8 +150,8 @@ static BackgroundWorker DtmWorker = {
145150};
146151
147152#define XTM_TRACE (fmt , ...)
148- // #define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
149- #define XTM_INFO (fmt , ...)
153+ #define XTM_INFO (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
154+ // #define XTM_INFO(fmt, ...)
150155
151156static void DumpSnapshot (Snapshot s , char * name )
152157{
@@ -697,7 +702,7 @@ static void DtmInitialize()
697702 dtm -> xidLock = LWLockAssign ();
698703 dtm -> nReservedXids = 0 ;
699704 dtm -> minXid = InvalidTransactionId ;
700- dtm -> nNodes = MultimasterNodes ;
705+ dtm -> nNodes = MMNodes ;
701706 RegisterXactCallback (DtmXactCallback , NULL );
702707 }
703708 LWLockRelease (AddinShmemInitLock );
@@ -714,11 +719,11 @@ static void DtmInitialize()
714719 );
715720
716721 info .keysize = sizeof (TransactionId );
717- info .entrysize = sizeof (ExternalTransaction );
722+ info .entrysize = sizeof (LocalTransaction );
718723 info .hash = dtm_xid_hash_fn ;
719724 info .match = dtm_xid_match_fn ;
720- external_trans = ShmemInitHash (
721- "external_trans " ,
725+ local_trans = ShmemInitHash (
726+ "local_trans " ,
722727 DTM_HASH_SIZE , DTM_HASH_SIZE ,
723728 & info ,
724729 HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
@@ -734,14 +739,19 @@ DtmXactCallback(XactEvent event, void *arg)
734739 XTM_INFO ("%d: DtmXactCallbackevent=%d nextxid=%d\n" , getpid (), event , DtmNextXid );
735740 switch (event )
736741 {
737- case XACT_EVENT_START :
738- if (MyProc && MyProc -> backendId != InvalidBackendId ) {
739- printf ("getpid=%d, MyProc=%d, MyProc->backendId=%d\n" , getpid (), MyProc -> pid , MyProc -> backendId );
740- MultimasterBeginTransaction ();
742+ case XACT_EVENT_START :
743+ if (MyBackendId != InvalidBackendId && MMDoReplication ) {
744+ printf ("getpid=%d, backendId=%d\n" , getpid (), MyBackendId );
745+ MMBeginTransaction ();
746+ }
747+ break ;
748+ case XACT_EVENT_PRE_COMMIT :
749+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
750+ if (!MMDoReplication && TransactionIdIsValid (GetCurrentTransactionIdIfAny ())) {
751+ MMMarkTransAsLocal (GetCurrentTransactionIdIfAny ());
741752 }
742753 break ;
743- case XACT_EVENT_COMMIT :
744- case XACT_EVENT_ABORT :
754+ case XACT_EVENT_ABORT :
745755 if (TransactionIdIsValid (DtmNextXid ))
746756 {
747757 if (event == XACT_EVENT_COMMIT )
@@ -862,7 +872,7 @@ _PG_init(void)
862872 "multimaster.conn_strings" ,
863873 "Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'" ,
864874 NULL ,
865- & MultimasterConnStrs ,
875+ & MMConnStrs ,
866876 "" ,
867877 PGC_POSTMASTER , // context
868878 0 , // flags,
@@ -875,7 +885,7 @@ _PG_init(void)
875885 "multimaster.node_id" ,
876886 "Multimaster node ID" ,
877887 NULL ,
878- & MultimasterNodeId ,
888+ & MMNodeId ,
879889 1 ,
880890 1 ,
881891 INT_MAX ,
@@ -886,7 +896,7 @@ _PG_init(void)
886896 NULL
887897 );
888898
889- MultimasterNodes = LogicalReplicationStartReceivers ( MultimasterConnStrs , MultimasterNodeId );
899+ MMNodes = MMStartReceivers ( MMConnStrs , MMNodeId );
890900
891901 if (DtmBufferSize != 0 )
892902 {
@@ -924,10 +934,10 @@ static void DtmShmemStartup(void)
924934 * ***************************************************************************
925935 */
926936
927- void MultimasterBeginTransaction (void )
937+ void MMBeginTransaction (void )
928938{
929939 if (TransactionIdIsValid (DtmNextXid ))
930- elog (ERROR , "MultimasterBeginTransaction should be called only once for global transaction" );
940+ elog (ERROR , "MMBeginTransaction should be called only once for global transaction" );
931941 if (dtm == NULL )
932942 elog (ERROR , "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf" );
933943 DtmNextXid = DtmGlobalStartTransaction (& DtmSnapshot , & dtm -> minXid );
@@ -939,10 +949,8 @@ void MultimasterBeginTransaction(void)
939949 DtmLastSnapshot = NULL ;
940950}
941951
942- void MultimasterJoinTransaction (TransactionId xid )
952+ void MMJoinTransaction (TransactionId xid )
943953{
944- ExternalTransaction * et ;
945-
946954 if (TransactionIdIsValid (DtmNextXid ))
947955 elog (ERROR , "dtm_begin/join_transaction should be called only once for global transaction" );
948956 DtmNextXid = xid ;
@@ -955,28 +963,52 @@ void MultimasterJoinTransaction(TransactionId xid)
955963 DtmHasGlobalSnapshot = true;
956964 DtmLastSnapshot = NULL ;
957965
966+ MMMarkTransAsLocal (DtmNextXid );
967+ }
968+
969+
970+ void MMMarkTransAsLocal (TransactionId xid )
971+ {
972+ LocalTransaction * lt ;
973+
974+ Assert (TransactionIdIsValid (xid ));
958975 LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
959- et = hash_search (external_trans , & DtmNextXid , HASH_ENTER , NULL );
960- et -> count = dtm -> nNodes - 1 ;
976+ lt = hash_search (local_trans , & xid , HASH_ENTER , NULL );
977+ lt -> count = dtm -> nNodes - 1 ;
961978 LWLockRelease (dtm -> hashLock );
962979}
963980
964- bool MultimasterIsExternalTransaction (TransactionId xid )
981+ bool MMIsLocalTransaction (TransactionId xid )
965982{
966- ExternalTransaction * et ;
983+ LocalTransaction * lt ;
967984 bool result = false;
968985 LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
969- et = hash_search (external_trans , & xid , HASH_FIND , NULL );
970- if (et != NULL ) {
986+ lt = hash_search (local_trans , & xid , HASH_FIND , NULL );
987+ if (lt != NULL ) {
971988 result = true;
972- if (-- et -> count == 0 ) {
973- hash_search (external_trans , & xid , HASH_REMOVE , NULL );
989+ if (-- lt -> count == 0 ) {
990+ hash_search (local_trans , & xid , HASH_REMOVE , NULL );
974991 }
975992 }
976993 LWLockRelease (dtm -> hashLock );
977994 return result ;
978995}
979996
997+ Datum
998+ mm_start_replication (PG_FUNCTION_ARGS )
999+ {
1000+ MMDoReplication = true;
1001+ PG_RETURN_VOID ();
1002+ }
1003+
1004+ Datum
1005+ mm_stop_replication (PG_FUNCTION_ARGS )
1006+ {
1007+ MMDoReplication = false;
1008+ PG_RETURN_VOID ();
1009+ }
1010+
1011+
9801012
9811013
9821014void DtmBackgroundWorker (Datum arg )
0 commit comments