@@ -59,6 +59,7 @@ typedef struct
5959 TransactionId minXid ; /* XID of oldest transaction visible by any active transaction (local or global) */
6060 TransactionId nextXid ; /* next XID for local transaction */
6161 size_t nReservedXids ; /* number of XIDs reserved for local transactions */
62+ int64 disabledNodeMask ;
6263 int nNodes ;
6364 pg_atomic_uint32 nReceivers ;
6465 bool initialized ;
@@ -74,13 +75,16 @@ typedef struct
7475#define DTM_SHMEM_SIZE (64*1024*1024)
7576#define DTM_HASH_SIZE 1003
7677
78+ #define BIT_SET (mask , bit ) ((mask) & ((int64)1 << (bit)))
79+
7780void _PG_init (void );
7881void _PG_fini (void );
7982
8083PG_MODULE_MAGIC ;
8184
8285PG_FUNCTION_INFO_V1 (mm_start_replication );
8386PG_FUNCTION_INFO_V1 (mm_stop_replication );
87+ PG_FUNCTION_INFO_V1 (mm_disable_node );
8488
8589static Snapshot DtmGetSnapshot (Snapshot snapshot );
8690static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
@@ -108,6 +112,7 @@ static void DtmBackgroundWorker(Datum arg);
108112static void MMMarkTransAsLocal (TransactionId xid );
109113static BgwPool * MMPoolConstructor (void );
110114static bool MMRunUtilityStmt (PGconn * conn , char const * sql );
115+ static void MMBroadcastUtilityStmt (char const * sql , bool ignoreError );
111116
112117static HTAB * xid_in_doubt ;
113118static HTAB * local_trans ;
@@ -737,6 +742,7 @@ static void DtmInitialize()
737742 dtm -> nReservedXids = 0 ;
738743 dtm -> minXid = InvalidTransactionId ;
739744 dtm -> nNodes = MMNodes ;
745+ dtm -> disabledNodeMask = 0 ;
740746 pg_atomic_write_u32 (& dtm -> nReceivers , 0 );
741747 dtm -> initialized = false;
742748 BgwPoolInit (& dtm -> pool , MMExecutor , MMDatabaseName , MMQueueSize );
@@ -1209,6 +1215,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
12091215 PG_RETURN_VOID ();
12101216}
12111217
1218+ Datum
1219+ mm_disable_node (PG_FUNCTION_ARGS )
1220+ {
1221+ int nodeId = PG_GETARG_INT32 (0 );
1222+ if (!BIT_SET (dtm -> disabledNodeMask , nodeId ))
1223+ {
1224+ dtm -> disabledNodeMask |= ((int64 )1 << nodeId );
1225+ dtm -> nNodes -= 1 ;
1226+ if (!IsTransactionBlock ())
1227+ {
1228+ MMBroadcastUtilityStmt (psprintf ("select mm_disable_node(%d)" , nodeId ), true);
1229+ }
1230+ }
1231+ PG_RETURN_VOID ();
1232+ }
1233+
12121234/*
12131235 * Execute statement with specified parameters and check its result
12141236 */
@@ -1224,6 +1246,95 @@ static bool MMRunUtilityStmt(PGconn* conn, char const* sql)
12241246 return ret ;
12251247}
12261248
1249+ static void MMBroadcastUtilityStmt (char const * sql , bool ignoreError )
1250+ {
1251+ char * conn_str = pstrdup (MMConnStrs );
1252+ char * conn_str_end = conn_str + strlen (conn_str );
1253+ int i = 0 ;
1254+ int64 disabledNodeMask = dtm -> disabledNodeMask ;
1255+ int failedNode = -1 ;
1256+ char const * errorMsg = NULL ;
1257+ PGconn * * conns = palloc0 (sizeof (PGconn * )* MMNodes );
1258+
1259+ while (conn_str < conn_str_end )
1260+ {
1261+ char * p = strchr (conn_str , ',' );
1262+ if (p == NULL ) {
1263+ p = conn_str_end ;
1264+ }
1265+ * p = '\0' ;
1266+ if (!BIT_SET (disabledNodeMask , i ))
1267+ {
1268+ conns [i ] = PQconnectdb (conn_str );
1269+ if (PQstatus (conns [i ]) != CONNECTION_OK )
1270+ {
1271+ if (ignoreError )
1272+ {
1273+ PQfinish (conns [i ]);
1274+ conns [i ] = NULL ;
1275+ } else {
1276+ failedNode = i ;
1277+ do {
1278+ PQfinish (conns [i ]);
1279+ } while (-- i >= 0 );
1280+ elog (ERROR , "Failed to establish connection '%s' to node %d" , conn_str , failedNode );
1281+ }
1282+ }
1283+ }
1284+ conn_str = p + 1 ;
1285+ i += 1 ;
1286+ }
1287+ Assert (i == MMNodes );
1288+
1289+ for (i = 0 ; i < MMNodes ; i ++ )
1290+ {
1291+ if (conns [i ])
1292+ {
1293+ if (!MMRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" ) && !ignoreError )
1294+ {
1295+ errorMsg = "Failed to start transaction at node %d" ;
1296+ failedNode = i ;
1297+ break ;
1298+ }
1299+ if (!MMRunUtilityStmt (conns [i ], sql ) && !ignoreError )
1300+ {
1301+ errorMsg = "Failed to run command at node %d" ;
1302+ failedNode = i ;
1303+ break ;
1304+ }
1305+ }
1306+ }
1307+ if (failedNode >= 0 && !ignoreError )
1308+ {
1309+ for (i = 0 ; i < MMNodes ; i ++ )
1310+ {
1311+ if (conns [i ])
1312+ {
1313+ MMRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" );
1314+ }
1315+ }
1316+ } else {
1317+ for (i = 0 ; i < MMNodes ; i ++ )
1318+ {
1319+ if (conns [i ] && !MMRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" ) && !ignoreError )
1320+ {
1321+ errorMsg = "Commit failed at node %d" ;
1322+ failedNode = i ;
1323+ }
1324+ }
1325+ }
1326+ for (i = 0 ; i < MMNodes ; i ++ )
1327+ {
1328+ if (conns [i ])
1329+ {
1330+ PQfinish (conns [i ]);
1331+ }
1332+ }
1333+ if (!ignoreError && failedNode >= 0 )
1334+ {
1335+ elog (ERROR , errorMsg , failedNode + 1 );
1336+ }
1337+ }
12271338
12281339static void MMProcessUtility (Node * parsetree , const char * queryString ,
12291340 ProcessUtilityContext context , ParamListInfo params ,
@@ -1267,67 +1378,7 @@ static void MMProcessUtility(Node *parsetree, const char *queryString,
12671378 MMIsDistributedTrans = false;
12681379 }
12691380 } else {
1270- char * conn_str = pstrdup (MMConnStrs );
1271- char * conn_str_end = conn_str + strlen (conn_str );
1272- int i = 0 ;
1273- int failedNode = -1 ;
1274- char const * errorMsg = NULL ;
1275- PGconn * * conns ;
1276- conns = palloc (sizeof (PGconn * )* MMNodes );
1277-
1278- while (conn_str < conn_str_end ) {
1279- char * p = strchr (conn_str , ',' );
1280- if (p == NULL ) {
1281- p = conn_str_end ;
1282- }
1283- * p = '\0' ;
1284- conns [i ] = PQconnectdb (conn_str );
1285- if (PQstatus (conns [i ]) != CONNECTION_OK )
1286- {
1287- failedNode = i ;
1288- do {
1289- PQfinish (conns [i ]);
1290- } while (-- i >= 0 );
1291- elog (ERROR , "Failed to establish connection '%s' to node %d" , conn_str , failedNode );
1292- }
1293- conn_str = p + 1 ;
1294- i += 1 ;
1295- }
1296- Assert (i == MMNodes );
1297-
1298- for (i = 0 ; i < MMNodes ; i ++ ) {
1299- if (!MMRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" ))
1300- {
1301- errorMsg = "Failed to start transaction at node %d" ;
1302- failedNode = i ;
1303- break ;
1304- }
1305- if (!MMRunUtilityStmt (conns [i ], queryString ))
1306- {
1307- errorMsg = "Failed to run command at node %d" ;
1308- failedNode = i ;
1309- break ;
1310- }
1311- }
1312- if (failedNode >= 0 )
1313- {
1314- for (i = 0 ; i < MMNodes ; i ++ ) {
1315- MMRunUtilityStmt (conns [i ], "ROLLBACK TRANSACTION" );
1316- }
1317- } else {
1318- for (i = 0 ; i < MMNodes ; i ++ ) {
1319- if (!MMRunUtilityStmt (conns [i ], "COMMIT TRANSACTION" )) {
1320- errorMsg = "Commit failed at node %d" ;
1321- failedNode = i ;
1322- }
1323- }
1324- }
1325- for (i = 0 ; i < MMNodes ; i ++ ) {
1326- PQfinish (conns [i ]);
1327- }
1328- if (failedNode >= 0 ) {
1329- elog (ERROR , errorMsg , failedNode + 1 );
1330- }
1381+ MMBroadcastUtilityStmt (queryString , false);
13311382 }
13321383}
13331384static void
0 commit comments