@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
9292 const char * slot_name );
9393static void pg_ctl_status (const char * pg_ctl_cmd , int rc );
9494static void start_standby_server (const struct CreateSubscriberOptions * opt ,
95- bool restricted_access );
95+ bool restricted_access ,
96+ bool restrict_logical_worker );
9697static void stop_standby_server (const char * datadir );
9798static void wait_for_end_recovery (const char * conninfo ,
9899 const struct CreateSubscriberOptions * opt );
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
102103static void set_replication_progress (PGconn * conn , const struct LogicalRepInfo * dbinfo ,
103104 const char * lsn );
104105static void enable_subscription (PGconn * conn , const struct LogicalRepInfo * dbinfo );
106+ static void check_and_drop_existing_subscriptions (PGconn * conn ,
107+ const struct LogicalRepInfo * dbinfo );
108+ static void drop_existing_subscriptions (PGconn * conn , const char * subname ,
109+ const char * dbname );
105110
106111#define USEC_PER_SEC 1000000
107112#define WAIT_INTERVAL 1 /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
10251030 exit (1 );
10261031}
10271032
1033+ /*
1034+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
1035+ * the primary (publisher node) and the newly created subscriber. We
1036+ * shouldn't drop the associated slot as that would be used by the publisher
1037+ * node.
1038+ */
1039+ static void
1040+ drop_existing_subscriptions (PGconn * conn , const char * subname , const char * dbname )
1041+ {
1042+ PQExpBuffer query = createPQExpBuffer ();
1043+ PGresult * res ;
1044+
1045+ Assert (conn != NULL );
1046+
1047+ /*
1048+ * Construct a query string. These commands are allowed to be executed
1049+ * within a transaction.
1050+ */
1051+ appendPQExpBuffer (query , "ALTER SUBSCRIPTION %s DISABLE;" ,
1052+ subname );
1053+ appendPQExpBuffer (query , " ALTER SUBSCRIPTION %s SET (slot_name = NONE);" ,
1054+ subname );
1055+ appendPQExpBuffer (query , " DROP SUBSCRIPTION %s;" , subname );
1056+
1057+ pg_log_info ("dropping subscription \"%s\" on database \"%s\"" ,
1058+ subname , dbname );
1059+
1060+ if (!dry_run )
1061+ {
1062+ res = PQexec (conn , query -> data );
1063+
1064+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1065+ {
1066+ pg_log_error ("could not drop a subscription \"%s\" settings: %s" ,
1067+ subname , PQresultErrorMessage (res ));
1068+ disconnect_database (conn , true);
1069+ }
1070+
1071+ PQclear (res );
1072+ }
1073+
1074+ destroyPQExpBuffer (query );
1075+ }
1076+
1077+ /*
1078+ * Retrieve and drop the pre-existing subscriptions.
1079+ */
1080+ static void
1081+ check_and_drop_existing_subscriptions (PGconn * conn ,
1082+ const struct LogicalRepInfo * dbinfo )
1083+ {
1084+ PQExpBuffer query = createPQExpBuffer ();
1085+ char * dbname ;
1086+ PGresult * res ;
1087+
1088+ Assert (conn != NULL );
1089+
1090+ dbname = PQescapeLiteral (conn , dbinfo -> dbname , strlen (dbinfo -> dbname ));
1091+
1092+ appendPQExpBuffer (query ,
1093+ "SELECT s.subname FROM pg_catalog.pg_subscription s "
1094+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095+ "WHERE d.datname = %s" ,
1096+ dbname );
1097+ res = PQexec (conn , query -> data );
1098+
1099+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
1100+ {
1101+ pg_log_error ("could not obtain pre-existing subscriptions: %s" ,
1102+ PQresultErrorMessage (res ));
1103+ disconnect_database (conn , true);
1104+ }
1105+
1106+ for (int i = 0 ; i < PQntuples (res ); i ++ )
1107+ drop_existing_subscriptions (conn , PQgetvalue (res , i , 0 ),
1108+ dbinfo -> dbname );
1109+
1110+ PQclear (res );
1111+ destroyPQExpBuffer (query );
1112+ }
1113+
10281114/*
10291115 * Create the subscriptions, adjust the initial location for logical
10301116 * replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
10401126 /* Connect to subscriber. */
10411127 conn = connect_database (dbinfo [i ].subconninfo , true);
10421128
1129+ /*
1130+ * We don't need the pre-existing subscriptions on the newly formed
1131+ * subscriber. They can connect to other publisher nodes and either
1132+ * get some unwarranted data or can lead to ERRORs in connecting to
1133+ * such nodes.
1134+ */
1135+ check_and_drop_existing_subscriptions (conn , & dbinfo [i ]);
1136+
10431137 /*
10441138 * Since the publication was created before the consistent LSN, it is
10451139 * available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
13141408}
13151409
13161410static void
1317- start_standby_server (const struct CreateSubscriberOptions * opt , bool restricted_access )
1411+ start_standby_server (const struct CreateSubscriberOptions * opt , bool restricted_access ,
1412+ bool restrict_logical_worker )
13181413{
13191414 PQExpBuffer pg_ctl_cmd = createPQExpBuffer ();
13201415 int rc ;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
13431438 if (opt -> config_file != NULL )
13441439 appendPQExpBuffer (pg_ctl_cmd , " -o \"-c config_file=%s\"" ,
13451440 opt -> config_file );
1441+
1442+ /* Suppress to start logical replication if requested */
1443+ if (restrict_logical_worker )
1444+ appendPQExpBuffer (pg_ctl_cmd , " -o \"-c max_logical_replication_workers=0\"" );
1445+
13461446 pg_log_debug ("pg_ctl command is: %s" , pg_ctl_cmd -> data );
13471447 rc = system (pg_ctl_cmd -> data );
13481448 pg_ctl_status (pg_ctl_cmd -> data , rc );
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
20672167 * transformation steps.
20682168 */
20692169 pg_log_info ("starting the standby with command-line options" );
2070- start_standby_server (& opt , true);
2170+ start_standby_server (& opt , true, false );
20712171
20722172 /* Check if the standby server is ready for logical replication */
20732173 check_subscriber (dbinfo );
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
20982198
20992199 /*
21002200 * Start subscriber so the recovery parameters will take effect. Wait
2101- * until accepting connections.
2201+ * until accepting connections. We don't want to start logical replication
2202+ * during setup.
21022203 */
21032204 pg_log_info ("starting the subscriber" );
2104- start_standby_server (& opt , true);
2205+ start_standby_server (& opt , true, true );
21052206
21062207 /* Waiting the subscriber to be promoted */
21072208 wait_for_end_recovery (dbinfo [0 ].subconninfo , & opt );
0 commit comments