@@ -174,10 +174,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174174
175175static char twophase_buf [10 * 1024 ];
176176static int twophase_pos = 0 ;
177- size_t bogus_write (int fd , char * buf , size_t nbytes );
178-
177+ size_t bogus_write (int fd , const void * buf , size_t nbytes );
179178static char * XlogReadTwoPhaseData (XLogRecPtr lsn );
180- // LWLock *xlogreclock;
181179
182180/*
183181 * Initialization of shared memory
@@ -997,6 +995,8 @@ StartPrepare(GlobalTransaction gxact)
997995
998996 save_state_data (& hdr , sizeof (TwoPhaseFileHeader ));
999997
998+ // fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
999+
10001000 /*
10011001 * Add the additional info about subxacts, deletable files and cache
10021002 * invalidation messages.
@@ -1033,13 +1033,13 @@ StartPrepare(GlobalTransaction gxact)
10331033void
10341034EndPrepare (GlobalTransaction gxact )
10351035{
1036- PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
1037- TransactionId xid = pgxact -> xid ;
1036+ // PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037+ // TransactionId xid = pgxact->xid;
10381038 TwoPhaseFileHeader * hdr ;
10391039 char path [MAXPGPATH ];
10401040 StateFileChunk * record ;
10411041 pg_crc32c statefile_crc ;
1042- pg_crc32c bogus_crc ;
1042+ // pg_crc32c bogus_crc;
10431043 int fd ;
10441044
10451045 /* Add the end sentinel to the list of 2PC records */
@@ -1144,26 +1144,13 @@ EndPrepare(GlobalTransaction gxact)
11441144 MyPgXact -> delayChkpt = true;
11451145
11461146 XLogBeginInsert ();
1147-
11481147 for (record = records .head ; record != NULL ; record = record -> next )
11491148 XLogRegisterData (record -> data , record -> len );
1150-
1151- // LWLockAcquire(xlogreclock, LW_EXCLUSIVE);
1152- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
1153- gxact -> prepare_xlogptr = GetXLogInsertRecPtr ();
11541149 gxact -> prepare_lsn = XLogInsert (RM_XACT_ID , XLOG_XACT_PREPARE );
1155- LWLockRelease (TwoPhaseStateLock );
1156- // LWLockRelease(xlogreclock);
1157-
1158-
11591150 XLogFlush (gxact -> prepare_lsn );
1151+ gxact -> prepare_xlogptr = ProcLastRecPtr ;
11601152
1161-
1162- // fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163- // gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164- // fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165- // gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1166-
1153+ // fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
11671154
11681155 /* If we crash now, we have prepared: WAL replay will fix things */
11691156
@@ -1250,101 +1237,100 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
12501237static char *
12511238ReadTwoPhaseFile (TransactionId xid , bool give_warnings )
12521239{
1253- // char path[MAXPGPATH];
1254- // char *buf;
1255- // TwoPhaseFileHeader *hdr;
1256- // int fd;
1257- // struct stat stat;
1258- // uint32 crc_offset;
1259- // pg_crc32c calc_crc,
1260- // file_crc;
1261-
1262- // TwoPhaseFilePath(path, xid);
1240+ char path [MAXPGPATH ];
1241+ char * buf ;
1242+ TwoPhaseFileHeader * hdr ;
1243+ int fd ;
1244+ struct stat stat ;
1245+ uint32 crc_offset ;
1246+ pg_crc32c calc_crc ,
1247+ file_crc ;
12631248
1264- // fd = OpenTransientFile (path, O_RDONLY | PG_BINARY, 0 );
1249+ TwoPhaseFilePath (path , xid );
12651250
1266- // if (fd < 0)
1267- // {
1268- // if (give_warnings)
1269- // ereport(WARNING,
1270- // (errcode_for_file_access(),
1271- // errmsg("could not open two-phase state file \"%s\": %m",
1272- // path)));
1273- // return NULL;
1274- // }
1251+ fd = OpenTransientFile (path , O_RDONLY | PG_BINARY , 0 );
1252+ if (fd < 0 )
1253+ {
1254+ if (give_warnings )
1255+ ereport (WARNING ,
1256+ (errcode_for_file_access (),
1257+ errmsg ("could not open two-phase state file \"%s\": %m" ,
1258+ path )));
1259+ return NULL ;
1260+ }
12751261
12761262 /*
12771263 * Check file length. We can determine a lower bound pretty easily. We
12781264 * set an upper bound to avoid palloc() failure on a corrupt file, though
12791265 * we can't guarantee that we won't get an out of memory error anyway,
12801266 * even on a valid file.
12811267 */
1282- // if (fstat(fd, &stat))
1283- // {
1284- // CloseTransientFile(fd);
1285- // if (give_warnings)
1286- // ereport(WARNING,
1287- // (errcode_for_file_access(),
1288- // errmsg("could not stat two-phase state file \"%s\": %m",
1289- // path)));
1290- // return NULL;
1291- // }
1268+ if (fstat (fd , & stat ))
1269+ {
1270+ CloseTransientFile (fd );
1271+ if (give_warnings )
1272+ ereport (WARNING ,
1273+ (errcode_for_file_access (),
1274+ errmsg ("could not stat two-phase state file \"%s\": %m" ,
1275+ path )));
1276+ return NULL ;
1277+ }
12921278
1293- // if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1294- // MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1295- // sizeof(pg_crc32c)) ||
1296- // stat.st_size > MaxAllocSize)
1297- // {
1298- // CloseTransientFile(fd);
1299- // return NULL;
1300- // }
1279+ if (stat .st_size < (MAXALIGN (sizeof (TwoPhaseFileHeader )) +
1280+ MAXALIGN (sizeof (TwoPhaseRecordOnDisk )) +
1281+ sizeof (pg_crc32c )) ||
1282+ stat .st_size > MaxAllocSize )
1283+ {
1284+ CloseTransientFile (fd );
1285+ return NULL ;
1286+ }
13011287
1302- // crc_offset = stat.st_size - sizeof(pg_crc32c);
1303- // if (crc_offset != MAXALIGN(crc_offset))
1304- // {
1305- // CloseTransientFile(fd);
1306- // return NULL;
1307- // }
1288+ crc_offset = stat .st_size - sizeof (pg_crc32c );
1289+ if (crc_offset != MAXALIGN (crc_offset ))
1290+ {
1291+ CloseTransientFile (fd );
1292+ return NULL ;
1293+ }
13081294
1309- // / *
1310- // * OK, slurp in the file.
1311- // */
1312- // buf = (char *) palloc(stat.st_size);
1295+ /*
1296+ * OK, slurp in the file.
1297+ */
1298+ buf = (char * ) palloc (stat .st_size );
13131299
1314- // if (read(fd, buf, stat.st_size) != stat.st_size)
1315- // {
1316- // CloseTransientFile(fd);
1317- // if (give_warnings)
1318- // ereport(WARNING,
1319- // (errcode_for_file_access(),
1320- // errmsg("could not read two-phase state file \"%s\": %m",
1321- // path)));
1322- // pfree(buf);
1323- // return NULL;
1324- // }
1300+ if (read (fd , buf , stat .st_size ) != stat .st_size )
1301+ {
1302+ CloseTransientFile (fd );
1303+ if (give_warnings )
1304+ ereport (WARNING ,
1305+ (errcode_for_file_access (),
1306+ errmsg ("could not read two-phase state file \"%s\": %m" ,
1307+ path )));
1308+ pfree (buf );
1309+ return NULL ;
1310+ }
13251311
1326- // CloseTransientFile(fd);
1312+ CloseTransientFile (fd );
13271313
1328- // hdr = (TwoPhaseFileHeader *) buf;
1329- // if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1330- // {
1331- // pfree(buf);
1332- // return NULL;
1333- // }
1314+ hdr = (TwoPhaseFileHeader * ) buf ;
1315+ if (hdr -> magic != TWOPHASE_MAGIC || hdr -> total_len != stat .st_size )
1316+ {
1317+ pfree (buf );
1318+ return NULL ;
1319+ }
13341320
1335- // INIT_CRC32C(calc_crc);
1336- // COMP_CRC32C(calc_crc, buf, crc_offset);
1337- // FIN_CRC32C(calc_crc);
1321+ INIT_CRC32C (calc_crc );
1322+ COMP_CRC32C (calc_crc , buf , crc_offset );
1323+ FIN_CRC32C (calc_crc );
13381324
1339- // file_crc = *((pg_crc32c *) (buf + crc_offset));
1325+ file_crc = * ((pg_crc32c * ) (buf + crc_offset ));
13401326
1341- // if (!EQ_CRC32C(calc_crc, file_crc))
1342- // {
1343- // pfree(buf);
1344- // return NULL;
1345- // }
1327+ if (!EQ_CRC32C (calc_crc , file_crc ))
1328+ {
1329+ pfree (buf );
1330+ return NULL ;
1331+ }
13461332
1347- return twophase_buf ;
1333+ return buf ;
13481334}
13491335
13501336/*
@@ -1410,12 +1396,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14101396 * Read and validate the state file
14111397 */
14121398 // buf = ReadTwoPhaseFile(xid, true);
1399+ // buf = twophase_buf;
14131400 buf = XlogReadTwoPhaseData (gxact -> prepare_xlogptr );
1414- if (buf == NULL )
1415- ereport (ERROR ,
1416- (errcode (ERRCODE_DATA_CORRUPTED ),
1417- errmsg ("two-phase state file for transaction %u is corrupt" ,
1418- xid )));
14191401
14201402 /*
14211403 * Disassemble the header area
@@ -1435,6 +1417,15 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14351417 /* compute latestXid among all children */
14361418 latestXid = TransactionIdLatest (xid , hdr -> nsubxacts , children );
14371419
1420+
1421+ // fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422+ // fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1423+
1424+ Assert (hdr -> nsubxacts == 0 );
1425+ Assert (hdr -> ncommitrels == 0 );
1426+ Assert (hdr -> nabortrels == 0 );
1427+ Assert (hdr -> ninvalmsgs == 0 );
1428+
14381429 /*
14391430 * The order of operations here is critical: make the XLOG entry for
14401431 * commit or abort, then mark the transaction committed or aborted in
@@ -2246,30 +2237,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
22462237 SyncRepWaitForLSN (recptr );
22472238}
22482239
2249-
2250-
2251-
2252-
2253-
2254-
2255-
2256-
22572240/**********************************************************************************/
22582241
22592242
2260- // static int xlogreadfd = -1;
2261- // static XLogSegNo xlogreadsegno = -1;
2262- // static char xlogfpath[MAXPGPATH];
2263-
2264- // typedef struct XLogPageReadPrivate
2265- // {
2266- // const char *datadir;
2267- // TimeLineID tli;
2268- // } XLogPageReadPrivate;
2269-
2270-
2271- size_t
2272- bogus_write (int fd , char * buf , size_t nbytes )
2243+ size_t
2244+ bogus_write (int fd , const void * buf , size_t nbytes )
22732245{
22742246 memcpy (twophase_buf + twophase_pos , buf , nbytes );
22752247 twophase_pos += nbytes ;
@@ -2284,8 +2256,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22842256 XLogReaderState * xlogreader ;
22852257 char * errormsg ;
22862258
2287- fprintf (stderr , "XlogReadTwoPhaseData called\n" );
2288-
22892259 xlogreader = XLogReaderAllocate (& logical_read_local_xlog_page , NULL );
22902260 if (xlogreader == NULL )
22912261 fprintf (stderr , "xlogreader == NULL\n" );
@@ -2296,20 +2266,5 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22962266 fprintf (stderr , "XLogReadRecord error\n" );
22972267 }
22982268
2299- // memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300- // twophase_pos += nbytes;
2301- // return nbytes;
2302-
2303- // XLogReaderFree(xlogreader);
2304- // if (xlogreadfd != -1)
2305- // {
2306- // close(xlogreadfd);
2307- // xlogreadfd = -1;
2308- // }
2309-
23102269 return XLogRecGetData (xlogreader );
23112270}
2312-
2313-
2314-
2315-
0 commit comments