@@ -369,13 +369,13 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
369369
370370static void MtmOpenConnections ()
371371{
372- int nNodes = MtmNodes ;
372+ int nNodes = MtmMaxNodes ;
373373 int i ;
374374
375375 sockets = (int * )palloc (sizeof (int )* nNodes );
376376
377377 for (i = 0 ; i < nNodes ; i ++ ) {
378- if (i + 1 != MtmNodeId ) {
378+ if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
379379 sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
380380 if (sockets [i ] < 0 ) {
381381 MtmOnNodeDisconnect (i + 1 );
@@ -384,8 +384,8 @@ static void MtmOpenConnections()
384384 sockets [i ] = -1 ;
385385 }
386386 }
387- if (Mtm -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
388- elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible " , Mtm -> nNodes , MtmNodes );
387+ if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
388+ elog (WARNING , "Node is out of quorum: only %d nodes of %d are accessible " , Mtm -> nLiveNodes , Mtm -> nAllNodes );
389389 MtmSwitchClusterMode (MTM_IN_MINORITY );
390390 } else if (Mtm -> status == MTM_INITIALIZATION ) {
391391 MtmSwitchClusterMode (MTM_CONNECTED );
@@ -444,7 +444,7 @@ static void MtmAcceptOneConnection()
444444 elog (WARNING , "Arbiter get unexpected handshake message %d" , req .hdr .code );
445445 close (fd );
446446 } else {
447- Assert (req .hdr .node > 0 && req .hdr .node <= MtmNodes && req .hdr .node != MtmNodeId );
447+ Assert (req .hdr .node > 0 && req .hdr .node <= Mtm -> nAllNodes && req .hdr .node != MtmNodeId );
448448 resp .code = MSG_STATUS ;
449449 resp .disabledNodeMask = Mtm -> disabledNodeMask ;
450450 resp .dxid = HANDSHAKE_MAGIC ;
@@ -472,9 +472,10 @@ static void MtmAcceptIncomingConnections()
472472 struct sockaddr_in sock_inet ;
473473 int on = 1 ;
474474 int i ;
475+ int nNodes = MtmMaxNodes ;
475476
476- sockets = (int * )palloc (sizeof (int )* MtmNodes );
477- for (i = 0 ; i < MtmNodes ; i ++ ) {
477+ sockets = (int * )palloc (sizeof (int )* nNodes );
478+ for (i = 0 ; i < nNodes ; i ++ ) {
478479 sockets [i ] = -1 ;
479480 }
480481 sock_inet .sin_family = AF_INET ;
@@ -490,7 +491,7 @@ static void MtmAcceptIncomingConnections()
490491 if (bind (gateway , (struct sockaddr * )& sock_inet , sizeof (sock_inet )) < 0 ) {
491492 elog (ERROR , "Arbiter failed to bind socket: %d" , errno );
492493 }
493- if (listen (gateway , MtmNodes ) < 0 ) {
494+ if (listen (gateway , nNodes ) < 0 ) {
494495 elog (ERROR , "Arbiter failed to listen socket: %d" , errno );
495496 }
496497
@@ -527,22 +528,22 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
527528{
528529 int i ;
529530 int n = 1 ;
530- for (i = 0 ; i < MtmNodes ; i ++ )
531+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
531532 {
532533 if (!BIT_CHECK (Mtm -> disabledNodeMask , i ) && TransactionIdIsValid (ts -> xids [i ])) {
533534 Assert (i + 1 != MtmNodeId );
534535 MtmAppendBuffer (txBuffer , ts -> xids [i ], i , ts );
535536 n += 1 ;
536537 }
537538 }
538- Assert (n == Mtm -> nNodes );
539+ Assert (n == Mtm -> nLiveNodes );
539540}
540541
541542
542543static void MtmTransSender (Datum arg )
543544{
544545 sigset_t sset ;
545- int nNodes = MtmNodes ;
546+ int nNodes = MtmMaxNodes ;
546547 int i ;
547548 MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
548549
@@ -580,7 +581,7 @@ static void MtmTransSender(Datum arg)
580581
581582 MtmUnlock ();
582583
583- for (i = 0 ; i < nNodes ; i ++ ) {
584+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
584585 if (txBuffer [i ].used != 0 ) {
585586 MtmSendToNode (i , txBuffer [i ].data , txBuffer [i ].used * sizeof (MtmArbiterMessage ));
586587 txBuffer [i ].used = 0 ;
@@ -593,7 +594,7 @@ static void MtmTransSender(Datum arg)
593594#if !USE_EPOLL
594595static bool MtmRecovery ()
595596{
596- int nNodes = MtmNodes ;
597+ int nNodes = Mtm -> nAllNodes ;
597598 bool recovered = false;
598599 int i ;
599600
@@ -618,7 +619,7 @@ static bool MtmRecovery()
618619static void MtmTransReceiver (Datum arg )
619620{
620621 sigset_t sset ;
621- int nNodes = MtmNodes ;
622+ int nNodes = MtmMaxNodes ;
622623 int nResponses ;
623624 int i , j , n , rc ;
624625 MtmBuffer * rxBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
@@ -708,7 +709,7 @@ static void MtmTransReceiver(Datum arg)
708709 if (MtmIsCoordinator (ts )) {
709710 switch (msg -> code ) {
710711 case MSG_READY :
711- Assert (ts -> nVotes < Mtm -> nNodes );
712+ Assert (ts -> nVotes < Mtm -> nLiveNodes );
712713 Mtm -> nodes [msg -> node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
713714 ts -> xids [msg -> node - 1 ] = msg -> sxid ;
714715
@@ -720,7 +721,7 @@ static void MtmTransReceiver(Datum arg)
720721 MtmAbortTransaction (ts );
721722 }
722723
723- if (++ ts -> nVotes == Mtm -> nNodes ) {
724+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
724725 /* All nodes are finished their transactions */
725726 if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
726727 MtmWakeUpBackend (ts );
@@ -736,23 +737,23 @@ static void MtmTransReceiver(Datum arg)
736737 }
737738 break ;
738739 case MSG_ABORTED :
739- Assert (ts -> nVotes < Mtm -> nNodes );
740+ Assert (ts -> nVotes < Mtm -> nLiveNodes );
740741 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
741742 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
742743 MtmAbortTransaction (ts );
743744 }
744- if (++ ts -> nVotes == Mtm -> nNodes ) {
745+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
745746 MtmWakeUpBackend (ts );
746747 }
747748 break ;
748749 case MSG_PREPARED :
749750 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
750- Assert (ts -> nVotes < Mtm -> nNodes );
751+ Assert (ts -> nVotes < Mtm -> nLiveNodes );
751752 if (msg -> csn > ts -> csn ) {
752753 ts -> csn = msg -> csn ;
753754 MtmSyncClock (ts -> csn );
754755 }
755- if (++ ts -> nVotes == Mtm -> nNodes ) {
756+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
756757 ts -> csn = MtmAssignCSN ();
757758 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
758759 MtmWakeUpBackend (ts );
0 commit comments