@@ -131,7 +131,7 @@ static TransactionManager MtmTM = {
131131 MtmGetName
132132};
133133
134- static char const * const MtmNodeStatusMnem [] =
134+ char const * const MtmNodeStatusMnem [] =
135135{
136136 "Intialization" ,
137137 "Offline" ,
@@ -602,7 +602,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
602602 x -> xid = GetCurrentTransactionId ();
603603
604604 if (dtm -> disabledNodeMask != 0 ) {
605- MtmUpdateClusterStatus ( );
605+ MtmRefreshClusterStatus (true );
606606 if (dtm -> status != MTM_ONLINE ) {
607607 elog (ERROR , "Abort current transaction because this cluster node is not online" );
608608 }
@@ -1096,7 +1096,7 @@ _PG_fini(void)
10961096 */
10971097
10981098
1099- void MtmClusterSwitchMode (MtmNodeStatus mode )
1099+ void MtmSwitchClusterMode (MtmNodeStatus mode )
11001100{
11011101 dtm -> status = mode ;
11021102 elog (WARNING , "Switch to %s mode" , MtmNodeStatusMnem [mode ]);
@@ -1121,7 +1121,8 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
11211121 Assert (dtm -> status == MTM_RECOVERY );
11221122 } else if (dtm -> status == MTM_RECOVERY ) {
11231123 /* When recovery is completed we get normal transaction ID and switch to normal mode */
1124- MtmClusterSwitchMode (MTM_ONLINE );
1124+ dtm -> recoverySlot = 0 ;
1125+ MtmSwitchClusterMode (MTM_ONLINE );
11251126 }
11261127 dtmTx .gtid = * gtid ;
11271128 dtmTx .xid = GetCurrentTransactionId ();
@@ -1137,9 +1138,8 @@ void MtmReceiverStarted(int nodeId)
11371138 if (!BIT_CHECK (dtm -> pglogicalNodeMask , nodeId - 1 )) {
11381139 BIT_SET (dtm -> pglogicalNodeMask , nodeId - 1 );
11391140 if (++ dtm -> nReceivers == dtm -> nNodes - 1 ) {
1140- elog (WARNING , "All receivers are started, switch to normal mode" );
11411141 Assert (dtm -> status == MTM_CONNECTED );
1142- dtm -> status = MTM_ONLINE ;
1142+ MtmSwitchClusterMode ( MTM_OFFLINE ) ;
11431143 }
11441144 }
11451145 SpinLockRelease (& dtm -> spinlock );
@@ -1632,14 +1632,14 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16321632
16331633 ByteBufferAlloc (& buf );
16341634 EnumerateLocks (MtmSerializeLock , & buf );
1635- PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used );
1635+ PaxosSet (psprintf ("lock-graph-%d" , MtmNodeId ), buf .data , buf .used , true );
16361636 MtmGraphInit (& graph );
16371637 MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data , buf .used /sizeof (GlobalTransactionId ));
16381638 ByteBufferFree (& buf );
16391639 for (i = 0 ; i < MtmNodes ; i ++ ) {
16401640 if (i + 1 != MtmNodeId && !BIT_CHECK (dtm -> disabledNodeMask , i )) {
16411641 int size ;
1642- void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL );
1642+ void * data = PaxosGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true );
16431643 if (data == NULL ) {
16441644 hasDeadlock = true; /* Just temporary hack until no Paxos */
16451645 } else {
@@ -1655,12 +1655,12 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16551655}
16561656
16571657static void
1658- MtmBuildConnectivityMatrix (nodemask_t * matrix )
1658+ MtmBuildConnectivityMatrix (nodemask_t * matrix , bool nowait )
16591659{
16601660 int i , j , n = MtmNodes ;
16611661 for (i = 0 ; i < n ; i ++ ) {
16621662 if (i + 1 != MtmNodeId ) {
1663- void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL );
1663+ void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
16641664 matrix [i ] = * (nodemask_t * )data ;
16651665 } else {
16661666 matrix [i ] = dtm -> connectivityMask ;
@@ -1678,14 +1678,14 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16781678 * Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
16791679 * This function returns false if current node is excluded from cluster, true otherwise
16801680 */
1681- void MtmUpdateClusterStatus ( void )
1681+ void MtmRefreshClusterStatus ( bool nowait )
16821682{
16831683 nodemask_t mask , clique ;
16841684 nodemask_t matrix [MAX_NODES ];
16851685 int clique_size ;
16861686 int i ;
16871687
1688- MtmBuildConnectivityMatrix (matrix );
1688+ MtmBuildConnectivityMatrix (matrix , nowait );
16891689
16901690 clique = MtmFindMaxClique (matrix , MtmNodes , & clique_size );
16911691 if (clique_size >= MtmNodes /2 + 1 ) { /* have quorum */
@@ -1708,11 +1708,11 @@ void MtmUpdateClusterStatus(void)
17081708 if (BIT_CHECK (dtm -> disabledNodeMask , MtmNodeId - 1 )) {
17091709 if (dtm -> status == MTM_ONLINE ) {
17101710 /* I was excluded from cluster:( */
1711- MtmClusterSwitchMode (MTM_OFFLINE );
1711+ MtmSwitchClusterMode (MTM_OFFLINE );
17121712 }
17131713 } else if (dtm -> status == MTM_OFFLINE ) {
17141714 /* Should we somehow restart logical receivers? */
1715- MtmClusterSwitchMode (MTM_RECOVERY );
1715+ MtmSwitchClusterMode (MTM_RECOVERY );
17161716 }
17171717 } else {
17181718 elog (WARNING , "Clique %lx has no quorum" , clique );
@@ -1722,30 +1722,30 @@ void MtmUpdateClusterStatus(void)
17221722void MtmOnNodeDisconnect (int nodeId )
17231723{
17241724 BIT_SET (dtm -> connectivityMask , nodeId - 1 );
1725- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & dtm -> connectivityMask , sizeof dtm -> connectivityMask );
1725+ PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & dtm -> connectivityMask , sizeof dtm -> connectivityMask , false );
17261726
17271727 /* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
17281728 MtmSleep (MtmKeepaliveTimeout );
17291729
1730- MtmUpdateClusterStatus ( );
1730+ MtmRefreshClusterStatus (false );
17311731}
17321732
17331733void MtmOnNodeConnect (int nodeId )
17341734{
17351735 BIT_CLEAR (dtm -> connectivityMask , nodeId - 1 );
1736- PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & dtm -> connectivityMask , sizeof dtm -> connectivityMask );
1736+ PaxosSet (psprintf ("node-mask-%d" , MtmNodeId ), & dtm -> connectivityMask , sizeof dtm -> connectivityMask , false );
17371737}
17381738
17391739/*
17401740 * Paxos function stubs (until them are miplemented)
17411741 */
1742- void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts )
1742+ void * PaxosGet (char const * key , int * size , PaxosTimestamp * ts , bool nowait )
17431743{
17441744 if (size != NULL ) {
17451745 * size = 0 ;
17461746 }
17471747 return NULL ;
17481748}
17491749
1750- void PaxosSet (char const * key , void const * value , int size )
1750+ void PaxosSet (char const * key , void const * value , int size , bool nowait )
17511751{}
0 commit comments