@@ -571,7 +571,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
571571 MtmCheckClusterLock ();
572572
573573 ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
574- ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
574+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
575+ /*
576+ * Invalid CSN prevent replication of transaction by logical replication
577+ */
575578 ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
576579 ts -> csn = MtmAssignCSN ();
577580 ts -> gtid = x -> gtid ;
@@ -603,7 +606,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
603606 MtmTransState * ts ;
604607
605608 MtmLock (LW_EXCLUSIVE );
606- ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
609+ ts = hash_search (xid2state , & x -> xid , HASH_FIND , NULL );
607610 Assert (ts != NULL );
608611 if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
609612 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -619,7 +622,7 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
619622 } else {
620623 /* wait N commits or just one ABORT */
621624 ts -> nVotes += 1 ; /* I vote myself */
622- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
625+ while (ts -> nVotes != dtm -> nNodes && ts -> status != TRANSACTION_STATUS_ABORTED ) {
623626 MtmUnlock ();
624627 WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
625628 ResetLatch (& MyProc -> procLatch );
@@ -628,6 +631,8 @@ MtmPrepareTransaction(MtmCurrentTrans* x)
628631 MtmUnlock ();
629632 if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
630633 elog (ERROR , "Distributed transaction %d is rejected by DTM" , x -> xid );
634+ } else {
635+ Assert (ts -> status == TRANSACTION_STATUS_UNKNOWN );
631636 }
632637 }
633638}
@@ -637,7 +642,7 @@ static void
637642MtmEndTransaction (MtmCurrentTrans * x , bool commit )
638643{
639644 MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" , MyProcPid , x -> xid , x -> isPrepared , x -> isDistributed , commit ? "commit" : "abort" );
640- if (x -> isDistributed && (TransactionIdIsValid ( x -> xid ) || x -> isReplicated )) {
645+ if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
641646 MtmTransState * ts ;
642647 MtmLock (LW_EXCLUSIVE );
643648 if (x -> isPrepared ) {
@@ -656,7 +661,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
656661 }
657662 } else {
658663 ts -> status = TRANSACTION_STATUS_ABORTED ;
659- if (x -> isReplicated ) {
664+ if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
665+ /*
666+ * Send notification only of ABORT happens during transaction processing at replicas,
667+ * do not send notification if ABORT is receiver from master
668+ */
660669 MtmSendNotificationMessage (ts ); /* send notification to coordinator */
661670 }
662671 }
@@ -683,8 +692,6 @@ void MtmSendNotificationMessage(MtmTransState* ts)
683692 }
684693}
685694
686-
687-
688695void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
689696{
690697 csn_t localSnapshot ;
0 commit comments