@@ -191,6 +191,8 @@ static int MtmWorkers;
191191static int MtmVacuumDelay ;
192192static int MtmMinRecoveryLag ;
193193static int MtmMaxRecoveryLag ;
194+ static int Mtm2PCPrepareRatio ;
195+ static int Mtm2PCMinTimeout ;
194196static bool MtmIgnoreTablesWithoutPk ;
195197
196198static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -766,8 +768,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
766768
767769}
768770
769- static time_t maxWakeupTime ;
770-
771771static void
772772MtmPostPrepareTransaction (MtmCurrentTrans * x )
773773{
@@ -783,25 +783,32 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
783783 tm -> state = ts ;
784784 ts -> votingCompleted = true;
785785 if (Mtm -> status != MTM_RECOVERY ) {
786- MtmSendNotificationMessage (ts , MtmUseDtm ? MSG_READY : MSG_PREPARED ); /* send notification to coordinator */
786+ if (MtmUseDtm ) {
787+ MtmSendNotificationMessage (ts , MSG_READY ); /* send notification to coordinator */
788+ } else {
789+ ts -> csn = MtmAssignCSN ();
790+ MtmSendNotificationMessage (ts , MSG_PREPARED ); /* send notification to coordinator */
791+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
792+ }
787793 } else {
788794 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
789795 }
790796 MtmUnlock ();
791797 MtmResetTransaction (x );
792798 } else {
793- time_t wakeupTime ;
799+ time_t timeout = Max (Mtm2PCMinTimeout , (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100000 ); /* usec->msec and percents */
800+ int result = 0 ;
794801 /* wait votes from all nodes */
795- while (!ts -> votingCompleted ) {
802+ while (!ts -> votingCompleted && !( result & WL_TIMEOUT )) {
796803 MtmUnlock ();
797- WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
804+ result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT , timeout );
798805 ResetLatch (& MyProc -> procLatch );
799- wakeupTime = MtmGetCurrentTime () - ts -> wakeupTime ;
800- if (wakeupTime > maxWakeupTime ) {
801- maxWakeupTime = wakeupTime ;
802- }
803806 MtmLock (LW_SHARED );
804807 }
808+ if (!ts -> votingCompleted ) {
809+ ts -> status = TRANSACTION_STATUS_ABORTED ;
810+ elog (WARNING , "Transaction is aborted because of %d msec timeout expiration" , (int )timeout );
811+ }
805812 x -> status = ts -> status ;
806813 MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
807814 MtmUnlock ();
@@ -989,11 +996,12 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
989996}
990997
991998void MtmWakeUpBackend (MtmTransState * ts )
992- {
993- MTM_LOG3 ("Wakeup backed procno=%d, pid=%d" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
994- ts -> votingCompleted = true;
995- ts -> wakeupTime = MtmGetCurrentTime ();
996- SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
999+ {
1000+ if (!ts -> votingCompleted ) {
1001+ MTM_LOG3 ("Wakeup backed procno=%d, pid=%d" , ts -> procno , ProcGlobal -> allProcs [ts -> procno ].pid );
1002+ ts -> votingCompleted = true;
1003+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
1004+ }
9971005}
9981006
9991007void MtmAbortTransaction (MtmTransState * ts )
@@ -1599,6 +1607,38 @@ _PG_init(void)
15991607 if (!process_shared_preload_libraries_in_progress )
16001608 return ;
16011609
1610+ DefineCustomIntVariable (
1611+ "multimaster.2pc_min_timeout" ,
1612+ "Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes" ,
1613+ "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1614+ & Mtm2PCMinTimeout ,
1615+ 10000 ,
1616+ 0 ,
1617+ INT_MAX ,
1618+ PGC_BACKEND ,
1619+ 0 ,
1620+ NULL ,
1621+ NULL ,
1622+ NULL
1623+ );
1624+
1625+ DefineCustomIntVariable (
1626+ "multimaster.2pc_prepare_ratio" ,
1627+ "Percent of prepare time for maximal time of second phase of two-pahse commit" ,
1628+ "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1629+ & Mtm2PCPrepareRatio ,
1630+ 100 ,
1631+ 0 ,
1632+ INT_MAX ,
1633+ PGC_BACKEND ,
1634+ 0 ,
1635+ NULL ,
1636+ NULL ,
1637+ NULL
1638+ );
1639+
1640+
1641+
16021642 DefineCustomIntVariable (
16031643 "multimaster.node_disable_delay" ,
16041644 "Minamal amount of time (sec) between node status change" ,
0 commit comments