@@ -696,6 +696,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
696696 ts -> votingCompleted = false;
697697 ts -> cmd = MSG_INVALID ;
698698 ts -> nSubxids = xactGetCommittedChildren (& subxids );
699+ Mtm -> nActiveTransactions += 1 ;
699700
700701 x -> isPrepared = true;
701702 x -> csn = ts -> csn ;
@@ -795,6 +796,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
795796 ts -> status = TRANSACTION_STATUS_ABORTED ;
796797 }
797798 MtmAdjustSubtransactions (ts );
799+ Assert (Mtm -> nActiveTransactions != 0 );
800+ Mtm -> nActiveTransactions -= 1 ;
798801 }
799802 if (!commit && x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
800803 /*
@@ -836,6 +839,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
836839 }
837840}
838841
842+ void MtmRecoveryCompleted (void )
843+ {
844+ elog (WARNING , "Recevoery of node %d is completed" , MtmNodeId );
845+ Mtm -> recoverySlot = 0 ;
846+ MtmSwitchClusterMode (MTM_ONLINE );
847+ }
848+
839849void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
840850{
841851 MtmLock (LW_EXCLUSIVE );
@@ -847,8 +857,7 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
847857 Assert (Mtm -> status == MTM_RECOVERY );
848858 } else if (Mtm -> status == MTM_RECOVERY ) {
849859 /* When recovery is completed we get normal transaction ID and switch to normal mode */
850- Mtm -> recoverySlot = 0 ;
851- MtmSwitchClusterMode (MTM_ONLINE );
860+ MtmRecoveryCompleted ();
852861 }
853862 MtmTx .gtid = * gtid ;
854863 MtmTx .xid = GetCurrentTransactionId ();
@@ -973,35 +982,52 @@ static int64 MtmGetSlotLag(int nodeId)
973982 */
974983bool MtmIsRecoveredNode (int nodeId )
975984{
976- return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) ;
985+ return BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 );
977986}
978987
979988
980- void MtmRecoveryPorgress ( XLogRecPtr lsn )
989+ bool MtmRecoveryCaughtUp ( int nodeId , XLogRecPtr slotLSN )
981990{
982-
983- Assert (MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
984- if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
985- && MyWalSnd -> sentPtr + MtmMinRecoveryLag > GetXLogInsertRecPtr ())
991+ bool caughtUp = false;
992+ if (MtmIsRecoveredNode (nodeId )) {
993+ XLogRecPtr walLSN = GetXLogInsertRecPtr ();
994+ MtmLock (LW_EXCLUSIVE );
995+ if (slotLSN == walLSN ) {
996+ if (BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )) {
997+ elog (WARNING ,"Node %d is caught-up" , nodeId );
998+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
999+ BIT_CLEAR (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
1000+ BIT_CLEAR (Mtm -> nodeLockerMask , nodeId - 1 );
1001+ Mtm -> nLockers -= 1 ;
1002+ } else {
1003+ elog (WARNING ,"Node %d is caugth-up without locking cluster" , nodeId );
1004+ /* We are lucky: caugth-up without locking cluster! */
1005+ Mtm -> nNodes += 1 ;
1006+ BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1007+ }
1008+ caughtUp = true;
1009+ } else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
1010+ && slotLSN + MtmMinRecoveryLag > walLSN )
9861011 {
9871012 /*
9881013 * Wal sender almost catched up.
9891014 * Lock cluster preventing new transaction to start until wal is completely replayed.
9901015 * We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
9911016 * Is there some better way to establish mapping between nodes ad WAL-seconder?
9921017 */
993- elog (WARNING ,"Node %d is catching up " , nodeId );
994- MtmLock ( LW_EXCLUSIVE );
1018+ elog (WARNING ,"Node %d is almost caught-up: lock cluster " , nodeId );
1019+ Assert ( MyWalSnd != NULL ); /* This function is called by WAL-sender, so it should not be NULL */
9951020 BIT_SET (Mtm -> nodeLockerMask , nodeId - 1 );
9961021 BIT_SET (Mtm -> walSenderLockerMask , MyWalSnd - WalSndCtl -> walsnds );
9971022 Mtm -> nLockers += 1 ;
998- MtmUnlock ();
9991023 } else {
1000- MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n" , nodeId , MyWalSnd -> sentPtr , GetXLogInsertRecPtr () , Mtm -> nLockers );
1024+ MTM_INFO ("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d \n" , nodeId , slotLSN , walLSN , MyWalSnd -> sentPtr , Mtm -> nLockers , Mtm -> nActiveTransactions );
10011025 }
1002- return true;
1026+ MtmUnlock ();
1027+ } else {
1028+ MTM_INFO ("Node %d is not in recovery mode\n" , nodeId );
10031029 }
1004- return false ;
1030+ return caughtUp ;
10051031}
10061032
10071033void MtmSwitchClusterMode (MtmNodeStatus mode )
@@ -1020,22 +1046,24 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10201046static void
10211047MtmCheckClusterLock ()
10221048{
1049+ timestamp_t delay = MIN_WAIT_TIMEOUT ;
10231050 while (true)
10241051 {
10251052 nodemask_t mask = Mtm -> walSenderLockerMask ;
10261053 if (mask != 0 ) {
1027- XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1028- int i ;
1029- timestamp_t delay = MIN_WAIT_TIMEOUT ;
1030- for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1031- if (mask & 1 ) {
1032- if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1033- /* recovery is in progress */
1034- break ;
1035- } else {
1036- /* recovered replica catched up with master */
1037- elog (WARNING , "WAL-sender %d complete recovery" , i );
1038- BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1054+ if (Mtm -> nActiveTransactions == 0 ) {
1055+ XLogRecPtr currLogPos = GetXLogInsertRecPtr ();
1056+ int i ;
1057+ for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1058+ if (mask & 1 ) {
1059+ if (WalSndCtl -> walsnds [i ].sentPtr != currLogPos ) {
1060+ /* recovery is in progress */
1061+ break ;
1062+ } else {
1063+ /* recovered replica catched up with master */
1064+ elog (WARNING , "WAL-sender %d complete recovery" , i );
1065+ BIT_CLEAR (Mtm -> walSenderLockerMask , i );
1066+ }
10391067 }
10401068 }
10411069 }
@@ -1265,6 +1293,7 @@ static void MtmInitialize()
12651293 Mtm -> walSenderLockerMask = 0 ;
12661294 Mtm -> nodeLockerMask = 0 ;
12671295 Mtm -> nLockers = 0 ;
1296+ Mtm -> nActiveTransactions = 0 ;
12681297 Mtm -> votingTransactions = NULL ;
12691298 Mtm -> transListHead = NULL ;
12701299 Mtm -> transListTail = & Mtm -> transListHead ;
@@ -1705,12 +1734,31 @@ void MtmDropNode(int nodeId, bool dropSlot)
17051734static void
17061735MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
17071736{
1737+ ListCell * param ;
1738+ bool isRecoverySession = false;
1739+ foreach (param , args -> in_params )
1740+ {
1741+ DefElem * elem = lfirst (param );
1742+ if (strcmp ("mtm_replication_mode" , elem -> defname ) == 0 ) {
1743+ isRecoverySession = elem -> arg != NULL && strVal (elem -> arg ) != NULL && strcmp (strVal (elem -> arg ), "recovery" ) == 0 ;
1744+ break ;
1745+ }
1746+ }
17081747 MtmLock (LW_EXCLUSIVE );
1709- if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1710- elog (WARNING , "Recovery of node %d is completed: start normal replication" , MtmReplicationNodeId );
1748+ if (isRecoverySession ) {
1749+ elog (WARNING , "Node %d start recovery of node %d" , MtmNodeId , MtmReplicationNodeId );
1750+ if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1751+ BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
1752+ Mtm -> nNodes -= 1 ;
1753+ MtmCheckQuorum ();
1754+ }
1755+ } else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
1756+ elog (WARNING , "Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
17111757 BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
17121758 Mtm -> nNodes += 1 ;
17131759 MtmCheckQuorum ();
1760+ } else {
1761+ elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
17141762 }
17151763 MtmUnlock ();
17161764}
@@ -1728,7 +1776,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
17281776 bool res = Mtm -> status != MTM_RECOVERY
17291777 && (args -> origin_id == InvalidRepOriginId
17301778 || MtmIsRecoveredNode (MtmReplicationNodeId ));
1731- MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
1779+ MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" , MyProcPid , res );
17321780 return res ;
17331781}
17341782
0 commit comments