6262#include "replication/origin.h"
6363#include "replication/syncrep.h"
6464#include "replication/walsender.h"
65+ #include "replication/logicalfuncs.h"
6566#include "storage/fd.h"
6667#include "storage/ipc.h"
6768#include "storage/predicate.h"
@@ -174,6 +175,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174175static char twophase_buf [10 * 1024 ];
175176static int twophase_pos = 0 ;
176177size_t bogus_write (int fd , char * buf , size_t nbytes );
178+
179+ static char * XlogReadTwoPhaseData (XLogRecPtr lsn );
177180// LWLock *xlogreclock;
178181
179182/*
@@ -1156,8 +1159,10 @@ EndPrepare(GlobalTransaction gxact)
11561159 XLogFlush (gxact -> prepare_lsn );
11571160
11581161
1159- fprintf (stderr , "WAL %s->prepare_xlogptr = %X/%X \n" , gxact -> gid , (uint32 ) (gxact -> prepare_xlogptr >> 32 ), (uint32 ) (gxact -> prepare_xlogptr ));
1160- fprintf (stderr , "WAL %s->prepare_lsn = %X/%X \n" , gxact -> gid , (uint32 ) (gxact -> prepare_lsn >> 32 ), (uint32 ) (gxact -> prepare_lsn ));
1162+ // fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163+ // gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164+ // fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165+ // gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
11611166
11621167
11631168 /* If we crash now, we have prepared: WAL replay will fix things */
@@ -1404,7 +1409,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14041409 /*
14051410 * Read and validate the state file
14061411 */
1407- buf = ReadTwoPhaseFile (xid , true);
1412+ // buf = ReadTwoPhaseFile(xid, true);
1413+ buf = XlogReadTwoPhaseData (gxact -> prepare_xlogptr );
14081414 if (buf == NULL )
14091415 ereport (ERROR ,
14101416 (errcode (ERRCODE_DATA_CORRUPTED ),
@@ -2251,15 +2257,16 @@ RecordTransactionAbortPrepared(TransactionId xid,
22512257/**********************************************************************************/
22522258
22532259
2254- static int xlogreadfd = -1 ;
2255- static XLogSegNo xlogreadsegno = -1 ;
2256- static char xlogfpath [MAXPGPATH ];
2260+ // static int xlogreadfd = -1;
2261+ // static XLogSegNo xlogreadsegno = -1;
2262+ // static char xlogfpath[MAXPGPATH];
2263+
2264+ // typedef struct XLogPageReadPrivate
2265+ // {
2266+ // const char *datadir;
2267+ // TimeLineID tli;
2268+ // } XLogPageReadPrivate;
22572269
2258- typedef struct XLogPageReadPrivate
2259- {
2260- const char * datadir ;
2261- TimeLineID tli ;
2262- } XLogPageReadPrivate ;
22632270
22642271size_t
22652272bogus_write (int fd , char * buf , size_t nbytes )
@@ -2270,165 +2277,39 @@ bogus_write(int fd, char *buf, size_t nbytes)
22702277}
22712278
22722279
2273- static int SimpleXLogPageRead (XLogReaderState * xlogreader ,
2274- XLogRecPtr targetPagePtr ,
2275- int reqLen , XLogRecPtr targetRecPtr , char * readBuf ,
2276- TimeLineID * pageTLI );
2277-
2278-
2279- /* XLogreader callback function, to read a WAL page */
2280- static int
2281- SimpleXLogPageRead (XLogReaderState * xlogreader , XLogRecPtr targetPagePtr ,
2282- int reqLen , XLogRecPtr targetRecPtr , char * readBuf ,
2283- TimeLineID * pageTLI )
2280+ static char *
2281+ XlogReadTwoPhaseData (XLogRecPtr lsn )
22842282{
2285- XLogPageReadPrivate * private = ( XLogPageReadPrivate * ) xlogreader -> private_data ;
2286- uint32 targetPageOff ;
2287- XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY ;
2283+ XLogRecord * record ;
2284+ XLogReaderState * xlogreader ;
2285+ char * errormsg ;
22882286
2289- XLByteToSeg (targetPagePtr , targetSegNo );
2290- targetPageOff = targetPagePtr % XLogSegSize ;
2287+ fprintf (stderr , "XlogReadTwoPhaseData called\n" );
22912288
2292- /*
2293- * See if we need to switch to a new segment because the requested record
2294- * is not in the currently open one.
2295- */
2296- if (xlogreadfd >= 0 && !XLByteInSeg (targetPagePtr , xlogreadsegno ))
2297- {
2298- close (xlogreadfd );
2299- xlogreadfd = -1 ;
2300- }
2289+ xlogreader = XLogReaderAllocate (& logical_read_local_xlog_page , NULL );
2290+ if (xlogreader == NULL )
2291+ fprintf (stderr , "xlogreader == NULL\n" );
23012292
2302- XLByteToSeg (targetPagePtr , xlogreadsegno );
2303-
2304- if (xlogreadfd < 0 )
2293+ record = XLogReadRecord (xlogreader , lsn , & errormsg );
2294+ if (record == NULL )
23052295 {
2306- char xlogfname [MAXFNAMELEN ];
2307-
2308- XLogFileName (xlogfname , private -> tli , xlogreadsegno );
2309-
2310- snprintf (xlogfpath , MAXPGPATH , "%s/" XLOGDIR "/%s" , private -> datadir , xlogfname );
2311-
2312- xlogreadfd = open (xlogfpath , O_RDONLY | PG_BINARY , 0 );
2313-
2314- if (xlogreadfd < 0 )
2315- {
2316- printf (_ ("could not open file \"%s\": %s\n" ), xlogfpath ,
2317- strerror (errno ));
2318- return -1 ;
2319- }
2296+ fprintf (stderr , "XLogReadRecord error\n" );
23202297 }
23212298
2322- /*
2323- * At this point, we have the right segment open.
2324- */
2325- Assert (xlogreadfd != -1 );
2299+ // memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300+ // twophase_pos += nbytes;
2301+ // return nbytes;
23262302
2327- /* Read the requested page */
2328- if (lseek (xlogreadfd , (off_t ) targetPageOff , SEEK_SET ) < 0 )
2329- {
2330- printf (_ ("could not seek in file \"%s\": %s\n" ), xlogfpath ,
2331- strerror (errno ));
2332- return -1 ;
2333- }
2334-
2335- if (read (xlogreadfd , readBuf , XLOG_BLCKSZ ) != XLOG_BLCKSZ )
2336- {
2337- printf (_ ("could not read from file \"%s\": %s\n" ), xlogfpath ,
2338- strerror (errno ));
2339- return -1 ;
2340- }
2341-
2342- Assert (targetSegNo == xlogreadsegno );
2303+ // XLogReaderFree(xlogreader);
2304+ // if (xlogreadfd != -1)
2305+ // {
2306+ // close(xlogreadfd);
2307+ // xlogreadfd = -1;
2308+ // }
23432309
2344- * pageTLI = private -> tli ;
2345- return XLOG_BLCKSZ ;
2310+ return XLogRecGetData (xlogreader );
23462311}
23472312
2348- // XLogRecPtr
2349- // readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli);
2350-
2351- // XLogRecPtr
2352- // readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
2353- // {
2354- // XLogRecord *record;
2355- // XLogReaderState *xlogreader;
2356- // char *errormsg;
2357- // XLogPageReadPrivate private;
2358- // XLogRecPtr endptr;
2359-
2360- // private.datadir = datadir;
2361- // private.tli = tli;
2362- // xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2363- // if (xlogreader == NULL)
2364- // pg_fatal("out of memory\n");
2365-
2366- // record = XLogReadRecord(xlogreader, ptr, &errormsg);
2367- // if (record == NULL)
2368- // {
2369- // if (errormsg)
2370- // pg_fatal("could not read WAL record at %X/%X: %s\n",
2371- // (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2372- // else
2373- // pg_fatal("could not read WAL record at %X/%X\n",
2374- // (uint32) (ptr >> 32), (uint32) (ptr));
2375- // }
2376- // endptr = xlogreader->EndRecPtr;
2377-
2378- // XLogReaderFree(xlogreader);
2379- // if (xlogreadfd != -1)
2380- // {
2381- // close(xlogreadfd);
2382- // xlogreadfd = -1;
2383- // }
2384-
2385- // return endptr;
2386- // }
2387-
2388-
2389- // static char *
2390- // XlogReadTwoPhaseData(XLogRecPtr lsn, bool give_warnings, TimeLineID tli)
2391- // {
2392- // XLogRecord *record;
2393- // XLogReaderState *xlogreader;
2394- // XLogPageReadPrivate private;
2395-
2396- // private.datadir = datadir;
2397- // private.tli = tli;
2398- // xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2399- // if (xlogreader == NULL)
2400- // pg_fatal("out of memory\n");
2401-
2402- // record = XLogReadRecord(xlogreader, ptr, &errormsg);
2403- // if (record == NULL)
2404- // {
2405- // if (errormsg)
2406- // pg_fatal("could not read WAL record at %X/%X: %s\n",
2407- // (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2408- // else
2409- // pg_fatal("could not read WAL record at %X/%X\n",
2410- // (uint32) (ptr >> 32), (uint32) (ptr));
2411- // }
2412- // endptr = xlogreader->EndRecPtr;
2413-
2414- // XLogReaderFree(xlogreader);
2415- // if (xlogreadfd != -1)
2416- // {
2417- // close(xlogreadfd);
2418- // xlogreadfd = -1;
2419- // }
2420-
2421- // return XLogRecGetData(record)
2422- // }
2423-
2424-
2425-
2426-
2427-
2428-
2429-
2430-
2431-
24322313
24332314
24342315
0 commit comments