@@ -64,6 +64,7 @@ typedef struct {
6464 bool isReplicated ; /* transaction on replica */
6565 bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666 bool isPrepared ; /* transaction is perpared at first stage of 2PC */
67+ bool isTransactionBlock ; /* is transaction block */
6768 bool containsDML ; /* transaction contains DML statements */
6869 XidStatus status ; /* transaction status */
6970 csn_t snapshot ; /* transaction snaphsot */
@@ -590,7 +591,7 @@ MtmXactCallback(XactEvent event, void *arg)
590591 MtmEndTransaction (& MtmTx , false);
591592 break ;
592593 case XACT_EVENT_COMMIT_COMMAND :
593- if (!IsTransactionBlock () ) {
594+ if (!MtmTx . isTransactionBlock ) {
594595 MtmTwoPhaseCommit (& MtmTx );
595596 }
596597 break ;
@@ -629,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
629630 x -> isReplicated = false;
630631 x -> isDistributed = MtmIsUserTransaction ();
631632 x -> isPrepared = false;
633+ x -> isTransactionBlock = IsTransactionBlock ();
632634 if (x -> isDistributed && Mtm -> status != MTM_ONLINE ) {
633635 /* reject all user's transactions at offline cluster */
634636 MtmUnlock ();
@@ -1930,19 +1932,20 @@ MtmGenerateGid(char* gid)
19301932
19311933static bool MtmTwoPhaseCommit (MtmCurrentTrans * x )
19321934{
1933- if (x -> isDistributed && x -> containsDML ) {
1935+ if (! x -> isReplicated && ( x -> isDistributed && x -> containsDML ) ) {
19341936 MtmGenerateGid (x -> gid );
1935- if (!IsTransactionBlock () ) {
1936- elog (WARNING , "Start transaction block for %d " , x -> xid );
1937+ if (!x -> isTransactionBlock ) {
1938+ elog (WARNING , "Start transaction block for %s " , x -> gid );
19371939 BeginTransactionBlock ();
1940+ x -> isTransactionBlock = true;
19381941 CommitTransactionCommand ();
19391942 StartTransactionCommand ();
19401943 }
19411944 if (!PrepareTransactionBlock (x -> gid ))
19421945 {
19431946 elog (WARNING , "Failed to prepare transaction %s" , x -> gid );
19441947 /* ??? Should we do explicit rollback */
1945- } else {
1948+ } else {
19461949 CommitTransactionCommand ();
19471950 StartTransactionCommand ();
19481951 if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
@@ -1970,8 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19701973 TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
19711974 switch (stmt -> kind )
19721975 {
1976+ case TRANS_STMT_BEGIN :
1977+ MtmTx .isTransactionBlock = true;
1978+ break ;
19731979 case TRANS_STMT_COMMIT :
1974- if (MtmTwoPhaseCommit (& MtmTx )) {
1980+ if (MtmTwoPhaseCommit (& MtmTx )) {
19751981 return ;
19761982 }
19771983 break ;
@@ -2036,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20362042 }
20372043 }
20382044 }
2039- if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
2040- MtmTwoPhaseCommit (& MtmTx );
2041- }
20422045 }
20432046 if (PreviousExecutorFinishHook != NULL )
20442047 {
0 commit comments