3838#include "replication/logicallauncher.h"
3939#include "replication/logicalworker.h"
4040#include "replication/slot.h"
41+ #include "replication/walreceiver.h"
4142#include "replication/worker_internal.h"
4243
4344#include "storage/ipc.h"
@@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void);
7677static void logicalrep_launcher_onexit (int code , Datum arg );
7778static void logicalrep_worker_onexit (int code , Datum arg );
7879static void logicalrep_worker_detach (void );
80+ static void logicalrep_worker_cleanup (LogicalRepWorker * worker );
7981
8082/* Flags set by signal handlers */
8183volatile sig_atomic_t got_SIGHUP = false;
@@ -154,34 +156,49 @@ get_subscription_list(void)
154156/*
155157 * Wait for a background worker to start up and attach to the shmem context.
156158 *
157- * This is like WaitForBackgroundWorkerStartup(), except that we wait for
158- * attaching, not just start and we also just exit if postmaster died .
159+ * This is only needed for cleaning up the shared memory in case the worker
160+ * fails to attach .
159161 */
160- static bool
162+ static void
161163WaitForReplicationWorkerAttach (LogicalRepWorker * worker ,
162164 BackgroundWorkerHandle * handle )
163165{
164166 BgwHandleStatus status ;
165167 int rc ;
168+ uint16 generation ;
169+
170+ /* Remember generation for future identification. */
171+ generation = worker -> generation ;
166172
167173 for (;;)
168174 {
169175 pid_t pid ;
170176
171177 CHECK_FOR_INTERRUPTS ();
172178
179+ LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
180+
181+ /* Worker either died or has started; no need to do anything. */
182+ if (!worker -> in_use || worker -> proc )
183+ {
184+ LWLockRelease (LogicalRepWorkerLock );
185+ return ;
186+ }
187+
188+ LWLockRelease (LogicalRepWorkerLock );
189+
190+ /* Check if worker has died before attaching, and clean up after it. */
173191 status = GetBackgroundWorkerPid (handle , & pid );
174192
175- /*
176- * Worker started and attached to our shmem. This check is safe
177- * because only launcher ever starts the workers, so nobody can steal
178- * the worker slot.
179- */
180- if (status == BGWH_STARTED && worker -> proc )
181- return true;
182- /* Worker didn't start or died before attaching to our shmem. */
183193 if (status == BGWH_STOPPED )
184- return false;
194+ {
195+ LWLockAcquire (LogicalRepWorkerLock , LW_EXCLUSIVE );
196+ /* Ensure that this was indeed the worker we waited for. */
197+ if (generation == worker -> generation )
198+ logicalrep_worker_cleanup (worker );
199+ LWLockRelease (LogicalRepWorkerLock );
200+ return ;
201+ }
185202
186203 /*
187204 * We need timeout because we generally don't get notified via latch
@@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
197214 ResetLatch (MyLatch );
198215 }
199216
200- return false ;
217+ return ;
201218}
202219
203220/*
@@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
216233 for (i = 0 ; i < max_logical_replication_workers ; i ++ )
217234 {
218235 LogicalRepWorker * w = & LogicalRepCtx -> workers [i ];
219- if (w -> subid == subid && w -> relid == relid &&
220- (!only_running || ( w -> proc && IsBackendPid ( w -> proc -> pid )) ))
236+ if (w -> in_use && w -> subid == subid && w -> relid == relid &&
237+ (!only_running || w -> proc ))
221238 {
222239 res = w ;
223240 break ;
@@ -236,8 +253,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
236253{
237254 BackgroundWorker bgw ;
238255 BackgroundWorkerHandle * bgw_handle ;
256+ int i ;
239257 int slot ;
240258 LogicalRepWorker * worker = NULL ;
259+ int nsyncworkers ;
260+ TimestampTz now ;
241261
242262 ereport (LOG ,
243263 (errmsg ("starting logical replication worker for subscription \"%s\"" ,
@@ -255,17 +275,73 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
255275 */
256276 LWLockAcquire (LogicalRepWorkerLock , LW_EXCLUSIVE );
257277
278+ retry :
258279 /* Find unused worker slot. */
259- for (slot = 0 ; slot < max_logical_replication_workers ; slot ++ )
280+ for (i = 0 ; i < max_logical_replication_workers ; i ++ )
260281 {
261- if (!LogicalRepCtx -> workers [slot ].proc )
282+ LogicalRepWorker * w = & LogicalRepCtx -> workers [i ];
283+
284+ if (!w -> in_use )
262285 {
263- worker = & LogicalRepCtx -> workers [slot ];
286+ worker = w ;
287+ slot = i ;
264288 break ;
265289 }
266290 }
267291
268- /* Bail if not found */
292+ nsyncworkers = logicalrep_sync_worker_count (subid );
293+
294+ now = GetCurrentTimestamp ();
295+
296+ /*
297+ * If we didn't find a free slot, try to do garbage collection. The
298+ * reason we do this is because if some worker failed to start up and its
299+ * parent has crashed while waiting, the in_use state was never cleared.
300+ */
301+ if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription )
302+ {
303+ bool did_cleanup = false;
304+
305+ for (i = 0 ; i < max_logical_replication_workers ; i ++ )
306+ {
307+ LogicalRepWorker * w = & LogicalRepCtx -> workers [i ];
308+
309+ /*
310+ * If the worker was marked in use but didn't manage to attach in
311+ * time, clean it up.
312+ */
313+ if (w -> in_use && !w -> proc &&
314+ TimestampDifferenceExceeds (w -> launch_time , now ,
315+ wal_receiver_timeout ))
316+ {
317+ elog (WARNING ,
318+ "logical replication worker for subscription \"%d\" took too long to start; canceled" ,
319+ worker -> subid );
320+
321+ logicalrep_worker_cleanup (w );
322+ did_cleanup = true;
323+ }
324+ }
325+
326+ if (did_cleanup )
327+ goto retry ;
328+ }
329+
330+ /*
331+ * If we reached the sync worker limit per subscription, just exit
332+ * silently as we might get here because of an otherwise harmless race
333+ * condition.
334+ */
335+ if (nsyncworkers >= max_sync_workers_per_subscription )
336+ {
337+ LWLockRelease (LogicalRepWorkerLock );
338+ return ;
339+ }
340+
341+ /*
342+ * However if there are no more free worker slots, inform user about it
343+ * before exiting.
344+ */
269345 if (worker == NULL )
270346 {
271347 LWLockRelease (LogicalRepWorkerLock );
@@ -276,7 +352,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
276352 return ;
277353 }
278354
279- /* Prepare the worker info. */
355+ /* Prepare the worker slot. */
356+ worker -> launch_time = now ;
357+ worker -> in_use = true;
358+ worker -> generation ++ ;
280359 worker -> proc = NULL ;
281360 worker -> dbid = dbid ;
282361 worker -> userid = userid ;
331410logicalrep_worker_stop (Oid subid , Oid relid )
332411{
333412 LogicalRepWorker * worker ;
413+ uint16 generation ;
334414
335415 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
336416
@@ -343,11 +423,17 @@ logicalrep_worker_stop(Oid subid, Oid relid)
343423 return ;
344424 }
345425
426+ /*
427+ * Remember which generation was our worker so we can check if what we see
428+ * is still the same one.
429+ */
430+ generation = worker -> generation ;
431+
346432 /*
347433 * If we found worker but it does not have proc set it is starting up,
348434 * wait for it to finish and then kill it.
349435 */
350- while (worker && !worker -> proc )
436+ while (worker -> in_use && !worker -> proc )
351437 {
352438 int rc ;
353439
@@ -370,10 +456,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
370456 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
371457
372458 /*
373- * Worker is no longer associated with subscription. It must have
374- * exited, nothing more for us to do.
459+ * Check whether the worker slot is no longer used, which would mean
460+ * that the worker has exited, or whether the worker generation is
461+ * different, meaning that a different worker has taken the slot.
375462 */
376- if (worker -> subid == InvalidOid )
463+ if (! worker -> in_use || worker -> generation != generation )
377464 {
378465 LWLockRelease (LogicalRepWorkerLock );
379466 return ;
@@ -394,7 +481,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
394481 int rc ;
395482
396483 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
397- if (!worker -> proc )
484+ if (!worker -> proc || worker -> generation != generation )
398485 {
399486 LWLockRelease (LogicalRepWorkerLock );
400487 break ;
@@ -453,11 +540,23 @@ logicalrep_worker_attach(int slot)
453540 Assert (slot >= 0 && slot < max_logical_replication_workers );
454541 MyLogicalRepWorker = & LogicalRepCtx -> workers [slot ];
455542
543+ if (!MyLogicalRepWorker -> in_use )
544+ {
545+ LWLockRelease (LogicalRepWorkerLock );
546+ ereport (ERROR ,
547+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
548+ errmsg ("logical replication worker slot %d is empty, cannot attach" ,
549+ slot )));
550+ }
551+
456552 if (MyLogicalRepWorker -> proc )
553+ {
554+ LWLockRelease (LogicalRepWorkerLock );
457555 ereport (ERROR ,
458556 (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
459- errmsg ("logical replication worker slot %d already used by "
460- "another worker" , slot )));
557+ errmsg ("logical replication worker slot %d is already used by "
558+ "another worker, cannot attach" , slot )));
559+ }
461560
462561 MyLogicalRepWorker -> proc = MyProc ;
463562 before_shmem_exit (logicalrep_worker_onexit , (Datum ) 0 );
@@ -474,14 +573,27 @@ logicalrep_worker_detach(void)
474573 /* Block concurrent access. */
475574 LWLockAcquire (LogicalRepWorkerLock , LW_EXCLUSIVE );
476575
477- MyLogicalRepWorker -> dbid = InvalidOid ;
478- MyLogicalRepWorker -> userid = InvalidOid ;
479- MyLogicalRepWorker -> subid = InvalidOid ;
480- MyLogicalRepWorker -> proc = NULL ;
576+ logicalrep_worker_cleanup (MyLogicalRepWorker );
481577
482578 LWLockRelease (LogicalRepWorkerLock );
483579}
484580
581+ /*
582+ * Clean up worker info.
583+ */
584+ static void
585+ logicalrep_worker_cleanup (LogicalRepWorker * worker )
586+ {
587+ Assert (LWLockHeldByMeInMode (LogicalRepWorkerLock , LW_EXCLUSIVE ));
588+
589+ worker -> in_use = false;
590+ worker -> proc = NULL ;
591+ worker -> dbid = InvalidOid ;
592+ worker -> userid = InvalidOid ;
593+ worker -> subid = InvalidOid ;
594+ worker -> relid = InvalidOid ;
595+ }
596+
485597/*
486598 * Cleanup function for logical replication launcher.
487599 *
@@ -732,12 +844,11 @@ ApplyLauncherMain(Datum main_arg)
732844
733845 if (sub -> enabled && w == NULL )
734846 {
735- logicalrep_worker_launch (sub -> dbid , sub -> oid , sub -> name ,
736- sub -> owner , InvalidOid );
737847 last_start_time = now ;
738848 wait_time = wal_retrieve_retry_interval ;
739- /* Limit to one worker per mainloop cycle. */
740- break ;
849+
850+ logicalrep_worker_launch (sub -> dbid , sub -> oid , sub -> name ,
851+ sub -> owner , InvalidOid );
741852 }
742853 }
743854
0 commit comments