@@ -170,12 +170,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
170170 const TwoPhaseCallback callbacks []);
171171static void RemoveGXact (GlobalTransaction gxact );
172172
173-
174-
175- static char twophase_buf [10 * 1024 ];
176- static int twophase_pos = 0 ;
177- size_t bogus_write (int fd , const void * buf , size_t nbytes );
178- static char * XlogReadTwoPhaseData (XLogRecPtr lsn );
173+ static void XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len );
179174
180175/*
181176 * Initialization of shared memory
@@ -1033,14 +1028,8 @@ StartPrepare(GlobalTransaction gxact)
10331028void
10341029EndPrepare (GlobalTransaction gxact )
10351030{
1036- // PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037- // TransactionId xid = pgxact->xid;
10381031 TwoPhaseFileHeader * hdr ;
1039- char path [MAXPGPATH ];
10401032 StateFileChunk * record ;
1041- pg_crc32c statefile_crc ;
1042- // pg_crc32c bogus_crc;
1043- int fd ;
10441033
10451034 /* Add the end sentinel to the list of 2PC records */
10461035 RegisterTwoPhaseRecord (TWOPHASE_RM_END_ID , 0 ,
@@ -1061,72 +1050,7 @@ EndPrepare(GlobalTransaction gxact)
10611050 errmsg ("two-phase state file maximum length exceeded" )));
10621051
10631052 /*
1064- * Create the 2PC state file.
1065- */
1066- // TwoPhaseFilePath(path, xid);
1067-
1068- // fd = OpenTransientFile(path,
1069- // O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1070- // S_IRUSR | S_IWUSR);
1071- fd = 1 ;
1072-
1073- if (fd < 0 )
1074- ereport (ERROR ,
1075- (errcode_for_file_access (),
1076- errmsg ("could not create two-phase state file \"%s\": %m" ,
1077- path )));
1078-
1079- /* Write data to file, and calculate CRC as we pass over it */
1080- INIT_CRC32C (statefile_crc );
1081-
1082- for (record = records .head ; record != NULL ; record = record -> next )
1083- {
1084- COMP_CRC32C (statefile_crc , record -> data , record -> len );
1085- if ((bogus_write (fd , record -> data , record -> len )) != record -> len )
1086- {
1087- CloseTransientFile (fd );
1088- ereport (ERROR ,
1089- (errcode_for_file_access (),
1090- errmsg ("could not write two-phase state file: %m" )));
1091- }
1092- }
1093-
1094- FIN_CRC32C (statefile_crc );
1095-
1096- // /*
1097- // * Write a deliberately bogus CRC to the state file; this is just paranoia
1098- // * to catch the case where four more bytes will run us out of disk space.
1099- // */
1100- // bogus_crc = ~statefile_crc;
1101-
1102- // if ((bogus_write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
1103- // {
1104- // CloseTransientFile(fd);
1105- // ereport(ERROR,
1106- // (errcode_for_file_access(),
1107- // errmsg("could not write two-phase state file: %m")));
1108- // }
1109-
1110- // /* Back up to prepare for rewriting the CRC */
1111- // if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
1112- // {
1113- // CloseTransientFile(fd);
1114- // ereport(ERROR,
1115- // (errcode_for_file_access(),
1116- // errmsg("could not seek in two-phase state file: %m")));
1117- // }
1118-
1119- /*
1120- * The state file isn't valid yet, because we haven't written the correct
1121- * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
1122- *
1123- * Between the time we have written the WAL entry and the time we write
1124- * out the correct state file CRC, we have an inconsistency: the xact is
1125- * prepared according to WAL but not according to our on-disk state. We
1126- * use a critical section to force a PANIC if we are unable to complete
1127- * the write --- then, WAL replay should repair the inconsistency. The
1128- * odds of a PANIC actually occurring should be very tiny given that we
1129- * were able to write the bogus CRC above.
1053+ * Now writing 2PC state data to WAL.
11301054 *
11311055 * We have to set delayChkpt here, too; otherwise a checkpoint starting
11321056 * immediately after the WAL record is inserted could complete without
@@ -1148,25 +1072,11 @@ EndPrepare(GlobalTransaction gxact)
11481072 XLogRegisterData (record -> data , record -> len );
11491073 gxact -> prepare_lsn = XLogInsert (RM_XACT_ID , XLOG_XACT_PREPARE );
11501074 XLogFlush (gxact -> prepare_lsn );
1151- gxact -> prepare_xlogptr = ProcLastRecPtr ;
1152-
1153- // fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
11541075
11551076 /* If we crash now, we have prepared: WAL replay will fix things */
11561077
1157- /* write correct CRC and close file */
1158- if ((bogus_write (fd , & statefile_crc , sizeof (pg_crc32c ))) != sizeof (pg_crc32c ))
1159- {
1160- CloseTransientFile (fd );
1161- ereport (ERROR ,
1162- (errcode_for_file_access (),
1163- errmsg ("could not write two-phase state file: %m" )));
1164- }
1165-
1166- // if (CloseTransientFile(fd) != 0)
1167- // ereport(ERROR,
1168- // (errcode_for_file_access(),
1169- // errmsg("could not close two-phase state file: %m")));
1078+ /* Store record's start location to read that later on Commit */
1079+ gxact -> prepare_xlogptr = ProcLastRecPtr ;
11701080
11711081 /*
11721082 * Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1198,6 +1108,11 @@ EndPrepare(GlobalTransaction gxact)
11981108
11991109 END_CRIT_SECTION ();
12001110
1111+
1112+ fprintf (stderr , "EndPrepare: %s=(%d,%d,%d,%d,%d)\n" , gxact -> gid , hdr -> xid , hdr -> nsubxacts , hdr -> ncommitrels , hdr -> nabortrels , hdr -> ninvalmsgs );
1113+ fprintf (stderr , "EndPrepare: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n" , gxact -> gid , gxact -> prepare_xlogptr , gxact -> prepare_lsn , gxact -> prepare_lsn - gxact -> prepare_xlogptr );
1114+
1115+
12011116 /*
12021117 * Wait for synchronous replication, if required.
12031118 *
@@ -1254,7 +1169,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12541169 if (give_warnings )
12551170 ereport (WARNING ,
12561171 (errcode_for_file_access (),
1257- errmsg ("could not open two-phase state file \"%s\": %m" ,
1172+ errmsg ("ReadTwoPhaseFile: could not open two-phase state file \"%s\": %m" ,
12581173 path )));
12591174 return NULL ;
12601175 }
@@ -1395,9 +1310,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13951310 /*
13961311 * Read and validate the state file
13971312 */
1398- // buf = ReadTwoPhaseFile(xid, true);
1399- // buf = twophase_buf;
1400- buf = XlogReadTwoPhaseData (gxact -> prepare_xlogptr );
1313+ if (gxact -> prepare_lsn <= GetRedoRecPtr ())
1314+ buf = ReadTwoPhaseFile (xid , true);
1315+ else
1316+ XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
14011317
14021318 /*
14031319 * Disassemble the header area
@@ -1417,9 +1333,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14171333 /* compute latestXid among all children */
14181334 latestXid = TransactionIdLatest (xid , hdr -> nsubxacts , children );
14191335
1420-
1421- // fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422- // fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1336+ fprintf (stderr , "FinishPrepared: %s=(%d,%d,%d,%d,%d)\n" , gxact -> gid , hdr -> xid , hdr -> nsubxacts , hdr -> ncommitrels , hdr -> nabortrels , hdr -> ninvalmsgs );
1337+ fprintf (stderr , "FinishPrepared: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n" , gxact -> gid , gxact -> prepare_xlogptr , gxact -> prepare_lsn , gxact -> prepare_lsn - gxact -> prepare_xlogptr );
14231338
14241339 Assert (hdr -> nsubxacts == 0 );
14251340 Assert (hdr -> ncommitrels == 0 );
@@ -1508,7 +1423,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
15081423 /*
15091424 * And now we can clean up our mess.
15101425 */
1511- RemoveTwoPhaseFile (xid , true);
1426+ // RemoveTwoPhaseFile(xid, true);
15121427
15131428 RemoveGXact (gxact );
15141429 MyLockedGxact = NULL ;
@@ -1551,16 +1466,15 @@ ProcessRecords(char *bufptr, TransactionId xid,
15511466void
15521467RemoveTwoPhaseFile (TransactionId xid , bool giveWarning )
15531468{
1554- // char path[MAXPGPATH];
1555-
1556- // TwoPhaseFilePath(path, xid);
1557- // if (unlink(path))
1558- // if (errno != ENOENT || giveWarning)
1559- // ereport(WARNING,
1560- // (errcode_for_file_access(),
1561- // errmsg("could not remove two-phase state file \"%s\": %m",
1562- // path)));
1563- twophase_pos = 0 ;
1469+ char path [MAXPGPATH ];
1470+
1471+ TwoPhaseFilePath (path , xid );
1472+ if (unlink (path ))
1473+ if (errno != ENOENT || giveWarning )
1474+ ereport (WARNING ,
1475+ (errcode_for_file_access (),
1476+ errmsg ("could not remove two-phase state file \"%s\": %m" ,
1477+ path )));
15641478}
15651479
15661480/*
@@ -1575,6 +1489,8 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
15751489 pg_crc32c statefile_crc ;
15761490 int fd ;
15771491
1492+ fprintf (stderr , "RecreateTwoPhaseFile called xid=%d, len=%d\n" , xid , len );
1493+
15781494 /* Recompute CRC */
15791495 INIT_CRC32C (statefile_crc );
15801496 COMP_CRC32C (statefile_crc , content , len );
@@ -1646,6 +1562,7 @@ void
16461562CheckPointTwoPhase (XLogRecPtr redo_horizon )
16471563{
16481564 TransactionId * xids ;
1565+ XLogRecPtr * xlogptrs ;
16491566 int nxids ;
16501567 char path [MAXPGPATH ];
16511568 int i ;
@@ -1667,6 +1584,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16671584 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
16681585
16691586 xids = (TransactionId * ) palloc (max_prepared_xacts * sizeof (TransactionId ));
1587+ xlogptrs = (XLogRecPtr * ) palloc (max_prepared_xacts * sizeof (XLogRecPtr ));
16701588 nxids = 0 ;
16711589
16721590 LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
@@ -1675,10 +1593,14 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16751593 {
16761594 GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
16771595 PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
1596+ int j ;
16781597
1679- if (gxact -> valid &&
1680- gxact -> prepare_lsn <= redo_horizon )
1681- xids [nxids ++ ] = pgxact -> xid ;
1598+ if (gxact -> valid && gxact -> prepare_lsn <= redo_horizon ){
1599+ j = nxids ++ ;
1600+ xids [j ] = pgxact -> xid ;
1601+ xlogptrs [j ] = gxact -> prepare_xlogptr ;
1602+ }
1603+
16821604 }
16831605
16841606 LWLockRelease (TwoPhaseStateLock );
@@ -1687,32 +1609,39 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16871609 {
16881610 TransactionId xid = xids [i ];
16891611 int fd ;
1612+ int len ;
1613+ char * buf ;
16901614
16911615 TwoPhaseFilePath (path , xid );
16921616
1617+ fprintf (stderr , "CheckPointTwoPhase: %lX\n" , xlogptrs [i ]);
1618+
16931619 fd = OpenTransientFile (path , O_RDWR | PG_BINARY , 0 );
1694- if (fd < 0 )
1620+
1621+ if (fd < 0 && errno == ENOENT )
16951622 {
1696- if (errno == ENOENT )
1697- {
1698- /* OK if gxact is no longer valid */
1699- if (!TransactionIdIsPrepared (xid ))
1700- continue ;
1701- /* Restore errno in case it was changed */
1702- errno = ENOENT ;
1703- }
1704- ereport (ERROR ,
1705- (errcode_for_file_access (),
1706- errmsg ("could not open two-phase state file \"%s\": %m" ,
1707- path )));
1708- }
1623+ fprintf (stderr , "CheckPointTwoPhase: %d <-> %d \n" , errno , ENOENT );
17091624
1710- if (pg_fsync (fd ) != 0 )
1625+ /* OK if gxact is no longer valid */
1626+ if (!TransactionIdIsPrepared (xid ))
1627+ continue ;
1628+
1629+ /* Re-create file */
1630+ XlogReadTwoPhaseData (xlogptrs [i ], & buf , & len );
1631+ RecreateTwoPhaseFile (xid , buf , len );
1632+ fd = OpenTransientFile (path , O_RDWR | PG_BINARY , 0 );
1633+
1634+ if (fd < 0 )
1635+ ereport (ERROR ,
1636+ (errcode_for_file_access (),
1637+ errmsg ("CheckPointTwoPhase: could not open two-phase state file after re-creating \"%s\": %m" ,
1638+ path )));
1639+ }
1640+ else if (fd < 0 )
17111641 {
1712- CloseTransientFile (fd );
17131642 ereport (ERROR ,
17141643 (errcode_for_file_access (),
1715- errmsg ("could not fsync two-phase state file \"%s\": %m" ,
1644+ errmsg ("CheckPointTwoPhase: could not open two-phase state file \"%s\": %m" ,
17161645 path )));
17171646 }
17181647
@@ -2239,18 +2168,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
22392168
22402169/**********************************************************************************/
22412170
2242-
2243- size_t
2244- bogus_write (int fd , const void * buf , size_t nbytes )
2245- {
2246- memcpy (twophase_buf + twophase_pos , buf , nbytes );
2247- twophase_pos += nbytes ;
2248- return nbytes ;
2249- }
2250-
2251-
2252- static char *
2253- XlogReadTwoPhaseData (XLogRecPtr lsn )
2171+ void
2172+ XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len )
22542173{
22552174 XLogRecord * record ;
22562175 XLogReaderState * xlogreader ;
@@ -2261,10 +2180,16 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22612180 fprintf (stderr , "xlogreader == NULL\n" );
22622181
22632182 record = XLogReadRecord (xlogreader , lsn , & errormsg );
2183+
22642184 if (record == NULL )
2265- {
22662185 fprintf (stderr , "XLogReadRecord error\n" );
2267- }
22682186
2269- return XLogRecGetData (xlogreader );
2187+ if (len != NULL )
2188+ * len = XLogRecGetDataLen (xlogreader );
2189+ * buf = XLogRecGetData (xlogreader );
22702190}
2191+
2192+
2193+
2194+
2195+
0 commit comments