2525#include "catalog/pg_subscription.h"
2626#include "catalog/pg_subscription_rel.h"
2727#include "funcapi.h"
28+ #include "lib/dshash.h"
2829#include "libpq/pqsignal.h"
2930#include "miscadmin.h"
3031#include "pgstat.h"
@@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
6465 /* Supervisor process. */
6566 pid_t launcher_pid ;
6667
68+ /* Hash table holding last start times of subscriptions' apply workers. */
69+ dsa_handle last_start_dsa ;
70+ dshash_table_handle last_start_dsh ;
71+
6772 /* Background workers. */
6873 LogicalRepWorker workers [FLEXIBLE_ARRAY_MEMBER ];
6974} LogicalRepCtxStruct ;
7075
7176static LogicalRepCtxStruct * LogicalRepCtx ;
7277
78+ /* an entry in the last-start-times shared hash table */
79+ typedef struct LauncherLastStartTimesEntry
80+ {
81+ Oid subid ; /* OID of logrep subscription (hash key) */
82+ TimestampTz last_start_time ; /* last time its apply worker was started */
83+ } LauncherLastStartTimesEntry ;
84+
85+ /* parameters for the last-start-times shared hash table */
86+ static const dshash_parameters dsh_params = {
87+ sizeof (Oid ),
88+ sizeof (LauncherLastStartTimesEntry ),
89+ dshash_memcmp ,
90+ dshash_memhash ,
91+ LWTRANCHE_LAUNCHER_HASH
92+ };
93+
94+ static dsa_area * last_start_times_dsa = NULL ;
95+ static dshash_table * last_start_times = NULL ;
96+
97+ static bool on_commit_launcher_wakeup = false;
98+
99+
73100static void ApplyLauncherWakeup (void );
74101static void logicalrep_launcher_onexit (int code , Datum arg );
75102static void logicalrep_worker_onexit (int code , Datum arg );
76103static void logicalrep_worker_detach (void );
77104static void logicalrep_worker_cleanup (LogicalRepWorker * worker );
78105static int logicalrep_pa_worker_count (Oid subid );
79-
80- static bool on_commit_launcher_wakeup = false;
106+ static void logicalrep_launcher_attach_dshmem (void );
107+ static void ApplyLauncherSetWorkerStartTime (Oid subid , TimestampTz start_time );
108+ static TimestampTz ApplyLauncherGetWorkerStartTime (Oid subid );
81109
82110
83111/*
@@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
894922
895923 memset (LogicalRepCtx , 0 , ApplyLauncherShmemSize ());
896924
925+ LogicalRepCtx -> last_start_dsa = DSM_HANDLE_INVALID ;
926+ LogicalRepCtx -> last_start_dsh = DSM_HANDLE_INVALID ;
927+
897928 /* Initialize memory and spin locks for each worker slot. */
898929 for (slot = 0 ; slot < max_logical_replication_workers ; slot ++ )
899930 {
@@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void)
905936 }
906937}
907938
939+ /*
940+ * Initialize or attach to the dynamic shared hash table that stores the
941+ * last-start times, if not already done.
942+ * This must be called before accessing the table.
943+ */
944+ static void
945+ logicalrep_launcher_attach_dshmem (void )
946+ {
947+ MemoryContext oldcontext ;
948+
949+ /* Quick exit if we already did this. */
950+ if (LogicalRepCtx -> last_start_dsh != DSM_HANDLE_INVALID &&
951+ last_start_times != NULL )
952+ return ;
953+
954+ /* Otherwise, use a lock to ensure only one process creates the table. */
955+ LWLockAcquire (LogicalRepWorkerLock , LW_EXCLUSIVE );
956+
957+ /* Be sure any local memory allocated by DSA routines is persistent. */
958+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
959+
960+ if (LogicalRepCtx -> last_start_dsh == DSM_HANDLE_INVALID )
961+ {
962+ /* Initialize dynamic shared hash table for last-start times. */
963+ last_start_times_dsa = dsa_create (LWTRANCHE_LAUNCHER_DSA );
964+ dsa_pin (last_start_times_dsa );
965+ dsa_pin_mapping (last_start_times_dsa );
966+ last_start_times = dshash_create (last_start_times_dsa , & dsh_params , 0 );
967+
968+ /* Store handles in shared memory for other backends to use. */
969+ LogicalRepCtx -> last_start_dsa = dsa_get_handle (last_start_times_dsa );
970+ LogicalRepCtx -> last_start_dsh = dshash_get_hash_table_handle (last_start_times );
971+ }
972+ else if (!last_start_times )
973+ {
974+ /* Attach to existing dynamic shared hash table. */
975+ last_start_times_dsa = dsa_attach (LogicalRepCtx -> last_start_dsa );
976+ dsa_pin_mapping (last_start_times_dsa );
977+ last_start_times = dshash_attach (last_start_times_dsa , & dsh_params ,
978+ LogicalRepCtx -> last_start_dsh , 0 );
979+ }
980+
981+ MemoryContextSwitchTo (oldcontext );
982+ LWLockRelease (LogicalRepWorkerLock );
983+ }
984+
985+ /*
986+ * Set the last-start time for the subscription.
987+ */
988+ static void
989+ ApplyLauncherSetWorkerStartTime (Oid subid , TimestampTz start_time )
990+ {
991+ LauncherLastStartTimesEntry * entry ;
992+ bool found ;
993+
994+ logicalrep_launcher_attach_dshmem ();
995+
996+ entry = dshash_find_or_insert (last_start_times , & subid , & found );
997+ entry -> last_start_time = start_time ;
998+ dshash_release_lock (last_start_times , entry );
999+ }
1000+
1001+ /*
1002+ * Return the last-start time for the subscription, or 0 if there isn't one.
1003+ */
1004+ static TimestampTz
1005+ ApplyLauncherGetWorkerStartTime (Oid subid )
1006+ {
1007+ LauncherLastStartTimesEntry * entry ;
1008+ TimestampTz ret ;
1009+
1010+ logicalrep_launcher_attach_dshmem ();
1011+
1012+ entry = dshash_find (last_start_times , & subid , false);
1013+ if (entry == NULL )
1014+ return 0 ;
1015+
1016+ ret = entry -> last_start_time ;
1017+ dshash_release_lock (last_start_times , entry );
1018+
1019+ return ret ;
1020+ }
1021+
1022+ /*
1023+ * Remove the last-start-time entry for the subscription, if one exists.
1024+ *
1025+ * This has two use-cases: to remove the entry related to a subscription
1026+ * that's been deleted or disabled (just to avoid leaking shared memory),
1027+ * and to allow immediate restart of an apply worker that has exited
1028+ * due to subscription parameter changes.
1029+ */
1030+ void
1031+ ApplyLauncherForgetWorkerStartTime (Oid subid )
1032+ {
1033+ logicalrep_launcher_attach_dshmem ();
1034+
1035+ (void ) dshash_delete_key (last_start_times , & subid );
1036+ }
1037+
9081038/*
9091039 * Wakeup the launcher on commit if requested.
9101040 */
@@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
9471077void
9481078ApplyLauncherMain (Datum main_arg )
9491079{
950- TimestampTz last_start_time = 0 ;
951-
9521080 ereport (DEBUG1 ,
9531081 (errmsg_internal ("logical replication launcher started" )));
9541082
@@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg)
9761104 ListCell * lc ;
9771105 MemoryContext subctx ;
9781106 MemoryContext oldctx ;
979- TimestampTz now ;
9801107 long wait_time = DEFAULT_NAPTIME_PER_CYCLE ;
9811108
9821109 CHECK_FOR_INTERRUPTS ();
9831110
984- now = GetCurrentTimestamp ();
1111+ /* Use temporary context to avoid leaking memory across cycles. */
1112+ subctx = AllocSetContextCreate (TopMemoryContext ,
1113+ "Logical Replication Launcher sublist" ,
1114+ ALLOCSET_DEFAULT_SIZES );
1115+ oldctx = MemoryContextSwitchTo (subctx );
9851116
986- /* Limit the start retry to once a wal_retrieve_retry_interval */
987- if ( TimestampDifferenceExceeds ( last_start_time , now ,
988- wal_retrieve_retry_interval ) )
1117+ /* Start any missing workers for enabled subscriptions. */
1118+ sublist = get_subscription_list ();
1119+ foreach ( lc , sublist )
9891120 {
990- /* Use temporary context for the database list and worker info. */
991- subctx = AllocSetContextCreate ( TopMemoryContext ,
992- "Logical Replication Launcher sublist" ,
993- ALLOCSET_DEFAULT_SIZES ) ;
994- oldctx = MemoryContextSwitchTo ( subctx ) ;
1121+ Subscription * sub = ( Subscription * ) lfirst ( lc );
1122+ LogicalRepWorker * w ;
1123+ TimestampTz last_start ;
1124+ TimestampTz now ;
1125+ long elapsed ;
9951126
996- /* search for subscriptions to start or stop. */
997- sublist = get_subscription_list ();
998-
999- /* Start the missing workers for enabled subscriptions. */
1000- foreach (lc , sublist )
1001- {
1002- Subscription * sub = (Subscription * ) lfirst (lc );
1003- LogicalRepWorker * w ;
1127+ if (!sub -> enabled )
1128+ continue ;
10041129
1005- if (!sub -> enabled )
1006- continue ;
1007-
1008- LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
1009- w = logicalrep_worker_find (sub -> oid , InvalidOid , false);
1010- LWLockRelease (LogicalRepWorkerLock );
1011-
1012- if (w == NULL )
1013- {
1014- last_start_time = now ;
1015- wait_time = wal_retrieve_retry_interval ;
1130+ LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
1131+ w = logicalrep_worker_find (sub -> oid , InvalidOid , false);
1132+ LWLockRelease (LogicalRepWorkerLock );
10161133
1017- logicalrep_worker_launch (sub -> dbid , sub -> oid , sub -> name ,
1018- sub -> owner , InvalidOid , DSM_HANDLE_INVALID );
1019- }
1020- }
1134+ if (w != NULL )
1135+ continue ; /* worker is running already */
10211136
1022- /* Switch back to original memory context. */
1023- MemoryContextSwitchTo (oldctx );
1024- /* Clean the temporary memory. */
1025- MemoryContextDelete (subctx );
1026- }
1027- else
1028- {
10291137 /*
1030- * The wait in previous cycle was interrupted in less than
1031- * wal_retrieve_retry_interval since last worker was started, this
1032- * usually means crash of the worker, so we should retry in
1033- * wal_retrieve_retry_interval again.
1138+ * If the worker is eligible to start now, launch it. Otherwise,
1139+ * adjust wait_time so that we'll wake up as soon as it can be
1140+ * started.
1141+ *
1142+ * Each subscription's apply worker can only be restarted once per
1143+ * wal_retrieve_retry_interval, so that errors do not cause us to
1144+ * repeatedly restart the worker as fast as possible. In cases
1145+ * where a restart is expected (e.g., subscription parameter
1146+ * changes), another process should remove the last-start entry
1147+ * for the subscription so that the worker can be restarted
1148+ * without waiting for wal_retrieve_retry_interval to elapse.
10341149 */
1035- wait_time = wal_retrieve_retry_interval ;
1150+ last_start = ApplyLauncherGetWorkerStartTime (sub -> oid );
1151+ now = GetCurrentTimestamp ();
1152+ if (last_start == 0 ||
1153+ (elapsed = TimestampDifferenceMilliseconds (last_start , now )) >= wal_retrieve_retry_interval )
1154+ {
1155+ ApplyLauncherSetWorkerStartTime (sub -> oid , now );
1156+ logicalrep_worker_launch (sub -> dbid , sub -> oid , sub -> name ,
1157+ sub -> owner , InvalidOid ,
1158+ DSM_HANDLE_INVALID );
1159+ }
1160+ else
1161+ {
1162+ wait_time = Min (wait_time ,
1163+ wal_retrieve_retry_interval - elapsed );
1164+ }
10361165 }
10371166
1167+ /* Switch back to original memory context. */
1168+ MemoryContextSwitchTo (oldctx );
1169+ /* Clean the temporary memory. */
1170+ MemoryContextDelete (subctx );
1171+
10381172 /* Wait for more work. */
10391173 rc = WaitLatch (MyLatch ,
10401174 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
0 commit comments