2424 * are treated as not a crash but approximately normal termination;
2525 * the walsender will exit quickly without sending any more XLOG records.
2626 *
27- * If the server is shut down, postmaster sends us SIGUSR2 after all
28- * regular backends have exited and the shutdown checkpoint has been written.
29- * This instructs walsender to send any outstanding WAL, including the
30- * shutdown checkpoint record, wait for it to be replicated to the standby,
31- * and then exit.
27+ * If the server is shut down, checkpointer sends us
28+ * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29+ * the backend is idle or runs an SQL query this causes the backend to
30+ * shutdown, if logical replication is in progress all existing WAL records
31+ * are processed followed by a shutdown. Otherwise this causes the walsender
32+ * to switch to the "stopping" state. In this state, the walsender will reject
33+ * any further replication commands. The checkpointer begins the shutdown
34+ * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35+ * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36+ * walsender to send any outstanding WAL, including the shutdown checkpoint
37+ * record, wait for it to be replicated to the standby, and then exit.
3238 *
3339 *
3440 * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
@@ -171,13 +177,14 @@ static bool WalSndCaughtUp = false;
171177
172178/* Flags set by signal handlers for later service in main loop */
173179static volatile sig_atomic_t got_SIGHUP = false;
174- static volatile sig_atomic_t walsender_ready_to_stop = false;
180+ static volatile sig_atomic_t got_SIGUSR2 = false;
181+ static volatile sig_atomic_t got_STOPPING = false;
175182
176183/*
177- * This is set while we are streaming. When not set, SIGUSR2 signal will be
178- * handled like SIGTERM. When set, the main loop is responsible for checking
179- * walsender_ready_to_stop and terminating when it's set (after streaming any
180- * remaining WAL).
184+ * This is set while we are streaming. When not set
185+ * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
186+ * the main loop is responsible for checking got_STOPPING and terminating when
187+ * it's set (after streaming any remaining WAL).
181188 */
182189static volatile sig_atomic_t replication_active = false;
183190
@@ -264,7 +271,8 @@ WalSndErrorCleanup(void)
264271 ReplicationSlotRelease ();
265272
266273 replication_active = false;
267- if (walsender_ready_to_stop )
274+
275+ if (got_STOPPING || got_SIGUSR2 )
268276 proc_exit (0 );
269277
270278 /* Revert back to startup state */
@@ -671,7 +679,7 @@ StartReplication(StartReplicationCmd *cmd)
671679 WalSndLoop (XLogSendPhysical );
672680
673681 replication_active = false;
674- if (walsender_ready_to_stop )
682+ if (got_STOPPING )
675683 proc_exit (0 );
676684 WalSndSetState (WALSNDSTATE_STARTUP );
677685
@@ -970,7 +978,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
970978 {
971979 ereport (LOG ,
972980 (errmsg ("terminating walsender process after promotion" )));
973- walsender_ready_to_stop = true;
981+ got_STOPPING = true;
974982 }
975983
976984 WalSndSetState (WALSNDSTATE_CATCHUP );
@@ -1024,7 +1032,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10241032 ReplicationSlotRelease ();
10251033
10261034 replication_active = false;
1027- if (walsender_ready_to_stop )
1035+ if (got_STOPPING )
10281036 proc_exit (0 );
10291037 WalSndSetState (WALSNDSTATE_STARTUP );
10301038
@@ -1204,6 +1212,14 @@ WalSndWaitForWal(XLogRecPtr loc)
12041212 /* Check for input from the client */
12051213 ProcessRepliesIfAny ();
12061214
1215+ /*
1216+ * If we're shutting down, trigger pending WAL to be written out,
1217+ * otherwise we'd possibly end up waiting for WAL that never gets
1218+ * written, because walwriter has shut down already.
1219+ */
1220+ if (got_STOPPING )
1221+ XLogBackgroundFlush ();
1222+
12071223 /* Update our idea of the currently flushed position. */
12081224 if (!RecoveryInProgress ())
12091225 RecentFlushPtr = GetFlushRecPtr ();
@@ -1219,7 +1235,7 @@ WalSndWaitForWal(XLogRecPtr loc)
12191235 * RecentFlushPtr, so we can send all remaining data before shutting
12201236 * down.
12211237 */
1222- if (walsender_ready_to_stop )
1238+ if (got_STOPPING )
12231239 break ;
12241240
12251241 /*
@@ -1289,6 +1305,22 @@ exec_replication_command(const char *cmd_string)
12891305 MemoryContext cmd_context ;
12901306 MemoryContext old_context ;
12911307
1308+ /*
1309+ * If WAL sender has been told that shutdown is getting close, switch its
1310+ * status accordingly to handle the next replication commands correctly.
1311+ */
1312+ if (got_STOPPING )
1313+ WalSndSetState (WALSNDSTATE_STOPPING );
1314+
1315+ /*
1316+ * Throw error if in stopping mode. We need prevent commands that could
1317+ * generate WAL while the shutdown checkpoint is being written. To be
1318+ * safe, we just prohibit all new commands.
1319+ */
1320+ if (MyWalSnd -> state == WALSNDSTATE_STOPPING )
1321+ ereport (ERROR ,
1322+ (errmsg ("cannot execute new commands while WAL sender is in stopping mode" )));
1323+
12921324 /*
12931325 * Log replication command if log_replication_commands is enabled. Even
12941326 * when it's disabled, log the command with DEBUG1 level for backward
@@ -1884,7 +1916,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
18841916 * normal termination at shutdown, or a promotion, the walsender
18851917 * is not sure which.
18861918 */
1887- if (walsender_ready_to_stop )
1919+ if (got_SIGUSR2 )
18881920 WalSndDone (send_data );
18891921 }
18901922
@@ -2193,6 +2225,10 @@ XLogSendPhysical(void)
21932225 XLogRecPtr endptr ;
21942226 Size nbytes ;
21952227
2228+ /* If requested switch the WAL sender to the stopping state. */
2229+ if (got_STOPPING )
2230+ WalSndSetState (WALSNDSTATE_STOPPING );
2231+
21962232 if (streamingDoneSending )
21972233 {
21982234 WalSndCaughtUp = true;
@@ -2452,7 +2488,16 @@ XLogSendLogical(void)
24522488 * point, then we're caught up.
24532489 */
24542490 if (logical_decoding_ctx -> reader -> EndRecPtr >= GetFlushRecPtr ())
2491+ {
24552492 WalSndCaughtUp = true;
2493+
2494+ /*
2495+ * Have WalSndLoop() terminate the connection in an orderly
2496+ * manner, after writing out all the pending data.
2497+ */
2498+ if (got_STOPPING )
2499+ got_SIGUSR2 = true;
2500+ }
24562501 }
24572502
24582503 /* Update shared memory status */
@@ -2563,6 +2608,26 @@ WalSndRqstFileReload(void)
25632608 }
25642609}
25652610
2611+ /*
2612+ * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2613+ */
2614+ void
2615+ HandleWalSndInitStopping (void )
2616+ {
2617+ Assert (am_walsender );
2618+
2619+ /*
2620+ * If replication has not yet started, die like with SIGTERM. If
2621+ * replication is active, only set a flag and wake up the main loop. It
2622+ * will send any outstanding WAL, wait for it to be replicated to the
2623+ * standby, and then exit gracefully.
2624+ */
2625+ if (!replication_active )
2626+ kill (MyProcPid , SIGTERM );
2627+ else
2628+ got_STOPPING = true;
2629+ }
2630+
25662631/* SIGHUP: set flag to re-read config file at next convenient time */
25672632static void
25682633WalSndSigHupHandler (SIGNAL_ARGS )
@@ -2576,22 +2641,17 @@ WalSndSigHupHandler(SIGNAL_ARGS)
25762641 errno = save_errno ;
25772642}
25782643
2579- /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
2644+ /*
2645+ * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2646+ * sender should already have been switched to WALSNDSTATE_STOPPING at
2647+ * this point.
2648+ */
25802649static void
25812650WalSndLastCycleHandler (SIGNAL_ARGS )
25822651{
25832652 int save_errno = errno ;
25842653
2585- /*
2586- * If replication has not yet started, die like with SIGTERM. If
2587- * replication is active, only set a flag and wake up the main loop. It
2588- * will send any outstanding WAL, wait for it to be replicated to the
2589- * standby, and then exit gracefully.
2590- */
2591- if (!replication_active )
2592- kill (MyProcPid , SIGTERM );
2593-
2594- walsender_ready_to_stop = true;
2654+ got_SIGUSR2 = true;
25952655 SetLatch (MyLatch );
25962656
25972657 errno = save_errno ;
@@ -2689,6 +2749,77 @@ WalSndWakeup(void)
26892749 }
26902750}
26912751
2752+ /*
2753+ * Signal all walsenders to move to stopping state.
2754+ *
2755+ * This will trigger walsenders to move to a state where no further WAL can be
2756+ * generated. See this file's header for details.
2757+ */
2758+ void
2759+ WalSndInitStopping (void )
2760+ {
2761+ int i ;
2762+
2763+ for (i = 0 ; i < max_wal_senders ; i ++ )
2764+ {
2765+ WalSnd * walsnd = & WalSndCtl -> walsnds [i ];
2766+ pid_t pid ;
2767+
2768+ SpinLockAcquire (& walsnd -> mutex );
2769+ pid = walsnd -> pid ;
2770+ SpinLockRelease (& walsnd -> mutex );
2771+
2772+ if (pid == 0 )
2773+ continue ;
2774+
2775+ SendProcSignal (pid , PROCSIG_WALSND_INIT_STOPPING , InvalidBackendId );
2776+ }
2777+ }
2778+
2779+ /*
2780+ * Wait that all the WAL senders have quit or reached the stopping state. This
2781+ * is used by the checkpointer to control when the shutdown checkpoint can
2782+ * safely be performed.
2783+ */
2784+ void
2785+ WalSndWaitStopping (void )
2786+ {
2787+ for (;;)
2788+ {
2789+ int i ;
2790+ bool all_stopped = true;
2791+
2792+ for (i = 0 ; i < max_wal_senders ; i ++ )
2793+ {
2794+ WalSndState state ;
2795+ WalSnd * walsnd = & WalSndCtl -> walsnds [i ];
2796+
2797+ SpinLockAcquire (& walsnd -> mutex );
2798+
2799+ if (walsnd -> pid == 0 )
2800+ {
2801+ SpinLockRelease (& walsnd -> mutex );
2802+ continue ;
2803+ }
2804+
2805+ state = walsnd -> state ;
2806+ SpinLockRelease (& walsnd -> mutex );
2807+
2808+ if (state != WALSNDSTATE_STOPPING )
2809+ {
2810+ all_stopped = false;
2811+ break ;
2812+ }
2813+ }
2814+
2815+ /* safe to leave if confirmation is done for all WAL senders */
2816+ if (all_stopped )
2817+ return ;
2818+
2819+ pg_usleep (10000L ); /* wait for 10 msec */
2820+ }
2821+ }
2822+
26922823/* Set state for current walsender (only called in walsender) */
26932824void
26942825WalSndSetState (WalSndState state )
@@ -2722,6 +2853,8 @@ WalSndGetStateString(WalSndState state)
27222853 return "catchup" ;
27232854 case WALSNDSTATE_STREAMING :
27242855 return "streaming" ;
2856+ case WALSNDSTATE_STOPPING :
2857+ return "stopping" ;
27252858 }
27262859 return "UNKNOWN" ;
27272860}
0 commit comments