Skip to content

Commit 7c07f54

Browse files
committed
Implement ring buffer
1 parent 9f2c4ed commit 7c07f54

File tree

3 files changed

+135
-36
lines changed

3 files changed

+135
-36
lines changed

src/backend/access/transam/xlog.c

Lines changed: 105 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@
7979
#include "utils/snapmgr.h"
8080
#include "utils/timestamp.h"
8181

82+
#include <sys/mman.h>
83+
84+
8285
extern uint32 bootstrap_data_checksum_version;
8386

8487
/* Unsupported old recovery command file names (relative to $PGDATA) */
@@ -206,6 +209,8 @@ bool InRecovery = false;
206209
/* Are we in Hot Standby mode? Only valid in startup process, see xlog.h */
207210
HotStandbyState standbyState = STANDBY_DISABLED;
208211

212+
bool useWalRingBuffer;
213+
209214
static XLogRecPtr LastRec;
210215

211216
/* Local copy of WalRcv->flushedUpto */
@@ -597,6 +602,7 @@ typedef struct XLogCtlInsert
597602
*/
598603
typedef struct XLogCtlData
599604
{
605+
XLogRingBuffer ringBuf;
600606
XLogCtlInsert Insert;
601607

602608
/* Protected by info_lck: */
@@ -735,6 +741,7 @@ static WALInsertLockPadded *WALInsertLocks = NULL;
735741
* We maintain an image of pg_control in shared memory.
736742
*/
737743
static ControlFileData *ControlFile = NULL;
744+
XLogRingBuffer volatile* XLogRing;
738745

739746
/*
740747
* Calculate the amount of space left on the page after 'endptr'. Beware
@@ -1512,6 +1519,45 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
15121519
int written;
15131520
XLogRecPtr CurrPos;
15141521
XLogPageHeader pagehdr;
1522+
size_t bufSize = XLOG_BLCKSZ * XLOGbuffers;
1523+
1524+
if (useWalRingBuffer)
1525+
{
1526+
SpinLockAcquire(&XLogRing->spinlock);
1527+
//elog(LOG, "writePos=%lx readPos=%lx StartPos=%lx EndPos=%lx writeOffs=%lx readOffs=%lx",
1528+
// XLogRing->writePos, XLogRing->readPos, StartPos, EndPos, XLogRing->writeOffs, XLogRing->readOffs);
1529+
if (XLogRing->writePos == 0)
1530+
{
1531+
/* First access to WAL */
1532+
Assert(XLogRing->readOffs == 0 && XLogRing->writeOffs == 0);
1533+
XLogRing->writePos = XLogRing->readPos = StartPos;
1534+
XLogRing->readOffs = XLogRing->writeOffs = StartPos % bufSize;
1535+
}
1536+
else
1537+
{
1538+
while (StartPos >= XLogRing->writePos)
1539+
{
1540+
size_t txSize = EndPos - XLogRing->writePos;
1541+
size_t readOffs = XLogRing->readOffs;
1542+
size_t writeOffs = XLogRing->writeOffs;
1543+
size_t available = readOffs <= writeOffs
1544+
? bufSize - writeOffs + readOffs - 1
1545+
: readOffs - writeOffs - 1;
1546+
if (txSize > available)
1547+
{
1548+
XLogRing->blockedWriter = MyLatch;
1549+
SpinLockRelease(&XLogRing->spinlock);
1550+
(void) WaitLatch(MyLatch,
1551+
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
1552+
-1, WAIT_EVENT_WAL_WRITER_MAIN);
1553+
SpinLockAcquire(&XLogRing->spinlock);
1554+
}
1555+
else
1556+
break;
1557+
}
1558+
}
1559+
SpinLockRelease(&XLogRing->spinlock);
1560+
}
15151561

15161562
/*
15171563
* Get a pointer to the right place in the right WAL buffer to start
@@ -1642,6 +1688,15 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
16421688

16431689
if (CurrPos != EndPos)
16441690
elog(PANIC, "space reserved for WAL record does not match what was written");
1691+
1692+
if (useWalRingBuffer && XLogRing->writePos < EndPos)
1693+
{
1694+
SpinLockAcquire(&XLogRing->spinlock);
1695+
//elog(LOG, "Write record %lx length %d", XLogRing->writePos, ((XLogRecord *)(XLogRing->pages + XLogRing->writeOffs))->xl_tot_len);
1696+
XLogRing->writeOffs = EndPos % bufSize;
1697+
XLogRing->writePos = EndPos;
1698+
SpinLockRelease(&XLogRing->spinlock);
1699+
}
16451700
}
16461701

16471702
/*
@@ -2129,6 +2184,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
21292184
XLogPageHeader NewPage;
21302185
int npages = 0;
21312186

2187+
if (useWalRingBuffer && opportunistic)
2188+
return;
2189+
21322190
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
21332191

21342192
/*
@@ -2209,9 +2267,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
22092267
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
22102268

22112269
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
2212-
22132270
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
2214-
22152271
/*
22162272
* Be sure to re-zero the buffer so that bytes beyond what we've
22172273
* written will look like zeroes and not valid XLOG records...
@@ -2466,7 +2522,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
24662522
LogwrtResult.Write = EndPtr;
24672523
ispartialpage = WriteRqst.Write < LogwrtResult.Write;
24682524

2469-
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
2525+
if (!useWalRingBuffer &&
2526+
!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
24702527
wal_segment_size))
24712528
{
24722529
/*
@@ -2486,7 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
24862543
}
24872544

24882545
/* Make sure we have the current logfile open */
2489-
if (openLogFile < 0)
2546+
if (!useWalRingBuffer && openLogFile < 0)
24902547
{
24912548
XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
24922549
wal_segment_size);
@@ -2528,35 +2585,37 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
25282585
from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
25292586
nbytes = npages * (Size) XLOG_BLCKSZ;
25302587
nleft = nbytes;
2531-
do
2532-
{
2533-
errno = 0;
2534-
pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
2535-
written = pg_pwrite(openLogFile, from, nleft, startoffset);
2536-
pgstat_report_wait_end();
2537-
if (written <= 0)
2588+
if (!useWalRingBuffer)
2589+
{
2590+
do
25382591
{
2539-
char xlogfname[MAXFNAMELEN];
2540-
int save_errno;
2541-
2542-
if (errno == EINTR)
2543-
continue;
2592+
errno = 0;
2593+
pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
2594+
written = pg_pwrite(openLogFile, from, nleft, startoffset);
2595+
pgstat_report_wait_end();
2596+
if (written <= 0)
2597+
{
2598+
char xlogfname[MAXFNAMELEN];
2599+
int save_errno;
25442600

2545-
save_errno = errno;
2546-
XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo,
2547-
wal_segment_size);
2548-
errno = save_errno;
2549-
ereport(PANIC,
2550-
(errcode_for_file_access(),
2551-
errmsg("could not write to log file %s "
2552-
"at offset %u, length %zu: %m",
2553-
xlogfname, startoffset, nleft)));
2554-
}
2555-
nleft -= written;
2556-
from += written;
2557-
startoffset += written;
2558-
} while (nleft > 0);
2601+
if (errno == EINTR)
2602+
continue;
25592603

2604+
save_errno = errno;
2605+
XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo,
2606+
wal_segment_size);
2607+
errno = save_errno;
2608+
ereport(PANIC,
2609+
(errcode_for_file_access(),
2610+
errmsg("could not write to log file %s "
2611+
"at offset %u, length %zu: %m",
2612+
xlogfname, startoffset, nleft)));
2613+
}
2614+
nleft -= written;
2615+
from += written;
2616+
startoffset += written;
2617+
} while (nleft > 0);
2618+
}
25602619
npages = 0;
25612620

25622621
/*
@@ -2574,7 +2633,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
25742633
*/
25752634
if (finishing_seg)
25762635
{
2577-
issue_xlog_fsync(openLogFile, openLogSegNo);
2636+
if (!useWalRingBuffer)
2637+
issue_xlog_fsync(openLogFile, openLogSegNo);
25782638

25792639
/* signal that we need to wakeup walsenders later */
25802640
WalSndWakeupRequest();
@@ -2631,7 +2691,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
26312691
* fsync more than one file.
26322692
*/
26332693
if (sync_method != SYNC_METHOD_OPEN &&
2634-
sync_method != SYNC_METHOD_OPEN_DSYNC)
2694+
sync_method != SYNC_METHOD_OPEN_DSYNC &&
2695+
!useWalRingBuffer)
26352696
{
26362697
if (openLogFile >= 0 &&
26372698
!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
@@ -5175,6 +5236,13 @@ XLOGShmemInit(void)
51755236
SpinLockInit(&XLogCtl->info_lck);
51765237
SpinLockInit(&XLogCtl->ulsn_lck);
51775238
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
5239+
5240+
if (useWalRingBuffer)
5241+
{
5242+
XLogRing = &XLogCtl->ringBuf;
5243+
XLogRing->pages = XLogCtl->pages;
5244+
SpinLockInit(&XLogRing->spinlock);
5245+
}
51785246
}
51795247

51805248
/*
@@ -5288,7 +5356,8 @@ BootStrapXLOG(void)
52885356

52895357
/* Create first XLOG segment file */
52905358
use_existent = false;
5291-
openLogFile = XLogFileInit(1, &use_existent, false);
5359+
if (!useWalRingBuffer)
5360+
openLogFile = XLogFileInit(1, &use_existent, false);
52925361

52935362
/*
52945363
* We needn't bother with Reserve/ReleaseExternalFD here, since we'll
@@ -5310,13 +5379,13 @@ BootStrapXLOG(void)
53105379
pgstat_report_wait_end();
53115380

53125381
pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
5313-
if (pg_fsync(openLogFile) != 0)
5382+
if (!useWalRingBuffer && pg_fsync(openLogFile) != 0)
53145383
ereport(PANIC,
53155384
(errcode_for_file_access(),
53165385
errmsg("could not fsync bootstrap write-ahead log file: %m")));
53175386
pgstat_report_wait_end();
53185387

5319-
if (close(openLogFile) != 0)
5388+
if (!useWalRingBuffer && close(openLogFile) != 0)
53205389
ereport(PANIC,
53215390
(errcode_for_file_access(),
53225391
errmsg("could not close bootstrap write-ahead log file: %m")));
@@ -11960,6 +12029,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1196012029
if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf))
1196112030
{
1196212031
/* reset any error XLogReaderValidatePageHeader() might have set */
12032+
elog(LOG, "Invlid page header %lx error=%s", targetPagePtr, xlogreader->errormsg_buf);
1196312033
xlogreader->errormsg_buf[0] = '\0';
1196412034
goto next_record_is_invalid;
1196512035
}

src/backend/utils/misc/guc.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,16 @@ static struct config_bool ConfigureNamesBool[] =
12721272
true,
12731273
NULL, NULL, NULL
12741274
},
1275+
{
1276+
{"use_wal_ring_buffer", PGC_SIGHUP, WAL_SETTINGS,
1277+
gettext_noop("Send data to WAL sender through rig buffer"),
1278+
NULL
1279+
},
1280+
&useWalRingBuffer,
1281+
false,
1282+
NULL, NULL, NULL
1283+
},
1284+
12751285

12761286
{
12771287
{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,

src/include/access/xlog.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
#include "lib/stringinfo.h"
2020
#include "nodes/pg_list.h"
2121
#include "storage/fd.h"
22-
22+
#ifndef FRONTEND
23+
#include "storage/latch.h"
24+
#include "storage/s_lock.h"
25+
#endif
2326

2427
/* Sync methods */
2528
#define SYNC_METHOD_FSYNC 0
@@ -384,4 +387,20 @@ extern SessionBackupState get_backup_status(void);
384387
#define PROMOTE_SIGNAL_FILE "promote"
385388
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
386389

390+
#ifndef FRONTEND
391+
typedef struct
392+
{
393+
size_t readOffs;
394+
size_t writeOffs;
395+
XLogRecPtr readPos;
396+
XLogRecPtr writePos;
397+
char* pages;
398+
Latch* blockedWriter;
399+
slock_t spinlock;
400+
} XLogRingBuffer;
401+
402+
extern XLogRingBuffer volatile* XLogRing;
403+
extern bool useWalRingBuffer;
404+
#endif
405+
387406
#endif /* XLOG_H */

0 commit comments

Comments
 (0)