PostgreSQL Source Code git master
localbuf.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * localbuf.c
4 * local buffer manager. Fast buffer manager for temporary tables,
5 * which never need to be WAL-logged or checkpointed, etc.
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994-5, Regents of the University of California
9 *
10 *
11 * IDENTIFICATION
12 * src/backend/storage/buffer/localbuf.c
13 *
14 *-------------------------------------------------------------------------
15 */
16#include "postgres.h"
17
18#include "access/parallel.h"
19#include "executor/instrument.h"
20#include "pgstat.h"
21#include "storage/aio.h"
23#include "storage/bufmgr.h"
24#include "storage/fd.h"
25#include "utils/guc_hooks.h"
26#include "utils/memdebug.h"
27#include "utils/memutils.h"
28#include "utils/rel.h"
29#include "utils/resowner.h"
30
31
32/*#define LBDEBUG*/
33
34/* entry for buffer lookup hashtable */
35typedef struct
36{
37 BufferTag key; /* Tag of a disk page */
38 int id; /* Associated local buffer's index */
40
41/* Note: this macro only works on local buffers, not shared ones! */
42#define LocalBufHdrGetBlock(bufHdr) \
43 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
44
45int NLocBuffer = 0; /* until buffers are initialized */
46
50
51static int nextFreeLocalBufId = 0;
52
53static HTAB *LocalBufHash = NULL;
54
55/* number of local buffers pinned at least once */
56static int NLocalPinnedBuffers = 0;
57
58
59static void InitLocalBuffers(void);
60static Block GetLocalBufferStorage(void);
61static Buffer GetLocalVictimBuffer(void);
62
63
64/*
65 * PrefetchLocalBuffer -
66 * initiate asynchronous read of a block of a relation
67 *
68 * Do PrefetchBuffer's work for temporary relations.
69 * No-op if prefetching isn't compiled in.
70 */
73 BlockNumber blockNum)
74{
75 PrefetchBufferResult result = {InvalidBuffer, false};
76 BufferTag newTag; /* identity of requested block */
77 LocalBufferLookupEnt *hresult;
78
79 InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
80
81 /* Initialize local buffers if first request in this session */
82 if (LocalBufHash == NULL)
84
85 /* See if the desired buffer already exists */
86 hresult = (LocalBufferLookupEnt *)
87 hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
88
89 if (hresult)
90 {
91 /* Yes, so nothing to do */
92 result.recent_buffer = -hresult->id - 1;
93 }
94 else
95 {
96#ifdef USE_PREFETCH
97 /* Not in buffers, so initiate prefetch */
98 if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
99 smgrprefetch(smgr, forkNum, blockNum, 1))
100 {
101 result.initiated_io = true;
102 }
103#endif /* USE_PREFETCH */
104 }
105
106 return result;
107}
108
109
110/*
111 * LocalBufferAlloc -
112 * Find or create a local buffer for the given page of the given relation.
113 *
114 * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
115 * any locking since this is all local. We support only default access
116 * strategy (hence, usage_count is always advanced).
117 */
120 bool *foundPtr)
121{
122 BufferTag newTag; /* identity of requested block */
123 LocalBufferLookupEnt *hresult;
124 BufferDesc *bufHdr;
125 Buffer victim_buffer;
126 int bufid;
127 bool found;
128
129 InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
130
131 /* Initialize local buffers if first request in this session */
132 if (LocalBufHash == NULL)
134
136
137 /* See if the desired buffer already exists */
138 hresult = (LocalBufferLookupEnt *)
139 hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
140
141 if (hresult)
142 {
143 bufid = hresult->id;
144 bufHdr = GetLocalBufferDescriptor(bufid);
145 Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
146
147 *foundPtr = PinLocalBuffer(bufHdr, true);
148 }
149 else
150 {
151 uint32 buf_state;
152
153 victim_buffer = GetLocalVictimBuffer();
154 bufid = -victim_buffer - 1;
155 bufHdr = GetLocalBufferDescriptor(bufid);
156
157 hresult = (LocalBufferLookupEnt *)
158 hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
159 if (found) /* shouldn't happen */
160 elog(ERROR, "local buffer hash table corrupted");
161 hresult->id = bufid;
162
163 /*
164 * it's all ours now.
165 */
166 bufHdr->tag = newTag;
167
168 buf_state = pg_atomic_read_u32(&bufHdr->state);
169 buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
170 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
171 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
172
173 *foundPtr = false;
174 }
175
176 return bufHdr;
177}
178
179/*
180 * Like FlushBuffer(), just for local buffers.
181 */
182void
184{
185 instr_time io_start;
186 Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
187
189
190 /*
191 * Try to start an I/O operation. There currently are no reasons for
192 * StartLocalBufferIO to return false, so we raise an error in that case.
193 */
194 if (!StartLocalBufferIO(bufHdr, false, false))
195 elog(ERROR, "failed to start write IO on local buffer");
196
197 /* Find smgr relation for buffer */
198 if (reln == NULL)
199 reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
201
202 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
203
205
206 /* And write... */
207 smgrwrite(reln,
208 BufTagGetForkNum(&bufHdr->tag),
209 bufHdr->tag.blockNum,
210 localpage,
211 false);
212
213 /* Temporary table I/O does not use Buffer Access Strategies */
215 IOOP_WRITE, io_start, 1, BLCKSZ);
216
217 /* Mark not-dirty */
218 TerminateLocalBufferIO(bufHdr, true, 0, false);
219
221}
222
223static Buffer
225{
226 int victim_bufid;
227 int trycounter;
228 BufferDesc *bufHdr;
229
231
232 /*
233 * Need to get a new buffer. We use a clock-sweep algorithm (essentially
234 * the same as what freelist.c does now...)
235 */
236 trycounter = NLocBuffer;
237 for (;;)
238 {
239 victim_bufid = nextFreeLocalBufId;
240
243
244 bufHdr = GetLocalBufferDescriptor(victim_bufid);
245
246 if (LocalRefCount[victim_bufid] == 0)
247 {
248 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
249
250 if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
251 {
252 buf_state -= BUF_USAGECOUNT_ONE;
253 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
254 trycounter = NLocBuffer;
255 }
256 else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
257 {
258 /*
259 * This can be reached if the backend initiated AIO for this
260 * buffer and then errored out.
261 */
262 }
263 else
264 {
265 /* Found a usable buffer */
266 PinLocalBuffer(bufHdr, false);
267 break;
268 }
269 }
270 else if (--trycounter == 0)
272 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
273 errmsg("no empty local buffer available")));
274 }
275
276 /*
277 * lazy memory allocation: allocate space on first use of a buffer.
278 */
279 if (LocalBufHdrGetBlock(bufHdr) == NULL)
280 {
281 /* Set pointer for use by BufferGetBlock() macro */
283 }
284
285 /*
286 * this buffer is not referenced but it might still be dirty. if that's
287 * the case, write it out before reusing it!
288 */
289 if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
290 FlushLocalBuffer(bufHdr, NULL);
291
292 /*
293 * Remove the victim buffer from the hashtable and mark as invalid.
294 */
295 if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
296 {
297 InvalidateLocalBuffer(bufHdr, false);
298
300 }
301
302 return BufferDescriptorGetBuffer(bufHdr);
303}
304
305/* see GetPinLimit() */
306uint32
308{
309 /* Every backend has its own temporary buffers, and can pin them all. */
310 return num_temp_buffers;
311}
312
313/* see GetAdditionalPinLimit() */
314uint32
316{
319}
320
321/* see LimitAdditionalPins() */
322void
324{
325 uint32 max_pins;
326
327 if (*additional_pins <= 1)
328 return;
329
330 /*
331 * In contrast to LimitAdditionalPins() other backends don't play a role
332 * here. We can allow up to NLocBuffer pins in total, but it might not be
333 * initialized yet so read num_temp_buffers.
334 */
336
337 if (*additional_pins >= max_pins)
338 *additional_pins = max_pins;
339}
340
341/*
342 * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
343 * temporary buffers.
344 */
347 ForkNumber fork,
348 uint32 flags,
349 uint32 extend_by,
350 BlockNumber extend_upto,
351 Buffer *buffers,
352 uint32 *extended_by)
353{
354 BlockNumber first_block;
355 instr_time io_start;
356
357 /* Initialize local buffers if first request in this session */
358 if (LocalBufHash == NULL)
360
361 LimitAdditionalLocalPins(&extend_by);
362
363 for (uint32 i = 0; i < extend_by; i++)
364 {
365 BufferDesc *buf_hdr;
366 Block buf_block;
367
368 buffers[i] = GetLocalVictimBuffer();
369 buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
370 buf_block = LocalBufHdrGetBlock(buf_hdr);
371
372 /* new buffers are zero-filled */
373 MemSet(buf_block, 0, BLCKSZ);
374 }
375
376 first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
377
378 if (extend_upto != InvalidBlockNumber)
379 {
380 /*
381 * In contrast to shared relations, nothing could change the relation
382 * size concurrently. Thus we shouldn't end up finding that we don't
383 * need to do anything.
384 */
385 Assert(first_block <= extend_upto);
386
387 Assert((uint64) first_block + extend_by <= extend_upto);
388 }
389
390 /* Fail if relation is already at maximum possible length */
391 if ((uint64) first_block + extend_by >= MaxBlockNumber)
393 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
394 errmsg("cannot extend relation %s beyond %u blocks",
395 relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
397
398 for (uint32 i = 0; i < extend_by; i++)
399 {
400 int victim_buf_id;
401 BufferDesc *victim_buf_hdr;
402 BufferTag tag;
403 LocalBufferLookupEnt *hresult;
404 bool found;
405
406 victim_buf_id = -buffers[i] - 1;
407 victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
408
409 /* in case we need to pin an existing buffer below */
411
412 InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
413 first_block + i);
414
415 hresult = (LocalBufferLookupEnt *)
416 hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
417 if (found)
418 {
419 BufferDesc *existing_hdr;
420 uint32 buf_state;
421
423
424 existing_hdr = GetLocalBufferDescriptor(hresult->id);
425 PinLocalBuffer(existing_hdr, false);
426 buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
427
428 /*
429 * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
430 */
431 buf_state = pg_atomic_read_u32(&existing_hdr->state);
432 Assert(buf_state & BM_TAG_VALID);
433 Assert(!(buf_state & BM_DIRTY));
434 buf_state &= ~BM_VALID;
435 pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
436
437 /* no need to loop for local buffers */
438 StartLocalBufferIO(existing_hdr, true, false);
439 }
440 else
441 {
442 uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
443
444 Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
445
446 victim_buf_hdr->tag = tag;
447
448 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
449
450 pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
451
452 hresult->id = victim_buf_id;
453
454 StartLocalBufferIO(victim_buf_hdr, true, false);
455 }
456 }
457
459
460 /* actually extend relation */
461 smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
462
464 io_start, 1, extend_by * BLCKSZ);
465
466 for (uint32 i = 0; i < extend_by; i++)
467 {
468 Buffer buf = buffers[i];
469 BufferDesc *buf_hdr;
470 uint32 buf_state;
471
472 buf_hdr = GetLocalBufferDescriptor(-buf - 1);
473
474 buf_state = pg_atomic_read_u32(&buf_hdr->state);
475 buf_state |= BM_VALID;
476 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
477 }
478
479 *extended_by = extend_by;
480
482
483 return first_block;
484}
485
486/*
487 * MarkLocalBufferDirty -
488 * mark a local buffer dirty
489 */
490void
492{
493 int bufid;
494 BufferDesc *bufHdr;
495 uint32 buf_state;
496
497 Assert(BufferIsLocal(buffer));
498
499#ifdef LBDEBUG
500 fprintf(stderr, "LB DIRTY %d\n", buffer);
501#endif
502
503 bufid = -buffer - 1;
504
505 Assert(LocalRefCount[bufid] > 0);
506
507 bufHdr = GetLocalBufferDescriptor(bufid);
508
509 buf_state = pg_atomic_read_u32(&bufHdr->state);
510
511 if (!(buf_state & BM_DIRTY))
513
514 buf_state |= BM_DIRTY;
515
516 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
517}
518
519/*
520 * Like StartBufferIO, but for local buffers
521 */
522bool
523StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
524{
525 uint32 buf_state;
526
527 /*
528 * With AIO the buffer could have IO in progress, e.g. when there are two
529 * scans of the same relation. Either wait for the other IO or return
530 * false.
531 */
532 if (pgaio_wref_valid(&bufHdr->io_wref))
533 {
534 PgAioWaitRef iow = bufHdr->io_wref;
535
536 if (nowait)
537 return false;
538
539 pgaio_wref_wait(&iow);
540 }
541
542 /* Once we get here, there is definitely no I/O active on this buffer */
543
544 /* Check if someone else already did the I/O */
545 buf_state = pg_atomic_read_u32(&bufHdr->state);
546 if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
547 {
548 return false;
549 }
550
551 /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
552
553 /* local buffers don't track IO using resowners */
554
555 return true;
556}
557
558/*
559 * Like TerminateBufferIO, but for local buffers
560 */
561void
562TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
563 bool release_aio)
564{
565 /* Only need to adjust flags */
566 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
567
568 /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
569
570 /* Clear earlier errors, if this IO failed, it'll be marked again */
571 buf_state &= ~BM_IO_ERROR;
572
573 if (clear_dirty)
574 buf_state &= ~BM_DIRTY;
575
576 if (release_aio)
577 {
578 /* release pin held by IO subsystem, see also buffer_stage_common() */
579 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
580 buf_state -= BUF_REFCOUNT_ONE;
581 pgaio_wref_clear(&bufHdr->io_wref);
582 }
583
584 buf_state |= set_flag_bits;
585 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
586
587 /* local buffers don't track IO using resowners */
588
589 /* local buffers don't use the IO CV, as no other process can see buffer */
590
591 /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
592}
593
594/*
595 * InvalidateLocalBuffer -- mark a local buffer invalid.
596 *
597 * If check_unreferenced is true, error out if the buffer is still
598 * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
599 * as part of changing the identity of a buffer, instead of just dropping the
600 * buffer.
601 *
602 * See also InvalidateBuffer().
603 */
604void
605InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
606{
607 Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
608 int bufid = -buffer - 1;
609 uint32 buf_state;
610 LocalBufferLookupEnt *hresult;
611
612 /*
613 * It's possible that we started IO on this buffer before e.g. aborting
614 * the transaction that created a table. We need to wait for that IO to
615 * complete before removing / reusing the buffer.
616 */
617 if (pgaio_wref_valid(&bufHdr->io_wref))
618 {
619 PgAioWaitRef iow = bufHdr->io_wref;
620
621 pgaio_wref_wait(&iow);
622 Assert(!pgaio_wref_valid(&bufHdr->io_wref));
623 }
624
625 buf_state = pg_atomic_read_u32(&bufHdr->state);
626
627 /*
628 * We need to test not just LocalRefCount[bufid] but also the BufferDesc
629 * itself, as the latter is used to represent a pin by the AIO subsystem.
630 * This can happen if AIO is initiated and then the query errors out.
631 */
632 if (check_unreferenced &&
633 (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
634 elog(ERROR, "block %u of %s is still referenced (local %d)",
635 bufHdr->tag.blockNum,
638 BufTagGetForkNum(&bufHdr->tag)).str,
639 LocalRefCount[bufid]);
640
641 /* Remove entry from hashtable */
642 hresult = (LocalBufferLookupEnt *)
643 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
644 if (!hresult) /* shouldn't happen */
645 elog(ERROR, "local buffer hash table corrupted");
646 /* Mark buffer invalid */
647 ClearBufferTag(&bufHdr->tag);
648 buf_state &= ~BUF_FLAG_MASK;
649 buf_state &= ~BUF_USAGECOUNT_MASK;
650 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
651}
652
653/*
654 * DropRelationLocalBuffers
655 * This function removes from the buffer pool all the pages of the
656 * specified relation that have block numbers >= firstDelBlock.
657 * (In particular, with firstDelBlock = 0, all pages are removed.)
658 * Dirty pages are simply dropped, without bothering to write them
659 * out first. Therefore, this is NOT rollback-able, and so should be
660 * used only with extreme caution!
661 *
662 * See DropRelationBuffers in bufmgr.c for more notes.
663 */
664void
666 int nforks, BlockNumber *firstDelBlock)
667{
668 int i;
669 int j;
670
671 for (i = 0; i < NLocBuffer; i++)
672 {
674 uint32 buf_state;
675
676 buf_state = pg_atomic_read_u32(&bufHdr->state);
677
678 if (!(buf_state & BM_TAG_VALID) ||
679 !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
680 continue;
681
682 for (j = 0; j < nforks; j++)
683 {
684 if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
685 bufHdr->tag.blockNum >= firstDelBlock[j])
686 {
687 InvalidateLocalBuffer(bufHdr, true);
688 break;
689 }
690 }
691 }
692}
693
694/*
695 * DropRelationAllLocalBuffers
696 * This function removes from the buffer pool all pages of all forks
697 * of the specified relation.
698 *
699 * See DropRelationsAllBuffers in bufmgr.c for more notes.
700 */
701void
703{
704 int i;
705
706 for (i = 0; i < NLocBuffer; i++)
707 {
709 uint32 buf_state;
710
711 buf_state = pg_atomic_read_u32(&bufHdr->state);
712
713 if ((buf_state & BM_TAG_VALID) &&
714 BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
715 {
716 InvalidateLocalBuffer(bufHdr, true);
717 }
718 }
719}
720
721/*
722 * InitLocalBuffers -
723 * init the local buffer cache. Since most queries (esp. multi-user ones)
724 * don't involve local buffers, we delay allocating actual memory for the
725 * buffers until we need them; just make the buffer headers here.
726 */
727static void
729{
730 int nbufs = num_temp_buffers;
731 HASHCTL info;
732 int i;
733
734 /*
735 * Parallel workers can't access data in temporary tables, because they
736 * have no visibility into the local buffers of their leader. This is a
737 * convenient, low-cost place to provide a backstop check for that. Note
738 * that we don't wish to prevent a parallel worker from accessing catalog
739 * metadata about a temp table, so checks at higher levels would be
740 * inappropriate.
741 */
742 if (IsParallelWorker())
744 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
745 errmsg("cannot access temporary tables during a parallel operation")));
746
747 /* Allocate and zero buffer headers and auxiliary arrays */
749 LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
750 LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
753 (errcode(ERRCODE_OUT_OF_MEMORY),
754 errmsg("out of memory")));
755
757
758 /* initialize fields that need to start off nonzero */
759 for (i = 0; i < nbufs; i++)
760 {
762
763 /*
764 * negative to indicate local buffer. This is tricky: shared buffers
765 * start with 0. We have to start with -2. (Note that the routine
766 * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
767 * is -1.)
768 */
769 buf->buf_id = -i - 2;
770
771 pgaio_wref_clear(&buf->io_wref);
772
773 /*
774 * Intentionally do not initialize the buffer's atomic variable
775 * (besides zeroing the underlying memory above). That way we get
776 * errors on platforms without atomics, if somebody (re-)introduces
777 * atomic operations for local buffers.
778 */
779 }
780
781 /* Create the lookup hash table */
782 info.keysize = sizeof(BufferTag);
783 info.entrysize = sizeof(LocalBufferLookupEnt);
784
785 LocalBufHash = hash_create("Local Buffer Lookup Table",
786 nbufs,
787 &info,
789
790 if (!LocalBufHash)
791 elog(ERROR, "could not initialize local buffer hash table");
792
793 /* Initialization done, mark buffers allocated */
794 NLocBuffer = nbufs;
795}
796
797/*
798 * XXX: We could have a slightly more efficient version of PinLocalBuffer()
799 * that does not support adjusting the usagecount - but so far it does not
800 * seem worth the trouble.
801 *
802 * Note that ResourceOwnerEnlarge() must have been done already.
803 */
804bool
805PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
806{
807 uint32 buf_state;
808 Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
809 int bufid = -buffer - 1;
810
811 buf_state = pg_atomic_read_u32(&buf_hdr->state);
812
813 if (LocalRefCount[bufid] == 0)
814 {
816 buf_state += BUF_REFCOUNT_ONE;
817 if (adjust_usagecount &&
819 {
820 buf_state += BUF_USAGECOUNT_ONE;
821 }
822 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
823
824 /*
825 * See comment in PinBuffer().
826 *
827 * If the buffer isn't allocated yet, it'll be marked as defined in
828 * GetLocalBufferStorage().
829 */
830 if (LocalBufHdrGetBlock(buf_hdr) != NULL)
832 }
833 LocalRefCount[bufid]++;
836
837 return buf_state & BM_VALID;
838}
839
840void
842{
845}
846
847void
849{
850 int buffid = -buffer - 1;
851
852 Assert(BufferIsLocal(buffer));
853 Assert(LocalRefCount[buffid] > 0);
855
856 if (--LocalRefCount[buffid] == 0)
857 {
858 BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
859 uint32 buf_state;
860
862
863 buf_state = pg_atomic_read_u32(&buf_hdr->state);
864 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
865 buf_state -= BUF_REFCOUNT_ONE;
866 pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
867
868 /* see comment in UnpinBufferNoOwner */
870 }
871}
872
873/*
874 * GUC check_hook for temp_buffers
875 */
876bool
878{
879 /*
880 * Once local buffers have been initialized, it's too late to change this.
881 * However, if this is only a test call, allow it.
882 */
884 {
885 GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
886 return false;
887 }
888 return true;
889}
890
891/*
892 * GetLocalBufferStorage - allocate memory for a local buffer
893 *
894 * The idea of this function is to aggregate our requests for storage
895 * so that the memory manager doesn't see a whole lot of relatively small
896 * requests. Since we'll never give back a local buffer once it's created
897 * within a particular process, no point in burdening memmgr with separately
898 * managed chunks.
899 */
900static Block
902{
903 static char *cur_block = NULL;
904 static int next_buf_in_block = 0;
905 static int num_bufs_in_block = 0;
906 static int total_bufs_allocated = 0;
907 static MemoryContext LocalBufferContext = NULL;
908
909 char *this_buf;
910
911 Assert(total_bufs_allocated < NLocBuffer);
912
913 if (next_buf_in_block >= num_bufs_in_block)
914 {
915 /* Need to make a new request to memmgr */
916 int num_bufs;
917
918 /*
919 * We allocate local buffers in a context of their own, so that the
920 * space eaten for them is easily recognizable in MemoryContextStats
921 * output. Create the context on first use.
922 */
923 if (LocalBufferContext == NULL)
924 LocalBufferContext =
926 "LocalBufferContext",
928
929 /* Start with a 16-buffer request; subsequent ones double each time */
930 num_bufs = Max(num_bufs_in_block * 2, 16);
931 /* But not more than what we need for all remaining local bufs */
932 num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
933 /* And don't overflow MaxAllocSize, either */
934 num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
935
936 /* Buffers should be I/O aligned. */
937 cur_block = MemoryContextAllocAligned(LocalBufferContext,
938 num_bufs * BLCKSZ,
940 0);
941
942 next_buf_in_block = 0;
943 num_bufs_in_block = num_bufs;
944 }
945
946 /* Allocate next buffer in current memory block */
947 this_buf = cur_block + next_buf_in_block * BLCKSZ;
948 next_buf_in_block++;
949 total_bufs_allocated++;
950
951 /*
952 * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
953 * here. The block is actually undefined, but we want consistency with
954 * the regular case of not needing to allocate memory. This is
955 * specifically needed when method_io_uring.c fills the block, because
956 * Valgrind doesn't recognize io_uring reads causing undefined memory to
957 * become defined.
958 */
959 VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
960
961 return (Block) this_buf;
962}
963
964/*
965 * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
966 *
967 * This is just like CheckForBufferLeaks(), but for local buffers.
968 */
969static void
971{
972#ifdef USE_ASSERT_CHECKING
973 if (LocalRefCount)
974 {
975 int RefCountErrors = 0;
976 int i;
977
978 for (i = 0; i < NLocBuffer; i++)
979 {
980 if (LocalRefCount[i] != 0)
981 {
982 Buffer b = -i - 1;
983 char *s;
984
986 elog(WARNING, "local buffer refcount leak: %s", s);
987 pfree(s);
988
989 RefCountErrors++;
990 }
991 }
992 Assert(RefCountErrors == 0);
993 }
994#endif
995}
996
997/*
998 * AtEOXact_LocalBuffers - clean up at end of transaction.
999 *
1000 * This is just like AtEOXact_Buffers, but for local buffers.
1001 */
1002void
1004{
1006}
1007
1008/*
1009 * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1010 *
1011 * This is just like AtProcExit_Buffers, but for local buffers.
1012 */
1013void
1015{
1016 /*
1017 * We shouldn't be holding any remaining pins; if we are, and assertions
1018 * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1019 * drop the temp rels.
1020 */
1022}
bool pgaio_wref_valid(PgAioWaitRef *iow)
Definition: aio.c:971
void pgaio_wref_clear(PgAioWaitRef *iow)
Definition: aio.c:964
void pgaio_wref_wait(PgAioWaitRef *iow)
Definition: aio.c:991
static void pg_atomic_unlocked_write_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:293
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:237
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
#define MaxBlockNumber
Definition: block.h:35
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define BM_MAX_USAGE_COUNT
Definition: buf_internals.h:86
static void InitBufferTag(BufferTag *tag, const RelFileLocator *rlocator, ForkNumber forkNum, BlockNumber blockNum)
#define BM_TAG_VALID
Definition: buf_internals.h:71
#define BUF_USAGECOUNT_MASK
Definition: buf_internals.h:53
static ForkNumber BufTagGetForkNum(const BufferTag *tag)
#define BUF_REFCOUNT_ONE
Definition: buf_internals.h:51
static bool BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
static bool BufTagMatchesRelFileLocator(const BufferTag *tag, const RelFileLocator *rlocator)
#define BUF_FLAG_MASK
Definition: buf_internals.h:56
#define BM_DIRTY
Definition: buf_internals.h:69
#define BM_JUST_DIRTIED
Definition: buf_internals.h:74
#define BUF_STATE_GET_USAGECOUNT(state)
Definition: buf_internals.h:60
static void ClearBufferTag(BufferTag *tag)
static void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer)
struct buftag BufferTag
static void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
#define BUF_USAGECOUNT_ONE
Definition: buf_internals.h:54
#define BUF_STATE_GET_REFCOUNT(state)
Definition: buf_internals.h:59
static RelFileLocator BufTagGetRelFileLocator(const BufferTag *tag)
#define BM_VALID
Definition: buf_internals.h:70
static BufferDesc * GetLocalBufferDescriptor(uint32 id)
static Buffer BufferDescriptorGetBuffer(const BufferDesc *bdesc)
bool track_io_timing
Definition: bufmgr.c:147
char * DebugPrintBufferRefcount(Buffer buffer)
Definition: bufmgr.c:4166
void * Block
Definition: bufmgr.h:26
#define BMR_GET_SMGR(bmr)
Definition: bufmgr.h:118
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1541
PageData * Page
Definition: bufpage.h:81
#define Min(x, y)
Definition: c.h:1008
#define Max(x, y)
Definition: c.h:1002
int32_t int32
Definition: c.h:539
uint64_t uint64
Definition: c.h:544
uint32_t uint32
Definition: c.h:543
#define MemSet(start, val, len)
Definition: c.h:1024
#define fprintf(file, fmt, msg)
Definition: cubescan.l:21
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
#define MaxAllocSize
Definition: fe_memutils.h:22
ProcNumber MyProcNumber
Definition: globals.c:90
#define newval
#define GUC_check_errdetail
Definition: guc.h:505
GucSource
Definition: guc.h:112
@ PGC_S_TEST
Definition: guc.h:125
int num_temp_buffers
Definition: guc_tables.c:553
Assert(PointerIsAligned(start, uint64))
const char * str
#define calloc(a, b)
Definition: header.h:55
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define IsParallelWorker()
Definition: parallel.h:60
BufferUsage pgBufferUsage
Definition: instrument.c:20
int b
Definition: isn.c:74
int j
Definition: isn.c:78
int i
Definition: isn.c:77
int32 * LocalRefCount
Definition: localbuf.c:49
void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
Definition: localbuf.c:183
void UnpinLocalBuffer(Buffer buffer)
Definition: localbuf.c:841
bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
Definition: localbuf.c:523
static HTAB * LocalBufHash
Definition: localbuf.c:53
static int NLocalPinnedBuffers
Definition: localbuf.c:56
void AtEOXact_LocalBuffers(bool isCommit)
Definition: localbuf.c:1003
#define LocalBufHdrGetBlock(bufHdr)
Definition: localbuf.c:42
static void CheckForLocalBufferLeaks(void)
Definition: localbuf.c:970
uint32 GetAdditionalLocalPinLimit(void)
Definition: localbuf.c:315
static Block GetLocalBufferStorage(void)
Definition: localbuf.c:901
static int nextFreeLocalBufId
Definition: localbuf.c:51
bool check_temp_buffers(int *newval, void **extra, GucSource source)
Definition: localbuf.c:877
void AtProcExit_LocalBuffers(void)
Definition: localbuf.c:1014
bool PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
Definition: localbuf.c:805
static void InitLocalBuffers(void)
Definition: localbuf.c:728
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:323
uint32 GetLocalPinLimit(void)
Definition: localbuf.c:307
static Buffer GetLocalVictimBuffer(void)
Definition: localbuf.c:224
void MarkLocalBufferDirty(Buffer buffer)
Definition: localbuf.c:491
void DropRelationAllLocalBuffers(RelFileLocator rlocator)
Definition: localbuf.c:702
void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, bool release_aio)
Definition: localbuf.c:562
void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
Definition: localbuf.c:605
int NLocBuffer
Definition: localbuf.c:45
PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum)
Definition: localbuf.c:72
BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr, ForkNumber fork, uint32 flags, uint32 extend_by, BlockNumber extend_upto, Buffer *buffers, uint32 *extended_by)
Definition: localbuf.c:346
Block * LocalBufferBlockPointers
Definition: localbuf.c:48
void UnpinLocalBufferNoOwner(Buffer buffer)
Definition: localbuf.c:848
void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum, int nforks, BlockNumber *firstDelBlock)
Definition: localbuf.c:665
BufferDesc * LocalBufferDescriptors
Definition: localbuf.c:47
BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, bool *foundPtr)
Definition: localbuf.c:119
void pfree(void *pointer)
Definition: mcxt.c:1594
void * MemoryContextAllocAligned(MemoryContext context, Size size, Size alignto, int flags)
Definition: mcxt.c:1460
MemoryContext TopMemoryContext
Definition: mcxt.c:166
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition: memdebug.h:27
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define PG_IO_ALIGN_SIZE
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
@ IOOBJECT_TEMP_RELATION
Definition: pgstat.h:278
@ IOCONTEXT_NORMAL
Definition: pgstat.h:289
@ IOOP_EXTEND
Definition: pgstat.h:314
@ IOOP_EVICT
Definition: pgstat.h:307
@ IOOP_WRITE
Definition: pgstat.h:316
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition: pgstat_io.c:91
void pgstat_count_io_op(IOObject io_object, IOContext io_context, IOOp io_op, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:68
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:122
ForkNumber
Definition: relpath.h:56
#define relpath(rlocator, forknum)
Definition: relpath.h:150
#define relpathbackend(rlocator, backend, forknum)
Definition: relpath.h:141
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
void ResourceOwnerEnlarge(ResourceOwner owner)
Definition: resowner.c:449
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:819
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:240
void smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync)
Definition: smgr.c:649
bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks)
Definition: smgr.c:678
static void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.h:131
BufferTag tag
pg_atomic_uint32 state
PgAioWaitRef io_wref
int64 local_blks_written
Definition: instrument.h:33
int64 local_blks_dirtied
Definition: instrument.h:32
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:222
Buffer recent_buffer
Definition: bufmgr.h:61
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:38
BlockNumber blockNum