PostgreSQL Source Code git master
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby, or
16 * WAL writer/backend on primary) wakes it once that LSN has been reached.
17 *
18 * The shared memory used by this module comprises a procInfos
19 * per-backend array with the information of the awaited LSN for each
20 * of the backend processes. The elements of that array are organized
21 * into a pairing heap waitersHeap, which allows for very fast finding
22 * of the least awaited LSN.
23 *
24 * In addition, the least-awaited LSN is cached as minWaitedLSN. The
25 * waiter process publishes information about itself to the shared
26 * memory and waits on the latch until it is woken up by the appropriate
27 * process, standby is promoted, or the postmaster dies. Then, it cleans
28 * information about itself in the shared memory.
29 *
30 * On standby servers: After replaying a WAL record, the startup process
31 * first performs a fast path check minWaitedLSN > replayLSN. If this
32 * check is negative, it checks waitersHeap and wakes up the backend
33 * whose awaited LSNs are reached.
34 *
35 * On primary servers: After flushing WAL, the WAL writer or backend
36 * process performs a similar check against the flush LSN and wakes up
37 * waiters whose target flush LSNs have been reached.
38 *
39 *-------------------------------------------------------------------------
40 */
41
42#include "postgres.h"
43
44#include <float.h>
45#include <math.h>
46
47#include "access/xlog.h"
48#include "access/xlogrecovery.h"
49#include "access/xlogwait.h"
50#include "miscadmin.h"
51#include "pgstat.h"
52#include "storage/latch.h"
53#include "storage/proc.h"
54#include "storage/shmem.h"
55#include "utils/fmgrprotos.h"
56#include "utils/pg_lsn.h"
57#include "utils/snapmgr.h"
58
59
60static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
61 void *arg);
62
64
65/* Report the amount of shared memory space needed for WaitLSNState. */
66Size
68{
69 Size size;
70
71 size = offsetof(WaitLSNState, procInfos);
73 return size;
74}
75
76/* Initialize the WaitLSNState in the shared memory. */
77void
79{
80 bool found;
81
82 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
84 &found);
85 if (!found)
86 {
87 int i;
88
89 /* Initialize heaps and tracking */
90 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
91 {
94 }
95
96 /* Initialize process info array */
97 memset(&waitLSNState->procInfos, 0,
99 }
100}
101
102/*
103 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
104 * LSN, so that the waiter with smallest LSN is at the top.
105 */
106static int
108{
111
112 if (aproc->waitLSN < bproc->waitLSN)
113 return 1;
114 else if (aproc->waitLSN > bproc->waitLSN)
115 return -1;
116 else
117 return 0;
118}
119
120/*
121 * Update minimum waited LSN for the specified LSN type
122 */
123static void
125{
127 int i = (int) lsnType;
128
129 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
130
132 {
134 WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
135
136 minWaitedLSN = procInfo->waitLSN;
137 }
139}
140
141/*
142 * Add current process to appropriate waiters heap based on LSN type
143 */
144static void
146{
148 int i = (int) lsnType;
149
150 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
151
152 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
153
154 procInfo->procno = MyProcNumber;
155 procInfo->waitLSN = lsn;
156 procInfo->lsnType = lsnType;
157
158 Assert(!procInfo->inHeap);
160 procInfo->inHeap = true;
161 updateMinWaitedLSN(lsnType);
162
163 LWLockRelease(WaitLSNLock);
164}
165
166/*
167 * Remove current process from appropriate waiters heap based on LSN type
168 */
169static void
171{
173 int i = (int) lsnType;
174
175 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
176
177 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178
179 Assert(procInfo->lsnType == lsnType);
180
181 if (procInfo->inHeap)
182 {
184 procInfo->inHeap = false;
185 updateMinWaitedLSN(lsnType);
186 }
187
188 LWLockRelease(WaitLSNLock);
189}
190
191/*
192 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
193 * on the stack. It should be enough to take single iteration for most cases.
194 */
195#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
196
197/*
198 * Remove waiters whose LSN has been reached from the heap and set their
199 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
200 * and set latches for all waiters.
201 *
202 * This function first accumulates waiters to wake up into an array, then
203 * wakes them up without holding a WaitLSNLock. The array size is static and
204 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
205 * to wake up all the waiters at once in the vast majority of cases. However,
206 * if there are more waiters, this function will loop to process them in
207 * multiple chunks.
208 */
209static void
211{
213 int numWakeUpProcs;
214 int i = (int) lsnType;
215
216 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
217
218 do
219 {
220 numWakeUpProcs = 0;
221 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
222
223 /*
224 * Iterate the waiters heap until we find LSN not yet reached. Record
225 * process numbers to wake up, but send wakeups after releasing lock.
226 */
228 {
230 WaitLSNProcInfo *procInfo;
231
232 /* Get procInfo using appropriate heap node */
233 procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
234
235 if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
236 break;
237
238 Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
239 wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
241
242 /* Update appropriate flag */
243 procInfo->inHeap = false;
244
245 if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
246 break;
247 }
248
249 updateMinWaitedLSN(lsnType);
250 LWLockRelease(WaitLSNLock);
251
252 /*
253 * Set latches for processes whose waited LSNs have been reached.
254 * Since SetLatch() is a time-consuming operation, we do this outside
255 * of WaitLSNLock. This is safe because procLatch is never freed, so
256 * at worst we may set a latch for the wrong process or for no process
257 * at all, which is harmless.
258 */
259 for (i = 0; i < numWakeUpProcs; i++)
260 SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
261
262 } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
263}
264
265/*
266 * Wake up processes waiting for LSN to reach currentLSN
267 */
268void
270{
271 int i = (int) lsnType;
272
273 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
274
275 /*
276 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
277 * "wake all waiters" (e.g., during promotion when recovery ends).
278 */
279 if (XLogRecPtrIsValid(currentLSN) &&
281 return;
282
283 wakeupWaiters(lsnType, currentLSN);
284}
285
286/*
287 * Clean up LSN waiters for exiting process
288 */
289void
291{
292 if (waitLSNState)
293 {
294 /*
295 * We do a fast-path check of the inHeap flag without the lock. This
296 * flag is set to true only by the process itself. So, it's only
297 * possible to get a false positive. But that will be eliminated by a
298 * recheck inside deleteLSNWaiter().
299 */
302 }
303}
304
305/*
306 * Wait using MyLatch till the given LSN is reached, the replica gets
307 * promoted, or the postmaster dies.
308 *
309 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
310 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
311 * or replica got promoted before the target LSN reached.
312 */
314WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
315{
316 XLogRecPtr currentLSN;
317 TimestampTz endtime = 0;
318 int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
319
320 /* Shouldn't be called when shmem isn't initialized */
322
323 /* Should have a valid proc number */
325
326 if (timeout > 0)
327 {
329 wake_events |= WL_TIMEOUT;
330 }
331
332 /*
333 * Add our process to the waiters heap. It might happen that target LSN
334 * gets reached before we do. The check at the beginning of the loop
335 * below prevents the race condition.
336 */
337 addLSNWaiter(targetLSN, lsnType);
338
339 for (;;)
340 {
341 int rc;
342 long delay_ms = -1;
343
344 if (lsnType == WAIT_LSN_TYPE_REPLAY)
345 currentLSN = GetXLogReplayRecPtr(NULL);
346 else
347 currentLSN = GetFlushRecPtr(NULL);
348
349 /* Check that recovery is still in-progress */
350 if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
351 {
352 /*
353 * Recovery was ended, but check if target LSN was already
354 * reached.
355 */
356 deleteLSNWaiter(lsnType);
357
358 if (PromoteIsTriggered() && targetLSN <= currentLSN)
361 }
362 else
363 {
364 /* Check if the waited LSN has been reached */
365 if (targetLSN <= currentLSN)
366 break;
367 }
368
369 if (timeout > 0)
370 {
372 if (delay_ms <= 0)
373 break;
374 }
375
377
378 rc = WaitLatch(MyLatch, wake_events, delay_ms,
379 (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
380
381 /*
382 * Emergency bailout if postmaster has died. This is to avoid the
383 * necessity for manual cleanup of all postmaster children.
384 */
385 if (rc & WL_POSTMASTER_DEATH)
387 errcode(ERRCODE_ADMIN_SHUTDOWN),
388 errmsg("terminating connection due to unexpected postmaster exit"),
389 errcontext("while waiting for LSN"));
390
391 if (rc & WL_LATCH_SET)
393 }
394
395 /*
396 * Delete our process from the shared memory heap. We might already be
397 * deleted by the startup process. The 'inHeap' flags prevents us from
398 * the double deletion.
399 */
400 deleteLSNWaiter(lsnType);
401
402 /*
403 * If we didn't reach the target LSN, we must be exited by timeout.
404 */
405 if (targetLSN > currentLSN)
407
409}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:483
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:451
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:465
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
int64_t int64
Definition: c.h:540
#define PG_UINT64_MAX
Definition: c.h:603
size_t Size
Definition: c.h:615
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define errcontext
Definition: elog.h:198
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:150
ProcNumber MyProcNumber
Definition: globals.c:90
int MaxBackends
Definition: globals.c:146
struct Latch * MyLatch
Definition: globals.c:63
Assert(PointerIsAligned(start, uint64))
int b
Definition: isn.c:74
int a
Definition: isn.c:73
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
void * arg
#define NUM_AUXILIARY_PROCS
Definition: proc.h:463
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
ProcNumber procno
Definition: xlogwait.h:57
pairingheap_node heapNode
Definition: xlogwait.h:66
XLogRecPtr waitLSN
Definition: xlogwait.h:51
WaitLSNType lsnType
Definition: xlogwait.h:54
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogwait.h:91
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:79
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:85
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38
bool RecoveryInProgress(void)
Definition: xlog.c:6406
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6571
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void WaitLSNShmemInit(void)
Definition: xlogwait.c:78
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: xlogwait.c:107
void WaitLSNCleanup(void)
Definition: xlogwait.c:290
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition: xlogwait.c:195
struct WaitLSNState * waitLSNState
Definition: xlogwait.c:63
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition: xlogwait.c:124
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition: xlogwait.c:170
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition: xlogwait.c:314
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:210
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition: xlogwait.c:145
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:269
Size WaitLSNShmemSize(void)
Definition: xlogwait.c:67
WaitLSNResult
Definition: xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition: xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition: xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition: xlogwait.h:27
WaitLSNType
Definition: xlogwait.h:37
@ WAIT_LSN_TYPE_COUNT
Definition: xlogwait.h:40
@ WAIT_LSN_TYPE_REPLAY
Definition: xlogwait.h:38