@@ -218,6 +218,8 @@ static bool recoveryPauseAtTarget = true;
218218static TransactionId recoveryTargetXid ;
219219static TimestampTz recoveryTargetTime ;
220220static char * recoveryTargetName ;
221+ static int min_recovery_apply_delay = 0 ;
222+ static TimestampTz recoveryDelayUntilTime ;
221223
222224/* options taken from recovery.conf for XLOG streaming */
223225static bool StandbyModeRequested = false;
@@ -728,8 +730,10 @@ static bool holdingAllSlots = false;
728730
729731static void readRecoveryCommandFile (void );
730732static void exitArchiveRecovery (TimeLineID endTLI , XLogSegNo endLogSegNo );
731- static bool recoveryStopsHere (XLogRecord * record , bool * includeThis );
733+ static bool recoveryStopsHere (XLogRecord * record , bool * includeThis , bool * delayThis );
732734static void recoveryPausesHere (void );
735+ static void recoveryApplyDelay (void );
736+ static bool SetRecoveryDelayUntilTime (TimestampTz xtime );
733737static void SetLatestXTime (TimestampTz xtime );
734738static void SetCurrentChunkStartTime (TimestampTz xtime );
735739static void CheckRequiredParameterValues (void );
@@ -5476,6 +5480,19 @@ readRecoveryCommandFile(void)
54765480 (errmsg_internal ("trigger_file = '%s'" ,
54775481 TriggerFile )));
54785482 }
5483+ else if (strcmp (item -> name , "min_recovery_apply_delay" ) == 0 )
5484+ {
5485+ const char * hintmsg ;
5486+
5487+ if (!parse_int (item -> value , & min_recovery_apply_delay , GUC_UNIT_MS ,
5488+ & hintmsg ))
5489+ ereport (ERROR ,
5490+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
5491+ errmsg ("parameter \"%s\" requires a temporal value" , "min_recovery_apply_delay" ),
5492+ hintmsg ? errhint ("%s" , _ (hintmsg )) : 0 ));
5493+ ereport (DEBUG2 ,
5494+ (errmsg ("min_recovery_apply_delay = '%s'" , item -> value )));
5495+ }
54795496 else
54805497 ereport (FATAL ,
54815498 (errmsg ("unrecognized recovery parameter \"%s\"" ,
@@ -5625,10 +5642,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
56255642 * We also track the timestamp of the latest applied COMMIT/ABORT
56265643 * record in XLogCtl->recoveryLastXTime, for logging purposes.
56275644 * Also, some information is saved in recoveryStopXid et al for use in
5628- * annotating the new timeline's history file.
5645+ * annotating the new timeline's history file; and recoveryDelayUntilTime
5646+ * is updated, for time-delayed standbys.
56295647 */
56305648static bool
5631- recoveryStopsHere (XLogRecord * record , bool * includeThis )
5649+ recoveryStopsHere (XLogRecord * record , bool * includeThis , bool * delayThis )
56325650{
56335651 bool stopsHere ;
56345652 uint8 record_info ;
@@ -5645,20 +5663,31 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
56455663
56465664 recordXactCommitData = (xl_xact_commit_compact * ) XLogRecGetData (record );
56475665 recordXtime = recordXactCommitData -> xact_time ;
5666+
5667+ * delayThis = SetRecoveryDelayUntilTime (recordXactCommitData -> xact_time );
56485668 }
56495669 else if (record -> xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT )
56505670 {
56515671 xl_xact_commit * recordXactCommitData ;
56525672
56535673 recordXactCommitData = (xl_xact_commit * ) XLogRecGetData (record );
56545674 recordXtime = recordXactCommitData -> xact_time ;
5675+
5676+ * delayThis = SetRecoveryDelayUntilTime (recordXactCommitData -> xact_time );
56555677 }
56565678 else if (record -> xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT )
56575679 {
56585680 xl_xact_abort * recordXactAbortData ;
56595681
56605682 recordXactAbortData = (xl_xact_abort * ) XLogRecGetData (record );
56615683 recordXtime = recordXactAbortData -> xact_time ;
5684+
5685+ /*
5686+ * We deliberately choose not to delay aborts since they have no
5687+ * effect on MVCC. We already allow replay of records that don't
5688+ * have a timestamp, so there is already opportunity for issues
5689+ * caused by early conflicts on standbys.
5690+ */
56625691 }
56635692 else if (record -> xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT )
56645693 {
@@ -5667,6 +5696,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
56675696 recordRestorePointData = (xl_restore_point * ) XLogRecGetData (record );
56685697 recordXtime = recordRestorePointData -> rp_time ;
56695698 strncpy (recordRPName , recordRestorePointData -> rp_name , MAXFNAMELEN );
5699+
5700+ * delayThis = SetRecoveryDelayUntilTime (recordRestorePointData -> rp_time );
56705701 }
56715702 else
56725703 return false;
@@ -5833,6 +5864,66 @@ SetRecoveryPause(bool recoveryPause)
58335864 SpinLockRelease (& xlogctl -> info_lck );
58345865}
58355866
5867+ static bool
5868+ SetRecoveryDelayUntilTime (TimestampTz xtime )
5869+ {
5870+ if (min_recovery_apply_delay != 0 )
5871+ {
5872+ recoveryDelayUntilTime =
5873+ TimestampTzPlusMilliseconds (xtime , min_recovery_apply_delay );
5874+
5875+ return true;
5876+ }
5877+
5878+ return false;
5879+ }
5880+ /*
5881+ * When min_recovery_apply_delay is set, we wait long enough to make sure
5882+ * certain record types are applied at least that interval behind the master.
5883+ * See recoveryStopsHere().
5884+ *
5885+ * Note that the delay is calculated between the WAL record log time and
5886+ * the current time on standby. We would prefer to keep track of when this
5887+ * standby received each WAL record, which would allow a more consistent
5888+ * approach and one not affected by time synchronisation issues, but that
5889+ * is significantly more effort and complexity for little actual gain in
5890+ * usability.
5891+ */
5892+ static void
5893+ recoveryApplyDelay (void )
5894+ {
5895+ while (true)
5896+ {
5897+ long secs ;
5898+ int microsecs ;
5899+
5900+ ResetLatch (& XLogCtl -> recoveryWakeupLatch );
5901+
5902+ /* might change the trigger file's location */
5903+ HandleStartupProcInterrupts ();
5904+
5905+ if (CheckForStandbyTrigger ())
5906+ break ;
5907+
5908+ /*
5909+ * Wait for difference between GetCurrentTimestamp() and
5910+ * recoveryDelayUntilTime
5911+ */
5912+ TimestampDifference (GetCurrentTimestamp (), recoveryDelayUntilTime ,
5913+ & secs , & microsecs );
5914+
5915+ if (secs <= 0 && microsecs <=0 )
5916+ break ;
5917+
5918+ elog (DEBUG2 , "recovery apply delay %ld seconds, %d milliseconds" ,
5919+ secs , microsecs / 1000 );
5920+
5921+ WaitLatch (& XLogCtl -> recoveryWakeupLatch ,
5922+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
5923+ secs * 1000L + microsecs / 1000 );
5924+ }
5925+ }
5926+
58365927/*
58375928 * Save timestamp of latest processed commit/abort record.
58385929 *
@@ -6660,6 +6751,7 @@ StartupXLOG(void)
66606751 {
66616752 bool recoveryContinue = true;
66626753 bool recoveryApply = true;
6754+ bool recoveryDelay = false;
66636755 ErrorContextCallback errcallback ;
66646756 TimestampTz xtime ;
66656757
@@ -6719,7 +6811,7 @@ StartupXLOG(void)
67196811 /*
67206812 * Have we reached our recovery target?
67216813 */
6722- if (recoveryStopsHere (record , & recoveryApply ))
6814+ if (recoveryStopsHere (record , & recoveryApply , & recoveryDelay ))
67236815 {
67246816 if (recoveryPauseAtTarget )
67256817 {
@@ -6734,6 +6826,25 @@ StartupXLOG(void)
67346826 break ;
67356827 }
67366828
6829+ /*
6830+ * If we've been asked to lag the master, wait on
6831+ * latch until enough time has passed.
6832+ */
6833+ if (recoveryDelay )
6834+ {
6835+ recoveryApplyDelay ();
6836+
6837+ /*
6838+ * We test for paused recovery again here. If
6839+ * user sets delayed apply, it may be because
6840+ * they expect to pause recovery in case of
6841+ * problems, so we must test again here otherwise
6842+ * pausing during the delay-wait wouldn't work.
6843+ */
6844+ if (xlogctl -> recoveryPause )
6845+ recoveryPausesHere ();
6846+ }
6847+
67376848 /* Setup error traceback support for ereport() */
67386849 errcallback .callback = rm_redo_error_callback ;
67396850 errcallback .arg = (void * ) record ;
0 commit comments