@@ -82,20 +82,25 @@ typedef struct
8282 int node ; /* Sender node ID */
8383 TransactionId dxid ; /* Transaction ID at destination node */
8484 TransactionId sxid ; /* Transaction ID at sender node */
85- csn_t csn ; /* local CSN in case of sending data from replica to master, global CSN master->replica */
86- nodemask_t disabledNodeMask ; /* bitmask of disabled nodes at the sender of message */
85+ csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
86+ nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
87+ csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
8788} MtmArbiterMessage ;
8889
90+ typedef struct
91+ {
92+ MtmArbiterMessage hdr ;
93+ char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
94+ } MtmHandshakeMessage ;
95+
8996typedef struct
9097{
9198 int used ;
9299 MtmArbiterMessage data [BUFFER_SIZE ];
93100} MtmBuffer ;
94101
95102static int * sockets ;
96- static char * * hosts ;
97103static int gateway ;
98- static MtmState * ds ;
99104
100105static void MtmTransSender (Datum arg );
101106static void MtmTransReceiver (Datum arg );
@@ -266,39 +271,41 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
266271 continue ;
267272 } else {
268273 int optval = 1 ;
269- MtmArbiterMessage msg ;
274+ MtmHandshakeMessage req ;
275+ MtmArbiterMessage resp ;
270276 setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
271277 setsockopt (sd , SOL_SOCKET , SO_KEEPALIVE , (char const * )& optval , sizeof (optval ));
272278
273- msg .code = MSG_HANDSHAKE ;
274- msg .node = MtmNodeId ;
275- msg .dxid = HANDSHAKE_MAGIC ;
276- msg .sxid = ShmemVariableCache -> nextXid ;
277- msg .csn = MtmGetCurrentTime ();
278- msg .disabledNodeMask = ds -> disabledNodeMask ;
279- if (!MtmWriteSocket (sd , & msg , sizeof msg )) {
279+ req .hdr .code = MSG_HANDSHAKE ;
280+ req .hdr .node = MtmNodeId ;
281+ req .hdr .dxid = HANDSHAKE_MAGIC ;
282+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
283+ req .hdr .csn = MtmGetCurrentTime ();
284+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
285+ strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].connStr );
286+ if (!MtmWriteSocket (sd , & req , sizeof req )) {
280287 elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
281288 close (sd );
282289 goto Retry ;
283290 }
284- if (MtmReadSocket (sd , & msg , sizeof msg ) != sizeof (msg )) {
291+ if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
285292 elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
286293 close (sd );
287294 goto Retry ;
288295 }
289- if (msg .code != MSG_STATUS || msg .dxid != HANDSHAKE_MAGIC ) {
290- elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , msg .code , host , port );
296+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
297+ elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
291298 close (sd );
292299 goto Retry ;
293300 }
294301
295302 /* Some node considered that I am dead, so switch to recovery mode */
296- if (BIT_CHECK (msg .disabledNodeMask , MtmNodeId - 1 )) {
297- elog (WARNING , "Node %d think that I am dead" , msg .node );
303+ if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
304+ elog (WARNING , "Node %d think that I am dead" , resp .node );
298305 MtmSwitchClusterMode (MTM_RECOVERY );
299306 }
300307 /* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
301- ds -> disabledNodeMask |= msg .disabledNodeMask ;
308+ Mtm -> disabledNodeMask |= resp .disabledNodeMask ;
302309 return sd ;
303310 }
304311 }
@@ -309,39 +316,23 @@ static void MtmOpenConnections()
309316{
310317 int nNodes = MtmNodes ;
311318 int i ;
312- char * connStr = pstrdup (MtmConnStrs );
313319
314320 sockets = (int * )palloc (sizeof (int )* nNodes );
315- hosts = (char * * )palloc (sizeof (char * )* nNodes );
316321
317322 for (i = 0 ; i < nNodes ; i ++ ) {
318- char * host = strstr (connStr , "host=" );
319- char * end ;
320- if (host == NULL ) {
321- elog (ERROR , "Invalid connection string: '%s'" , MtmConnStrs );
322- }
323- host += 5 ;
324- for (end = host ; * end != ' ' && * end != ',' && * end != '\0' ; end ++ );
325- if (* end != '\0' ) {
326- * end = '\0' ;
327- connStr = end + 1 ;
328- } else {
329- connStr = end ;
330- }
331- hosts [i ] = host ;
332323 if (i + 1 != MtmNodeId ) {
333- sockets [i ] = MtmConnectSocket (host , MtmArbiterPort + i + 1 , MtmConnectAttempts );
324+ sockets [i ] = MtmConnectSocket (Mtm -> nodes [ i ]. hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
334325 if (sockets [i ] < 0 ) {
335326 MtmOnNodeDisconnect (i + 1 );
336327 }
337328 } else {
338329 sockets [i ] = -1 ;
339330 }
340331 }
341- if (ds -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
342- elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible" , ds -> nNodes , MtmNodes );
343- ds -> status = MTM_OFFLINE ;
344- } else if (ds -> status == MTM_INITIALIZATION ) {
332+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
333+ elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible" , Mtm -> nNodes , MtmNodes );
334+ Mtm -> status = MTM_OFFLINE ;
335+ } else if (Mtm -> status == MTM_INITIALIZATION ) {
345336 MtmSwitchClusterMode (MTM_CONNECTED );
346337 }
347338}
@@ -354,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
354345 if (sockets [node ] >= 0 ) {
355346 close (sockets [node ]);
356347 }
357- sockets [node ] = MtmConnectSocket (hosts [node ], MtmArbiterPort + node + 1 , MtmReconnectAttempts );
348+ sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ]. hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
358349 if (sockets [node ] < 0 ) {
359350 MtmOnNodeDisconnect (node + 1 );
360351 return false;
@@ -379,29 +370,31 @@ static void MtmAcceptOneConnection()
379370 if (fd < 0 ) {
380371 elog (WARNING , "Arbiter failed to accept socket: %d" , errno );
381372 } else {
382- MtmArbiterMessage msg ;
383- int rc = MtmReadSocket (fd , & msg , sizeof msg );
384- if (rc < sizeof (msg )) {
373+ MtmHandshakeMessage req ;
374+ MtmArbiterMessage resp ;
375+ int rc = MtmReadSocket (fd , & req , sizeof req );
376+ if (rc < sizeof (req )) {
385377 elog (WARNING , "Arbiter failed to handshake socket: %d, errno=%d" , rc , errno );
386- } else if (msg . code != MSG_HANDSHAKE && msg .dxid != HANDSHAKE_MAGIC ) {
387- elog (WARNING , "Arbiter get unexpected handshake message %d" , msg .code );
378+ } else if (req . hdr . code != MSG_HANDSHAKE && req . hdr .dxid != HANDSHAKE_MAGIC ) {
379+ elog (WARNING , "Arbiter get unexpected handshake message %d" , req . hdr .code );
388380 close (fd );
389381 } else {
390- Assert (msg .node > 0 && msg .node <= MtmNodes && msg .node != MtmNodeId );
391- msg .code = MSG_STATUS ;
392- msg .disabledNodeMask = ds -> disabledNodeMask ;
393- msg .dxid = HANDSHAKE_MAGIC ;
394- msg .sxid = ShmemVariableCache -> nextXid ;
395- msg .csn = MtmGetCurrentTime ();
396- if (!MtmWriteSocket (fd , & msg , sizeof msg )) {
397- elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , msg .node );
382+ Assert (req .hdr .node > 0 && req .hdr .node <= MtmNodes && req .hdr .node != MtmNodeId );
383+ resp .code = MSG_STATUS ;
384+ resp .disabledNodeMask = Mtm -> disabledNodeMask ;
385+ resp .dxid = HANDSHAKE_MAGIC ;
386+ resp .sxid = ShmemVariableCache -> nextXid ;
387+ resp .csn = MtmGetCurrentTime ();
388+ MtmUpdateNodeConnStr (req .hdr .node , req .connStr );
389+ if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
390+ elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
398391 close (fd );
399392 } else {
400- elog (NOTICE , "Arbiter established connection with node %d" , msg .node );
401- BIT_CLEAR (ds -> connectivityMask , msg .node - 1 );
402- MtmRegisterSocket (fd , msg .node - 1 );
403- sockets [msg .node - 1 ] = fd ;
404- MtmOnNodeConnect (msg .node );
393+ elog (NOTICE , "Arbiter established connection with node %d" , req . hdr .node );
394+ BIT_CLEAR (Mtm -> connectivityMask , req . hdr .node - 1 );
395+ MtmRegisterSocket (fd , req . hdr .node - 1 );
396+ sockets [req . hdr .node - 1 ] = fd ;
397+ MtmOnNodeConnect (req . hdr .node );
405398 }
406399 }
407400 }
@@ -415,7 +408,9 @@ static void MtmAcceptIncomingConnections()
415408 int i ;
416409
417410 sockets = (int * )palloc (sizeof (int )* MtmNodes );
418-
411+ for (i = 0 ; i < MtmNodes ; i ++ ) {
412+ sockets [i ] = -1 ;
413+ }
419414 sock_inet .sin_family = AF_INET ;
420415 sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
421416 sock_inet .sin_port = htons (MtmArbiterPort + MtmNodeId );
@@ -461,7 +456,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
461456 buf -> data [buf -> used ].sxid = ts -> xid ;
462457 buf -> data [buf -> used ].csn = ts -> csn ;
463458 buf -> data [buf -> used ].node = MtmNodeId ;
464- buf -> data [buf -> used ].disabledNodeMask = ds -> disabledNodeMask ;
459+ buf -> data [buf -> used ].disabledNodeMask = Mtm -> disabledNodeMask ;
460+ buf -> data [buf -> used ].oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
465461 buf -> used += 1 ;
466462}
467463
@@ -477,7 +473,7 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
477473 n += 1 ;
478474 }
479475 }
480- Assert (n == ds -> nNodes );
476+ Assert (n == Mtm -> nNodes );
481477}
482478
483479
@@ -487,8 +483,6 @@ static void MtmTransSender(Datum arg)
487483 int i ;
488484 MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
489485
490- ds = MtmGetState ();
491-
492486 MtmOpenConnections ();
493487
494488 for (i = 0 ; i < nNodes ; i ++ ) {
@@ -497,7 +491,7 @@ static void MtmTransSender(Datum arg)
497491
498492 while (true) {
499493 MtmTransState * ts ;
500- PGSemaphoreLock (& ds -> votingSemaphore );
494+ PGSemaphoreLock (& Mtm -> votingSemaphore );
501495 CHECK_FOR_INTERRUPTS ();
502496
503497 /*
@@ -506,14 +500,14 @@ static void MtmTransSender(Datum arg)
506500 */
507501 MtmLock (LW_SHARED );
508502
509- for (ts = ds -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
503+ for (ts = Mtm -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
510504 if (MtmIsCoordinator (ts )) {
511505 MtmBroadcastMessage (txBuffer , ts );
512506 } else {
513507 MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
514508 }
515509 }
516- ds -> votingTransactions = NULL ;
510+ Mtm -> votingTransactions = NULL ;
517511
518512 MtmUnlock ();
519513
@@ -573,8 +567,6 @@ static void MtmTransReceiver(Datum arg)
573567 max_fd = 0 ;
574568#endif
575569
576- ds = MtmGetState ();
577-
578570 MtmAcceptIncomingConnections ();
579571
580572 for (i = 0 ; i < nNodes ; i ++ ) {
@@ -613,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
613605 elog (ERROR , "Arbiter failed to select sockets: %d" , errno );
614606 }
615607 for (i = 0 ; i < nNodes ; i ++ ) {
616- if (FD_ISSET (sockets [i ], & events ))
608+ if (sockets [ i ] >= 0 && FD_ISSET (sockets [i ], & events ))
617609#endif
618610 {
619611 if (i + 1 == MtmNodeId ) {
@@ -637,13 +629,24 @@ static void MtmTransReceiver(Datum arg)
637629 MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
638630 Assert (ts != NULL );
639631 Assert (msg -> node > 0 && msg -> node <= nNodes && msg -> node != MtmNodeId );
632+
633+ Mtm -> nodes [msg -> node - 1 ].oldestSnapshot = msg -> oldestSnapshot ;
634+
640635 if (MtmIsCoordinator (ts )) {
641636 switch (msg -> code ) {
642637 case MSG_READY :
643- Assert (ts -> nVotes < ds -> nNodes );
644- ds -> nodeTransDelay [msg -> node - 1 ] += MtmGetCurrentTime () - ts -> csn ;
638+ Assert (ts -> nVotes < Mtm -> nNodes );
639+ Mtm -> nodes [msg -> node - 1 ]. transDelay += MtmGetCurrentTime () - ts -> csn ;
645640 ts -> xids [msg -> node - 1 ] = msg -> sxid ;
646- if (++ ts -> nVotes == ds -> nNodes ) {
641+
642+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
643+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
644+ commit on smaller subset of nodes */
645+ ts -> status = TRANSACTION_STATUS_ABORTED ;
646+ MtmAdjustSubtransactions (ts );
647+ }
648+
649+ if (++ ts -> nVotes == Mtm -> nNodes ) {
647650 /* All nodes are finished their transactions */
648651 if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649652 ts -> nVotes = 1 ; /* I voted myself */
@@ -655,24 +658,24 @@ static void MtmTransReceiver(Datum arg)
655658 }
656659 break ;
657660 case MSG_ABORTED :
658- Assert (ts -> nVotes < ds -> nNodes );
661+ Assert (ts -> nVotes < Mtm -> nNodes );
659662 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
660663 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
661664 ts -> status = TRANSACTION_STATUS_ABORTED ;
662665 MtmAdjustSubtransactions (ts );
663666 }
664- if (++ ts -> nVotes == ds -> nNodes ) {
667+ if (++ ts -> nVotes == Mtm -> nNodes ) {
665668 MtmWakeUpBackend (ts );
666669 }
667670 break ;
668671 case MSG_PREPARED :
669672 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670- Assert (ts -> nVotes < ds -> nNodes );
673+ Assert (ts -> nVotes < Mtm -> nNodes );
671674 if (msg -> csn > ts -> csn ) {
672675 ts -> csn = msg -> csn ;
673676 MtmSyncClock (ts -> csn );
674677 }
675- if (++ ts -> nVotes == ds -> nNodes ) {
678+ if (++ ts -> nVotes == Mtm -> nNodes ) {
676679 ts -> csn = MtmAssignCSN ();
677680 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678681 MtmWakeUpBackend (ts );
@@ -703,7 +706,7 @@ static void MtmTransReceiver(Datum arg)
703706 }
704707 }
705708 }
706- if (n == 0 && ds -> disabledNodeMask != 0 ) {
709+ if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
707710 /* If timeout is expired and there are didabled nodes, then recheck cluster's state */
708711 MtmRefreshClusterStatus (false);
709712 }
0 commit comments