7373
7474#include "multimaster.h"
7575
76- #define MAX_ROUTES 16
77- #define BUFFER_SIZE 1024
78- #define HANDSHAKE_MAGIC 0xCAFEDEED
76+ #define MAX_ROUTES 16
77+ #define INIT_BUFFER_SIZE 1024
78+ #define HANDSHAKE_MAGIC 0xCAFEDEED
7979
8080typedef struct
8181{
@@ -97,7 +97,8 @@ typedef struct
9797typedef struct
9898{
9999 int used ;
100- MtmArbiterMessage data [BUFFER_SIZE ];
100+ int size ;
101+ MtmArbiterMessage * data ;
101102} MtmBuffer ;
102103
103104static int * sockets ;
@@ -448,11 +449,14 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
448449 }
449450
450451 /* Some node considered that I am dead, so switch to recovery mode */
452+ MtmLock (LW_EXCLUSIVE );
451453 if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
452454 elog (WARNING , "Node %d thinks that I was dead" , resp .node );
453455 BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
454456 MtmSwitchClusterMode (MTM_RECOVERY );
455457 }
458+ MtmUnlock ();
459+
456460 return sd ;
457461}
458462
@@ -489,7 +493,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
489493 while (true) {
490494 if (sockets [node ] >= 0 && BIT_CHECK (Mtm -> reconnectMask , node )) {
491495 elog (WARNING , "Arbiter is forced to reconnect to node %d" , node + 1 );
496+ MtmLock (LW_EXCLUSIVE );
492497 BIT_CLEAR (Mtm -> reconnectMask , node );
498+ MtmUnlock ();
493499 close (sockets [node ]);
494500 sockets [node ] = -1 ;
495501 }
@@ -504,7 +510,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
504510 MtmOnNodeDisconnect (node + 1 );
505511 return false;
506512 }
513+ MtmLock (LW_EXCLUSIVE );
507514 BIT_CLEAR (Mtm -> reconnectMask , node );
515+ MtmUnlock ();
508516 MTM_LOG3 ("Arbiter restablished connection with node %d" , node + 1 );
509517 } else {
510518 return true;
@@ -550,7 +558,6 @@ static void MtmAcceptOneConnection()
550558 close (fd );
551559 } else {
552560 MTM_LOG1 ("Arbiter established connection with node %d" , req .hdr .node );
553- BIT_CLEAR (Mtm -> connectivityMask , req .hdr .node - 1 );
554561 MtmRegisterSocket (fd , req .hdr .node - 1 );
555562 sockets [req .hdr .node - 1 ] = fd ;
556563 MtmOnNodeConnect (req .hdr .node );
@@ -596,12 +603,9 @@ static void MtmAcceptIncomingConnections()
596603static void MtmAppendBuffer (MtmBuffer * txBuffer , TransactionId xid , int node , MtmTransState * ts )
597604{
598605 MtmBuffer * buf = & txBuffer [node ];
599- if (buf -> used == BUFFER_SIZE ) {
600- if (!MtmSendToNode (node , buf -> data , buf -> used * sizeof (MtmArbiterMessage ))) {
601- buf -> used = 0 ;
602- return ;
603- }
604- buf -> used = 0 ;
606+ if (buf -> used == buf -> size ) {
607+ buf -> size = buf -> size ? buf -> size * 2 : INIT_BUFFER_SIZE ;
608+ buf -> data = repalloc (buf -> data , buf -> size * sizeof (MtmArbiterMessage ));
605609 }
606610 buf -> data [buf -> used ].dxid = xid ;
607611
@@ -638,7 +642,7 @@ static void MtmTransSender(Datum arg)
638642 int nNodes = MtmMaxNodes ;
639643 int i ;
640644
641- MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
645+ MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
642646
643647 InitializeTimeouts ();
644648
@@ -653,10 +657,6 @@ static void MtmTransSender(Datum arg)
653657
654658 MtmOpenConnections ();
655659
656- for (i = 0 ; i < nNodes ; i ++ ) {
657- txBuffer [i ].used = 0 ;
658- }
659-
660660 while (!stop ) {
661661 MtmTransState * ts ;
662662 PGSemaphoreLock (& Mtm -> votingSemaphore );
@@ -721,7 +721,7 @@ static void MtmTransReceiver(Datum arg)
721721 int nNodes = MtmMaxNodes ;
722722 int nResponses ;
723723 int i , j , n , rc ;
724- MtmBuffer * rxBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
724+ MtmBuffer * rxBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
725725 timestamp_t lastHeartbeatCheck = MtmGetSystemTime ();
726726 timestamp_t now ;
727727
@@ -742,7 +742,8 @@ static void MtmTransReceiver(Datum arg)
742742 MtmAcceptIncomingConnections ();
743743
744744 for (i = 0 ; i < nNodes ; i ++ ) {
745- rxBuffer [i ].used = 0 ;
745+ rxBuffer [i ].size = INIT_BUFFER_SIZE ;
746+ rxBuffer [i ].data = palloc (INIT_BUFFER_SIZE * sizeof (MtmArbiterMessage ));
746747 }
747748
748749 while (!stop ) {
@@ -786,7 +787,7 @@ static void MtmTransReceiver(Datum arg)
786787 continue ;
787788 }
788789
789- rc = MtmReadFromNode (i , (char * )rxBuffer [i ].data + rxBuffer [i ].used , BUFFER_SIZE - rxBuffer [i ].used );
790+ rc = MtmReadFromNode (i , (char * )rxBuffer [i ].data + rxBuffer [i ].used , rxBuffer [ i ]. size - rxBuffer [i ].used );
790791 if (rc <= 0 ) {
791792 continue ;
792793 }
0 commit comments