2727 * synchronization has finished.
2828 *
2929 * The stream position synchronization works in multiple steps.
30- * - Sync finishes copy and sets table state as SYNCWAIT and waits
31- * for state to change in a loop.
30+ * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31+ * state to change in a loop.
3232 * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33- * When the desired state appears it will compare its position in the
34- * stream with the SYNCWAIT position and based on that changes the
35- * state to based on following rules:
36- * - if the apply is in front of the sync in the WAL stream the new
37- * state is set to CATCHUP and apply loops until the sync process
38- * catches up to the same LSN as apply
39- * - if the sync is in front of the apply in the WAL stream the new
40- * state is set to SYNCDONE
41- * - if both apply and sync are at the same position in the WAL stream
42- * the state of the table is set to READY
43- * - If the state was set to CATCHUP sync will read the stream and
44- * apply changes until it catches up to the specified stream
45- * position and then sets state to READY and signals apply that it
46- * can stop waiting and exits, if the state was set to something
47- * else than CATCHUP the sync process will simply end.
48- * - If the state was set to SYNCDONE by apply, the apply will
49- * continue tracking the table until it reaches the SYNCDONE stream
50- * position at which point it sets state to READY and stops tracking.
33+ * When the desired state appears, it will set the worker state to
34+ * CATCHUP and starts loop-waiting until either the table state is set
35+ * to SYNCDONE or the sync worker exits.
36+ * - After the sync worker has seen the state change to CATCHUP, it will
37+ * read the stream and apply changes (acting like an apply worker) until
38+ * it catches up to the specified stream position. Then it sets the
39+ * state to SYNCDONE. There might be zero changes applied between
40+ * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41+ * apply worker.
42+ * - Once the state was set to SYNCDONE, the apply will continue tracking
43+ * the table until it reaches the SYNCDONE stream position, at which
44+ * point it sets state to READY and stops tracking. Again, there might
45+ * be zero changes in between.
46+ *
47+ * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48+ * SYNCDONE -> READY.
5149 *
5250 * The catalog pg_subscription_rel is used to keep information about
53- * subscribed tables and their state and some transient state during
54- * data synchronization is kept in shared memory.
51+ * subscribed tables and their state. Some transient state during data
52+ * synchronization is kept in shared memory. The states SYNCWAIT and
53+ * CATCHUP only appear in memory.
5554 *
5655 * Example flows look like this:
5756 * - Apply is in front:
5857 * sync:8
59- * -> set SYNCWAIT
58+ * -> set in memory SYNCWAIT
6059 * apply:10
61- * -> set CATCHUP
60+ * -> set in memory CATCHUP
6261 * -> enter wait-loop
6362 * sync:10
64- * -> set READY
63+ * -> set in catalog SYNCDONE
6564 * -> exit
6665 * apply:10
6766 * -> exit wait-loop
6867 * -> continue rep
68+ * apply:11
69+ * -> set in catalog READY
6970 * - Sync in front:
7071 * sync:10
71- * -> set SYNCWAIT
72+ * -> set in memory SYNCWAIT
7273 * apply:8
73- * -> set SYNCDONE
74+ * -> set in memory CATCHUP
7475 * -> continue per-table filtering
7576 * sync:10
77+ * -> set in catalog SYNCDONE
7678 * -> exit
7779 * apply:10
78- * -> set READY
80+ * -> set in catalog READY
7981 * -> stop per-table filtering
8082 * -> continue rep
8183 *-------------------------------------------------------------------------
100102#include "replication/walreceiver.h"
101103#include "replication/worker_internal.h"
102104
105+ #include "utils/snapmgr.h"
103106#include "storage/ipc.h"
104107
105108#include "utils/builtins.h"
@@ -130,69 +133,107 @@ finish_sync_worker(void)
130133 /* And flush all writes. */
131134 XLogFlush (GetXLogWriteRecPtr ());
132135
133- /* Find the main apply worker and signal it. */
134- logicalrep_worker_wakeup (MyLogicalRepWorker -> subid , InvalidOid );
135-
136136 StartTransactionCommand ();
137137 ereport (LOG ,
138138 (errmsg ("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished" ,
139139 MySubscription -> name , get_rel_name (MyLogicalRepWorker -> relid ))));
140140 CommitTransactionCommand ();
141141
142+ /* Find the main apply worker and signal it. */
143+ logicalrep_worker_wakeup (MyLogicalRepWorker -> subid , InvalidOid );
144+
142145 /* Stop gracefully */
143146 proc_exit (0 );
144147}
145148
146149/*
147- * Wait until the table synchronization change.
150+ * Wait until the relation synchronization state is set in catalog to the
151+ * expected one.
148152 *
149- * If called from apply worker, it will wait for the synchronization worker to
150- * change table state in shmem. If called from synchronization worker, it
151- * will wait for apply worker to change table state in shmem.
153+ * Used when transitioning from CATCHUP state to SYNCDONE.
152154 *
153- * Returns false if the opposite worker has disappeared or the table state has
154- * been reset.
155+ * Returns false if the synchronization worker has disappeared or the table state
156+ * has been reset.
155157 */
156158static bool
157- wait_for_sync_status_change (Oid relid , char origstate )
159+ wait_for_relation_state_change (Oid relid , char expected_state )
158160{
159161 int rc ;
160- char state = origstate ;
162+ char state ;
161163
162164 for (;;)
163165 {
164166 LogicalRepWorker * worker ;
167+ XLogRecPtr statelsn ;
165168
166169 CHECK_FOR_INTERRUPTS ();
167170
171+ /* XXX use cache invalidation here to improve performance? */
172+ PushActiveSnapshot (GetLatestSnapshot ());
173+ state = GetSubscriptionRelState (MyLogicalRepWorker -> subid ,
174+ relid , & statelsn , true);
175+ PopActiveSnapshot ();
176+
177+ if (state == SUBREL_STATE_UNKNOWN )
178+ return false;
179+
180+ if (state == expected_state )
181+ return true;
182+
183+ /* Check if the sync worker is still running and bail if not. */
168184 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
169185
170186 /* Check if the opposite worker is still running and bail if not. */
171187 worker = logicalrep_worker_find (MyLogicalRepWorker -> subid ,
172188 am_tablesync_worker () ? InvalidOid : relid ,
173189 false);
190+ LWLockRelease (LogicalRepWorkerLock );
174191 if (!worker )
175- {
176- LWLockRelease (LogicalRepWorkerLock );
177192 return false;
178- }
179193
180- /*
181- * If I'm the synchronization worker, look at my own state. Otherwise
182- * look at the state of the synchronization worker we found above.
183- */
184- if (am_tablesync_worker ())
185- worker = MyLogicalRepWorker ;
194+ rc = WaitLatch (& MyProc -> procLatch ,
195+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
196+ 1000L , WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE );
186197
187- Assert (worker -> relid == relid );
188- state = worker -> relstate ;
198+ /* emergency bailout if postmaster has died */
199+ if (rc & WL_POSTMASTER_DEATH )
200+ proc_exit (1 );
189201
190- LWLockRelease (LogicalRepWorkerLock );
202+ ResetLatch (& MyProc -> procLatch );
203+ }
191204
192- if (state == SUBREL_STATE_UNKNOWN )
205+ return false;
206+ }
207+
208+ /*
209+ * Wait until the the apply worker changes the state of our synchronization
210+ * worker to the expected one.
211+ *
212+ * Used when transitioning from SYNCWAIT state to CATCHUP.
213+ *
214+ * Returns false if the apply worker has disappeared or table state has been
215+ * reset.
216+ */
217+ static bool
218+ wait_for_worker_state_change (char expected_state )
219+ {
220+ int rc ;
221+
222+ for (;;)
223+ {
224+ LogicalRepWorker * worker ;
225+
226+ CHECK_FOR_INTERRUPTS ();
227+
228+ /* Bail if he apply has died. */
229+ LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
230+ worker = logicalrep_worker_find (MyLogicalRepWorker -> subid ,
231+ InvalidOid , false);
232+ LWLockRelease (LogicalRepWorkerLock );
233+ if (!worker )
193234 return false;
194235
195- if (state != origstate )
236+ if (MyLogicalRepWorker -> relstate == expected_state )
196237 return true;
197238
198239 rc = WaitLatch (& MyProc -> procLatch ,
@@ -222,10 +263,9 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
222263 * Handle table synchronization cooperation from the synchronization
223264 * worker.
224265 *
225- * If the sync worker is in catch up mode and reached the predetermined
226- * synchronization point in the WAL stream, mark the table as READY and
227- * finish. If it caught up too far, set to SYNCDONE and finish. Things will
228- * then proceed in the "sync in front" scenario.
266+ * If the sync worker is in CATCHUP state and reached (or passed) the
267+ * predetermined synchronization point in the WAL stream, mark the table as
268+ * SYNCDONE and finish.
229269 */
230270static void
231271process_syncing_tables_for_sync (XLogRecPtr current_lsn )
@@ -239,10 +279,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
239279 {
240280 TimeLineID tli ;
241281
242- MyLogicalRepWorker -> relstate =
243- (current_lsn == MyLogicalRepWorker -> relstate_lsn )
244- ? SUBREL_STATE_READY
245- : SUBREL_STATE_SYNCDONE ;
282+ MyLogicalRepWorker -> relstate = SUBREL_STATE_SYNCDONE ;
246283 MyLogicalRepWorker -> relstate_lsn = current_lsn ;
247284
248285 SpinLockRelease (& MyLogicalRepWorker -> relmutex );
@@ -274,17 +311,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
274311 * at least wal_retrieve_retry_interval.
275312 *
276313 * For tables that are being synchronized already, check if sync workers
277- * either need action from the apply worker or have finished.
278- *
279- * The usual scenario is that the apply got ahead of the sync while the sync
280- * ran, and then the action needed by apply is to mark a table for CATCHUP and
281- * wait for the catchup to happen. In the less common case that sync worker
282- * got in front of the apply worker, the table is marked as SYNCDONE but not
283- * ready yet, as it needs to be tracked until apply reaches the same position
284- * to which it was synced.
314+ * either need action from the apply worker or have finished. This is the
315+ * SYNCWAIT to CATCHUP transition.
285316 *
286- * If the synchronization position is reached, then the table can be marked as
287- * READY and is no longer tracked.
317+ * If the synchronization position is reached (SYNCDONE) , then the table can
318+ * be marked as READY and is no longer tracked.
288319 */
289320static void
290321process_syncing_tables_for_apply (XLogRecPtr current_lsn )
@@ -358,7 +389,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
358389 last_start_times = NULL ;
359390 }
360391
361- /* Process all tables that are being synchronized. */
392+ /*
393+ * Process all tables that are being synchronized.
394+ */
362395 foreach (lc , table_states )
363396 {
364397 SubscriptionRelState * rstate = (SubscriptionRelState * ) lfirst (lc );
@@ -416,45 +449,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
416449 if (syncworker && rstate -> state == SUBREL_STATE_SYNCWAIT )
417450 {
418451 /*
419- * There are three possible synchronization situations here.
420- *
421- * a) Apply is in front of the table sync: We tell the table
422- * sync to CATCHUP.
423- *
424- * b) Apply is behind the table sync: We tell the table sync
425- * to mark the table as SYNCDONE and finish.
426- *
427- * c) Apply and table sync are at the same position: We tell
428- * table sync to mark the table as READY and finish.
429- *
430- * In any case we'll need to wait for table sync to change the
431- * state in catalog and only then continue ourselves.
452+ * Tell sync worker it can catchup now. We'll wait for it so
453+ * it does not get lost.
432454 */
433- if (current_lsn > rstate -> lsn )
434- {
435- rstate -> state = SUBREL_STATE_CATCHUP ;
436- rstate -> lsn = current_lsn ;
437- }
438- else if (current_lsn == rstate -> lsn )
439- {
440- rstate -> state = SUBREL_STATE_READY ;
441- rstate -> lsn = current_lsn ;
442- }
443- else
444- rstate -> state = SUBREL_STATE_SYNCDONE ;
445-
446455 SpinLockAcquire (& syncworker -> relmutex );
447- syncworker -> relstate = rstate -> state ;
448- syncworker -> relstate_lsn = rstate -> lsn ;
456+ syncworker -> relstate = SUBREL_STATE_CATCHUP ;
457+ syncworker -> relstate_lsn =
458+ Max (syncworker -> relstate_lsn , current_lsn );
449459 SpinLockRelease (& syncworker -> relmutex );
450460
451461 /* Signal the sync worker, as it may be waiting for us. */
452462 logicalrep_worker_wakeup_ptr (syncworker );
453463
454464 /*
455- * Enter busy loop and wait for synchronization status change.
465+ * Enter busy loop and wait for synchronization worker to
466+ * reach expected state (or die trying).
456467 */
457- wait_for_sync_status_change (rstate -> relid , rstate -> state );
468+ if (!started_tx )
469+ {
470+ StartTransactionCommand ();
471+ started_tx = true;
472+ }
473+ wait_for_relation_state_change (rstate -> relid ,
474+ SUBREL_STATE_SYNCDONE );
458475 }
459476
460477 /*
@@ -889,19 +906,28 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
889906 MyLogicalRepWorker -> relstate_lsn = * origin_startpos ;
890907 SpinLockRelease (& MyLogicalRepWorker -> relmutex );
891908
909+ /* Wait for main apply worker to tell us to catchup. */
910+ wait_for_worker_state_change (SUBREL_STATE_CATCHUP );
911+
892912 /*
893- * Wait for main apply worker to either tell us to catchup or
894- * that we are done.
913+ * There are now two possible states here:
914+ * a) Sync is behind the apply. If that's the case we need to
915+ * catch up with it by consuming the logical replication
916+ * stream up to the relstate_lsn. For that, we exit this
917+ * function and continue in ApplyWorkerMain().
918+ * b) Sync is caught up with the apply. So it can just set
919+ * the state to SYNCDONE and finish.
895920 */
896- wait_for_sync_status_change (MyLogicalRepWorker -> relid ,
897- MyLogicalRepWorker -> relstate );
898- if (MyLogicalRepWorker -> relstate != SUBREL_STATE_CATCHUP )
921+ if (* origin_startpos >= MyLogicalRepWorker -> relstate_lsn )
899922 {
900- /* Update the new state. */
923+ /*
924+ * Update the new state in catalog. No need to bother
925+ * with the shmem state as we are exiting for good.
926+ */
901927 SetSubscriptionRelState (MyLogicalRepWorker -> subid ,
902928 MyLogicalRepWorker -> relid ,
903- MyLogicalRepWorker -> relstate ,
904- MyLogicalRepWorker -> relstate_lsn );
929+ SUBREL_STATE_SYNCDONE ,
930+ * origin_startpos );
905931 finish_sync_worker ();
906932 }
907933 break ;
0 commit comments