2424 * are treated as not a crash but approximately normal termination;
2525 * the walsender will exit quickly without sending any more XLOG records.
2626 *
27- * If the server is shut down, postmaster sends us SIGUSR2 after all
28- * regular backends have exited and the shutdown checkpoint has been written.
29- * This instructs walsender to send any outstanding WAL, including the
30- * shutdown checkpoint record, wait for it to be replicated to the standby,
31- * and then exit.
27+ * If the server is shut down, checkpointer sends us
28+ * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29+ * the backend is idle or runs an SQL query this causes the backend to
30+ * shutdown, if logical replication is in progress all existing WAL records
31+ * are processed followed by a shutdown. Otherwise this causes the walsender
32+ * to switch to the "stopping" state. In this state, the walsender will reject
33+ * any further replication commands. The checkpointer begins the shutdown
34+ * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35+ * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36+ * walsender to send any outstanding WAL, including the shutdown checkpoint
37+ * record, wait for it to be replicated to the standby, and then exit.
3238 *
3339 *
3440 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
@@ -177,13 +183,14 @@ static bool WalSndCaughtUp = false;
177183
178184/* Flags set by signal handlers for later service in main loop */
179185static volatile sig_atomic_t got_SIGHUP = false;
180- static volatile sig_atomic_t walsender_ready_to_stop = false;
186+ static volatile sig_atomic_t got_SIGUSR2 = false;
187+ static volatile sig_atomic_t got_STOPPING = false;
181188
182189/*
183- * This is set while we are streaming. When not set, SIGUSR2 signal will be
184- * handled like SIGTERM. When set, the main loop is responsible for checking
185- * walsender_ready_to_stop and terminating when it's set (after streaming any
186- * remaining WAL).
190+ * This is set while we are streaming. When not set
191+ * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192+ * the main loop is responsible for checking got_STOPPING and terminating when
193+ * it's set (after streaming any remaining WAL).
187194 */
188195static volatile sig_atomic_t replication_active = false;
189196
@@ -300,7 +307,8 @@ WalSndErrorCleanup(void)
300307 ReplicationSlotCleanup ();
301308
302309 replication_active = false;
303- if (walsender_ready_to_stop )
310+
311+ if (got_STOPPING || got_SIGUSR2 )
304312 proc_exit (0 );
305313
306314 /* Revert back to startup state */
@@ -677,7 +685,7 @@ StartReplication(StartReplicationCmd *cmd)
677685 WalSndLoop (XLogSendPhysical );
678686
679687 replication_active = false;
680- if (walsender_ready_to_stop )
688+ if (got_STOPPING )
681689 proc_exit (0 );
682690 WalSndSetState (WALSNDSTATE_STARTUP );
683691
@@ -1055,7 +1063,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10551063 {
10561064 ereport (LOG ,
10571065 (errmsg ("terminating walsender process after promotion" )));
1058- walsender_ready_to_stop = true;
1066+ got_STOPPING = true;
10591067 }
10601068
10611069 WalSndSetState (WALSNDSTATE_CATCHUP );
@@ -1106,7 +1114,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
11061114 ReplicationSlotRelease ();
11071115
11081116 replication_active = false;
1109- if (walsender_ready_to_stop )
1117+ if (got_STOPPING )
11101118 proc_exit (0 );
11111119 WalSndSetState (WALSNDSTATE_STARTUP );
11121120
@@ -1311,6 +1319,14 @@ WalSndWaitForWal(XLogRecPtr loc)
13111319 /* Check for input from the client */
13121320 ProcessRepliesIfAny ();
13131321
1322+ /*
1323+ * If we're shutting down, trigger pending WAL to be written out,
1324+ * otherwise we'd possibly end up waiting for WAL that never gets
1325+ * written, because walwriter has shut down already.
1326+ */
1327+ if (got_STOPPING )
1328+ XLogBackgroundFlush ();
1329+
13141330 /* Update our idea of the currently flushed position. */
13151331 if (!RecoveryInProgress ())
13161332 RecentFlushPtr = GetFlushRecPtr ();
@@ -1326,7 +1342,7 @@ WalSndWaitForWal(XLogRecPtr loc)
13261342 * RecentFlushPtr, so we can send all remaining data before shutting
13271343 * down.
13281344 */
1329- if (walsender_ready_to_stop )
1345+ if (got_STOPPING )
13301346 break ;
13311347
13321348 /*
@@ -1400,6 +1416,22 @@ exec_replication_command(const char *cmd_string)
14001416 MemoryContext cmd_context ;
14011417 MemoryContext old_context ;
14021418
1419+ /*
1420+ * If WAL sender has been told that shutdown is getting close, switch its
1421+ * status accordingly to handle the next replication commands correctly.
1422+ */
1423+ if (got_STOPPING )
1424+ WalSndSetState (WALSNDSTATE_STOPPING );
1425+
1426+ /*
1427+ * Throw error if in stopping mode. We need prevent commands that could
1428+ * generate WAL while the shutdown checkpoint is being written. To be
1429+ * safe, we just prohibit all new commands.
1430+ */
1431+ if (MyWalSnd -> state == WALSNDSTATE_STOPPING )
1432+ ereport (ERROR ,
1433+ (errmsg ("cannot execute new commands while WAL sender is in stopping mode" )));
1434+
14031435 /*
14041436 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
14051437 * command arrives. Clean up the old stuff if there's anything.
@@ -2128,7 +2160,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
21282160 * normal termination at shutdown, or a promotion, the walsender
21292161 * is not sure which.
21302162 */
2131- if (walsender_ready_to_stop )
2163+ if (got_SIGUSR2 )
21322164 WalSndDone (send_data );
21332165 }
21342166
@@ -2443,6 +2475,10 @@ XLogSendPhysical(void)
24432475 XLogRecPtr endptr ;
24442476 Size nbytes ;
24452477
2478+ /* If requested switch the WAL sender to the stopping state. */
2479+ if (got_STOPPING )
2480+ WalSndSetState (WALSNDSTATE_STOPPING );
2481+
24462482 if (streamingDoneSending )
24472483 {
24482484 WalSndCaughtUp = true;
@@ -2733,7 +2769,16 @@ XLogSendLogical(void)
27332769 * point, then we're caught up.
27342770 */
27352771 if (logical_decoding_ctx -> reader -> EndRecPtr >= GetFlushRecPtr ())
2772+ {
27362773 WalSndCaughtUp = true;
2774+
2775+ /*
2776+ * Have WalSndLoop() terminate the connection in an orderly
2777+ * manner, after writing out all the pending data.
2778+ */
2779+ if (got_STOPPING )
2780+ got_SIGUSR2 = true;
2781+ }
27372782 }
27382783
27392784 /* Update shared memory status */
@@ -2843,6 +2888,26 @@ WalSndRqstFileReload(void)
28432888 }
28442889}
28452890
2891+ /*
2892+ * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2893+ */
2894+ void
2895+ HandleWalSndInitStopping (void )
2896+ {
2897+ Assert (am_walsender );
2898+
2899+ /*
2900+ * If replication has not yet started, die like with SIGTERM. If
2901+ * replication is active, only set a flag and wake up the main loop. It
2902+ * will send any outstanding WAL, wait for it to be replicated to the
2903+ * standby, and then exit gracefully.
2904+ */
2905+ if (!replication_active )
2906+ kill (MyProcPid , SIGTERM );
2907+ else
2908+ got_STOPPING = true;
2909+ }
2910+
28462911/* SIGHUP: set flag to re-read config file at next convenient time */
28472912static void
28482913WalSndSigHupHandler (SIGNAL_ARGS )
@@ -2856,22 +2921,17 @@ WalSndSigHupHandler(SIGNAL_ARGS)
28562921 errno = save_errno ;
28572922}
28582923
2859- /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
2924+ /*
2925+ * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2926+ * sender should already have been switched to WALSNDSTATE_STOPPING at
2927+ * this point.
2928+ */
28602929static void
28612930WalSndLastCycleHandler (SIGNAL_ARGS )
28622931{
28632932 int save_errno = errno ;
28642933
2865- /*
2866- * If replication has not yet started, die like with SIGTERM. If
2867- * replication is active, only set a flag and wake up the main loop. It
2868- * will send any outstanding WAL, wait for it to be replicated to the
2869- * standby, and then exit gracefully.
2870- */
2871- if (!replication_active )
2872- kill (MyProcPid , SIGTERM );
2873-
2874- walsender_ready_to_stop = true;
2934+ got_SIGUSR2 = true;
28752935 SetLatch (MyLatch );
28762936
28772937 errno = save_errno ;
@@ -2969,6 +3029,77 @@ WalSndWakeup(void)
29693029 }
29703030}
29713031
3032+ /*
3033+ * Signal all walsenders to move to stopping state.
3034+ *
3035+ * This will trigger walsenders to move to a state where no further WAL can be
3036+ * generated. See this file's header for details.
3037+ */
3038+ void
3039+ WalSndInitStopping (void )
3040+ {
3041+ int i ;
3042+
3043+ for (i = 0 ; i < max_wal_senders ; i ++ )
3044+ {
3045+ WalSnd * walsnd = & WalSndCtl -> walsnds [i ];
3046+ pid_t pid ;
3047+
3048+ SpinLockAcquire (& walsnd -> mutex );
3049+ pid = walsnd -> pid ;
3050+ SpinLockRelease (& walsnd -> mutex );
3051+
3052+ if (pid == 0 )
3053+ continue ;
3054+
3055+ SendProcSignal (pid , PROCSIG_WALSND_INIT_STOPPING , InvalidBackendId );
3056+ }
3057+ }
3058+
3059+ /*
3060+ * Wait that all the WAL senders have quit or reached the stopping state. This
3061+ * is used by the checkpointer to control when the shutdown checkpoint can
3062+ * safely be performed.
3063+ */
3064+ void
3065+ WalSndWaitStopping (void )
3066+ {
3067+ for (;;)
3068+ {
3069+ int i ;
3070+ bool all_stopped = true;
3071+
3072+ for (i = 0 ; i < max_wal_senders ; i ++ )
3073+ {
3074+ WalSndState state ;
3075+ WalSnd * walsnd = & WalSndCtl -> walsnds [i ];
3076+
3077+ SpinLockAcquire (& walsnd -> mutex );
3078+
3079+ if (walsnd -> pid == 0 )
3080+ {
3081+ SpinLockRelease (& walsnd -> mutex );
3082+ continue ;
3083+ }
3084+
3085+ state = walsnd -> state ;
3086+ SpinLockRelease (& walsnd -> mutex );
3087+
3088+ if (state != WALSNDSTATE_STOPPING )
3089+ {
3090+ all_stopped = false;
3091+ break ;
3092+ }
3093+ }
3094+
3095+ /* safe to leave if confirmation is done for all WAL senders */
3096+ if (all_stopped )
3097+ return ;
3098+
3099+ pg_usleep (10000L ); /* wait for 10 msec */
3100+ }
3101+ }
3102+
29723103/* Set state for current walsender (only called in walsender) */
29733104void
29743105WalSndSetState (WalSndState state )
@@ -3002,6 +3133,8 @@ WalSndGetStateString(WalSndState state)
30023133 return "catchup" ;
30033134 case WALSNDSTATE_STREAMING :
30043135 return "streaming" ;
3136+ case WALSNDSTATE_STOPPING :
3137+ return "stopping" ;
30053138 }
30063139 return "UNKNOWN" ;
30073140}
0 commit comments