@@ -195,7 +195,7 @@ static void MtmRegisterSocket(int fd, int node)
195195 ev .events = EPOLLIN ;
196196 ev .data .u32 = node ;
197197 if (epoll_ctl (epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
198- elog (ERROR , "Arbiter failed to add socket to epoll set: %d" , errno );
198+ elog (LOG , "Arbiter failed to add socket to epoll set: %d" , errno );
199199 }
200200#else
201201 FD_SET (fd , & inset );
@@ -209,7 +209,7 @@ static void MtmUnregisterSocket(int fd)
209209{
210210#if USE_EPOLL
211211 if (epoll_ctl (epollfd , EPOLL_CTL_DEL , fd , NULL ) < 0 ) {
212- elog (ERROR , "Arbiter failed to unregister socket from epoll set: %d" , errno );
212+ elog (LOG , "Arbiter failed to unregister socket from epoll set: %d" , errno );
213213 }
214214#else
215215 FD_CLR (fd , & inset );
@@ -266,12 +266,17 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266266
267267static int MtmReadSocket (int sd , void * buf , int buf_size )
268268{
269- int rc = recv (sd , buf , buf_size , 0 );
270- if (rc <= 0 ) {
271- Assert (errno != EINTR ); /* should not happen in non-blocking call */
272- return -1 ;
269+ int rc = MtmWaitSocket (sd , false, MtmHeartbeatSendTimeout );
270+ if (rc == 1 ) {
271+ int rc = recv (sd , buf , buf_size , 0 );
272+ if (rc <= 0 ) {
273+ Assert (errno != EINTR ); /* should not happen in non-blocking call */
274+ return -1 ;
275+ }
276+ return rc ;
277+ } else {
278+ return 0 ;
273279 }
274- return rc ;
275280}
276281
277282
@@ -325,7 +330,16 @@ static void MtmSetSocketOptions(int sd)
325330#endif
326331}
327332
328-
333+ static void MtmCheckResponse (MtmArbiterMessage * resp )
334+ {
335+ if (BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )) {
336+ elog (WARNING , "Node %d thinks that I was dead, while I am %s" , resp -> node , MtmNodeStatusMnem [Mtm -> status ]);
337+ if (Mtm -> status != MTM_RECOVERY ) {
338+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
339+ MtmSwitchClusterMode (MTM_RECOVERY );
340+ }
341+ }
342+ }
329343
330344static void MtmScheduleHeartbeat ()
331345{
@@ -347,7 +361,8 @@ static void MtmSendHeartbeat()
347361
348362 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
349363 {
350- if (sockets [i ] >= 0 && sockets [i ] != busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
364+ if (i + 1 != MtmNodeId && sockets [i ] != busy_socket
365+ && ((sockets [i ] >= 0 && !BIT_CHECK (Mtm -> disabledNodeMask , i )) || BIT_CHECK (Mtm -> reconnectMask , i )))
351366 {
352367 if (!MtmSendToNode (i , & msg , sizeof (msg ))) {
353368 elog (LOG , "Arbiter failed to send heartbeat to node %d" , i + 1 );
@@ -382,7 +397,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
382397 sock_inet .sin_port = htons (port );
383398
384399 if (!MtmResolveHostByName (host , addrs , & n_addrs )) {
385- elog (ERROR , "Arbiter failed to resolve host '%s' by name" , host );
400+ elog (LOG , "Arbiter failed to resolve host '%s' by name" , host );
401+ return -1 ;
386402 }
387403
388404 Retry :
@@ -391,11 +407,13 @@ static int MtmConnectSocket(int node, int port, int timeout)
391407
392408 sd = socket (AF_INET , SOCK_STREAM , 0 );
393409 if (sd < 0 ) {
394- elog (ERROR , "Arbiter failed to create socket: %d" , errno );
410+ elog (LOG , "Arbiter failed to create socket: %d" , errno );
411+ return -1 ;
395412 }
396413 rc = fcntl (sd , F_SETFL , O_NONBLOCK );
397414 if (rc < 0 ) {
398- elog (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
415+ elog (LOG , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
416+ return -1 ;
399417 }
400418 busy_socket = sd ;
401419 for (i = 0 ; i < n_addrs ; ++ i ) {
@@ -463,14 +481,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
463481 }
464482
465483 MtmLock (LW_EXCLUSIVE );
466-
467- /* Some node considered that I am dead, so switch to recovery mode */
468- if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
469- elog (WARNING , "Node %d thinks that I was dead" , resp .node );
470- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
471- MtmRollbackAllPreparedTransactions ();
472- MtmSwitchClusterMode (MTM_RECOVERY );
473- }
484+ MtmCheckResponse (& resp );
474485 MtmUnlock ();
475486
476487 return sd ;
@@ -493,7 +504,7 @@ static void MtmOpenConnections()
493504 char const * arbiterPortStr = strstr (Mtm -> nodes [i ].con .connStr , "arbiterport=" );
494505 if (arbiterPortStr != NULL ) {
495506 if (sscanf (arbiterPortStr + 12 , "%d" , & arbiterPort ) != 1 ) {
496- elog (ERROR , "Invalid arbiter port: %s" , arbiterPortStr + 12 );
507+ elog (ERROR , "Invalid arbiter port: %s" , arbiterPortStr + 12 );
497508 }
498509 } else {
499510 arbiterPort = MtmArbiterPort + i + 1 ;
@@ -518,11 +529,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
518529 while (true) {
519530 if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
520531 elog (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
532+ close (sockets [node ]);
533+ sockets [node ] = -1 ;
534+ }
535+ if (BIT_CHECK (Mtm -> reconnectMask , node )) {
521536 MtmLock (LW_EXCLUSIVE );
522537 BIT_CLEAR (Mtm -> reconnectMask , node );
523538 MtmUnlock ();
524- close (sockets [node ]);
525- sockets [node ] = -1 ;
526539 }
527540 if (sockets [node ] < 0 || !MtmWriteSocket (sockets [node ], buf , size )) {
528541 if (sockets [node ] >= 0 ) {
@@ -535,9 +548,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
535548 MtmOnNodeDisconnect (node + 1 );
536549 return false;
537550 }
538- MtmLock (LW_EXCLUSIVE );
539- BIT_CLEAR (Mtm -> reconnectMask , node );
540- MtmUnlock ();
541551 MTM_LOG3 ("Arbiter restablished connection with node %d" , node + 1 );
542552 } else {
543553 return true;
@@ -563,14 +573,23 @@ static void MtmAcceptOneConnection()
563573 } else {
564574 MtmHandshakeMessage req ;
565575 MtmArbiterMessage resp ;
566- int rc = MtmReadSocket (fd , & req , sizeof req );
576+ int rc = fcntl (fd , F_SETFL , O_NONBLOCK );
577+ if (rc < 0 ) {
578+ elog (ERROR , "Arbiter failed to switch socket to non-blocking mode: %d" , errno );
579+ }
580+ rc = MtmReadSocket (fd , & req , sizeof req );
567581 if (rc < sizeof (req )) {
568582 elog (WARNING , "Arbiter failed to handshake socket: %d, errno=%d" , rc , errno );
569583 } else if (req .hdr .code != MSG_HANDSHAKE && req .hdr .dxid != HANDSHAKE_MAGIC ) {
570584 elog (WARNING , "Arbiter get unexpected handshake message %d" , req .hdr .code );
571585 close (fd );
572586 } else {
573587 Assert (req .hdr .node > 0 && req .hdr .node <= Mtm -> nAllNodes && req .hdr .node != MtmNodeId );
588+
589+ MtmLock (LW_EXCLUSIVE );
590+ MtmCheckResponse (& req .hdr );
591+ MtmUnlock ();
592+
574593 resp .code = MSG_STATUS ;
575594 resp .disabledNodeMask = Mtm -> disabledNodeMask ;
576595 resp .dxid = HANDSHAKE_MAGIC ;
@@ -726,6 +745,7 @@ static void MtmTransSender(Datum arg)
726745 CHECK_FOR_INTERRUPTS ();
727746 }
728747 elog (LOG , "Stop arbiter sender %d" , MyProcPid );
748+ proc_exit (1 ); /* force restart of this bgwroker */
729749}
730750
731751
@@ -863,9 +883,7 @@ static void MtmTransReceiver(Datum arg)
863883 elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , msg -> node );
864884 continue ;
865885 }
866- if (BIT_CHECK (msg -> disabledNodeMask , MtmNodeId - 1 ) && Mtm -> status != MTM_RECOVERY ) {
867- elog (PANIC , "Node %d thinks that I was dead: perform hara-kiri not to be a zombie" , msg -> node );
868- }
886+ MtmCheckResponse (msg );
869887
870888 if (MtmIsCoordinator (ts )) {
871889 switch (msg -> code ) {
@@ -975,5 +993,6 @@ static void MtmTransReceiver(Datum arg)
975993 MtmRefreshClusterStatus (false);
976994 }
977995 }
996+ proc_exit (1 ); /* force restart of this bgwroker */
978997}
979998
0 commit comments