@@ -64,6 +64,7 @@ typedef struct {
6464 bool isReplicated ; /* transaction on replica */
6565 bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666 bool isPrepared ; /* transaction is perpared at first stage of 2PC */
67+ bool isTransactionBlock ; /* is transaction block */
6768 bool containsDML ; /* transaction contains DML statements */
6869 XidStatus status ; /* transaction status */
6970 csn_t snapshot ; /* transaction snaphsot */
@@ -111,6 +112,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
111112static void MtmPostPrepareTransaction (MtmCurrentTrans * x );
112113static void MtmAbortPreparedTransaction (MtmCurrentTrans * x );
113114static void MtmEndTransaction (MtmCurrentTrans * x , bool commit );
115+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x );
114116static TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum );
115117static bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
116118static TransactionId MtmAdjustOldestXid (TransactionId xid );
@@ -588,6 +590,11 @@ MtmXactCallback(XactEvent event, void *arg)
588590 case XACT_EVENT_ABORT :
589591 MtmEndTransaction (& MtmTx , false);
590592 break ;
593+ case XACT_EVENT_COMMIT_COMMAND :
594+ if (!MtmTx .isTransactionBlock ) {
595+ MtmTwoPhaseCommit (& MtmTx );
596+ }
597+ break ;
591598 default :
592599 break ;
593600 }
@@ -623,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
623630 x -> isReplicated = false;
624631 x -> isDistributed = MtmIsUserTransaction ();
625632 x -> isPrepared = false;
633+ x -> isTransactionBlock = IsTransactionBlock ();
626634 if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
627635 /* reject all user's transactions at offline cluster */
628636 MtmUnlock ();
@@ -1922,33 +1930,34 @@ MtmGenerateGid(char* gid)
19221930 sprintf (gid , "MTM-%d-%d-%d" , MtmNodeId , MyProcPid , ++ localCount );
19231931}
19241932
1925- static void MtmTwoPhaseCommit (char * completionTag )
1933+ static bool MtmTwoPhaseCommit (MtmCurrentTrans * x )
19261934{
1927- MtmGenerateGid (MtmTx .gid );
1928- if (!IsTransactionBlock ()) {
1929- elog (WARNING , "Start transaction block for %d" , MtmTx .xid );
1930- BeginTransactionBlock ();
1931- CommitTransactionCommand ();
1932- StartTransactionCommand ();
1933- }
1934- if (!PrepareTransactionBlock (MtmTx .gid ))
1935- {
1936- elog (WARNING , "Failed to prepare transaction %s" , MtmTx .gid );
1937- /* report unsuccessful commit in completionTag */
1938- if (completionTag ) {
1939- strcpy (completionTag , "ROLLBACK" );
1935+ if (!x -> isReplicated && (x -> isDistributed && x -> containsDML )) {
1936+ MtmGenerateGid (x -> gid );
1937+ if (!x -> isTransactionBlock ) {
1938+ elog (WARNING , "Start transaction block for %s" , x -> gid );
1939+ BeginTransactionBlock ();
1940+ x -> isTransactionBlock = true;
1941+ CommitTransactionCommand ();
1942+ StartTransactionCommand ();
19401943 }
1941- /* ??? Should we do explicit rollback */
1942- } else {
1943- CommitTransactionCommand ();
1944- StartTransactionCommand ();
1945- if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1946- FinishPreparedTransaction (MtmTx .gid , false);
1947- elog (ERROR , "Transaction %s is aborted by DTM" , MtmTx .gid );
1948- } else {
1949- FinishPreparedTransaction (MtmTx .gid , true);
1944+ if (!PrepareTransactionBlock (x -> gid ))
1945+ {
1946+ elog (WARNING , "Failed to prepare transaction %s" , x -> gid );
1947+ /* ??? Should we do explicit rollback */
1948+ } else {
1949+ CommitTransactionCommand ();
1950+ StartTransactionCommand ();
1951+ if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
1952+ FinishPreparedTransaction (x -> gid , false);
1953+ elog (ERROR , "Transaction %s is aborted by DTM" , x -> gid );
1954+ } else {
1955+ FinishPreparedTransaction (x -> gid , true);
1956+ }
19501957 }
1958+ return true;
19511959 }
1960+ return false;
19521961}
19531962
19541963static void MtmProcessUtility (Node * parsetree , const char * queryString ,
@@ -1964,9 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19641973 TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
19651974 switch (stmt -> kind )
19661975 {
1976+ case TRANS_STMT_BEGIN :
1977+ MtmTx .isTransactionBlock = true;
1978+ break ;
19671979 case TRANS_STMT_COMMIT :
1968- if (MtmTx .isDistributed && MtmTx .containsDML ) {
1969- MtmTwoPhaseCommit (completionTag );
1980+ if (MtmTwoPhaseCommit (& MtmTx )) {
19701981 return ;
19711982 }
19721983 break ;
@@ -2002,9 +2013,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
20022013 if (MtmProcessDDLCommand (queryString )) {
20032014 return ;
20042015 }
2005- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2006- MtmTwoPhaseCommit (completionTag );
2007- }
20082016 }
20092017 if (PreviousProcessUtilityHook != NULL )
20102018 {
@@ -2034,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20342042 }
20352043 }
20362044 }
2037- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2038- MtmTwoPhaseCommit (NULL );
2039- }
20402045 }
20412046 if (PreviousExecutorFinishHook != NULL )
20422047 {
0 commit comments