@@ -107,7 +107,7 @@ static int gateway;
107107static bool send_heartbeat ;
108108static timestamp_t last_sent_heartbeat ;
109109static TimeoutId heartbeat_timer ;
110- static int busy_socket ;
110+ static nodemask_t busy_mask ;
111111
112112static void MtmTransSender (Datum arg );
113113static void MtmTransReceiver (Datum arg );
@@ -243,23 +243,19 @@ static int MtmWaitSocket(int sd, bool forWrite, time_t timeoutMsec)
243243static bool MtmWriteSocket (int sd , void const * buf , int size )
244244{
245245 char * src = (char * )buf ;
246- busy_socket = sd ;
247246 while (size != 0 ) {
248247 int rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
249248 if (rc == 1 ) {
250249 while ((rc = send (sd , src , size , 0 )) < 0 && errno == EINTR );
251250 if (rc < 0 ) {
252- busy_socket = -1 ;
253251 return false;
254252 }
255253 size -= rc ;
256254 src += rc ;
257255 } else if (rc < 0 ) {
258- busy_socket = -1 ;
259256 return false;
260257 }
261258 }
262- busy_socket = -1 ;
263259 return true;
264260}
265261
@@ -271,8 +267,6 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
271267 rc = MtmWaitSocket (sd , false, MtmHeartbeatSendTimeout );
272268 if (rc == 1 ) {
273269 while ((rc = recv (sd , buf , buf_size , 0 )) < 0 && errno == EINTR );
274- } else {
275- return 0 ;
276270 }
277271 }
278272 return rc ;
@@ -365,7 +359,7 @@ static void MtmSendHeartbeat()
365359
366360 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
367361 {
368- if (i + 1 != MtmNodeId && sockets [ i ] != busy_socket
362+ if (i + 1 != MtmNodeId && ! BIT_CHECK ( busy_mask , i )
369363 && (Mtm -> status != MTM_ONLINE
370364 || (sockets [i ] >= 0 && !BIT_CHECK (Mtm -> disabledNodeMask , i ) && !BIT_CHECK (Mtm -> reconnectMask , i ))))
371365 {
@@ -399,6 +393,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
399393 int sd ;
400394 timestamp_t start = MtmGetSystemTime ();
401395 char const * host = Mtm -> nodes [node ].con .hostName ;
396+ nodemask_t save_mask = busy_mask ;
402397
403398 sock_inet .sin_family = AF_INET ;
404399 sock_inet .sin_port = htons (port );
@@ -407,22 +402,24 @@ static int MtmConnectSocket(int node, int port, int timeout)
407402 elog (LOG , "Arbiter failed to resolve host '%s' by name" , host );
408403 return -1 ;
409404 }
410-
405+ BIT_SET (busy_mask , node );
406+
411407 Retry :
412408 while (1 ) {
413409 int rc = -1 ;
414410
415411 sd = socket (AF_INET , SOCK_STREAM , 0 );
416412 if (sd < 0 ) {
417413 elog (LOG , "Arbiter failed to create socket: %d" , errno );
414+ busy_mask = save_mask ;
418415 return -1 ;
419416 }
420417 rc = fcntl (sd , F_SETFL , O_NONBLOCK );
421418 if (rc < 0 ) {
422419 elog (LOG , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
420+ busy_mask = save_mask ;
423421 return -1 ;
424422 }
425- busy_socket = sd ;
426423 for (i = 0 ; i < n_addrs ; ++ i ) {
427424 memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
428425 do {
@@ -438,17 +435,17 @@ static int MtmConnectSocket(int node, int port, int timeout)
438435 }
439436 if (errno != EINPROGRESS || start + MSEC_TO_USEC (timeout ) < MtmGetSystemTime ()) {
440437 elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
441- busy_socket = -1 ;
442438 close (sd );
439+ busy_mask = save_mask ;
443440 return -1 ;
444441 } else {
445442 rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
446443 if (rc == 1 ) {
447444 socklen_t optlen = sizeof (int );
448445 if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
449446 elog (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
450- busy_socket = -1 ;
451447 close (sd );
448+ busy_mask = save_mask ;
452449 return -1 ;
453450 }
454451 if (rc == 0 ) {
@@ -491,6 +488,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
491488 MtmCheckResponse (& resp );
492489 MtmUnlock ();
493490
491+ busy_mask = save_mask ;
492+
494493 return sd ;
495494}
496495
@@ -533,6 +532,9 @@ static void MtmOpenConnections()
533532
534533static bool MtmSendToNode (int node , void const * buf , int size )
535534{
535+ bool result = true;
536+ nodemask_t save_mask = busy_mask ;
537+ BIT_SET (busy_mask , node );
536538 while (true) {
537539 if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
538540 elog (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
@@ -553,13 +555,17 @@ static bool MtmSendToNode(int node, void const* buf, int size)
553555 sockets [node ] = MtmConnectSocket (node , MtmArbiterPort + node + 1 , MtmReconnectTimeout );
554556 if (sockets [node ] < 0 ) {
555557 MtmOnNodeDisconnect (node + 1 );
556- return false;
558+ result = false;
559+ break ;
557560 }
558561 MTM_LOG3 ("Arbiter restablished connection with node %d" , node + 1 );
559562 } else {
560- return true;
563+ result = true;
564+ break ;
561565 }
562566 }
567+ busy_mask = save_mask ;
568+ return result ;
563569}
564570
565571static int MtmReadFromNode (int node , void * buf , int buf_size )
0 commit comments