@@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);
162162 * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
163163 * modified, and decoding from the corresponding LSN's can reach a
164164 * consistent snapshot.
165+ *
166+ * *remote_slot_precedes will be true if the remote slot's LSN or xmin
167+ * precedes locally reserved position.
165168 */
166169static bool
167170update_local_synced_slot (RemoteSlot * remote_slot , Oid remote_dbid ,
168- bool * found_consistent_snapshot )
171+ bool * found_consistent_snapshot ,
172+ bool * remote_slot_precedes )
169173{
170174 ReplicationSlot * slot = MyReplicationSlot ;
171- bool slot_updated = false;
175+ bool updated_xmin_or_lsn = false;
176+ bool updated_config = false;
172177
173178 Assert (slot -> data .invalidated == RS_INVAL_NONE );
174179
175180 if (found_consistent_snapshot )
176181 * found_consistent_snapshot = false;
177182
178- if (remote_slot -> confirmed_lsn != slot -> data .confirmed_flush ||
179- remote_slot -> restart_lsn != slot -> data .restart_lsn ||
180- remote_slot -> catalog_xmin != slot -> data .catalog_xmin )
183+ if (remote_slot_precedes )
184+ * remote_slot_precedes = false;
185+
186+ /*
187+ * Don't overwrite if we already have a newer catalog_xmin and
188+ * restart_lsn.
189+ */
190+ if (remote_slot -> restart_lsn < slot -> data .restart_lsn ||
191+ TransactionIdPrecedes (remote_slot -> catalog_xmin ,
192+ slot -> data .catalog_xmin ))
193+ {
194+ /*
195+ * This can happen in following situations:
196+ *
197+ * If the slot is temporary, it means either the initial WAL location
198+ * reserved for the local slot is ahead of the remote slot's
199+ * restart_lsn or the initial xmin_horizon computed for the local slot
200+ * is ahead of the remote slot.
201+ *
202+ * If the slot is persistent, restart_lsn of the synced slot could
203+ * still be ahead of the remote slot. Since we use slot advance
204+ * functionality to keep snapbuild/slot updated, it is possible that
205+ * the restart_lsn is advanced to a later position than it has on the
206+ * primary. This can happen when slot advancing machinery finds
207+ * running xacts record after reaching the consistent state at a later
208+ * point than the primary where it serializes the snapshot and updates
209+ * the restart_lsn.
210+ *
211+ * We LOG the message if the slot is temporary as it can help the user
212+ * to understand why the slot is not sync-ready. In the case of a
213+ * persistent slot, it would be a more common case and won't directly
214+ * impact the users, so we used DEBUG1 level to log the message.
215+ */
216+ ereport (slot -> data .persistency == RS_TEMPORARY ? LOG : DEBUG1 ,
217+ errmsg ("could not sync slot \"%s\" as remote slot precedes local slot" ,
218+ remote_slot -> name ),
219+ errdetail ("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u." ,
220+ LSN_FORMAT_ARGS (remote_slot -> restart_lsn ),
221+ remote_slot -> catalog_xmin ,
222+ LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
223+ slot -> data .catalog_xmin ));
224+
225+ if (remote_slot_precedes )
226+ * remote_slot_precedes = true;
227+ }
228+
229+ /*
230+ * Attempt to sync LSNs and xmins only if remote slot is ahead of local
231+ * slot.
232+ */
233+ else if (remote_slot -> confirmed_lsn > slot -> data .confirmed_flush ||
234+ remote_slot -> restart_lsn > slot -> data .restart_lsn ||
235+ TransactionIdFollows (remote_slot -> catalog_xmin ,
236+ slot -> data .catalog_xmin ))
181237 {
182238 /*
183239 * We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
198254 slot -> data .restart_lsn = remote_slot -> restart_lsn ;
199255 slot -> data .confirmed_flush = remote_slot -> confirmed_lsn ;
200256 slot -> data .catalog_xmin = remote_slot -> catalog_xmin ;
201- slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
202257 SpinLockRelease (& slot -> mutex );
203258
204259 if (found_consistent_snapshot )
@@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
208263 {
209264 LogicalSlotAdvanceAndCheckSnapState (remote_slot -> confirmed_lsn ,
210265 found_consistent_snapshot );
211- }
212266
213- ReplicationSlotsComputeRequiredXmin (false);
214- ReplicationSlotsComputeRequiredLSN ();
267+ /* Sanity check */
268+ if (slot -> data .confirmed_flush != remote_slot -> confirmed_lsn )
269+ ereport (ERROR ,
270+ errmsg_internal ("synchronized confirmed_flush for slot \"%s\" differs from remote slot" ,
271+ remote_slot -> name ),
272+ errdetail_internal ("Remote slot has LSN %X/%X but local slot has LSN %X/%X." ,
273+ LSN_FORMAT_ARGS (remote_slot -> confirmed_lsn ),
274+ LSN_FORMAT_ARGS (slot -> data .confirmed_flush )));
275+ }
215276
216- slot_updated = true;
277+ updated_xmin_or_lsn = true;
217278 }
218279
219280 if (remote_dbid != slot -> data .database ||
@@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
233294 slot -> data .failover = remote_slot -> failover ;
234295 SpinLockRelease (& slot -> mutex );
235296
236- slot_updated = true;
297+ updated_config = true;
237298 }
238299
239- return slot_updated ;
300+ /*
301+ * We have to write the changed xmin to disk *before* we change the
302+ * in-memory value, otherwise after a crash we wouldn't know that some
303+ * catalog tuples might have been removed already.
304+ */
305+ if (updated_config || updated_xmin_or_lsn )
306+ {
307+ ReplicationSlotMarkDirty ();
308+ ReplicationSlotSave ();
309+ }
310+
311+ /*
312+ * Now the new xmin is safely on disk, we can let the global value
313+ * advance. We do not take ProcArrayLock or similar since we only advance
314+ * xmin here and there's not much harm done by a concurrent computation
315+ * missing that.
316+ */
317+ if (updated_xmin_or_lsn )
318+ {
319+ SpinLockAcquire (& slot -> mutex );
320+ slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
321+ SpinLockRelease (& slot -> mutex );
322+
323+ ReplicationSlotsComputeRequiredXmin (false);
324+ ReplicationSlotsComputeRequiredLSN ();
325+ }
326+
327+ return updated_config || updated_xmin_or_lsn ;
240328}
241329
242330/*
@@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
460548{
461549 ReplicationSlot * slot = MyReplicationSlot ;
462550 bool found_consistent_snapshot = false;
551+ bool remote_slot_precedes = false;
552+
553+ (void ) update_local_synced_slot (remote_slot , remote_dbid ,
554+ & found_consistent_snapshot ,
555+ & remote_slot_precedes );
463556
464557 /*
465558 * Check if the primary server has caught up. Refer to the comment atop
466559 * the file for details on this check.
467560 */
468- if (remote_slot -> restart_lsn < slot -> data .restart_lsn ||
469- TransactionIdPrecedes (remote_slot -> catalog_xmin ,
470- slot -> data .catalog_xmin ))
561+ if (remote_slot_precedes )
471562 {
472563 /*
473564 * The remote slot didn't catch up to locally reserved position.
@@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
476567 * current location when recreating the slot in the next cycle. It may
477568 * take more time to create such a slot. Therefore, we keep this slot
478569 * and attempt the synchronization in the next cycle.
479- *
480- * XXX should this be changed to elog(DEBUG1) perhaps?
481570 */
482- ereport (LOG ,
483- errmsg ("could not sync slot \"%s\" as remote slot precedes local slot" ,
484- remote_slot -> name ),
485- errdetail ("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u." ,
486- LSN_FORMAT_ARGS (remote_slot -> restart_lsn ),
487- remote_slot -> catalog_xmin ,
488- LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
489- slot -> data .catalog_xmin ));
490571 return false;
491572 }
492573
493- (void ) update_local_synced_slot (remote_slot , remote_dbid ,
494- & found_consistent_snapshot );
495-
496574 /*
497575 * Don't persist the slot if it cannot reach the consistent point from the
498576 * restart_lsn. See comments atop this file.
@@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
633711 /*
634712 * Sanity check: As long as the invalidations are handled
635713 * appropriately as above, this should never happen.
714+ *
715+ * We don't need to check restart_lsn here. See the comments in
716+ * update_local_synced_slot() for details.
636717 */
637- if (remote_slot -> restart_lsn < slot -> data .restart_lsn )
638- elog (ERROR ,
639- "cannot synchronize local slot \"%s\" LSN(%X/%X)"
640- " to remote slot's LSN(%X/%X) as synchronization"
641- " would move it backwards" , remote_slot -> name ,
642- LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
643- LSN_FORMAT_ARGS (remote_slot -> restart_lsn ));
644-
645- /* Make sure the slot changes persist across server restart */
646- if (update_local_synced_slot (remote_slot , remote_dbid , NULL ))
647- {
648- ReplicationSlotMarkDirty ();
649- ReplicationSlotSave ();
650-
651- slot_updated = true;
652- }
718+ if (remote_slot -> confirmed_lsn < slot -> data .confirmed_flush )
719+ ereport (ERROR ,
720+ errmsg_internal ("cannot synchronize local slot \"%s\"" ,
721+ remote_slot -> name ),
722+ errdetail_internal ("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X)." ,
723+ LSN_FORMAT_ARGS (slot -> data .confirmed_flush ),
724+ LSN_FORMAT_ARGS (remote_slot -> confirmed_lsn )));
725+
726+ slot_updated = update_local_synced_slot (remote_slot , remote_dbid ,
727+ NULL , NULL );
653728 }
654729 }
655730 /* Otherwise create the slot first. */
0 commit comments