@@ -87,7 +87,6 @@ typedef struct
8787 csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
8888 nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
8989 csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
90- uint64 seqno ;/* Message sequence number (used to eliminate duplicated messages) */
9190} MtmArbiterMessage ;
9291
9392typedef struct
@@ -446,7 +445,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
446445 req .hdr .sxid = ShmemVariableCache -> nextXid ;
447446 req .hdr .csn = MtmGetCurrentTime ();
448447 req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
449- req .hdr .seqno = Mtm -> nodes [node ].recvSeqNo ;
450448 strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
451449 if (!MtmWriteSocket (sd , & req , sizeof req )) {
452450 elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
@@ -465,9 +463,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
465463 }
466464
467465 MtmLock (LW_EXCLUSIVE );
468- if (Mtm -> nodes [resp .node - 1 ].sendSeqNo < resp .seqno ) {
469- Mtm -> nodes [resp .node - 1 ].sendSeqNo = resp .seqno ;
470- }
471466
472467 /* Some node considered that I am dead, so switch to recovery mode */
473468 if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
@@ -582,10 +577,6 @@ static void MtmAcceptOneConnection()
582577 resp .sxid = ShmemVariableCache -> nextXid ;
583578 resp .csn = MtmGetCurrentTime ();
584579 resp .node = MtmNodeId ;
585- resp .seqno = Mtm -> nodes [req .hdr .node - 1 ].recvSeqNo ;
586- if (Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo < req .hdr .seqno ) {
587- Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo = req .hdr .seqno ;
588- }
589580 MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req .hdr .node - 1 ].con , req .connStr );
590581 if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
591582 elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
@@ -651,7 +642,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
651642 MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
652643 messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
653644 Assert (ts -> cmd != MSG_INVALID );
654- buf -> data [buf -> used ].seqno = ++ Mtm -> nodes [node ].sendSeqNo ;
655645 buf -> data [buf -> used ].code = ts -> cmd ;
656646 buf -> data [buf -> used ].sxid = ts -> xid ;
657647 buf -> data [buf -> used ].csn = ts -> csn ;
@@ -868,12 +858,6 @@ static void MtmTransReceiver(Datum arg)
868858 elog (WARNING , "Ignore message from dead node %d\n" , msg -> node );
869859 continue ;
870860 }
871- if (msg -> seqno <= Mtm -> nodes [msg -> node - 1 ].recvSeqNo ) {
872- elog (WARNING , "Ignore duplicated message %ld (<=%ld) from node %d" , msg -> seqno , Mtm -> nodes [msg -> node - 1 ].recvSeqNo , msg -> node );
873- continue ;
874- }
875- Mtm -> nodes [msg -> node - 1 ].recvSeqNo = msg -> seqno ;
876-
877861 ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
878862 if (ts == NULL ) {
879863 elog (WARNING , "Ignore response for unexisted transaction %d from node %d" , msg -> dxid , msg -> node );
0 commit comments