@@ -171,6 +171,7 @@ MtmConnectionInfo* MtmConnections;
171171static char * MtmConnStrs ;
172172static int MtmQueueSize ;
173173static int MtmWorkers ;
174+ static int MtmVacuumDelay ;
174175static int MtmMinRecoveryLag ;
175176static int MtmMaxRecoveryLag ;
176177
@@ -408,36 +409,48 @@ MtmAdjustOldestXid(TransactionId xid)
408409{
409410 if (TransactionIdIsValid (xid )) {
410411 MtmTransState * ts , * prev = NULL ;
411-
412+ csn_t oldestSnapshot = 0 ;
413+ int i ;
414+
412415 MtmLock (LW_EXCLUSIVE );
413- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
414- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) { /* committed transactions have same CSNs at all nodes */
415- csn_t oldestSnapshot ;
416- int i ;
417-
418- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot = ts -> csn ;
419- for (i = 0 ; i < MtmNodes ; i ++ ) {
420- if (Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot ) {
421- oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
422- }
416+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
417+ if (TransactionIdPrecedes (ts -> xid , xid )
418+ && ts -> status == TRANSACTION_STATUS_COMMITTED
419+ && ts -> csn > oldestSnapshot )
420+ {
421+ oldestSnapshot = ts -> csn ;
423422 }
424- for (ts = Mtm -> transListHead ;
425- ts != NULL
426- && ts -> csn < oldestSnapshot
427- && (ts -> status == TRANSACTION_STATUS_COMMITTED || ts -> status == TRANSACTION_STATUS_ABORTED )
428- && TransactionIdPrecedes (ts -> xid , xid );
429- prev = ts , ts = ts -> next )
423+ }
424+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
425+ for (i = 0 ; i < MtmNodes ; i ++ ) {
426+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
427+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
430428 {
431- if (prev != NULL ) {
432- /* Remove information about too old transactions */
433- hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
434- }
429+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
435430 }
436- }
437- if (prev != NULL ) {
438- Mtm -> transListHead = prev ;
439- Mtm -> oldestXid = xid = prev -> xid ;
440- } else {
431+ }
432+ oldestSnapshot -= MtmVacuumDelay * USEC ;
433+ for (ts = Mtm -> transListHead ;
434+ ts != NULL
435+ && ts -> csn < oldestSnapshot
436+ && TransactionIdPrecedes (ts -> xid , xid )
437+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
438+ ts -> status == TRANSACTION_STATUS_ABORTED );
439+ ts = ts -> next )
440+ {
441+ if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
442+ prev = ts ;
443+ }
444+ }
445+ if (prev != NULL ) {
446+ for (ts = Mtm -> transListHead ; ts != prev ; ts = ts -> next ) {
447+ /* Remove information about too old transactions */
448+ Assert (ts -> status != TRANSACTION_STATUS_UNKNOWN );
449+ hash_search (MtmXid2State , & ts -> xid , HASH_REMOVE , NULL );
450+ }
451+ Mtm -> transListHead = prev ;
452+ Mtm -> oldestXid = xid = prev -> xid ;
453+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
441454 xid = Mtm -> oldestXid ;
442455 }
443456 MtmUnlock ();
@@ -1325,6 +1338,21 @@ _PG_init(void)
13251338 NULL
13261339 );
13271340
1341+ DefineCustomIntVariable (
1342+ "multimaster.vacuum_delay" ,
1343+ "Minimal age of records which can be vacuumed (seconds)" ,
1344+ NULL ,
1345+ & MtmVacuumDelay ,
1346+ 1 ,
1347+ 1 ,
1348+ INT_MAX ,
1349+ PGC_BACKEND ,
1350+ 0 ,
1351+ NULL ,
1352+ NULL ,
1353+ NULL
1354+ );
1355+
13281356 DefineCustomIntVariable (
13291357 "multimaster.queue_size" ,
13301358 "Multimaster queue size" ,
0 commit comments