@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289289 elog (ERROR , "Arbiter failed to resolve host '%s' by name" , host );
290290 }
291291
292- Retry :
293-
292+ Retry :
294293 while (1 ) {
295294 int rc = -1 ;
296295
@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384383
385384
386385static bool MtmSendToNode (int node , void const * buf , int size )
387- {
388- while (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
389- elog (WARNING , "Arbiter failed to write to node %d: %d" , node + 1 , errno );
390- if (sockets [node ] >= 0 ) {
386+ {
387+ while (true) {
388+ if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
389+ elog (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
390+ BIT_CLEAR (Mtm -> reconnectMask , node );
391391 close (sockets [node ]);
392+ sockets [node ] = -1 ;
392393 }
393- sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
394- if (sockets [node ] < 0 ) {
395- MtmOnNodeDisconnect (node + 1 );
396- return false;
394+ if (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
395+ if (sockets [node ] >= 0 ) {
396+ elog (WARNING , "Arbiter failed to write to node %d: %d" , node + 1 , errno );
397+ close (sockets [node ]);
398+ }
399+ sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
400+ if (sockets [node ] < 0 ) {
401+ MtmOnNodeDisconnect (node + 1 );
402+ return false;
403+ }
404+ MTM_TRACE ("Arbiter restablished connection with node %d\n" , node + 1 );
405+ } else {
406+ return true;
397407 }
398- elog (NOTICE , "Arbiter restablish connection with node %d" , node + 1 );
399408 }
400- return true;
401409}
402410
403411static int MtmReadFromNode (int node , void * buf , int buf_size )
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477485
478486 sockets [MtmNodeId - 1 ] = gateway ;
479487 MtmRegisterSocket (gateway , MtmNodeId - 1 );
480-
481- for (i = 0 ; i < MtmNodes - 1 ; i ++ ) {
482- MtmAcceptOneConnection ();
483- }
484488}
485489
486490
@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693697 msg -> node , Mtm -> disabledNodeMask , msg -> disabledNodeMask );
694698 ts -> status = TRANSACTION_STATUS_ABORTED ;
695699 MtmAdjustSubtransactions (ts );
700+ Mtm -> nActiveTransactions -= 1 ;
696701 }
697702
698703 if (++ ts -> nVotes == Mtm -> nNodes ) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712717 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
713718 ts -> status = TRANSACTION_STATUS_ABORTED ;
714719 MtmAdjustSubtransactions (ts );
720+ Mtm -> nActiveTransactions -= 1 ;
715721 }
716722 if (++ ts -> nVotes == Mtm -> nNodes ) {
717723 MtmWakeUpBackend (ts );
0 commit comments