@@ -116,14 +116,15 @@ static TransactionId MtmAdjustOldestXid(TransactionId xid);
116116static bool MtmDetectGlobalDeadLock (PGPROC * proc );
117117static void MtmAddSubtransactions (MtmTransState * ts , TransactionId * subxids , int nSubxids );
118118static char const * MtmGetName (void );
119- static void MtmCheckClusterLock ()
119+ static void MtmCheckClusterLock (void );
120+ static void MtmCheckSlots (void );
121+ static void MtmAddSubtransactions (MtmTransState * ts , TransactionId * subxids , int nSubxids );
120122
121123static void MtmShmemStartup (void );
122124
123125static BgwPool * MtmPoolConstructor (void );
124126static bool MtmRunUtilityStmt (PGconn * conn , char const * sql );
125127static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
126- static void MtmVoteForTransaction (MtmTransState * ts );
127128
128129static HTAB * xid2state ;
129130static HTAB * gid2xid ;
@@ -543,10 +544,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
543544 * Prepare transaction for two-phase commit.
544545 * This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
545546 */
547+ static void
546548MtmPrePrepareTransaction (MtmCurrentTrans * x )
547549{
548550 MtmTransState * ts ;
549- int i ;
551+ TransactionId * subxids ;
550552
551553 if (!x -> isDistributed ) {
552554 return ;
@@ -575,9 +577,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
575577 ts -> gtid = x -> gtid ;
576578 ts -> procno = MyProc -> pgprocno ;
577579 ts -> nVotes = 0 ;
578-
580+ ts -> nSubxids = xactGetCommittedChildren ( & subxids );
579581 x -> isPrepared = true;
580- x -> csn = csn ;
582+ x -> csn = ts -> csn ;
581583
582584 dtm -> transCount += 1 ;
583585
@@ -588,34 +590,36 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
588590 ts -> gtid .node = MtmNodeId ;
589591 }
590592 MtmTransactionListAppend (ts );
593+ MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
591594
592595 MtmUnlock ();
593596
594597 MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" , MyProcPid , x -> xid , ts -> csn );
595598}
596599
600+ static void
597601MtmPrepareTransaction (MtmCurrentTrans * x )
598602{
599603 MtmTransState * ts ;
600604
601605 MtmLock (LW_EXCLUSIVE );
602606 ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
603607 Assert (ts != NULL );
604- if (ts -> status = TRANSACTION_STATUS_IN_PROGRESS ) {
608+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
605609 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
606610 MtmAdjustSubtransactions (ts );
607611 }
608612
609613 if (!MtmIsCoordinator (ts )) {
610- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid , x -> gid , HASH_ENTER , NULL );
614+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid , x -> gid , HASH_ENTER , NULL );
611615 Assert (x -> gid [0 ]);
612616 hm -> state = ts ;
613617 MtmSendNotificationMessage (ts ); /* send notification to coordinator */
614618 MtmUnlock ();
615619 } else {
616620 /* wait N commits or just one ABORT */
617- ts -> nVotes += 1 ;
618- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_PROGRESS ) {
621+ ts -> nVotes += 1 ; /* I vote myself */
622+ while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
619623 MtmUnlock ();
620624 WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
621625 ResetLatch (& MyProc -> procLatch );
@@ -633,14 +637,14 @@ static void
633637MtmEndTransaction (MtmCurrentTrans * x , bool commit )
634638{
635639 MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" , MyProcPid , x -> xid , x -> isPrepared , x -> isDistributed , commit ? "commit" : "abort" );
636- if (x -> isDistributed ) {
640+ if (x -> isDistributed && ( TransactionIdIsValid ( x -> xid ) || x -> isReplicated ) ) {
637641 MtmTransState * ts ;
638642 MtmLock (LW_EXCLUSIVE );
639643 if (x -> isPrepared ) {
640644 ts = hash_search (xid2state , & x -> xid , HASH_FIND , NULL );
641645 Assert (ts != NULL );
642646 } else {
643- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid , x -> gid , HASH_REMOVE , NULL );
647+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid , x -> gid , HASH_REMOVE , NULL );
644648 Assert (hm != NULL );
645649 ts = hm -> state ;
646650 }
@@ -712,12 +716,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
712716
713717void MtmSetCurrentTransactionGID (char const * gid )
714718{
719+ MTM_TRACE ("Set current transaction GID %s\n" , gid );
715720 strcpy (dtmTx .gid , gid );
721+ dtmTx .isDistributed = true;
722+ dtmTx .isReplicated = true;
716723}
717724
718725void MtmSetCurrentTransactionCSN (csn_t csn )
719726{
727+ MTM_TRACE ("Set current transaction CSN %ld\n" , csn );
720728 dtmTx .csn = csn ;
729+ dtmTx .isDistributed = true;
730+ dtmTx .isReplicated = true;
721731}
722732
723733/*
@@ -731,7 +741,8 @@ void MtmSetCurrentTransactionCSN(csn_t csn)
731741 * Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
732742 * WAL overflow
733743 */
734- static void MtmCheckSlots ()
744+ static void
745+ MtmCheckSlots ()
735746{
736747 if (MtmMaxRecoveryLag != 0 && dtm -> disabledNodeMask != 0 )
737748 {
@@ -1682,14 +1693,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16821693 {
16831694 case TRANS_STMT_COMMIT :
16841695 if (dtmTx .isDistributed && dtmTx .containsDML ) {
1685- char gid { MUTLIMASTER_MAX_GID_SIZE ];
1686- MtmGenerateGid (& gid );
1696+ char gid [ MULTIMASTER_MAX_GID_SIZE ];
1697+ MtmGenerateGid (gid );
16871698 if (!IsTransactionBlock ()) {
16881699 elog (WARNING , "Start transaction block for %d" , dtmTx .xid );
16891700 CommitTransactionCommand ();
16901701 StartTransactionCommand ();
16911702 }
1692- if (!PrepareTransactionBlock (& gid ))
1703+ if (!PrepareTransactionBlock (gid ))
16931704 {
16941705 elog (WARNING , "Failed to prepare transaction %s" , gid );
16951706 /* report unsuccessful commit in completionTag */
0 commit comments