@@ -724,6 +724,18 @@ typedef struct XLogCtlData
724724 XLogRecPtr lastFpwDisableRecPtr ;
725725
726726 slock_t info_lck ; /* locks shared variables shown above */
727+
728+ /*
729+ * Variables used to track segment-boundary-crossing WAL records. See
730+ * RegisterSegmentBoundary. Protected by segtrack_lck.
731+ */
732+ XLogSegNo lastNotifiedSeg ;
733+ XLogSegNo earliestSegBoundary ;
734+ XLogRecPtr earliestSegBoundaryEndPtr ;
735+ XLogSegNo latestSegBoundary ;
736+ XLogRecPtr latestSegBoundaryEndPtr ;
737+
738+ slock_t segtrack_lck ; /* locks shared variables shown above */
727739} XLogCtlData ;
728740
729741static XLogCtlData * XLogCtl = NULL ;
@@ -920,6 +932,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
920932 XLogSegNo * endlogSegNo );
921933static void UpdateLastRemovedPtr (char * filename );
922934static void ValidateXLOGDirectoryStructure (void );
935+ static void RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr pos );
923936static void CleanupBackupHistory (void );
924937static void UpdateMinRecoveryPoint (XLogRecPtr lsn , bool force );
925938static XLogRecord * ReadRecord (XLogReaderState * xlogreader ,
@@ -1154,23 +1167,56 @@ XLogInsertRecord(XLogRecData *rdata,
11541167 END_CRIT_SECTION ();
11551168
11561169 /*
1157- * Update shared LogwrtRqst.Write, if we crossed page boundary.
1170+ * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
1171+ * segment boundary, register that and wake up walwriter.
11581172 */
11591173 if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ )
11601174 {
1175+ XLogSegNo StartSeg ;
1176+ XLogSegNo EndSeg ;
1177+
1178+ XLByteToSeg (StartPos , StartSeg , wal_segment_size );
1179+ XLByteToSeg (EndPos , EndSeg , wal_segment_size );
1180+
1181+ /*
1182+ * Register our crossing the segment boundary if that occurred.
1183+ *
1184+ * Note that we did not use XLByteToPrevSeg() for determining the
1185+ * ending segment. This is so that a record that fits perfectly into
1186+ * the end of the segment causes the latter to get marked ready for
1187+ * archival immediately.
1188+ */
1189+ if (StartSeg != EndSeg && XLogArchivingActive ())
1190+ RegisterSegmentBoundary (EndSeg , EndPos );
1191+
1192+ /*
1193+ * Advance LogwrtRqst.Write so that it includes new block(s).
1194+ *
1195+ * We do this after registering the segment boundary so that the
1196+ * comparison with the flushed pointer below can use the latest value
1197+ * known globally.
1198+ */
11611199 SpinLockAcquire (& XLogCtl -> info_lck );
1162- /* advance global request to include new block(s) */
11631200 if (XLogCtl -> LogwrtRqst .Write < EndPos )
11641201 XLogCtl -> LogwrtRqst .Write = EndPos ;
11651202 /* update local result copy while I have the chance */
11661203 LogwrtResult = XLogCtl -> LogwrtResult ;
11671204 SpinLockRelease (& XLogCtl -> info_lck );
1205+
1206+ /*
1207+ * There's a chance that the record was already flushed to disk and we
1208+ * missed marking segments as ready for archive. If this happens, we
1209+ * nudge the WALWriter, which will take care of notifying segments as
1210+ * needed.
1211+ */
1212+ if (StartSeg != EndSeg && XLogArchivingActive () &&
1213+ LogwrtResult .Flush >= EndPos && ProcGlobal -> walwriterLatch )
1214+ SetLatch (ProcGlobal -> walwriterLatch );
11681215 }
11691216
11701217 /*
11711218 * If this was an XLOG_SWITCH record, flush the record and the empty
1172- * padding space that fills the rest of the segment, and perform
1173- * end-of-segment actions (eg, notifying archiver).
1219+ * padding space that fills the rest of the segment.
11741220 */
11751221 if (isLogSwitch )
11761222 {
@@ -2421,6 +2467,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
24212467
24222468 /* We should always be inside a critical section here */
24232469 Assert (CritSectionCount > 0 );
2470+ Assert (LWLockHeldByMe (WALWriteLock ));
24242471
24252472 /*
24262473 * Update local LogwrtResult (caller probably did this already, but...)
@@ -2586,11 +2633,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
25862633 * later. Doing it here ensures that one and only one backend will
25872634 * perform this fsync.
25882635 *
2589- * This is also the right place to notify the Archiver that the
2590- * segment is ready to copy to archival storage, and to update the
2591- * timer for archive_timeout, and to signal for a checkpoint if
2592- * too many logfile segments have been used since the last
2593- * checkpoint.
2636+ * If WAL archiving is active, we attempt to notify the archiver
2637+ * of any segments that are now ready for archival.
2638+ *
2639+ * This is also the right place to update the timer for
2640+ * archive_timeout and to signal for a checkpoint if too many
2641+ * logfile segments have been used since the last checkpoint.
25942642 */
25952643 if (finishing_seg )
25962644 {
@@ -2602,7 +2650,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
26022650 LogwrtResult .Flush = LogwrtResult .Write ; /* end of page */
26032651
26042652 if (XLogArchivingActive ())
2605- XLogArchiveNotifySeg ( openLogSegNo );
2653+ NotifySegmentsReadyForArchive ( LogwrtResult . Flush );
26062654
26072655 XLogCtl -> lastSegSwitchTime = (pg_time_t ) time (NULL );
26082656 XLogCtl -> lastSegSwitchLSN = LogwrtResult .Flush ;
@@ -2690,6 +2738,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
26902738 XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
26912739 SpinLockRelease (& XLogCtl -> info_lck );
26922740 }
2741+
2742+ if (XLogArchivingActive ())
2743+ NotifySegmentsReadyForArchive (LogwrtResult .Flush );
26932744}
26942745
26952746/*
@@ -4328,6 +4379,131 @@ ValidateXLOGDirectoryStructure(void)
43284379 }
43294380}
43304381
4382+ /*
4383+ * RegisterSegmentBoundary
4384+ *
4385+ * WAL records that are split across a segment boundary require special
4386+ * treatment for archiving: the initial segment must not be archived until
4387+ * the end segment has been flushed, in case we crash before we have
4388+ * the chance to flush the end segment (because after recovery we would
4389+ * overwrite that WAL record with a different one, and so the file we
4390+ * archived no longer represents truth.) This also applies to streaming
4391+ * physical replication.
4392+ *
4393+ * To handle this, we keep track of the LSN of WAL records that cross
4394+ * segment boundaries. Two such are sufficient: the ones with the
4395+ * earliest and the latest end pointers we know about, since the flush
4396+ * position advances monotonically. WAL record writers register
4397+ * boundary-crossing records here, which is used by .ready file creation
4398+ * to delay until the end segment is known flushed.
4399+ */
4400+ static void
4401+ RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr endpos )
4402+ {
4403+ XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY ;
4404+
4405+ /* verify caller computed segment number correctly */
4406+ AssertArg ((XLByteToSeg (endpos , segno , wal_segment_size ), segno == seg ));
4407+
4408+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4409+
4410+ /*
4411+ * If no segment boundaries are registered, store the new segment boundary
4412+ * in earliestSegBoundary. Otherwise, store the greater segment
4413+ * boundaries in latestSegBoundary.
4414+ */
4415+ if (XLogCtl -> earliestSegBoundary == MaxXLogSegNo )
4416+ {
4417+ XLogCtl -> earliestSegBoundary = seg ;
4418+ XLogCtl -> earliestSegBoundaryEndPtr = endpos ;
4419+ }
4420+ else if (seg > XLogCtl -> earliestSegBoundary &&
4421+ (XLogCtl -> latestSegBoundary == MaxXLogSegNo ||
4422+ seg > XLogCtl -> latestSegBoundary ))
4423+ {
4424+ XLogCtl -> latestSegBoundary = seg ;
4425+ XLogCtl -> latestSegBoundaryEndPtr = endpos ;
4426+ }
4427+
4428+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4429+ }
4430+
4431+ /*
4432+ * NotifySegmentsReadyForArchive
4433+ *
4434+ * Mark segments as ready for archival, given that it is safe to do so.
4435+ * This function is idempotent.
4436+ */
4437+ void
4438+ NotifySegmentsReadyForArchive (XLogRecPtr flushRecPtr )
4439+ {
4440+ XLogSegNo latest_boundary_seg ;
4441+ XLogSegNo last_notified ;
4442+ XLogSegNo flushed_seg ;
4443+ XLogSegNo seg ;
4444+ bool keep_latest ;
4445+
4446+ XLByteToSeg (flushRecPtr , flushed_seg , wal_segment_size );
4447+
4448+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4449+
4450+ if (XLogCtl -> latestSegBoundary <= flushed_seg &&
4451+ XLogCtl -> latestSegBoundaryEndPtr <= flushRecPtr )
4452+ {
4453+ latest_boundary_seg = XLogCtl -> latestSegBoundary ;
4454+ keep_latest = false;
4455+ }
4456+ else if (XLogCtl -> earliestSegBoundary <= flushed_seg &&
4457+ XLogCtl -> earliestSegBoundaryEndPtr <= flushRecPtr )
4458+ {
4459+ latest_boundary_seg = XLogCtl -> earliestSegBoundary ;
4460+ keep_latest = true;
4461+ }
4462+ else
4463+ {
4464+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4465+ return ;
4466+ }
4467+
4468+ last_notified = XLogCtl -> lastNotifiedSeg ;
4469+
4470+ /*
4471+ * Update shared memory and discard segment boundaries that are no longer
4472+ * needed.
4473+ *
4474+ * It is safe to update shared memory before we attempt to create the
4475+ * .ready files. If our calls to XLogArchiveNotifySeg() fail,
4476+ * RemoveOldXlogFiles() will retry it as needed.
4477+ */
4478+ if (last_notified < latest_boundary_seg - 1 )
4479+ XLogCtl -> lastNotifiedSeg = latest_boundary_seg - 1 ;
4480+
4481+ if (keep_latest )
4482+ {
4483+ XLogCtl -> earliestSegBoundary = XLogCtl -> latestSegBoundary ;
4484+ XLogCtl -> earliestSegBoundaryEndPtr = XLogCtl -> latestSegBoundaryEndPtr ;
4485+ }
4486+ else
4487+ {
4488+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
4489+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4490+ }
4491+
4492+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
4493+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4494+
4495+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4496+
4497+ /*
4498+ * Notify archiver about segments that are ready for archival (by creating
4499+ * the corresponding .ready files).
4500+ */
4501+ for (seg = last_notified + 1 ; seg < latest_boundary_seg ; seg ++ )
4502+ XLogArchiveNotifySeg (seg , false);
4503+
4504+ PgArchWakeup ();
4505+ }
4506+
43314507/*
43324508 * Remove previous backup history files. This also retries creation of
43334509 * .ready files for any backup history files for which XLogArchiveNotify
@@ -5230,9 +5406,17 @@ XLOGShmemInit(void)
52305406
52315407 SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
52325408 SpinLockInit (& XLogCtl -> info_lck );
5409+ SpinLockInit (& XLogCtl -> segtrack_lck );
52335410 SpinLockInit (& XLogCtl -> ulsn_lck );
52345411 InitSharedLatch (& XLogCtl -> recoveryWakeupLatch );
52355412 ConditionVariableInit (& XLogCtl -> recoveryNotPausedCV );
5413+
5414+ /* Initialize stuff for marking segments as ready for archival. */
5415+ XLogCtl -> lastNotifiedSeg = MaxXLogSegNo ;
5416+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
5417+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5418+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
5419+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
52365420}
52375421
52385422/*
@@ -7873,6 +8057,20 @@ StartupXLOG(void)
78738057 XLogCtl -> LogwrtRqst .Write = EndOfLog ;
78748058 XLogCtl -> LogwrtRqst .Flush = EndOfLog ;
78758059
8060+ /*
8061+ * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
8062+ */
8063+ if (XLogArchivingActive ())
8064+ {
8065+ XLogSegNo EndOfLogSeg ;
8066+
8067+ XLByteToSeg (EndOfLog , EndOfLogSeg , wal_segment_size );
8068+
8069+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
8070+ XLogCtl -> lastNotifiedSeg = EndOfLogSeg - 1 ;
8071+ SpinLockRelease (& XLogCtl -> segtrack_lck );
8072+ }
8073+
78768074 /*
78778075 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
78788076 * record before resource manager writes cleanup WAL records or checkpoint
@@ -8000,7 +8198,7 @@ StartupXLOG(void)
80008198 XLogArchiveCleanup (partialfname );
80018199
80028200 durable_rename (origpath , partialpath , ERROR );
8003- XLogArchiveNotify (partialfname );
8201+ XLogArchiveNotify (partialfname , true );
80048202 }
80058203 }
80068204 }
0 commit comments