@@ -121,7 +121,10 @@ typedef struct GlobalTransactionData
121121 BackendId dummyBackendId ; /* similar to backend id for backends */
122122 TimestampTz prepared_at ; /* time of preparation */
123123 XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record end */
124- XLogRecPtr prepare_xlogptr ; /* XLOG offset of prepare record start */
124+ XLogRecPtr prepare_xlogptr ; /* XLOG offset of prepare record start
125+ * or NULL if twophase data moved to file
126+ * after checkpoint.
127+ */
125128 Oid owner ; /* ID of user that executed the xact */
126129 BackendId locking_backend ; /* backend currently working on the xact */
127130 bool valid ; /* TRUE if PGPROC entry is in proc array */
@@ -1303,21 +1306,23 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13031306
13041307 /*
13051308 * Read and validate 2PC state data.
1306- * NB: Here we can face the situation where checkpoint can happend
1307- * between condition check and xlog read. To prevent that I'm holding
1308- * delayChkpt. Other possible scenario is try to read xlog and if it fails
1309- * try to read file.
1309+ * State data can be stored in xlog or files depending on checkpoint
1310+ * status. One way to read that data is to delay checkpoint (delayChkpt) and
1311+ * compare gxact->prepare_lsn with current xlog horizon. But having in mind
1312+ * that most of 2PC transactions will be commited right after prepare, we
1313+ * can just try to read xlog and in case of error read file. Also that is
1314+ * happening under LockGXact, so nobody can commit our transaction between
1315+ * xlog and file reads.
13101316 */
1311- MyPgXact -> delayChkpt = true;
1312- if (gxact -> prepare_lsn <= GetRedoRecPtr ()){
1313- buf = ReadTwoPhaseFile (xid , true);
1314- file_used = true;
1317+ if (gxact -> prepare_lsn )
1318+ {
1319+ XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
13151320 }
13161321 else
13171322 {
1318- XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
1323+ buf = ReadTwoPhaseFile (xid , true);
1324+ file_used = true;
13191325 }
1320- MyPgXact -> delayChkpt = false;
13211326
13221327 /*
13231328 * Disassemble the header area
@@ -1560,24 +1565,35 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
15601565 int len ;
15611566 char * buf ;
15621567
1568+ fprintf (stderr , "=== Checkpoint: redo_horizon=%lX\n" , redo_horizon );
1569+
15631570 if (max_prepared_xacts <= 0 )
15641571 return ; /* nothing to do */
15651572
15661573 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
15671574
1575+ /*
1576+ * Here we doing whole I/O while holding TwoPhaseStateLock.
1577+ * It's also possible to move I/O out of the lock, but on
1578+ * every error we should check whether somebody commited our
1579+ * transaction in different backend. Let's leave this optimisation
1580+ * for future, if somebody will spot that this place cause
1581+ * bottleneck.
1582+ *
1583+ */
15681584 LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
1569-
15701585 for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
15711586 {
15721587 GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
15731588 PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
15741589
1575- if (gxact -> valid && gxact -> prepare_lsn <= redo_horizon ){
1590+ if (gxact -> valid && gxact -> prepare_lsn && gxact -> prepare_lsn <= redo_horizon ){
15761591 XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , & len );
15771592 RecreateTwoPhaseFile (pgxact -> xid , buf , len );
1593+ gxact -> prepare_lsn = (XLogRecPtr ) NULL ;
1594+ pfree (buf );
15781595 }
15791596 }
1580-
15811597 LWLockRelease (TwoPhaseStateLock );
15821598
15831599 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
@@ -2094,7 +2110,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
20942110
20952111/**********************************************************************************/
20962112
2097- void
2113+ static void
20982114XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len )
20992115{
21002116 XLogRecord * record ;
@@ -2106,17 +2122,14 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
21062122 elog (ERROR , "failed to open xlogreader for reading 2PC data" );
21072123
21082124 record = XLogReadRecord (xlogreader , lsn , & errormsg );
2109-
21102125 if (record == NULL )
2111- elog (ERROR , "failed to find 2PC data in xlog" );
2126+ elog (ERROR , "failed to read 2PC record from xlog" );
21122127
21132128 if (len != NULL )
21142129 * len = XLogRecGetDataLen (xlogreader );
2115- else
2116- elog (ERROR , "failed to read 2PC data from xlog: xore length" );
21172130
2118- * buf = palloc (sizeof (char )* ( * len ));
2119- memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char )* ( * len ));
2131+ * buf = palloc (sizeof (char )* XLogRecGetDataLen ( xlogreader ));
2132+ memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char )* XLogRecGetDataLen ( xlogreader ));
21202133
21212134 XLogReaderFree (xlogreader );
21222135}
0 commit comments