@@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
7878#define CommitTsCtl (&CommitTsCtlData)
7979
8080/*
81- * We keep a cache of the last value set in shared memory. This is protected
82- * by CommitTsLock.
81+ * We keep a cache of the last value set in shared memory.
82+ *
83+ * This is also good place to keep the activation status. We keep this
84+ * separate from the GUC so that the standby can activate the module if the
85+ * primary has it active independently of the value of the GUC.
86+ *
87+ * This is protected by CommitTsLock. In some places, we use commitTsActive
88+ * without acquiring the lock; where this happens, a comment explains the
89+ * rationale for it.
8390 */
8491typedef struct CommitTimestampShared
8592{
8693 TransactionId xidLastCommit ;
8794 CommitTimestampEntry dataLastCommit ;
95+ bool commitTsActive ;
8896} CommitTimestampShared ;
8997
9098CommitTimestampShared * commitTsShared ;
@@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
93101/* GUC variable */
94102bool track_commit_timestamp ;
95103
96- /*
97- * When this is set, commit_ts is force-enabled during recovery. This is so
98- * that a standby can replay WAL records coming from a master with the setting
99- * enabled. (Note that this doesn't enable SQL access to the data; it's
100- * effectively write-only until the GUC itself is enabled.)
101- */
102- static bool enable_during_recovery ;
103-
104104static void SetXidCommitTsInPage (TransactionId xid , int nsubxids ,
105105 TransactionId * subxids , TimestampTz ts ,
106106 RepOriginId nodeid , int pageno );
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109109static int ZeroCommitTsPage (int pageno , bool writeXlog );
110110static bool CommitTsPagePrecedes (int page1 , int page2 );
111111static void ActivateCommitTs (void );
112- static void DeactivateCommitTs (bool do_wal );
112+ static void DeactivateCommitTs (void );
113113static void WriteZeroPageXlogRec (int pageno );
114114static void WriteTruncateXlogRec (int pageno );
115115static void WriteSetTimestampXlogRec (TransactionId mainxid , int nsubxids ,
@@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
149149 TransactionId newestXact ;
150150
151151 /*
152- * No-op if the module is not enabled, but allow writes in a standby
153- * during recovery.
152+ * No-op if the module is not active.
153+ *
154+ * An unlocked read here is fine, because in a standby (the only place
155+ * where the flag can change in flight) this routine is only called by
156+ * the recovery process, which is also the only process which can change
157+ * the flag.
154158 */
155- if (!track_commit_timestamp && ! enable_during_recovery )
159+ if (!commitTsShared -> commitTsActive )
156160 return ;
157161
158162 /*
@@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283287 TransactionId oldestCommitTs ;
284288 TransactionId newestCommitTs ;
285289
290+ /* error if the given Xid doesn't normally commit */
291+ if (!TransactionIdIsNormal (xid ))
292+ ereport (ERROR ,
293+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
294+ errmsg ("cannot retrieve commit timestamp for transaction %u" , xid )));
295+
296+ LWLockAcquire (CommitTsLock , LW_SHARED );
297+
286298 /* Error if module not enabled */
287- if (!track_commit_timestamp )
299+ if (!commitTsShared -> commitTsActive )
288300 ereport (ERROR ,
289301 (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
290302 errmsg ("could not get commit timestamp data" ),
291303 errhint ("Make sure the configuration parameter \"%s\" is set." ,
292304 "track_commit_timestamp" )));
293305
294- /* error if the given Xid doesn't normally commit */
295- if (!TransactionIdIsNormal (xid ))
296- ereport (ERROR ,
297- (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
298- errmsg ("cannot retrieve commit timestamp for transaction %u" , xid )));
299-
300306 /*
301- * Return empty if the requested value is outside our valid range.
307+ * If we're asked for the cached value, return that. Otherwise, fall
308+ * through to read from SLRU.
302309 */
303- LWLockAcquire (CommitTsLock , LW_SHARED );
310+ if (commitTsShared -> xidLastCommit == xid )
311+ {
312+ * ts = commitTsShared -> dataLastCommit .time ;
313+ if (nodeid )
314+ * nodeid = commitTsShared -> dataLastCommit .nodeid ;
315+
316+ LWLockRelease (CommitTsLock );
317+ return * ts != 0 ;
318+ }
319+
304320 oldestCommitTs = ShmemVariableCache -> oldestCommitTs ;
305321 newestCommitTs = ShmemVariableCache -> newestCommitTs ;
306322 /* neither is invalid, or both are */
307323 Assert (TransactionIdIsValid (oldestCommitTs ) == TransactionIdIsValid (newestCommitTs ));
308324 LWLockRelease (CommitTsLock );
309325
326+ /*
327+ * Return empty if the requested value is outside our valid range.
328+ */
310329 if (!TransactionIdIsValid (oldestCommitTs ) ||
311330 TransactionIdPrecedes (xid , oldestCommitTs ) ||
312331 TransactionIdPrecedes (newestCommitTs , xid ))
@@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
317336 return false;
318337 }
319338
320- /*
321- * Use an unlocked atomic read on our cached value in shared memory; if
322- * it's a hit, acquire a lock and read the data, after verifying that it's
323- * still what we initially read. Otherwise, fall through to read from
324- * SLRU.
325- */
326- if (commitTsShared -> xidLastCommit == xid )
327- {
328- LWLockAcquire (CommitTsLock , LW_SHARED );
329- if (commitTsShared -> xidLastCommit == xid )
330- {
331- * ts = commitTsShared -> dataLastCommit .time ;
332- if (nodeid )
333- * nodeid = commitTsShared -> dataLastCommit .nodeid ;
334-
335- LWLockRelease (CommitTsLock );
336- return * ts != 0 ;
337- }
338- LWLockRelease (CommitTsLock );
339- }
340-
341339 /* lock is acquired by SimpleLruReadPage_ReadOnly */
342340 slotno = SimpleLruReadPage_ReadOnly (CommitTsCtl , pageno , xid );
343341 memcpy (& entry ,
@@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
366364{
367365 TransactionId xid ;
368366
367+ LWLockAcquire (CommitTsLock , LW_SHARED );
368+
369369 /* Error if module not enabled */
370- if (!track_commit_timestamp )
370+ if (!commitTsShared -> commitTsActive )
371371 ereport (ERROR ,
372372 (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
373373 errmsg ("could not get commit timestamp data" ),
374374 errhint ("Make sure the configuration parameter \"%s\" is set." ,
375375 "track_commit_timestamp" )));
376376
377- LWLockAcquire (CommitTsLock , LW_SHARED );
378377 xid = commitTsShared -> xidLastCommit ;
379378 if (ts )
380379 * ts = commitTsShared -> dataLastCommit .time ;
@@ -493,6 +492,7 @@ CommitTsShmemInit(void)
493492 commitTsShared -> xidLastCommit = InvalidTransactionId ;
494493 TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
495494 commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
495+ commitTsShared -> commitTsActive = false;
496496 }
497497 else
498498 Assert (found );
@@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
566566 * any leftover data.
567567 */
568568 if (!track_commit_timestamp )
569- DeactivateCommitTs (true );
569+ DeactivateCommitTs ();
570570}
571571
572572/*
@@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
588588 */
589589 if (newvalue )
590590 {
591- if (!track_commit_timestamp && ! oldvalue )
591+ if (!commitTsShared -> commitTsActive )
592592 ActivateCommitTs ();
593593 }
594- else if (! track_commit_timestamp && oldvalue )
595- DeactivateCommitTs (false );
594+ else if (commitTsShared -> commitTsActive )
595+ DeactivateCommitTs ();
596596}
597597
598598/*
@@ -645,7 +645,7 @@ ActivateCommitTs(void)
645645 }
646646 LWLockRelease (CommitTsLock );
647647
648- /* Finally, create the current segment file, if necessary */
648+ /* Create the current segment file, if necessary */
649649 if (!SimpleLruDoesPhysicalPageExist (CommitTsCtl , pageno ))
650650 {
651651 int slotno ;
@@ -657,8 +657,10 @@ ActivateCommitTs(void)
657657 LWLockRelease (CommitTsControlLock );
658658 }
659659
660- /* We can now replay xlog records from this module */
661- enable_during_recovery = true;
660+ /* Change the activation status in shared memory. */
661+ LWLockAcquire (CommitTsLock , LW_EXCLUSIVE );
662+ commitTsShared -> commitTsActive = true;
663+ LWLockRelease (CommitTsLock );
662664}
663665
664666/*
@@ -672,21 +674,25 @@ ActivateCommitTs(void)
672674 * possibly-invalid data; also removes segments of old data.
673675 */
674676static void
675- DeactivateCommitTs (bool do_wal )
677+ DeactivateCommitTs (void )
676678{
677- TransactionId xid = ShmemVariableCache -> nextXid ;
678- int pageno = TransactionIdToCTsPage (xid );
679-
680679 /*
681- * Re-Initialize our idea of the latest page number.
680+ * Cleanup the status in the shared memory.
681+ *
682+ * We reset everything in the commitTsShared record to prevent user from
683+ * getting confusing data about last committed transaction on the standby
684+ * when the module was activated repeatedly on the primary.
682685 */
683- LWLockAcquire (CommitTsControlLock , LW_EXCLUSIVE );
684- CommitTsCtl -> shared -> latest_page_number = pageno ;
685- LWLockRelease (CommitTsControlLock );
686-
687686 LWLockAcquire (CommitTsLock , LW_EXCLUSIVE );
687+
688+ commitTsShared -> commitTsActive = false;
689+ commitTsShared -> xidLastCommit = InvalidTransactionId ;
690+ TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
691+ commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
692+
688693 ShmemVariableCache -> oldestCommitTs = InvalidTransactionId ;
689694 ShmemVariableCache -> newestCommitTs = InvalidTransactionId ;
695+
690696 LWLockRelease (CommitTsLock );
691697
692698 /*
@@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
697703 * be overwritten anyway when we wrap around, but it seems better to be
698704 * tidy.)
699705 */
706+ LWLockAcquire (CommitTsControlLock , LW_EXCLUSIVE );
700707 (void ) SlruScanDirectory (CommitTsCtl , SlruScanDirCbDeleteAll , NULL );
701-
702- /* No longer enabled on recovery */
703- enable_during_recovery = false;
708+ LWLockRelease (CommitTsControlLock );
704709}
705710
706711/*
@@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
739744{
740745 int pageno ;
741746
742- /* nothing to do if module not enabled */
743- if (!track_commit_timestamp && !enable_during_recovery )
747+ /*
748+ * Nothing to do if module not enabled. Note we do an unlocked read of the
749+ * flag here, which is okay because this routine is only called from
750+ * GetNewTransactionId, which is never called in a standby.
751+ */
752+ Assert (!InRecovery );
753+ if (!commitTsShared -> commitTsActive )
744754 return ;
745755
746756 /*
@@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
768778 * Note that we don't need to flush XLOG here.
769779 */
770780void
771- TruncateCommitTs (TransactionId oldestXact , bool do_wal )
781+ TruncateCommitTs (TransactionId oldestXact )
772782{
773783 int cutoffPage ;
774784
@@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
784794 return ; /* nothing to remove */
785795
786796 /* Write XLOG record */
787- if (do_wal )
788- WriteTruncateXlogRec (cutoffPage );
797+ WriteTruncateXlogRec (cutoffPage );
789798
790799 /* Now we can remove the old CommitTs segment(s) */
791800 SimpleLruTruncate (CommitTsCtl , cutoffPage );
0 commit comments