@@ -458,7 +458,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
458458 MTM_TRACE ("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n" ,
459459 messageText [ts -> cmd ], ts -> csn , node + 1 , MtmNodeId , ts -> gtid .xid , ts -> xid );
460460 Assert (ts -> cmd != MSG_INVALID );
461- buf -> data [buf -> used ].code = ts -> cmd ;
461+ buf -> data [buf -> used ].code = ts -> status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED ;
462462 buf -> data [buf -> used ].dxid = xid ;
463463 buf -> data [buf -> used ].sxid = ts -> xid ;
464464 buf -> data [buf -> used ].csn = ts -> csn ;
@@ -467,21 +467,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
467467 buf -> used += 1 ;
468468}
469469
470- static void MtmBroadcastMessage (MtmBuffer * txBuffer , MtmTransState * ts )
471- {
472- int i ;
473- int n = 1 ;
474- for (i = 0 ; i < MtmNodes ; i ++ )
475- {
476- if (TransactionIdIsValid (ts -> xids [i ])) {
477- Assert (i + 1 != MtmNodeId );
478- MtmAppendBuffer (txBuffer , ts -> xids [i ], i , ts );
479- n += 1 ;
480- }
481- }
482- Assert (n == ds -> nNodes );
483- }
484-
485470static void MtmTransSender (Datum arg )
486471{
487472 int nNodes = MtmNodes ;
@@ -508,12 +493,7 @@ static void MtmTransSender(Datum arg)
508493 MtmLock (LW_SHARED );
509494
510495 for (ts = ds -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
511- if (MtmIsCoordinator (ts )) {
512- MtmBroadcastMessage (txBuffer , ts );
513- } else {
514- MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
515- }
516- ts -> cmd = MSG_INVALID ;
496+ MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
517497 }
518498 ds -> votingTransactions = NULL ;
519499 MtmUnlock ();
@@ -634,109 +614,31 @@ static void MtmTransReceiver(Datum arg)
634614 MtmArbiterMessage * msg = & rxBuffer [i ].data [j ];
635615 MtmTransState * ts = (MtmTransState * )hash_search (xid2state , & msg -> dxid , HASH_FIND , NULL );
636616 Assert (ts != NULL );
637- Assert (ts -> cmd == MSG_INVALID );
638617 Assert (msg -> node > 0 && msg -> node <= nNodes && msg -> node != MtmNodeId );
639- ts -> xids [msg -> node - 1 ] = msg -> sxid ;
640-
641- if (MtmIsCoordinator (ts )) {
642- switch (msg -> code ) {
643- case MSG_READY :
644- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
645- Assert (ts -> nVotes < ds -> nNodes );
646- ds -> nodeTransDelay [msg -> node - 1 ] += MtmGetCurrentTime () - ts -> csn ;
647- if (++ ts -> nVotes == ds -> nNodes ) {
648- /* All nodes are finished their transactions */
649- if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
650- ts -> nVotes = 1 ; /* I voted myself */
651- ts -> cmd = MSG_PREPARE ;
652- } else {
653- ts -> status = TRANSACTION_STATUS_ABORTED ;
654- ts -> cmd = MSG_ABORT ;
655- MtmAdjustSubtransactions (ts );
656- MtmWakeUpBackend (ts );
657- }
658- MtmSendNotificationMessage (ts );
659- }
660- break ;
618+ Assert (MtmIsCoordinator (ts ));
619+ switch (msg -> code ) {
661620 case MSG_PREPARED :
662- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
663- Assert (ts -> nVotes < ds -> nNodes );
664- if (msg -> csn > ts -> csn ) {
665- ts -> csn = msg -> csn ;
666- MtmSyncClock (ts -> csn );
667- }
668- if (++ ts -> nVotes == ds -> nNodes ) {
669- /* ts->csn is maximum of CSNs at all nodes */
670- ts -> nVotes = 1 ; /* I voted myself */
671- ts -> cmd = MSG_COMMIT ;
672- ts -> csn = MtmAssignCSN ();
673- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
674- MtmAdjustSubtransactions (ts );
675- MtmSendNotificationMessage (ts );
676- }
677- break ;
678- case MSG_COMMITTED :
679- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
680- Assert (ts -> nVotes < ds -> nNodes );
681- if (++ ts -> nVotes == ds -> nNodes ) {
682- /* All nodes have the same CSN */
683- MtmWakeUpBackend (ts );
621+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
622+ if (msg -> csn > ts -> csn ) {
623+ ts -> csn = msg -> csn ;
624+ MtmSyncClock (ts -> csn );
625+ }
626+ if (++ ts -> nVotes == ds -> nNodes ) {
627+ MtmWakeUpBackend (ts );
628+ }
684629 }
685630 break ;
686631 case MSG_ABORTED :
687- Assert (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
688- Assert (ts -> nVotes < ds -> nNodes );
689- ts -> status = TRANSACTION_STATUS_ABORTED ;
690- if (++ ts -> nVotes == ds -> nNodes ) {
691- ts -> cmd = MSG_ABORT ;
692- MtmAdjustSubtransactions (ts );
693- MtmSendNotificationMessage (ts );
694- MtmWakeUpBackend (ts );
695- }
696- break ;
697- default :
698- Assert (false);
699- }
700- } else { /* replica */
701- switch (msg -> code ) {
702- case MSG_PREPARE :
703- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
704- if ((msg -> disabledNodeMask & ~ds -> disabledNodeMask ) != 0 ) {
705- /* Coordinator's disabled mask is wider than my: so reject such transaction to avoid
706- commit on smaller subset of nodes */
632+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
707633 ts -> status = TRANSACTION_STATUS_ABORTED ;
708- ts -> cmd = MSG_ABORT ;
709- MtmAdjustSubtransactions (ts );
710- MtmWakeUpBackend (ts );
711- } else {
712- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
713- ts -> csn = MtmAssignCSN ();
714- ts -> cmd = MSG_PREPARED ;
715- }
716- MtmSendNotificationMessage (ts );
717- break ;
718- case MSG_COMMIT :
719- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
720- Assert (ts -> csn < msg -> csn );
721- ts -> csn = msg -> csn ;
722- MtmSyncClock (ts -> csn );
723- ts -> cmd = MSG_COMMITTED ;
724- MtmAdjustSubtransactions (ts );
725- MtmSendNotificationMessage (ts );
726- MtmWakeUpBackend (ts );
727- break ;
728- case MSG_ABORT :
729- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
730- Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN || ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
731- ts -> status = TRANSACTION_STATUS_ABORTED ;
732634 MtmAdjustSubtransactions (ts );
733635 MtmWakeUpBackend (ts );
734636 }
735637 break ;
736638 default :
737639 Assert (false);
738640 }
739- }
641+ }
740642 }
741643 MtmUnlock ();
742644
0 commit comments