|
44 | 44 | #include "storage/proc.h" |
45 | 45 | #include "utils/syscache.h" |
46 | 46 | #include "replication/walsender.h" |
| 47 | +#include "replication/slot.h" |
47 | 48 | #include "port/atomics.h" |
48 | 49 | #include "tcop/utility.h" |
49 | 50 | #include "sockhub/sockhub.h" |
@@ -84,7 +85,7 @@ PG_MODULE_MAGIC; |
84 | 85 |
|
85 | 86 | PG_FUNCTION_INFO_V1(mm_start_replication); |
86 | 87 | PG_FUNCTION_INFO_V1(mm_stop_replication); |
87 | | -PG_FUNCTION_INFO_V1(mm_disable_node); |
| 88 | +PG_FUNCTION_INFO_V1(mm_drop_node); |
88 | 89 |
|
89 | 90 | static Snapshot DtmGetSnapshot(Snapshot snapshot); |
90 | 91 | static void DtmMergeWithGlobalSnapshot(Snapshot snapshot); |
@@ -1216,17 +1217,26 @@ mm_stop_replication(PG_FUNCTION_ARGS) |
1216 | 1217 | } |
1217 | 1218 |
|
1218 | 1219 | Datum |
1219 | | -mm_disable_node(PG_FUNCTION_ARGS) |
| 1220 | +mm_drop_node(PG_FUNCTION_ARGS) |
1220 | 1221 | { |
1221 | 1222 | int nodeId = PG_GETARG_INT32(0); |
1222 | | - if (!BIT_SET(dtm->disabledNodeMask, nodeId)) |
| 1223 | + bool dropSlot = PG_GETARG_BOOL(1); |
| 1224 | + if (!BIT_SET(dtm->disabledNodeMask, nodeId-1)) |
1223 | 1225 | { |
1224 | | - dtm->disabledNodeMask |= ((int64)1 << nodeId); |
| 1226 | + if (nodeId <= 0 || nodeId > dtm->nNodes) |
| 1227 | + { |
| 1228 | + elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, dtm->nNodes); |
| 1229 | + } |
| 1230 | + dtm->disabledNodeMask |= ((int64)1 << (nodeId-1)); |
1225 | 1231 | dtm->nNodes -= 1; |
1226 | 1232 | if (!IsTransactionBlock()) |
1227 | 1233 | { |
1228 | | - MMBroadcastUtilityStmt(psprintf("select mm_disable_node(%d)", nodeId), true); |
| 1234 | + MMBroadcastUtilityStmt(psprintf("select mm_drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true); |
1229 | 1235 | } |
| 1236 | + if (dropSlot) |
| 1237 | + { |
| 1238 | + ReplicationSlotDrop(psprintf("mm_slot_%d", nodeId)); |
| 1239 | + } |
1230 | 1240 | } |
1231 | 1241 | PG_RETURN_VOID(); |
1232 | 1242 | } |
|
0 commit comments