@@ -89,6 +89,7 @@ PG_MODULE_MAGIC;
8989PG_FUNCTION_INFO_V1 (mtm_start_replication );
9090PG_FUNCTION_INFO_V1 (mtm_stop_replication );
9191PG_FUNCTION_INFO_V1 (mtm_drop_node );
92+ PG_FUNCTION_INFO_V1 (mtm_recover_node );
9293PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
9394
9495static Snapshot MtmGetSnapshot (Snapshot snapshot );
@@ -1182,6 +1183,22 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
11821183 return dtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
11831184}
11841185
1186+ void MtmRecoverNode (int nodeId )
1187+ {
1188+ if (nodeId <= 0 || nodeId > dtm -> nNodes )
1189+ {
1190+ elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , dtm -> nNodes );
1191+ }
1192+ if (!BIT_CHECK (dtm -> disabledNodeMask , nodeId - 1 )) {
1193+ elog (ERROR , "Node %d was not disabled" , nodeId );
1194+ }
1195+ if (!IsTransactionBlock ())
1196+ {
1197+ MtmBroadcastUtilityStmt (psprintf ("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')" , nodeId ), true);
1198+ }
1199+ }
1200+
1201+
11851202void MtmDropNode (int nodeId , bool dropSlot )
11861203{
11871204 if (!BIT_CHECK (dtm -> disabledNodeMask , nodeId - 1 ))
@@ -1227,6 +1244,14 @@ mtm_drop_node(PG_FUNCTION_ARGS)
12271244 PG_RETURN_VOID ();
12281245}
12291246
1247+ Datum
1248+ mtm_recover_node (PG_FUNCTION_ARGS )
1249+ {
1250+ int nodeId = PG_GETARG_INT32 (0 );
1251+ MtmRecoverNode (nodeId );
1252+ PG_RETURN_VOID ();
1253+ }
1254+
12301255Datum
12311256mtm_get_snapshot (PG_FUNCTION_ARGS )
12321257{
@@ -1599,7 +1624,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
15991624 {
16001625 if ((proclock -> holdMask & LOCKBIT_ON (lm )) && (conflictMask & LOCKBIT_ON (lm )))
16011626 {
1602- MTM_TRACE ("%d: %u(%u) waits for %u(%u)\n" , getpid () , srcPgXact -> xid , proc -> pid , dstPgXact -> xid , proclock -> tag .myProc -> pid );
1627+ MTM_TRACE ("%d: %u(%u) waits for %u(%u)\n" , MyProcPid , srcPgXact -> xid , proc -> pid , dstPgXact -> xid , proclock -> tag .myProc -> pid );
16031628 MtmGetGtid (srcPgXact -> xid , & gtid ); /* transaction holding lock */
16041629 ByteBufferAppendInt32 (buf , gtid .node );
16051630 ByteBufferAppendInt32 (buf , gtid .xid );
@@ -1689,6 +1714,7 @@ void MtmRefreshClusterStatus(bool nowait)
16891714
16901715 clique = MtmFindMaxClique (matrix , MtmNodes , & clique_size );
16911716 if (clique_size >= MtmNodes /2 + 1 ) { /* have quorum */
1717+ elog (WARNING , "Find clique %lx, disabledNodeMask %lx" , clique , dtm -> disabledNodeMask );
16921718 MtmLock (LW_EXCLUSIVE );
16931719 mask = ~clique & (((nodemask_t )1 << MtmNodes )- 1 ) & ~dtm -> disabledNodeMask ; /* new disabled nodes mask */
16941720 for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
0 commit comments