@@ -553,6 +553,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
553553 Assert (!found );
554554 sts -> status = ts -> status ;
555555 sts -> csn = ts -> csn ;
556+ sts -> votingCompleted = true;
556557 MtmTransactionListInsertAfter (ts , sts );
557558 }
558559}
@@ -745,7 +746,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
745746 if (!MtmIsCoordinator (ts ) || Mtm -> status == MTM_RECOVERY ) {
746747 MtmTransMap * tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_ENTER , NULL );
747748 Assert (x -> gid [0 ]);
748- tm -> state = ts ;
749+ tm -> state = ts ;
750+ ts -> votingCompleted = true;
749751 if (Mtm -> status != MTM_RECOVERY ) {
750752 MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
751753 } else {
@@ -777,9 +779,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
777779 MtmLock (LW_EXCLUSIVE );
778780 tm = (MtmTransMap * )hash_search (MtmGid2State , x -> gid , HASH_REMOVE , NULL );
779781 Assert (tm != NULL );
780- tm -> state -> status = TRANSACTION_STATUS_ABORTED ;
781- MtmAdjustSubtransactions (tm -> state );
782- Mtm -> nActiveTransactions -= 1 ;
782+ MtmAbortTransaction (tm -> state );
783783 MtmUnlock ();
784784 x -> status = TRANSACTION_STATUS_ABORTED ;
785785 }
@@ -835,6 +835,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
835835 ts -> gtid = x -> gtid ;
836836 ts -> nSubxids = 0 ;
837837 ts -> cmd = MSG_INVALID ;
838+ ts -> votingCompleted = true;
838839 MtmTransactionListAppend (ts );
839840 }
840841 MtmSendNotificationMessage (ts , MSG_ABORTED ); /* send notification to coordinator */
@@ -937,6 +938,20 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
937938 return csn ;
938939}
939940
941+ void MtmWakeUpBackend (MtmTransState * ts )
942+ {
943+ MTM_TRACE ("Wakeup backed procno=%d, pid=%d\n" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
944+ ts -> votingCompleted = true;
945+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
946+ }
947+
948+ void MtmAbortTransaction (MtmTransState * ts )
949+ {
950+ ts -> status = TRANSACTION_STATUS_ABORTED ;
951+ MtmAdjustSubtransactions (ts );
952+ Mtm -> nActiveTransactions -= 1 ;
953+ }
954+
940955/*
941956 * -------------------------------------------
942957 * HA functions
@@ -1213,9 +1228,10 @@ void MtmCheckQuorum(void)
12131228 }
12141229}
12151230
1216-
12171231void MtmOnNodeDisconnect (int nodeId )
1218- {
1232+ {
1233+ MtmTransState * ts ;
1234+
12191235 BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
12201236 BIT_SET (Mtm -> reconnectMask , nodeId - 1 );
12211237 RaftableSet (psprintf ("node-mask-%d" , MtmNodeId ), & Mtm -> connectivityMask , sizeof Mtm -> connectivityMask , false);
@@ -1229,6 +1245,16 @@ void MtmOnNodeDisconnect(int nodeId)
12291245 BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
12301246 Mtm -> nNodes -= 1 ;
12311247 MtmCheckQuorum ();
1248+ /* Interrupt voting for active transaction and abort them */
1249+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
1250+ if (!ts -> votingCompleted ) {
1251+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1252+ elog (WARNING , "Rollback active transaction %d:%d" , ts -> gtid .node , ts -> gtid .xid );
1253+ MtmAbortTransaction (ts );
1254+ }
1255+ MtmWakeUpBackend (ts );
1256+ }
1257+ }
12321258 }
12331259 MtmUnlock ();
12341260 }
0 commit comments