4444#include "sockhub/sockhub.h"
4545
4646#include "libdtm.h"
47+ #include "multimaster.h"
4748
4849typedef struct
4950{
@@ -52,6 +53,7 @@ typedef struct
5253 TransactionId minXid ; /* XID of oldest transaction visible by any active transaction (local or global) */
5354 TransactionId nextXid ; /* next XID for local transaction */
5455 size_t nReservedXids ; /* number of XIDs reserved for local transactions */
56+ int nNodes ;
5557} DtmState ;
5658
5759typedef struct
@@ -61,19 +63,18 @@ typedef struct
6163 int used ;
6264} ByteBuffer ;
6365
66+ typedef struct
67+ {
68+ TransactionId xid ;
69+ int count ;
70+ } ExternalTransaction ;
6471
6572#define DTM_SHMEM_SIZE (1024*1024)
6673#define DTM_HASH_SIZE 1003
6774
6875void _PG_init (void );
6976void _PG_fini (void );
7077
71- extern void LogicalReplicationStartReceivers (char * nodes , int node_id );
72- extern void LogicalReplicationBroadcastXid (TransactonId Xid );
73-
74- void MultimasterBeginTransaction (void );
75- void MultimasterJoinTransaction (TransactionId xid );
76-
7778static Snapshot DtmGetSnapshot (Snapshot snapshot );
7879static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
7980static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
@@ -103,6 +104,7 @@ static void ByteBufferFree(ByteBuffer* buf);
103104
104105static shmem_startup_hook_type prev_shmem_startup_hook ;
105106static HTAB * xid_in_doubt ;
107+ static HTAB * external_trans ;
106108static DtmState * dtm ;
107109static Snapshot CurrentTransactionSnapshot ;
108110
@@ -126,11 +128,14 @@ static TransactionManager DtmTM = {
126128
127129static char * MultimasterConnStrs ;
128130static int MultimasterNodeId ;
131+ static int MultimasterNodes ;
129132
130133static char * DtmHost ;
131134static int DtmPort ;
132135static int DtmBufferSize ;
133136
137+ bool isBackgroundWorker ;
138+
134139static BackgroundWorker DtmWorker = {
135140 "DtmWorker" ,
136141 0 , /* do not need connection to the database */
@@ -694,6 +699,7 @@ static void DtmInitialize()
694699 dtm -> xidLock = LWLockAssign ();
695700 dtm -> nReservedXids = 0 ;
696701 dtm -> minXid = InvalidTransactionId ;
702+ dtm -> nNodes = MultimasterNodes ;
697703 RegisterXactCallback (DtmXactCallback , NULL );
698704 }
699705 LWLockRelease (AddinShmemInitLock );
@@ -709,6 +715,17 @@ static void DtmInitialize()
709715 HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
710716 );
711717
718+ info .keysize = sizeof (TransactionId );
719+ info .entrysize = sizeof (ExternalTransaction );
720+ info .hash = dtm_xid_hash_fn ;
721+ info .match = dtm_xid_match_fn ;
722+ external_trans = ShmemInitHash (
723+ "external_trans" ,
724+ DTM_HASH_SIZE , DTM_HASH_SIZE ,
725+ & info ,
726+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
727+ );
728+
712729
713730 TM = & DtmTM ;
714731}
@@ -720,7 +737,9 @@ DtmXactCallback(XactEvent event, void *arg)
720737 switch (event )
721738 {
722739 case XACT_EVENT_BEGIN :
723- MultimasterBeginTransaction ();
740+ if (!isBackgroundWorker ) {
741+ MultimasterBeginTransaction ();
742+ }
724743 break ;
725744 case XACT_EVENT_COMMIT :
726745 case XACT_EVENT_ABORT :
@@ -865,7 +884,7 @@ _PG_init(void)
865884 NULL
866885 );
867886
868- LogicalReplicationStartReceivers (MultimasterConnStrs , MultimasterNodeId );
887+ MultimasterNodes = LogicalReplicationStartReceivers (MultimasterConnStrs , MultimasterNodeId );
869888
870889 if (DtmBufferSize != 0 )
871890 {
@@ -931,10 +950,8 @@ dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
931950
932951void MultimasterBeginTransaction (void )
933952{
934- if (TransactionIdIsValid (DtmNextXid )) {
935- /* slave transaction */
936- return ;
937- }
953+ if (TransactionIdIsValid (DtmNextXid ))
954+ elog (ERROR , "MultimasterBeginTransaction should be called only once for global transaction" );
938955 if (dtm == NULL )
939956 elog (ERROR , "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf" );
940957 DtmNextXid = DtmGlobalStartTransaction (& DtmSnapshot , & dtm -> minXid );
@@ -944,12 +961,12 @@ void MultimasterBeginTransaction(void)
944961
945962 DtmHasGlobalSnapshot = true;
946963 DtmLastSnapshot = NULL ;
947-
948- LogicalReplicationBroadcastXid (DtmNextXid );
949964}
950965
951966void MultimasterJoinTransaction (TransactionId xid )
952967{
968+ ExternalTrans * et ;
969+
953970 if (TransactionIdIsValid (DtmNextXid ))
954971 elog (ERROR , "dtm_begin/join_transaction should be called only once for global transaction" );
955972 DtmNextXid = xid ;
@@ -961,14 +978,39 @@ void MultimasterJoinTransaction(TransactionId xid)
961978
962979 DtmHasGlobalSnapshot = true;
963980 DtmLastSnapshot = NULL ;
981+
982+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
983+ et = hash_search (external_trans , & DtmNextXid , HASH_ENTER , NULL );
984+ et -> count = dtm -> nNodes - 1 ;
985+ LWLockRelease (dtm -> hashLock );
964986}
965987
988+ bool MultimasterIsExternalTransaction (TransactionId xid )
989+ {
990+ ExternalTrans * et ;
991+ bool result = false;
992+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
993+ et = hash_search (external_trans , & DtmNextXid , HASH_FIND , NULL );
994+ if (et != NULL ) {
995+ result = true;
996+ if (-- et -> count == 0 ) {
997+ hash_search (external_trans , & DtmNextXid , HASH_REMOVE , NULL );
998+ }
999+ }
1000+ LWLockRelease (dtm -> hashLock );
1001+ return result ;
1002+ }
1003+
1004+
1005+
9661006void DtmBackgroundWorker (Datum arg )
9671007{
9681008 Shub shub ;
9691009 ShubParams params ;
9701010 char unix_sock_path [MAXPGPATH ];
9711011
1012+ isBackgroundWorker = true;
1013+
9721014 snprintf (unix_sock_path , sizeof (unix_sock_path ), "%s/p%d" , Unix_socket_directories , DtmPort );
9731015
9741016 ShubInitParams (& params );
0 commit comments