PostgreSQL Source Code git master
walreceiverfuncs.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17#include "postgres.h"
18
19#include <sys/stat.h>
20#include <sys/time.h>
21#include <time.h>
22#include <unistd.h>
23#include <signal.h>
24
26#include "access/xlogrecovery.h"
27#include "pgstat.h"
29#include "storage/pmsignal.h"
30#include "storage/proc.h"
31#include "storage/shmem.h"
32#include "utils/timestamp.h"
33
35
36/*
37 * How long to wait for walreceiver to start up after requesting
38 * postmaster to launch it. In seconds.
39 */
40#define WALRCV_STARTUP_TIMEOUT 10
41
42/* Report shared memory space needed by WalRcvShmemInit */
43Size
45{
46 Size size = 0;
47
48 size = add_size(size, sizeof(WalRcvData));
49
50 return size;
51}
52
53/* Allocate and initialize walreceiver-related shared memory */
54void
56{
57 bool found;
58
59 WalRcv = (WalRcvData *)
60 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61
62 if (!found)
63 {
64 /* First time through, so initialize */
71 }
72}
73
74/* Is walreceiver running (or starting up)? */
75bool
77{
78 WalRcvData *walrcv = WalRcv;
80 pg_time_t startTime;
81
82 SpinLockAcquire(&walrcv->mutex);
83
84 state = walrcv->walRcvState;
85 startTime = walrcv->startTime;
86
87 SpinLockRelease(&walrcv->mutex);
88
89 /*
90 * If it has taken too long for walreceiver to start up, give up. Setting
91 * the state to STOPPED ensures that if walreceiver later does start up
92 * after all, it will see that it's not supposed to be running and die
93 * without doing anything.
94 */
96 {
97 pg_time_t now = (pg_time_t) time(NULL);
98
99 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 {
101 bool stopped = false;
102
103 SpinLockAcquire(&walrcv->mutex);
104 if (walrcv->walRcvState == WALRCV_STARTING)
105 {
106 state = walrcv->walRcvState = WALRCV_STOPPED;
107 stopped = true;
108 }
109 SpinLockRelease(&walrcv->mutex);
110
111 if (stopped)
113 }
114 }
115
116 if (state != WALRCV_STOPPED)
117 return true;
118 else
119 return false;
120}
121
122/* Return the state of the walreceiver. */
125{
126 WalRcvData *walrcv = WalRcv;
128
129 SpinLockAcquire(&walrcv->mutex);
130 state = walrcv->walRcvState;
131 SpinLockRelease(&walrcv->mutex);
132
133 return state;
134}
135
136/*
137 * Is walreceiver running and streaming (or at least attempting to connect,
138 * or starting up)?
139 */
140bool
142{
143 WalRcvData *walrcv = WalRcv;
145 pg_time_t startTime;
146
147 SpinLockAcquire(&walrcv->mutex);
148
149 state = walrcv->walRcvState;
150 startTime = walrcv->startTime;
151
152 SpinLockRelease(&walrcv->mutex);
153
154 /*
155 * If it has taken too long for walreceiver to start up, give up. Setting
156 * the state to STOPPED ensures that if walreceiver later does start up
157 * after all, it will see that it's not supposed to be running and die
158 * without doing anything.
159 */
160 if (state == WALRCV_STARTING)
161 {
162 pg_time_t now = (pg_time_t) time(NULL);
163
164 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 {
166 bool stopped = false;
167
168 SpinLockAcquire(&walrcv->mutex);
169 if (walrcv->walRcvState == WALRCV_STARTING)
170 {
171 state = walrcv->walRcvState = WALRCV_STOPPED;
172 stopped = true;
173 }
174 SpinLockRelease(&walrcv->mutex);
175
176 if (stopped)
178 }
179 }
180
183 return true;
184 else
185 return false;
186}
187
188/*
189 * Stop walreceiver (if running) and wait for it to die.
190 * Executed by the Startup process.
191 */
192void
194{
195 WalRcvData *walrcv = WalRcv;
196 pid_t walrcvpid = 0;
197 bool stopped = false;
198
199 /*
200 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 * mode once it's finished, and will also request postmaster to not
202 * restart itself.
203 */
204 SpinLockAcquire(&walrcv->mutex);
205 switch (walrcv->walRcvState)
206 {
207 case WALRCV_STOPPED:
208 break;
209 case WALRCV_STARTING:
210 walrcv->walRcvState = WALRCV_STOPPED;
211 stopped = true;
212 break;
213
214 case WALRCV_STREAMING:
215 case WALRCV_WAITING:
218 /* fall through */
219 case WALRCV_STOPPING:
220 walrcvpid = walrcv->pid;
221 break;
222 }
223 SpinLockRelease(&walrcv->mutex);
224
225 /* Unnecessary but consistent. */
226 if (stopped)
228
229 /*
230 * Signal walreceiver process if it was still running.
231 */
232 if (walrcvpid != 0)
233 kill(walrcvpid, SIGTERM);
234
235 /*
236 * Wait for walreceiver to acknowledge its death by setting state to
237 * WALRCV_STOPPED.
238 */
240 while (WalRcvRunning())
242 WAIT_EVENT_WAL_RECEIVER_EXIT);
244}
245
246/*
247 * Request postmaster to start walreceiver.
248 *
249 * "recptr" indicates the position where streaming should begin. "conninfo"
250 * is a libpq connection string to use. "slotname" is, optionally, the name
251 * of a replication slot to acquire. "create_temp_slot" indicates to create
252 * a temporary slot when no "slotname" is given.
253 *
254 * WAL receivers do not directly load GUC parameters used for the connection
255 * to the primary, and rely on the values passed down by the caller of this
256 * routine instead. Hence, the addition of any new parameters should happen
257 * through this code path.
258 */
259void
260RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
261 const char *slotname, bool create_temp_slot)
262{
263 WalRcvData *walrcv = WalRcv;
264 bool launch = false;
265 pg_time_t now = (pg_time_t) time(NULL);
266 ProcNumber walrcv_proc;
267
268 /*
269 * We always start at the beginning of the segment. That prevents a broken
270 * segment (i.e., with no records in the first half of a segment) from
271 * being created by XLOG streaming, which might cause trouble later on if
272 * the segment is e.g archived.
273 */
274 if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
275 recptr -= XLogSegmentOffset(recptr, wal_segment_size);
276
277 SpinLockAcquire(&walrcv->mutex);
278
279 /* It better be stopped if we try to restart it */
280 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
281 walrcv->walRcvState == WALRCV_WAITING);
282
283 if (conninfo != NULL)
284 strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
285 else
286 walrcv->conninfo[0] = '\0';
287
288 /*
289 * Use configured replication slot if present, and ignore the value of
290 * create_temp_slot as the slot name should be persistent. Otherwise, use
291 * create_temp_slot to determine whether this WAL receiver should create a
292 * temporary slot by itself and use it, or not.
293 */
294 if (slotname != NULL && slotname[0] != '\0')
295 {
296 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
297 walrcv->is_temp_slot = false;
298 }
299 else
300 {
301 walrcv->slotname[0] = '\0';
302 walrcv->is_temp_slot = create_temp_slot;
303 }
304
305 if (walrcv->walRcvState == WALRCV_STOPPED)
306 {
307 launch = true;
309 }
310 else
312 walrcv->startTime = now;
313
314 /*
315 * If this is the first startup of walreceiver (on this timeline),
316 * initialize flushedUpto and latestChunkStart to the starting point.
317 */
318 if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
319 {
320 walrcv->flushedUpto = recptr;
321 walrcv->receivedTLI = tli;
322 walrcv->latestChunkStart = recptr;
323 }
324 walrcv->receiveStart = recptr;
325 walrcv->receiveStartTLI = tli;
326
327 walrcv_proc = walrcv->procno;
328
329 SpinLockRelease(&walrcv->mutex);
330
331 if (launch)
333 else if (walrcv_proc != INVALID_PROC_NUMBER)
334 SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
335}
336
337/*
338 * Returns the last+1 byte position that walreceiver has flushed.
339 *
340 * Optionally, returns the previous chunk start, that is the first byte
341 * written in the most recent walreceiver flush cycle. Callers not
342 * interested in that value may pass NULL for latestChunkStart. Same for
343 * receiveTLI.
344 */
347{
348 WalRcvData *walrcv = WalRcv;
349 XLogRecPtr recptr;
350
351 SpinLockAcquire(&walrcv->mutex);
352 recptr = walrcv->flushedUpto;
353 if (latestChunkStart)
354 *latestChunkStart = walrcv->latestChunkStart;
355 if (receiveTLI)
356 *receiveTLI = walrcv->receivedTLI;
357 SpinLockRelease(&walrcv->mutex);
358
359 return recptr;
360}
361
362/*
363 * Returns the last+1 byte position that walreceiver has written.
364 * This returns a recently written value without taking a lock.
365 */
368{
369 WalRcvData *walrcv = WalRcv;
370
371 return pg_atomic_read_u64(&walrcv->writtenUpto);
372}
373
374/*
375 * Returns the replication apply delay in ms or -1
376 * if the apply delay info is not available
377 */
378int
380{
381 WalRcvData *walrcv = WalRcv;
382 XLogRecPtr receivePtr;
383 XLogRecPtr replayPtr;
384 TimestampTz chunkReplayStartTime;
385
386 SpinLockAcquire(&walrcv->mutex);
387 receivePtr = walrcv->flushedUpto;
388 SpinLockRelease(&walrcv->mutex);
389
390 replayPtr = GetXLogReplayRecPtr(NULL);
391
392 if (receivePtr == replayPtr)
393 return 0;
394
395 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
396
397 if (chunkReplayStartTime == 0)
398 return -1;
399
400 return TimestampDifferenceMilliseconds(chunkReplayStartTime,
402}
403
404/*
405 * Returns the network latency in ms, note that this includes any
406 * difference in clock settings between the servers, as well as timezone.
407 */
408int
410{
411 WalRcvData *walrcv = WalRcv;
412 TimestampTz lastMsgSendTime;
413 TimestampTz lastMsgReceiptTime;
414
415 SpinLockAcquire(&walrcv->mutex);
416 lastMsgSendTime = walrcv->lastMsgSendTime;
417 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
418 SpinLockRelease(&walrcv->mutex);
419
420 return TimestampDifferenceMilliseconds(lastMsgSendTime,
421 lastMsgReceiptTime);
422}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:451
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:465
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define MemSet(start, val, len)
Definition: c.h:1024
size_t Size
Definition: c.h:615
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
Assert(PointerIsAligned(start, uint64))
void SetLatch(Latch *latch)
Definition: latch.c:290
#define NAMEDATALEN
int64 pg_time_t
Definition: pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
@ PMSIGNAL_START_WALRECEIVER
Definition: pmsignal.h:42
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define GetPGProcByNumber(n)
Definition: proc.h:440
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
TimeLineID receivedTLI
Definition: walreceiver.h:97
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
pid_t pid
Definition: walreceiver.h:68
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
XLogRecPtr receiveStart
Definition: walreceiver.h:86
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
ProcNumber procno
Definition: walreceiver.h:67
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
bool is_temp_slot
Definition: walreceiver.h:142
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
pg_time_t startTime
Definition: walreceiver.h:78
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110
WalRcvState walRcvState
Definition: walreceiver.h:71
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
Definition: regguts.h:323
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool WalRcvStreaming(void)
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
WalRcvData * WalRcv
XLogRecPtr GetWalRcvWriteRecPtr(void)
void ShutdownWalRcv(void)
#define WALRCV_STARTUP_TIMEOUT
WalRcvState WalRcvGetState(void)
bool WalRcvRunning(void)
int GetReplicationApplyDelay(void)
void WalRcvShmemInit(void)
Size WalRcvShmemSize(void)
int GetReplicationTransferLatency(void)
#define kill(pid, sig)
Definition: win32_port.h:493
int wal_segment_size
Definition: xlog.c:145
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63
static TimeLineID receiveTLI
Definition: xlogrecovery.c:266
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)