6969#define SUBOPT_DISABLE_ON_ERR 0x00000400
7070#define SUBOPT_PASSWORD_REQUIRED 0x00000800
7171#define SUBOPT_RUN_AS_OWNER 0x00001000
72- #define SUBOPT_LSN 0x00002000
73- #define SUBOPT_ORIGIN 0x00004000
72+ #define SUBOPT_FAILOVER 0x00002000
73+ #define SUBOPT_LSN 0x00004000
74+ #define SUBOPT_ORIGIN 0x00008000
75+
7476
7577/* check if the 'val' has 'bits' set */
7678#define IsSet (val , bits ) (((val) & (bits)) == (bits))
@@ -95,6 +97,7 @@ typedef struct SubOpts
9597 bool disableonerr ;
9698 bool passwordrequired ;
9799 bool runasowner ;
100+ bool failover ;
98101 char * origin ;
99102 XLogRecPtr lsn ;
100103} SubOpts ;
@@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
155158 opts -> passwordrequired = true;
156159 if (IsSet (supported_opts , SUBOPT_RUN_AS_OWNER ))
157160 opts -> runasowner = false;
161+ if (IsSet (supported_opts , SUBOPT_FAILOVER ))
162+ opts -> failover = false;
158163 if (IsSet (supported_opts , SUBOPT_ORIGIN ))
159164 opts -> origin = pstrdup (LOGICALREP_ORIGIN_ANY );
160165
@@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
303308 opts -> specified_opts |= SUBOPT_RUN_AS_OWNER ;
304309 opts -> runasowner = defGetBoolean (defel );
305310 }
311+ else if (IsSet (supported_opts , SUBOPT_FAILOVER ) &&
312+ strcmp (defel -> defname , "failover" ) == 0 )
313+ {
314+ if (IsSet (opts -> specified_opts , SUBOPT_FAILOVER ))
315+ errorConflictingDefElem (defel , pstate );
316+
317+ opts -> specified_opts |= SUBOPT_FAILOVER ;
318+ opts -> failover = defGetBoolean (defel );
319+ }
306320 else if (IsSet (supported_opts , SUBOPT_ORIGIN ) &&
307321 strcmp (defel -> defname , "origin" ) == 0 )
308322 {
@@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
388402 errmsg ("%s and %s are mutually exclusive options" ,
389403 "connect = false" , "copy_data = true" )));
390404
405+ if (opts -> failover &&
406+ IsSet (opts -> specified_opts , SUBOPT_FAILOVER ))
407+ ereport (ERROR ,
408+ (errcode (ERRCODE_SYNTAX_ERROR ),
409+ errmsg ("%s and %s are mutually exclusive options" ,
410+ "connect = false" , "failover = true" )));
411+
391412 /* Change the defaults of other options. */
392413 opts -> enabled = false;
393414 opts -> create_slot = false;
@@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
591612 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592613 SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593614 SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN );
615+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN );
595616 parse_subscription_options (pstate , stmt -> options , supported_opts , & opts );
596617
597618 /*
@@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
697718 values [Anum_pg_subscription_subdisableonerr - 1 ] = BoolGetDatum (opts .disableonerr );
698719 values [Anum_pg_subscription_subpasswordrequired - 1 ] = BoolGetDatum (opts .passwordrequired );
699720 values [Anum_pg_subscription_subrunasowner - 1 ] = BoolGetDatum (opts .runasowner );
721+ values [Anum_pg_subscription_subfailover - 1 ] = BoolGetDatum (opts .failover );
700722 values [Anum_pg_subscription_subconninfo - 1 ] =
701723 CStringGetTextDatum (conninfo );
702724 if (opts .slot_name )
@@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
807829 twophase_enabled = true;
808830
809831 walrcv_create_slot (wrconn , opts .slot_name , false, twophase_enabled ,
810- false , CRS_NOEXPORT_SNAPSHOT , NULL );
832+ opts . failover , CRS_NOEXPORT_SNAPSHOT , NULL );
811833
812834 if (twophase_enabled )
813835 UpdateTwoPhaseState (subid , LOGICALREP_TWOPHASE_STATE_ENABLED );
@@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
816838 (errmsg ("created replication slot \"%s\" on publisher" ,
817839 opts .slot_name )));
818840 }
841+
842+ /*
843+ * If the slot_name is specified without the create_slot option,
844+ * it is possible that the user intends to use an existing slot on
845+ * the publisher, so here we alter the failover property of the
846+ * slot to match the failover value in subscription.
847+ *
848+ * We do not need to change the failover to false if the server
849+ * does not support failover (e.g. pre-PG17).
850+ */
851+ else if (opts .slot_name &&
852+ (opts .failover || walrcv_server_version (wrconn ) >= 170000 ))
853+ {
854+ walrcv_alter_slot (wrconn , opts .slot_name , opts .failover );
855+ ereport (NOTICE ,
856+ (errmsg ("changed the failover state of replication slot \"%s\" on publisher to %s" ,
857+ opts .slot_name , opts .failover ? "true" : "false" )));
858+ }
819859 }
820860 PG_FINALLY ();
821861 {
@@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
11321172 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
11331173 SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
11341174 SUBOPT_PASSWORD_REQUIRED |
1135- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN );
1175+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1176+ SUBOPT_ORIGIN );
11361177
11371178 parse_subscription_options (pstate , stmt -> options ,
11381179 supported_opts , & opts );
@@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
12111252 replaces [Anum_pg_subscription_subrunasowner - 1 ] = true;
12121253 }
12131254
1255+ if (IsSet (opts .specified_opts , SUBOPT_FAILOVER ))
1256+ {
1257+ if (!sub -> slotname )
1258+ ereport (ERROR ,
1259+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1260+ errmsg ("cannot set %s for a subscription that does not have a slot name" ,
1261+ "failover" )));
1262+
1263+ /*
1264+ * Do not allow changing the failover state if the
1265+ * subscription is enabled. This is because the failover
1266+ * state of the slot on the publisher cannot be modified
1267+ * if the slot is currently acquired by the apply worker.
1268+ */
1269+ if (sub -> enabled )
1270+ ereport (ERROR ,
1271+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1272+ errmsg ("cannot set %s for enabled subscription" ,
1273+ "failover" )));
1274+
1275+ values [Anum_pg_subscription_subfailover - 1 ] =
1276+ BoolGetDatum (opts .failover );
1277+ replaces [Anum_pg_subscription_subfailover - 1 ] = true;
1278+ }
1279+
12141280 if (IsSet (opts .specified_opts , SUBOPT_ORIGIN ))
12151281 {
12161282 values [Anum_pg_subscription_suborigin - 1 ] =
@@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
14531519 heap_freetuple (tup );
14541520 }
14551521
1522+ /*
1523+ * Try to acquire the connection necessary for altering slot.
1524+ *
1525+ * This has to be at the end because otherwise if there is an error while
1526+ * doing the database operations we won't be able to rollback altered
1527+ * slot.
1528+ */
1529+ if (replaces [Anum_pg_subscription_subfailover - 1 ])
1530+ {
1531+ bool must_use_password ;
1532+ char * err ;
1533+ WalReceiverConn * wrconn ;
1534+
1535+ /* Load the library providing us libpq calls. */
1536+ load_file ("libpqwalreceiver" , false);
1537+
1538+ /* Try to connect to the publisher. */
1539+ must_use_password = sub -> passwordrequired && !sub -> ownersuperuser ;
1540+ wrconn = walrcv_connect (sub -> conninfo , true, must_use_password ,
1541+ sub -> name , & err );
1542+ if (!wrconn )
1543+ ereport (ERROR ,
1544+ (errcode (ERRCODE_CONNECTION_FAILURE ),
1545+ errmsg ("could not connect to the publisher: %s" , err )));
1546+
1547+ PG_TRY ();
1548+ {
1549+ walrcv_alter_slot (wrconn , sub -> slotname , opts .failover );
1550+
1551+ ereport (NOTICE ,
1552+ (errmsg ("changed the failover state of replication slot \"%s\" on publisher to %s" ,
1553+ sub -> slotname , opts .failover ? "true" : "false" )));
1554+ }
1555+ PG_FINALLY ();
1556+ {
1557+ walrcv_disconnect (wrconn );
1558+ }
1559+ PG_END_TRY ();
1560+ }
1561+
14561562 table_close (rel , RowExclusiveLock );
14571563
14581564 ObjectAddressSet (myself , SubscriptionRelationId , subid );
0 commit comments