4545 * fsynced
4646 * * If COMMIT happens after checkpoint then backend reads state data from
4747 * files
48- * * In case of crash replay will move data from xlog to files, if that
49- * hasn't happened before. XXX TODO - move to shmem in replay also
48+ * * Simplified version of the same scenario happens during recovery and
49+ * replication. See comments to KnownPreparedXact structure.
5050 *
5151 *-------------------------------------------------------------------------
5252 */
@@ -181,6 +181,35 @@ static GlobalTransaction MyLockedGxact = NULL;
181181
182182static bool twophaseExitRegistered = false;
183183
184+ /*
185+ * During replay and replication KnownPreparedList holds info about active prepared
186+ * transactions that weren't moved to files yet. We will need that info by the end of
187+ * recovery (including promote) to restore memory state of that transactions.
188+ *
189+ * Naive approach here is to move each PREPARE record to disk, fsync it and don't have
190+ * that list at all, but that provokes a lot of unnecessary fsyncs on small files
191+ * causing replica to be slower than master.
192+ *
193+ * Replay of twophase records happens by the following rules:
194+ * * On PREPARE redo KnownPreparedAdd() is called to add that transaction to
195+ * KnownPreparedList and no more actions taken.
196+ * * On checkpoint we iterate through KnownPreparedList, move all prepare
197+ * records that behind redo_horizon to file and deleting items from list.
198+ * * On COMMIT/ABORT we delete file or entry in KnownPreparedList.
199+ * * At the end of recovery we move all known prepared transactions to disk
200+ * to allow RecoverPreparedTransactions/StandbyRecoverPreparedTransactions
201+ * do their work.
202+ */
203+ typedef struct KnownPreparedXact
204+ {
205+ TransactionId xid ;
206+ XLogRecPtr prepare_start_lsn ;
207+ XLogRecPtr prepare_end_lsn ;
208+ dlist_node list_node ;
209+ } KnownPreparedXact ;
210+
211+ static dlist_head KnownPreparedList = DLIST_STATIC_INIT (KnownPreparedList );
212+
184213static void RecordTransactionCommitPrepared (TransactionId xid ,
185214 int nchildren ,
186215 TransactionId * children ,
@@ -200,82 +229,6 @@ static void RemoveGXact(GlobalTransaction gxact);
200229
201230static void XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len );
202231
203-
204- dlist_head StandbyTwoPhaseStateData = DLIST_STATIC_INIT (StandbyTwoPhaseStateData );
205-
206- typedef struct StandbyPreparedTransaction
207- {
208- TransactionId xid ;
209- XLogRecPtr prepare_start_lsn ;
210- XLogRecPtr prepare_end_lsn ;
211- dlist_node list_node ;
212- } StandbyPreparedTransaction ;
213-
214- void
215- StandbyCheckPointTwoPhase (XLogRecPtr redo_horizon )
216- {
217- dlist_mutable_iter miter ;
218- int serialized_xacts = 0 ;
219-
220- // Assert(RecoveryInProgress());
221-
222- TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
223-
224- dlist_foreach_modify (miter , & StandbyTwoPhaseStateData )
225- {
226- StandbyPreparedTransaction * xact = dlist_container (StandbyPreparedTransaction ,
227- list_node , miter .cur );
228-
229- if (redo_horizon == InvalidXLogRecPtr || xact -> prepare_end_lsn <= redo_horizon )
230- {
231- char * buf ;
232- int len ;
233-
234- XlogReadTwoPhaseData (xact -> prepare_start_lsn , & buf , & len );
235- RecreateTwoPhaseFile (xact -> xid , buf , len );
236- pfree (buf );
237- dlist_delete (miter .cur );
238- serialized_xacts ++ ;
239- }
240- }
241-
242- TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
243-
244- if (log_checkpoints && serialized_xacts > 0 )
245- ereport (LOG ,
246- (errmsg_plural ("%u two-phase state file was written "
247- "for long-running prepared transactions" ,
248- "%u two-phase state files were written "
249- "for long-running prepared transactions" ,
250- serialized_xacts ,
251- serialized_xacts )));
252- }
253-
254- // XXX: rename to remove_standby_state
255- void
256- StandbyAtCommit (TransactionId xid )
257- {
258- dlist_mutable_iter miter ;
259-
260- Assert (RecoveryInProgress ());
261-
262- dlist_foreach_modify (miter , & StandbyTwoPhaseStateData )
263- {
264- StandbyPreparedTransaction * xact = dlist_container (StandbyPreparedTransaction ,
265- list_node , miter .cur );
266-
267- if (xact -> xid == xid )
268- {
269- dlist_delete (miter .cur );
270- return ;
271- }
272- }
273-
274- RemoveTwoPhaseFile (xid , false);
275- }
276-
277-
278-
279232/*
280233 * Initialization of shared memory
281234 */
@@ -1729,18 +1682,25 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
17291682 serialized_xacts )));
17301683}
17311684
1685+ /*
1686+ * KnownPreparedAdd.
1687+ *
1688+ * Store correspondence of start/end lsn and xid in KnownPreparedList.
1689+ * This is called during redo of prepare record to have list of prepared
1690+ * transactions that aren't yet moved to 2PC files by the end of recovery.
1691+ */
17321692void
1733- StandbyAtPrepare (XLogReaderState * record )
1693+ KnownPreparedAdd (XLogReaderState * record )
17341694{
1735- StandbyPreparedTransaction * xact ;
1695+ KnownPreparedXact * xact ;
17361696 TwoPhaseFileHeader * hdr = (TwoPhaseFileHeader * ) XLogRecGetData (record );
17371697
1738- xact = (StandbyPreparedTransaction * ) palloc (sizeof (StandbyPreparedTransaction ));
1698+ xact = (KnownPreparedXact * ) palloc (sizeof (KnownPreparedXact ));
17391699 xact -> xid = hdr -> xid ;
17401700 xact -> prepare_start_lsn = record -> ReadRecPtr ;
17411701 xact -> prepare_end_lsn = record -> EndRecPtr ;
17421702
1743- dlist_push_tail (& StandbyTwoPhaseStateData , & xact -> list_node );
1703+ dlist_push_tail (& KnownPreparedList , & xact -> list_node );
17441704}
17451705
17461706/*
@@ -1781,7 +1741,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
17811741 int nxids = 0 ;
17821742 int allocsize = 0 ;
17831743
1784- StandbyCheckPointTwoPhase ( 0 );
1744+ KnownPreparedRecreateFiles ( InvalidXLogRecPtr );
17851745
17861746 cldir = AllocateDir (TWOPHASE_DIR );
17871747 while ((clde = ReadDir (cldir , TWOPHASE_DIR )) != NULL )
@@ -2254,3 +2214,88 @@ RecordTransactionAbortPrepared(TransactionId xid,
22542214 */
22552215 SyncRepWaitForLSN (recptr , false);
22562216}
2217+
2218+ /*
2219+ * KnownPreparedRemoveByXid
2220+ *
2221+ * Forget about prepared transaction. Called durind commit/abort.
2222+ */
2223+ void
2224+ KnownPreparedRemoveByXid (TransactionId xid )
2225+ {
2226+ dlist_mutable_iter miter ;
2227+
2228+ Assert (RecoveryInProgress ());
2229+
2230+ dlist_foreach_modify (miter , & KnownPreparedList )
2231+ {
2232+ KnownPreparedXact * xact = dlist_container (KnownPreparedXact ,
2233+ list_node , miter .cur );
2234+
2235+ if (xact -> xid == xid )
2236+ {
2237+ dlist_delete (miter .cur );
2238+ /*
2239+ * Since we found entry in KnownPreparedList we know that file isn't
2240+ * on disk yet and we can end up here.
2241+ */
2242+ return ;
2243+ }
2244+ }
2245+
2246+ /*
2247+ * Here we know that file should be moved to disk. But aborting recovery because
2248+ * of absence of unnecessary file doesn't seems to be a good idea, so call remove
2249+ * with giveWarning=false.
2250+ */
2251+ RemoveTwoPhaseFile (xid , false);
2252+ }
2253+
2254+ /*
2255+ * KnownPreparedRecreateFiles
2256+ *
2257+ * Moves prepare records from WAL to files. Callend during checkpoint replay
2258+ * or PrescanPreparedTransactions.
2259+ *
2260+ * redo_horizon = InvalidXLogRecPtr indicates that all transactions from
2261+ * KnownPreparedList should be moved to disk.
2262+ */
2263+ void
2264+ KnownPreparedRecreateFiles (XLogRecPtr redo_horizon )
2265+ {
2266+ dlist_mutable_iter miter ;
2267+ int serialized_xacts = 0 ;
2268+
2269+ Assert (RecoveryInProgress ());
2270+
2271+ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
2272+
2273+ dlist_foreach_modify (miter , & KnownPreparedList )
2274+ {
2275+ KnownPreparedXact * xact = dlist_container (KnownPreparedXact ,
2276+ list_node , miter .cur );
2277+
2278+ if (xact -> prepare_end_lsn <= redo_horizon || redo_horizon == InvalidXLogRecPtr )
2279+ {
2280+ char * buf ;
2281+ int len ;
2282+
2283+ XlogReadTwoPhaseData (xact -> prepare_start_lsn , & buf , & len );
2284+ RecreateTwoPhaseFile (xact -> xid , buf , len );
2285+ pfree (buf );
2286+ dlist_delete (miter .cur );
2287+ serialized_xacts ++ ;
2288+ }
2289+ }
2290+
2291+ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
2292+
2293+ if (log_checkpoints && serialized_xacts > 0 )
2294+ ereport (LOG ,
2295+ (errmsg_plural ("%u two-phase state file was written "
2296+ "for long-running prepared transactions" ,
2297+ "%u two-phase state files were written "
2298+ "for long-running prepared transactions" ,
2299+ serialized_xacts ,
2300+ serialized_xacts )));
2301+ }
0 commit comments