@@ -200,6 +200,7 @@ static int MtmMinRecoveryLag;
200200static int MtmMaxRecoveryLag ;
201201static int Mtm2PCPrepareRatio ;
202202static int Mtm2PCMinTimeout ;
203+ static int MtmGcPeriod ;
203204static bool MtmIgnoreTablesWithoutPk ;
204205
205206static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -342,16 +343,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342343Snapshot MtmGetSnapshot (Snapshot snapshot )
343344{
344345 snapshot = PgGetSnapshotData (snapshot );
345- RecentGlobalDataXmin = RecentGlobalXmin = Mtm -> oldestXid ;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346+ RecentGlobalDataXmin = RecentGlobalXmin = Mtm -> oldestXid ;
346347 return snapshot ;
347348}
348349
349350
350351TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum )
351352{
352353 TransactionId xmin = PgGetOldestXmin (NULL , false); /* consider all backends */
353- xmin = MtmAdjustOldestXid (xmin );
354- return xmin ;
354+ if (TransactionIdIsValid (xmin )) {
355+ MtmLock (LW_EXCLUSIVE );
356+ xmin = MtmAdjustOldestXid (xmin );
357+ MtmUnlock ();
358+ }
359+ return xmin ;
355360}
356361
357362bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot )
@@ -446,53 +451,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
446451static TransactionId
447452MtmAdjustOldestXid (TransactionId xid )
448453{
449- if (TransactionIdIsValid (xid )) {
450- MtmTransState * ts , * prev = NULL ;
451- int i ;
452-
453- MtmLock (LW_EXCLUSIVE );
454- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
455- if (ts != NULL ) {
456- csn_t oldestSnapshot = ts -> snapshot ;
457- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
458- for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
459- if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
460- && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
461- {
462- oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
463- }
464- }
465- oldestSnapshot -= MtmVacuumDelay * USECS_PER_SEC ;
466-
467- for (ts = Mtm -> transListHead ;
468- ts != NULL
469- && ts -> csn < oldestSnapshot
470- && TransactionIdPrecedes (ts -> xid , xid )
471- && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
472- ts -> status == TRANSACTION_STATUS_ABORTED );
473- prev = ts , ts = ts -> next )
454+ int i ;
455+ MtmTransState * prev = NULL ;
456+ MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
457+ MTM_LOG1 ("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d" , MyProcPid , xid , ts != NULL ? ts -> snapshot : 0 , ts != NULL ? ts -> csn : 0 , ts != NULL ? ts -> status : -1 );
458+ Mtm -> gcCount = 0 ;
459+ if (ts != NULL ) {
460+ csn_t oldestSnapshot = ts -> snapshot ;
461+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
462+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
463+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
464+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
474465 {
475- if (prev != NULL ) {
476- /* Remove information about too old transactions */
477- hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
478- }
466+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
479467 }
480- }
481- if (MtmUseDtm )
468+ }
469+ oldestSnapshot -= MtmVacuumDelay * USECS_PER_SEC ;
470+
471+ for (ts = Mtm -> transListHead ;
472+ ts != NULL
473+ && ts -> csn < oldestSnapshot
474+ && TransactionIdPrecedes (ts -> xid , xid )
475+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
476+ ts -> status == TRANSACTION_STATUS_ABORTED );
477+ prev = ts , ts = ts -> next )
482478 {
483479 if (prev != NULL ) {
484- Mtm -> transListHead = prev ;
485- Mtm -> oldestXid = xid = prev -> xid ;
486- } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
487- xid = Mtm -> oldestXid ;
488- }
489- } else {
490- if (prev != NULL ) {
491- Mtm -> transListHead = prev ;
480+ /* Remove information about too old transactions */
481+ hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
492482 }
493483 }
494- MtmUnlock ();
495- }
484+ }
485+ if (MtmUseDtm )
486+ {
487+ if (prev != NULL ) {
488+ Mtm -> transListHead = prev ;
489+ Mtm -> oldestXid = xid = prev -> xid ;
490+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
491+ xid = Mtm -> oldestXid ;
492+ }
493+ } else {
494+ if (prev != NULL ) {
495+ Mtm -> transListHead = prev ;
496+ }
497+ }
496498 return xid ;
497499}
498500/*
@@ -614,7 +616,12 @@ static void
614616MtmBeginTransaction (MtmCurrentTrans * x )
615617{
616618 if (x -> snapshot == INVALID_CSN ) {
617- MtmLock (LW_EXCLUSIVE );
619+ TransactionId xmin = (Mtm -> gcCount >= MtmGcPeriod ) ? PgGetOldestXmin (NULL , false) : InvalidTransactionId ; /* Get oldest xmin outside critical section */
620+
621+ MtmLock (LW_EXCLUSIVE );
622+ if (TransactionIdIsValid (xmin ) && Mtm -> gcCount >= MtmGcPeriod ) {
623+ MtmAdjustOldestXid (xmin );
624+ }
618625 x -> xid = GetCurrentTransactionIdIfAny ();
619626 x -> isReplicated = false;
620627 x -> isDistributed = MtmIsUserTransaction ();
@@ -690,7 +697,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
690697 }
691698
692699 MtmLock (LW_EXCLUSIVE );
693-
694700 /*
695701 * Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
696702 * Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -716,8 +722,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716722
717723 x -> isPrepared = true;
718724 x -> csn = ts -> csn ;
719-
725+
720726 Mtm -> transCount += 1 ;
727+ Mtm -> gcCount += 1 ;
728+
721729 MtmTransactionListAppend (ts );
722730 MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
723731 MTM_LOG3 ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)" ,
@@ -1466,8 +1474,9 @@ static void MtmInitialize()
14661474 Mtm -> transListHead = NULL ;
14671475 Mtm -> transListTail = & Mtm -> transListHead ;
14681476 Mtm -> nReceivers = 0 ;
1469- Mtm -> timeShift = 0 ;
1477+ Mtm -> timeShift = 0 ;
14701478 Mtm -> transCount = 0 ;
1479+ Mtm -> gcCount = 0 ;
14711480 Mtm -> nConfigChanges = 0 ;
14721481 Mtm -> localTablesHashLoaded = false;
14731482 for (i = 0 ; i < MtmNodes ; i ++ ) {
@@ -1600,6 +1609,21 @@ _PG_init(void)
16001609 if (!process_shared_preload_libraries_in_progress )
16011610 return ;
16021611
1612+ DefineCustomIntVariable (
1613+ "multimaster.gc_period" ,
1614+ "Number of distributed transactions after which garbage collection is started" ,
1615+ "Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map" ,
1616+ & MtmGcPeriod ,
1617+ MTM_HASH_SIZE /10 ,
1618+ 1 ,
1619+ INT_MAX ,
1620+ PGC_BACKEND ,
1621+ 0 ,
1622+ NULL ,
1623+ NULL ,
1624+ NULL
1625+ );
1626+
16031627 DefineCustomIntVariable (
16041628 "multimaster.max_nodes" ,
16051629 "Maximal number of cluster nodes" ,
@@ -2339,7 +2363,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23392363 values [11 ] = Int32GetDatum (Mtm -> recoverySlot );
23402364 values [12 ] = Int64GetDatum (hash_get_num_entries (MtmXid2State ));
23412365 values [13 ] = Int64GetDatum (hash_get_num_entries (MtmGid2State ));
2342- values [14 ] = Int64GetDatum (Mtm -> oldestSnapshot );
2366+ values [14 ] = Int32GetDatum (Mtm -> oldestXid );
23432367 values [15 ] = Int32GetDatum (Mtm -> nConfigChanges );
23442368
23452369 PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
0 commit comments