@@ -166,10 +166,12 @@ int MtmConnectAttempts;
166166int MtmConnectTimeout ;
167167int MtmKeepaliveTimeout ;
168168int MtmReconnectAttempts ;
169+ MtmConnectionInfo * MtmConnections ;
169170
170171static char * MtmConnStrs ;
171172static int MtmQueueSize ;
172173static int MtmWorkers ;
174+ static int MtmVacuumDelay ;
173175static int MtmMinRecoveryLag ;
174176static int MtmMaxRecoveryLag ;
175177
@@ -402,26 +404,90 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
402404 * We collest oldest CSNs from all nodes and choose minimum from them.
403405 * If no such XID can be located, then return previously observed oldest XID
404406 */
407+ #if 0
405408static TransactionId
406409MtmAdjustOldestXid (TransactionId xid )
407410{
408411 if (TransactionIdIsValid (xid )) {
409412 MtmTransState * ts , * prev = NULL ;
410-
413+ csn_t oldestSnapshot = 0 ;
414+ int i ;
415+
411416 MtmLock (LW_EXCLUSIVE );
412- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
413- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) { /* committed transactions have same CSNs at all nodes */
414- csn_t oldestSnapshot ;
415- int i ;
417+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
418+ if (TransactionIdPrecedes (ts -> xid , xid )
419+ && ts -> status == TRANSACTION_STATUS_COMMITTED
420+ && ts -> csn > oldestSnapshot )
421+ {
422+ oldestSnapshot = ts -> csn ;
423+ }
424+ }
425+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
426+ for (i = 0 ; i < MtmNodes ; i ++ ) {
427+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
428+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
429+ {
430+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
431+ }
432+ }
433+ oldestSnapshot -= MtmVacuumDelay * USEC ;
434+ for (ts = Mtm -> transListHead ;
435+ ts != NULL
436+ && ts -> csn < oldestSnapshot
437+ && TransactionIdPrecedes (ts -> xid , xid )
438+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
439+ ts -> status == TRANSACTION_STATUS_ABORTED );
440+ ts = ts -> next )
441+ {
442+ if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
443+ prev = ts ;
444+ }
445+ }
446+ if (prev != NULL ) {
447+ for (ts = Mtm -> transListHead ; ts != prev ; ts = ts -> next ) {
448+ /* Remove information about too old transactions */
449+ Assert (ts -> status != TRANSACTION_STATUS_UNKNOWN );
450+ hash_search (MtmXid2State , & ts -> xid , HASH_REMOVE , NULL );
451+ }
452+ Mtm -> transListHead = prev ;
453+ Mtm -> oldestXid = xid = prev -> xid ;
454+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
455+ xid = Mtm -> oldestXid ;
456+ }
457+ MtmUnlock ();
458+ }
459+ return xid ;
460+ }
461+ #else
462+ static TransactionId
463+ MtmAdjustOldestXid (TransactionId xid )
464+ {
465+ if (TransactionIdIsValid (xid )) {
466+ MtmTransState * ts , * prev = NULL ;
467+ int i ;
416468
417- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot = ts -> csn ;
469+ MtmLock (LW_EXCLUSIVE );
470+ ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
471+ if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
472+ csn_t oldestSnapshot = ts -> csn ;
473+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
418474 for (i = 0 ; i < MtmNodes ; i ++ ) {
419- if (Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot ) {
475+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
476+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
477+ {
420478 oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
421479 }
422480 }
423- for (ts = Mtm -> transListHead ; ts != NULL && ts -> csn < oldestSnapshot ; prev = ts , ts = ts -> next ) {
424- Assert (ts -> status == TRANSACTION_STATUS_COMMITTED || ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
481+ oldestSnapshot -= MtmVacuumDelay * USEC ;
482+
483+ for (ts = Mtm -> transListHead ;
484+ ts != NULL
485+ && ts -> csn < oldestSnapshot
486+ && TransactionIdPrecedes (ts -> xid , xid )
487+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
488+ ts -> status == TRANSACTION_STATUS_ABORTED );
489+ prev = ts , ts = ts -> next )
490+ {
425491 if (prev != NULL ) {
426492 /* Remove information about too old transactions */
427493 hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
@@ -431,14 +497,14 @@ MtmAdjustOldestXid(TransactionId xid)
431497 if (prev != NULL ) {
432498 Mtm -> transListHead = prev ;
433499 Mtm -> oldestXid = xid = prev -> xid ;
434- } else {
500+ } else if ( TransactionIdPrecedes ( Mtm -> oldestXid , xid )) {
435501 xid = Mtm -> oldestXid ;
436502 }
437503 MtmUnlock ();
438504 }
439505 return xid ;
440506}
441-
507+ #endif
442508/*
443509 * -------------------------------------------
444510 * Transaction list manipulation
@@ -989,7 +1055,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
9891055 for (i = 0 ; i < n ; i ++ ) {
9901056 if (i + 1 != MtmNodeId ) {
9911057 void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
992- matrix [i ] = * (nodemask_t * )data ;
1058+ matrix [i ] = data ? * (nodemask_t * )data : 0 ;
9931059 } else {
9941060 matrix [i ] = Mtm -> connectivityMask ;
9951061 }
@@ -1153,6 +1219,7 @@ static void MtmInitialize()
11531219 for (i = 0 ; i < MtmNodes ; i ++ ) {
11541220 Mtm -> nodes [i ].oldestSnapshot = 0 ;
11551221 Mtm -> nodes [i ].transDelay = 0 ;
1222+ Mtm -> nodes [i ].con = MtmConnections [i ];
11561223 }
11571224 PGSemaphoreCreate (& Mtm -> votingSemaphore );
11581225 PGSemaphoreReset (& Mtm -> votingSemaphore );
@@ -1178,17 +1245,17 @@ MtmShmemStartup(void)
11781245 MtmInitialize ();
11791246}
11801247
1181- void MtmUpdateNodeConnStr ( int nodeId , char const * connStr )
1248+ void MtmUpdateNodeConnectionInfo ( MtmConnectionInfo * conn , char const * connStr )
11821249{
11831250 char const * host ;
11841251 char const * end ;
11851252 int hostLen ;
11861253
11871254 if (strlen (connStr ) >= MULTIMASTER_MAX_CONN_STR_SIZE ) {
1188- elog (ERROR , "Too long (%d) connection string '%s' for node %d, limit is %d" ,
1189- (int )strlen (connStr ), connStr , nodeId , MULTIMASTER_MAX_CONN_STR_SIZE - 1 );
1255+ elog (ERROR , "Too long (%d) connection string '%s': limit is %d" ,
1256+ (int )strlen (connStr ), connStr , MULTIMASTER_MAX_CONN_STR_SIZE - 1 );
11901257 }
1191- strcpy (Mtm -> nodes [ nodeId - 1 ]. connStr , connStr );
1258+ strcpy (conn -> connStr , connStr );
11921259
11931260 host = strstr (connStr , "host=" );
11941261 if (host == NULL ) {
@@ -1198,30 +1265,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
11981265 for (end = host ; * end != ' ' && * end != '\0' ; end ++ );
11991266 hostLen = end - host ;
12001267 if (hostLen >= MULTIMASTER_MAX_HOST_NAME_SIZE ) {
1201- elog (ERROR , "Too long (%d) host name '%.*s' for node %d, limit is %d" ,
1202- hostLen , hostLen , host , nodeId , MULTIMASTER_MAX_HOST_NAME_SIZE - 1 );
1268+ elog (ERROR , "Too long (%d) host name '%.*s': limit is %d" ,
1269+ hostLen , hostLen , host , MULTIMASTER_MAX_HOST_NAME_SIZE - 1 );
12031270 }
1204- memcpy (Mtm -> nodes [ nodeId - 1 ]. hostName , host , hostLen );
1205- Mtm -> nodes [ nodeId - 1 ]. hostName [hostLen ] = '\0' ;
1271+ memcpy (conn -> hostName , host , hostLen );
1272+ conn -> hostName [hostLen ] = '\0' ;
12061273}
12071274
12081275static void MtmSplitConnStrs (void )
12091276{
12101277 int i ;
1211- char * copy = strdup (MtmConnStrs );
1278+ char * copy = pstrdup (MtmConnStrs );
12121279 char * connStr = copy ;
12131280 char * connStrEnd = connStr + strlen (connStr );
12141281
1282+ for (i = 0 ; connStr < connStrEnd ; i ++ ) {
1283+ char * p = strchr (connStr , ',' );
1284+ if (p == NULL ) {
1285+ p = connStrEnd ;
1286+ }
1287+ connStr = p + 1 ;
1288+ }
1289+ if (i > MAX_NODES ) {
1290+ elog (ERROR , "Multimaster with more than %d nodes is not currently supported" , MAX_NODES );
1291+ }
1292+ if (i < 2 ) {
1293+ elog (ERROR , "Multimaster should have at least two nodes" );
1294+ }
1295+ MtmNodes = i ;
1296+ MtmConnections = (MtmConnectionInfo * )palloc (i * sizeof (MtmConnectionInfo ));
1297+ connStr = copy ;
1298+
12151299 for (i = 0 ; connStr < connStrEnd ; i ++ ) {
12161300 char * p = strchr (connStr , ',' );
12171301 if (p == NULL ) {
12181302 p = connStrEnd ;
12191303 }
1220- if (i == MAX_NODES ) {
1221- elog (ERROR , "Multimaster with more than %d nodes is not currently supported" , MAX_NODES );
1222- }
12231304 * p = '\0' ;
1224- MtmUpdateNodeConnStr (i + 1 , connStr );
1305+
1306+ MtmUpdateNodeConnectionInfo (& MtmConnections [i ], connStr );
1307+
12251308 if (i + 1 == MtmNodeId ) {
12261309 char * dbName = strstr (connStr , "dbname=" );
12271310 char * end ;
@@ -1232,20 +1315,13 @@ static void MtmSplitConnStrs(void)
12321315 dbName += 7 ;
12331316 for (end = dbName ; * end != ' ' && * end != '\0' ; end ++ );
12341317 len = end - dbName ;
1235- MtmDatabaseName = (char * )malloc (len + 1 );
1318+ MtmDatabaseName = (char * )palloc (len + 1 );
12361319 memcpy (MtmDatabaseName , dbName , len );
12371320 MtmDatabaseName [len ] = '\0' ;
12381321 }
12391322 connStr = p + 1 ;
12401323 }
1241- free (copy );
1242- if (i < 2 ) {
1243- elog (ERROR , "Multimaster should have at least two nodes" );
1244- }
1245- MtmNodes = i ;
1246- if (MtmNodeId > MtmNodes ) {
1247- elog (ERROR , "Invalid node id %d for specified nubmer of nodes %d" , MtmNodeId , MtmNodes );
1248- }
1324+ pfree (copy );
12491325}
12501326
12511327void
@@ -1309,6 +1385,21 @@ _PG_init(void)
13091385 NULL
13101386 );
13111387
1388+ DefineCustomIntVariable (
1389+ "multimaster.vacuum_delay" ,
1390+ "Minimal age of records which can be vacuumed (seconds)" ,
1391+ NULL ,
1392+ & MtmVacuumDelay ,
1393+ 1 ,
1394+ 1 ,
1395+ INT_MAX ,
1396+ PGC_BACKEND ,
1397+ 0 ,
1398+ NULL ,
1399+ NULL ,
1400+ NULL
1401+ );
1402+
13121403 DefineCustomIntVariable (
13131404 "multimaster.queue_size" ,
13141405 "Multimaster queue size" ,
0 commit comments