@@ -1049,6 +1049,7 @@ MtmCheckClusterLock()
10491049 Mtm -> nNodes += Mtm -> nLockers ;
10501050 Mtm -> nLockers = 0 ;
10511051 Mtm -> nodeLockerMask = 0 ;
1052+ MtmCheckQuorum ();
10521053 }
10531054 }
10541055 break ;
@@ -1058,14 +1059,17 @@ MtmCheckClusterLock()
10581059/**
10591060 * Build internode connectivity mask. 1 - means that node is disconnected.
10601061 */
1061- static void
1062+ static bool
10621063MtmBuildConnectivityMatrix (nodemask_t * matrix , bool nowait )
10631064{
10641065 int i , j , n = MtmNodes ;
10651066 for (i = 0 ; i < n ; i ++ ) {
10661067 if (i + 1 != MtmNodeId ) {
10671068 void * data = PaxosGet (psprintf ("node-mask-%d" , i + 1 ), NULL , NULL , nowait );
1068- matrix [i ] = data ? * (nodemask_t * )data : 0 ;
1069+ if (data == NULL ) {
1070+ return false;
1071+ }
1072+ matrix [i ] = * (nodemask_t * )data ;
10691073 } else {
10701074 matrix [i ] = Mtm -> connectivityMask ;
10711075 }
@@ -1076,21 +1080,25 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
10761080 matrix [i ] |= ((matrix [j ] >> i ) & 1 ) << j ;
10771081 }
10781082 }
1083+ return true;
10791084}
10801085
10811086
10821087/**
10831088 * Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
10841089 * This function returns false if current node is excluded from cluster, true otherwise
10851090 */
1086- void MtmRefreshClusterStatus (bool nowait )
1091+ bool MtmRefreshClusterStatus (bool nowait )
10871092{
10881093 nodemask_t mask , clique ;
10891094 nodemask_t matrix [MAX_NODES ];
10901095 int clique_size ;
10911096 int i ;
10921097
1093- MtmBuildConnectivityMatrix (matrix , nowait );
1098+ if (!MtmBuildConnectivityMatrix (matrix , nowait )) {
1099+ /* RAFT is not available */
1100+ return false;
1101+ }
10941102
10951103 clique = MtmFindMaxClique (matrix , MtmNodes , & clique_size );
10961104 if (clique_size >= MtmNodes /2 + 1 ) { /* have quorum */
@@ -1110,6 +1118,7 @@ void MtmRefreshClusterStatus(bool nowait)
11101118 BIT_CLEAR (Mtm -> disabledNodeMask , i );
11111119 }
11121120 }
1121+ MtmCheckQuorum ();
11131122 MtmUnlock ();
11141123 if (BIT_CHECK (Mtm -> disabledNodeMask , MtmNodeId - 1 )) {
11151124 if (Mtm -> status == MTM_ONLINE ) {
@@ -1122,9 +1131,27 @@ void MtmRefreshClusterStatus(bool nowait)
11221131 }
11231132 } else {
11241133 elog (WARNING , "Clique %lx has no quorum" , clique );
1134+ Mtm -> status = MTM_IN_MINORITY ;
11251135 }
1136+ return true;
11261137}
11271138
1139+ void MtmCheckQuorum (void )
1140+ {
1141+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) {
1142+ if (Mtm -> status == MTM_ONLINE ) { /* out of quorum */
1143+ elog (WARNING , "Node is in minority: disabled mask %lx" , Mtm -> disabledNodeMask );
1144+ Mtm -> status = MTM_IN_MINORITY ;
1145+ }
1146+ } else {
1147+ if (Mtm -> status == MTM_IN_MINORITY ) {
1148+ elog (WARNING , "Node is in majority: dissbled mask %lx" , Mtm -> disabledNodeMask );
1149+ Mtm -> status = MTM_ONLINE ;
1150+ }
1151+ }
1152+ }
1153+
1154+
11281155void MtmOnNodeDisconnect (int nodeId )
11291156{
11301157 BIT_SET (Mtm -> connectivityMask , nodeId - 1 );
@@ -1133,7 +1160,15 @@ void MtmOnNodeDisconnect(int nodeId)
11331160 /* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11341161 MtmSleep (MtmKeepaliveTimeout );
11351162
1136- MtmRefreshClusterStatus (false);
1163+ if (!MtmRefreshClusterStatus (false)) {
1164+ MtmLock (LW_EXCLUSIVE );
1165+ if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1166+ BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1167+ Mtm -> nNodes -= 1 ;
1168+ MtmCheckQuorum ();
1169+ }
1170+ MtmUnlock ();
1171+ }
11371172}
11381173
11391174void MtmOnNodeConnect (int nodeId )
@@ -1635,6 +1670,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16351670 }
16361671 BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
16371672 Mtm -> nNodes -= 1 ;
1673+ MtmCheckQuorum ();
16381674 if (!MtmIsBroadcast ())
16391675 {
16401676 MtmBroadcastUtilityStmt (psprintf ("select mtm.drop_node(%d,%s)" , nodeId , dropSlot ? "true" : "false" ), true);
@@ -1649,6 +1685,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16491685static void
16501686MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
16511687{
1688+ elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
16521689 MtmOnNodeDisconnect (MtmReplicationNodeId );
16531690}
16541691
0 commit comments