@@ -274,8 +274,8 @@ void MtmSleep(timestamp_t interval)
274274{
275275 struct timespec ts ;
276276 struct timespec rem ;
277- ts .tv_sec = interval /1000000 ;
278- ts .tv_nsec = interval %1000000 * 1000 ;
277+ ts .tv_sec = interval /USECS_PER_SEC ;
278+ ts .tv_nsec = interval %USECS_PER_SEC * 1000 ;
279279
280280 while (nanosleep (& ts , & rem ) < 0 ) {
281281 Assert (errno == EINTR );
@@ -330,7 +330,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
330330
331331 MtmLock (LW_SHARED );
332332 ts = hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
333- if (ts != NULL ) {
333+ if (ts != NULL && ! ts -> isLocal ) {
334334 snapshot = ts -> snapshot ;
335335 }
336336 MtmUnlock ();
@@ -452,8 +452,8 @@ MtmAdjustOldestXid(TransactionId xid)
452452
453453 MtmLock (LW_EXCLUSIVE );
454454 ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
455- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
456- csn_t oldestSnapshot = ts -> csn ;
455+ if (ts != NULL ) {
456+ csn_t oldestSnapshot = ts -> snapshot ;
457457 Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
458458 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
459459 if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
@@ -483,8 +483,7 @@ MtmAdjustOldestXid(TransactionId xid)
483483 if (prev != NULL ) {
484484 Mtm -> transListHead = prev ;
485485 Mtm -> oldestXid = xid = prev -> xid ;
486- } else {
487- Assert (TransactionIdPrecedesOrEquals (Mtm -> oldestXid , xid ));
486+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
488487 xid = Mtm -> oldestXid ;
489488 }
490489 } else {
@@ -650,6 +649,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
650649 if (!found ) {
651650 ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
652651 ts -> snapshot = x -> snapshot ;
652+ ts -> isLocal = true;
653653 if (TransactionIdIsValid (x -> gtid .xid )) {
654654 Assert (x -> gtid .node != MtmNodeId );
655655 ts -> gtid = x -> gtid ;
@@ -704,7 +704,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704704 /*
705705 * Invalid CSN prevent replication of transaction by logical replication
706706 */
707- ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
707+ ts -> isLocal = x -> isReplicated || !x -> containsDML ;
708+ ts -> snapshot = x -> snapshot ;
708709 ts -> csn = MtmAssignCSN ();
709710 ts -> procno = MyProc -> pgprocno ;
710711 ts -> nVotes = 1 ; /* I am voted myself */
@@ -752,16 +753,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
752753 } else {
753754 time_t timeout = Max (Mtm2PCMinTimeout , (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100000 ); /* usec->msec and percents */
754755 int result = 0 ;
756+ int nConfigChanges = Mtm -> nConfigChanges ;
755757 /* wait votes from all nodes */
756758 while (!ts -> votingCompleted && !(result & WL_TIMEOUT )) {
757759 MtmUnlock ();
758760 result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET |WL_TIMEOUT , timeout );
759761 ResetLatch (& MyProc -> procLatch );
760762 MtmLock (LW_SHARED );
761763 }
762- if (!ts -> votingCompleted ) {
764+ if (!ts -> votingCompleted ) {
763765 ts -> status = TRANSACTION_STATUS_ABORTED ;
764- elog (WARNING , "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec" , (int )timeout , (int )((ts -> csn - x -> snapshot )/1000 ));
766+ elog (WARNING , "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec" , (int )timeout , (int )USEC_TO_MSEC (ts -> csn - x -> snapshot ));
767+ } else if (nConfigChanges != Mtm -> nConfigChanges ) {
768+ ts -> status = TRANSACTION_STATUS_ABORTED ;
769+ elog (WARNING , "Transaction is aborted because cluster configuration is changed during commit" );
765770 }
766771 x -> status = ts -> status ;
767772 MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
@@ -830,7 +835,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
830835 Assert (TransactionIdIsValid (x -> xid ));
831836 ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , NULL );
832837 ts -> status = TRANSACTION_STATUS_ABORTED ;
833- ts -> snapshot = INVALID_CSN ;
838+ ts -> isLocal = true;
839+ ts -> snapshot = x -> snapshot ;
834840 ts -> csn = MtmAssignCSN ();
835841 ts -> gtid = x -> gtid ;
836842 ts -> nSubxids = 0 ;
@@ -1089,6 +1095,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10891095 BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
10901096 Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
10911097 Mtm -> nLiveNodes += 1 ;
1098+ Mtm -> nConfigChanges += 1 ;
10921099 caughtUp = true;
10931100 } else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
10941101 && slotLSN + MtmMinRecoveryLag > walLSN )
@@ -1263,6 +1270,7 @@ bool MtmRefreshClusterStatus(bool nowait)
12631270
12641271void MtmCheckQuorum (void )
12651272{
1273+ Mtm -> nConfigChanges += 1 ;
12661274 if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) {
12671275 if (Mtm -> status == MTM_ONLINE ) { /* out of quorum */
12681276 elog (WARNING , "Node is in minority: disabled mask %lx" , (long ) Mtm -> disabledNodeMask );
@@ -1460,6 +1468,7 @@ static void MtmInitialize()
14601468 Mtm -> nReceivers = 0 ;
14611469 Mtm -> timeShift = 0 ;
14621470 Mtm -> transCount = 0 ;
1471+ Mtm -> nConfigChanges = 0 ;
14631472 Mtm -> localTablesHashLoaded = false;
14641473 for (i = 0 ; i < MtmNodes ; i ++ ) {
14651474 Mtm -> nodes [i ].oldestSnapshot = 0 ;
0 commit comments