@@ -366,14 +366,16 @@ static void MtmCheckHeartbeat()
366366}
367367
368368
369- static int MtmConnectSocket (char const * host , int port , int max_attempts )
369+ static int MtmConnectSocket (char const * host , int port , int timeout )
370370{
371371 struct sockaddr_in sock_inet ;
372372 unsigned addrs [MAX_ROUTES ];
373373 unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
374374 MtmHandshakeMessage req ;
375375 MtmArbiterMessage resp ;
376376 int sd ;
377+ timestamp_t start = MtmGetSystemTime ();
378+
377379
378380 sock_inet .sin_family = AF_INET ;
379381 sock_inet .sin_port = htons (port );
@@ -390,7 +392,10 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
390392 if (sd < 0 ) {
391393 elog (ERROR , "Arbiter failed to create socket: %d" , errno );
392394 }
393- fcntl (sd , F_SETFL , O_NONBLOCK );
395+ rc = fcntl (sd , F_SETFL , O_NONBLOCK );
396+ if (rc < 0 ) {
397+ elog (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
398+ }
394399 busy_socket = sd ;
395400 for (i = 0 ; i < n_addrs ; ++ i ) {
396401 memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
@@ -405,17 +410,19 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
405410 if (rc == 0 ) {
406411 break ;
407412 }
408- if (errno != EINPROGRESS || max_attempts == 0 ) {
413+ if (errno != EINPROGRESS || start + MSEC_TO_USEC ( timeout ) < MtmGetSystemTime () ) {
409414 elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
410415 busy_socket = -1 ;
416+ close (sd );
411417 return -1 ;
412418 } else {
413- rc = MtmWaitSocket (sd , true, MtmConnectTimeout );
419+ rc = MtmWaitSocket (sd , true, MtmHeartbeatSendTimeout );
414420 if (rc == 1 ) {
415421 socklen_t optlen = sizeof (int );
416422 if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
417423 elog (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
418424 busy_socket = -1 ;
425+ close (sd );
419426 return -1 ;
420427 }
421428 if (rc == 0 ) {
@@ -426,8 +433,8 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
426433 } else {
427434 elog (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
428435 }
429- max_attempts -= 1 ;
430- MtmSleep (MSEC_TO_USEC (MtmConnectTimeout ));
436+ close ( sd ) ;
437+ MtmSleep (MSEC_TO_USEC (MtmHeartbeatSendTimeout ));
431438 }
432439 }
433440 MtmSetSocketOptions (sd );
@@ -479,7 +486,7 @@ static void MtmOpenConnections()
479486 }
480487 for (i = 0 ; i < nNodes ; i ++ ) {
481488 if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
482- sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
489+ sockets [i ] = MtmConnectSocket (Mtm -> nodes [i ].con .hostName , MtmArbiterPort + i + 1 , MtmConnectTimeout );
483490 if (sockets [i ] < 0 ) {
484491 MtmOnNodeDisconnect (i + 1 );
485492 }
@@ -511,7 +518,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
511518 close (sockets [node ]);
512519 sockets [node ] = -1 ;
513520 }
514- sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
521+ sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ].con .hostName , MtmArbiterPort + node + 1 , MtmReconnectTimeout );
515522 if (sockets [node ] < 0 ) {
516523 MtmOnNodeDisconnect (node + 1 );
517524 return false;
0 commit comments