11/*-------------------------------------------------------------------------
22 *
33 * waitlsn.c
4- * Implements waiting for the given LSN, which is used in
4+ * Implements waiting for the given replay LSN, which is used in
55 * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
66 *
77 * Copyright (c) 2024, PostgreSQL Global Development Group
2626#include "storage/latch.h"
2727#include "storage/proc.h"
2828#include "storage/shmem.h"
29+ #include "utils/fmgrprotos.h"
2930#include "utils/pg_lsn.h"
3031#include "utils/snapmgr.h"
31- #include "utils/fmgrprotos.h"
3232#include "utils/wait_event_types.h"
3333
34- /* Add to / delete from shared memory array */
35- static void addLSNWaiter (XLogRecPtr lsn );
36- static void deleteLSNWaiter (void );
34+ static int lsn_cmp (const pairingheap_node * a , const pairingheap_node * b ,
35+ void * arg );
3736
3837struct WaitLSNState * waitLSN = NULL ;
39- static volatile sig_atomic_t haveShmemItem = false;
4038
41- /*
42- * Report the amount of shared memory space needed for WaitLSNState
43- */
39+ /* Report the amount of shared memory space needed for WaitLSNState. */
4440Size
4541WaitLSNShmemSize (void )
4642{
@@ -51,7 +47,7 @@ WaitLSNShmemSize(void)
5147 return size ;
5248}
5349
54- /* Initialize the WaitLSNState in the shared memory */
50+ /* Initialize the WaitLSNState in the shared memory. */
5551void
5652WaitLSNShmemInit (void )
5753{
@@ -62,81 +58,93 @@ WaitLSNShmemInit(void)
6258 & found );
6359 if (!found )
6460 {
65- SpinLockInit (& waitLSN -> mutex );
66- waitLSN -> numWaitedProcs = 0 ;
67- pg_atomic_init_u64 (& waitLSN -> minLSN , PG_UINT64_MAX );
61+ SpinLockInit (& waitLSN -> waitersHeapMutex );
62+ pg_atomic_init_u64 (& waitLSN -> minWaitedLSN , PG_UINT64_MAX );
63+ pairingheap_initialize (& waitLSN -> waitersHeap , lsn_cmp , NULL );
64+ memset (& waitLSN -> procInfos , 0 , MaxBackends * sizeof (WaitLSNProcInfo ));
6865 }
6966}
7067
7168/*
72- * Add the information about the LSN waiter backend to the shared memory
73- * array .
69+ * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
70+ * ordered by lsn, so that the waiter with smallest lsn is at the top .
7471 */
75- static void
76- addLSNWaiter ( XLogRecPtr lsn )
72+ static int
73+ lsn_cmp ( const pairingheap_node * a , const pairingheap_node * b , void * arg )
7774{
78- WaitLSNProcInfo cur ;
79- int i ;
75+ const WaitLSNProcInfo * aproc = pairingheap_const_container ( WaitLSNProcInfo , phNode , a ) ;
76+ const WaitLSNProcInfo * bproc = pairingheap_const_container ( WaitLSNProcInfo , phNode , b ) ;
8077
81- cur .procnum = MyProcNumber ;
82- cur .waitLSN = lsn ;
78+ if (aproc -> waitLSN < bproc -> waitLSN )
79+ return 1 ;
80+ else if (aproc -> waitLSN > bproc -> waitLSN )
81+ return -1 ;
82+ else
83+ return 0 ;
84+ }
8385
84- SpinLockAcquire (& waitLSN -> mutex );
86+ /*
87+ * Update waitLSN->minWaitedLSN according to the current state of
88+ * waitLSN->waitersHeap.
89+ */
90+ static void
91+ updateMinWaitedLSN (void )
92+ {
93+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX ;
8594
86- for ( i = 0 ; i < waitLSN -> numWaitedProcs ; i ++ )
95+ if (! pairingheap_is_empty ( & waitLSN -> waitersHeap ) )
8796 {
88- if (waitLSN -> procInfos [i ].waitLSN >= cur .waitLSN )
89- {
90- WaitLSNProcInfo tmp ;
97+ pairingheap_node * node = pairingheap_first (& waitLSN -> waitersHeap );
9198
92- tmp = waitLSN -> procInfos [i ];
93- waitLSN -> procInfos [i ] = cur ;
94- cur = tmp ;
95- }
99+ minWaitedLSN = pairingheap_container (WaitLSNProcInfo , phNode , node )-> waitLSN ;
96100 }
97- waitLSN -> procInfos [i ] = cur ;
98- waitLSN -> numWaitedProcs ++ ;
99101
100- pg_atomic_write_u64 (& waitLSN -> minLSN , waitLSN -> procInfos [i ].waitLSN );
101- SpinLockRelease (& waitLSN -> mutex );
102+ pg_atomic_write_u64 (& waitLSN -> minWaitedLSN , minWaitedLSN );
102103}
103104
104105/*
105- * Delete the information about the LSN waiter backend from the shared memory
106- * array.
106+ * Put the current process into the heap of LSN waiters.
107107 */
108108static void
109- deleteLSNWaiter ( void )
109+ addLSNWaiter ( XLogRecPtr lsn )
110110{
111- int i ;
112- bool found = false;
111+ WaitLSNProcInfo * procInfo = & waitLSN -> procInfos [MyProcNumber ];
113112
114- SpinLockAcquire ( & waitLSN -> mutex );
113+ Assert (! procInfo -> inHeap );
115114
116- for (i = 0 ; i < waitLSN -> numWaitedProcs ; i ++ )
117- {
118- if (waitLSN -> procInfos [i ].procnum == MyProcNumber )
119- found = true;
115+ procInfo -> procnum = MyProcNumber ;
116+ procInfo -> waitLSN = lsn ;
120117
121- if (found && i < waitLSN -> numWaitedProcs - 1 )
122- {
123- waitLSN -> procInfos [i ] = waitLSN -> procInfos [i + 1 ];
124- }
125- }
118+ SpinLockAcquire (& waitLSN -> waitersHeapMutex );
126119
127- if (!found )
120+ pairingheap_add (& waitLSN -> waitersHeap , & procInfo -> phNode );
121+ procInfo -> inHeap = true;
122+ updateMinWaitedLSN ();
123+
124+ SpinLockRelease (& waitLSN -> waitersHeapMutex );
125+ }
126+
127+ /*
128+ * Remove the current process from the heap of LSN waiters if it's there.
129+ */
130+ static void
131+ deleteLSNWaiter (void )
132+ {
133+ WaitLSNProcInfo * procInfo = & waitLSN -> procInfos [MyProcNumber ];
134+
135+ SpinLockAcquire (& waitLSN -> waitersHeapMutex );
136+
137+ if (!procInfo -> inHeap )
128138 {
129- SpinLockRelease (& waitLSN -> mutex );
139+ SpinLockRelease (& waitLSN -> waitersHeapMutex );
130140 return ;
131141 }
132- waitLSN -> numWaitedProcs -- ;
133142
134- if (waitLSN -> numWaitedProcs != 0 )
135- pg_atomic_write_u64 (& waitLSN -> minLSN , waitLSN -> procInfos [i ].waitLSN );
136- else
137- pg_atomic_write_u64 (& waitLSN -> minLSN , PG_UINT64_MAX );
143+ pairingheap_remove (& waitLSN -> waitersHeap , & procInfo -> phNode );
144+ procInfo -> inHeap = false;
145+ updateMinWaitedLSN ();
138146
139- SpinLockRelease (& waitLSN -> mutex );
147+ SpinLockRelease (& waitLSN -> waitersHeapMutex );
140148}
141149
142150/*
@@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
148156{
149157 int i ;
150158 int * wakeUpProcNums ;
151- int numWakeUpProcs ;
159+ int numWakeUpProcs = 0 ;
152160
153161 wakeUpProcNums = palloc (sizeof (int ) * MaxBackends );
154162
155- SpinLockAcquire (& waitLSN -> mutex );
163+ SpinLockAcquire (& waitLSN -> waitersHeapMutex );
156164
157165 /*
158- * Remember processes, whose waited LSNs are already replayed. We should
159- * set their latches later after spinlock release .
166+ * Iterate the pairing heap of waiting processes till we find LSN not yet
167+ * replayed. Record the process numbers to set their latches later.
160168 */
161- for ( i = 0 ; i < waitLSN -> numWaitedProcs ; i ++ )
169+ while (! pairingheap_is_empty ( & waitLSN -> waitersHeap ) )
162170 {
171+ pairingheap_node * node = pairingheap_first (& waitLSN -> waitersHeap );
172+ WaitLSNProcInfo * procInfo = pairingheap_container (WaitLSNProcInfo , phNode , node );
173+
163174 if (!XLogRecPtrIsInvalid (currentLSN ) &&
164- waitLSN -> procInfos [ i ]. waitLSN > currentLSN )
175+ procInfo -> waitLSN > currentLSN )
165176 break ;
166177
167- wakeUpProcNums [i ] = waitLSN -> procInfos [i ].procnum ;
178+ wakeUpProcNums [numWakeUpProcs ++ ] = procInfo -> procnum ;
179+ (void ) pairingheap_remove_first (& waitLSN -> waitersHeap );
180+ procInfo -> inHeap = false;
168181 }
169182
170- /*
171- * Immediately remove those processes from the shmem array. Otherwise,
172- * shmem array items will be here till corresponding processes wake up and
173- * delete themselves.
174- */
175- numWakeUpProcs = i ;
176- for (i = 0 ; i < waitLSN -> numWaitedProcs - numWakeUpProcs ; i ++ )
177- waitLSN -> procInfos [i ] = waitLSN -> procInfos [i + numWakeUpProcs ];
178- waitLSN -> numWaitedProcs -= numWakeUpProcs ;
179-
180- if (waitLSN -> numWaitedProcs != 0 )
181- pg_atomic_write_u64 (& waitLSN -> minLSN , waitLSN -> procInfos [i ].waitLSN );
182- else
183- pg_atomic_write_u64 (& waitLSN -> minLSN , PG_UINT64_MAX );
183+ updateMinWaitedLSN ();
184184
185- SpinLockRelease (& waitLSN -> mutex );
185+ SpinLockRelease (& waitLSN -> waitersHeapMutex );
186186
187187 /*
188188 * Set latches for processes, whose waited LSNs are already replayed. This
@@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
204204void
205205WaitLSNCleanup (void )
206206{
207- if (haveShmemItem )
207+ if (waitLSN -> procInfos [ MyProcNumber ]. inHeap )
208208 deleteLSNWaiter ();
209209}
210210
@@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
222222 Assert (waitLSN );
223223
224224 /* Should be only called by a backend */
225- Assert (MyBackendType == B_BACKEND );
225+ Assert (MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends );
226226
227227 if (!RecoveryInProgress ())
228228 ereport (ERROR ,
@@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
238238 endtime = TimestampTzPlusMilliseconds (GetCurrentTimestamp (), timeout );
239239
240240 addLSNWaiter (targetLSN );
241- haveShmemItem = true;
242241
243242 for (;;)
244243 {
@@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
280279 if (targetLSN > currentLSN )
281280 {
282281 deleteLSNWaiter ();
283- haveShmemItem = false;
284282 ereport (ERROR ,
285283 (errcode (ERRCODE_QUERY_CANCELED ),
286284 errmsg ("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X" ,
287285 LSN_FORMAT_ARGS (targetLSN ),
288286 LSN_FORMAT_ARGS (currentLSN ))));
289287 }
290- else
291- {
292- haveShmemItem = false;
293- }
294288}
295289
296290Datum
0 commit comments