@@ -106,10 +106,7 @@ static char const* const messageText[] =
106106 "HANDSHAKE" ,
107107 "READY" ,
108108 "PREPARE" ,
109- "COMMIT" ,
110- "ABORT" ,
111109 "PREPARED" ,
112- "COMMITTED" ,
113110 "ABORTED" ,
114111 "STATUS"
115112};
@@ -456,8 +453,10 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
456453 buf -> used = 0 ;
457454 }
458455 MTM_TRACE ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n" ,
459- ts -> status == TRANSACTION_STATUS_ABORTED ? "abort" : "commit" , ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
460- buf -> data [buf -> used ].code = ts -> status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED ;
456+ messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
457+
458+ Assert (ts -> cmd != MSG_INVALID );
459+ buf -> data [buf -> used ].code = ts -> cmd ;
461460 buf -> data [buf -> used ].dxid = xid ;
462461 buf -> data [buf -> used ].sxid = ts -> xid ;
463462 buf -> data [buf -> used ].csn = ts -> csn ;
@@ -466,6 +465,22 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
466465 buf -> used += 1 ;
467466}
468467
468+ static void MtmBroadcastMessage (MtmBuffer * txBuffer , MtmTransState * ts )
469+ {
470+ int i ;
471+ int n = 1 ;
472+ for (i = 0 ; i < MtmNodes ; i ++ )
473+ {
474+ if (TransactionIdIsValid (ts -> xids [i ])) {
475+ Assert (i + 1 != MtmNodeId );
476+ MtmAppendBuffer (txBuffer , ts -> xids [i ], i , ts );
477+ n += 1 ;
478+ }
479+ }
480+ Assert (n == ds -> nNodes );
481+ }
482+
483+
469484static void MtmTransSender (Datum arg )
470485{
471486 int nNodes = MtmNodes ;
@@ -492,7 +507,11 @@ static void MtmTransSender(Datum arg)
492507 MtmLock (LW_SHARED );
493508
494509 for (ts = ds -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
495- MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
510+ if (MtmIsCoordinator (ts )) {
511+ MtmBroadcastMessage (txBuffer , ts );
512+ } else {
513+ MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
514+ }
496515 }
497516 ds -> votingTransactions = NULL ;
498517
@@ -510,6 +529,7 @@ static void MtmTransSender(Datum arg)
510529static void MtmWakeUpBackend (MtmTransState * ts )
511530{
512531 MTM_TRACE ("Wakeup backed procno=%d, pid=%d\n" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
532+ ts -> votingCompleted = true;
513533 SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
514534}
515535
@@ -565,6 +585,9 @@ static void MtmTransReceiver(Datum arg)
565585#if USE_EPOLL
566586 n = epoll_wait (epollfd , events , nNodes , MtmKeepaliveTimeout /1000 );
567587 if (n < 0 ) {
588+ if (errno == EINTR ) {
589+ continue ;
590+ }
568591 elog (ERROR , "Arbiter failed to poll sockets: %d" , errno );
569592 }
570593 for (j = 0 ; j < n ; j ++ ) {
@@ -581,7 +604,9 @@ static void MtmTransReceiver(Datum arg)
581604 events = inset ;
582605 tv .tv_sec = MtmKeepaliveTimeout /USEC ;
583606 tv .tv_usec = MtmKeepaliveTimeout %USEC ;
584- n = select (max_fd + 1 , & events , NULL , NULL , & tv );
607+ do {
608+ n = select (max_fd + 1 , & events , NULL , NULL , & tv );
609+ } while (n < 0 && errno == ENINTR );
585610 } while (n < 0 && MtmRecovery ());
586611
587612 if (rc < 0 ) {
@@ -612,31 +637,62 @@ static void MtmTransReceiver(Datum arg)
612637 MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
613638 Assert (ts != NULL );
614639 Assert (msg -> node > 0 && msg -> node <= nNodes && msg -> node != MtmNodeId );
615- Assert (MtmIsCoordinator (ts ));
616- switch (msg -> code ) {
617- case MSG_PREPARED :
618- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
619- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS || ts -> status == TRANSACTION_STATUS_UNKNOWN );
620- if (msg -> csn > ts -> csn ) {
621- ts -> csn = msg -> csn ;
622- MtmSyncClock (ts -> csn );
623- }
624- if (++ ts -> nVotes == ds -> nNodes ) {
625- MtmWakeUpBackend (ts );
640+ if (MtmIsCoordinator (ts )) {
641+ switch (msg -> code ) {
642+ case MSG_READY :
643+ Assert (ts -> nVotes < ds -> nNodes );
644+ ds -> nodeTransDelay [msg -> node - 1 ] += MtmGetCurrentTime () - ts -> csn ;
645+ ts -> xids [msg -> node - 1 ] = msg -> sxid ;
646+ if (++ ts -> nVotes == ds -> nNodes ) {
647+ /* All nodes are finished their transactions */
648+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649+ ts -> nVotes = 1 ; /* I voted myself */
650+ MtmSendNotificationMessage (ts , MSG_PREPARE );
651+ } else {
652+ Assert (ts -> status == TRANSACTION_STATUS_ABORTED );
653+ MtmWakeUpBackend (ts );
626654 }
627655 }
628- break ;
629- case MSG_ABORTED :
656+ break ;
657+ case MSG_ABORTED :
658+ Assert (ts -> nVotes < ds -> nNodes );
630659 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
631- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS || ts -> status == TRANSACTION_STATUS_UNKNOWN );
660+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
632661 ts -> status = TRANSACTION_STATUS_ABORTED ;
633662 MtmAdjustSubtransactions (ts );
663+ }
664+ if (++ ts -> nVotes == ds -> nNodes ) {
634665 MtmWakeUpBackend (ts );
635666 }
636667 break ;
637- default :
668+ case MSG_PREPARED :
669+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670+ Assert (ts -> nVotes < ds -> nNodes );
671+ if (msg -> csn > ts -> csn ) {
672+ ts -> csn = msg -> csn ;
673+ MtmSyncClock (ts -> csn );
674+ }
675+ if (++ ts -> nVotes == ds -> nNodes ) {
676+ ts -> csn = MtmAssignCSN ();
677+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678+ MtmWakeUpBackend (ts );
679+ }
680+ default :
681+ Assert (false);
682+ }
683+ } else {
684+ switch (msg -> code ) {
685+ case MSG_PREPARE :
686+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
687+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
688+ ts -> csn = MtmAssignCSN ();
689+ MtmAdjustSubtransactions (ts );
690+ MtmSendNotificationMessage (ts , MSG_PREPARED );
691+ break ;
692+ default :
638693 Assert (false);
639- }
694+ }
695+ }
640696 }
641697 MtmUnlock ();
642698
0 commit comments