@@ -368,7 +368,7 @@ void MtmCheckHeartbeat()
368368}
369369
370370
371- static int MtmConnectSocket (char const * host , int port , int timeout )
371+ static int MtmConnectSocket (int node , int port , int timeout )
372372{
373373 struct sockaddr_in sock_inet ;
374374 unsigned addrs [MAX_ROUTES ];
@@ -377,7 +377,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
377377 MtmArbiterMessage resp ;
378378 int sd ;
379379 timestamp_t start = MtmGetSystemTime ();
380-
380+ char const * host = Mtm -> nodes [ node ]. con . hostName ;
381381
382382 sock_inet .sin_family = AF_INET ;
383383 sock_inet .sin_port = htons (port );
@@ -446,6 +446,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
446446 req .hdr .sxid = ShmemVariableCache -> nextXid ;
447447 req .hdr .csn = MtmGetCurrentTime ();
448448 req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
449+ req .hdr .seqno = Mtm -> nodes [node ].recvSeqNo ;
449450 strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
450451 if (!MtmWriteSocket (sd , & req , sizeof req )) {
451452 elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
@@ -464,7 +465,9 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
464465 }
465466
466467 MtmLock (LW_EXCLUSIVE );
467- Mtm -> nodes [resp .node - 1 ].sendSeqNo = resp .seqno ;
468+ if (Mtm -> nodes [resp .node - 1 ].sendSeqNo < resp .seqno ) {
469+ Mtm -> nodes [resp .node - 1 ].sendSeqNo = resp .seqno ;
470+ }
468471
469472 /* Some node considered that I am dead, so switch to recovery mode */
470473 if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
@@ -499,7 +502,7 @@ static void MtmOpenConnections()
499502 } else {
500503 arbiterPort = MtmArbiterPort + i + 1 ;
501504 }
502- sockets [i ] = MtmConnectSocket (Mtm -> nodes [ i ]. con . hostName , arbiterPort , MtmConnectTimeout );
505+ sockets [i ] = MtmConnectSocket (i , arbiterPort , MtmConnectTimeout );
503506 if (sockets [i ] < 0 ) {
504507 MtmOnNodeDisconnect (i + 1 );
505508 }
@@ -531,7 +534,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
531534 close (sockets [node ]);
532535 sockets [node ] = -1 ;
533536 }
534- sockets [node ] = MtmConnectSocket (Mtm -> nodes [ node ]. con . hostName , MtmArbiterPort + node + 1 , MtmReconnectTimeout );
537+ sockets [node ] = MtmConnectSocket (node , MtmArbiterPort + node + 1 , MtmReconnectTimeout );
535538 if (sockets [node ] < 0 ) {
536539 MtmOnNodeDisconnect (node + 1 );
537540 return false;
@@ -579,6 +582,9 @@ static void MtmAcceptOneConnection()
579582 resp .csn = MtmGetCurrentTime ();
580583 resp .node = MtmNodeId ;
581584 resp .seqno = Mtm -> nodes [req .hdr .node - 1 ].recvSeqNo ;
585+ if (Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo < req .hdr .seqno ) {
586+ Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo = req .hdr .seqno ;
587+ }
582588 MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req .hdr .node - 1 ].con , req .connStr );
583589 if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
584590 elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
0 commit comments