2121#include "storage/bufmgr.h"
2222#include "storage/proc.h"
2323#include "storage/procarray.h"
24+ #include "storage/read_stream.h"
2425#include "storage/smgr.h"
2526#include "utils/rel.h"
2627#include "utils/snapmgr.h"
@@ -41,6 +42,17 @@ typedef struct corrupt_items
4142 ItemPointer tids ;
4243} corrupt_items ;
4344
45+ /* for collect_corrupt_items_read_stream_next_block */
46+ struct collect_corrupt_items_read_stream_private
47+ {
48+ bool all_frozen ;
49+ bool all_visible ;
50+ BlockNumber current_blocknum ;
51+ BlockNumber last_exclusive ;
52+ Relation rel ;
53+ Buffer vmbuffer ;
54+ };
55+
4456PG_FUNCTION_INFO_V1 (pg_visibility_map );
4557PG_FUNCTION_INFO_V1 (pg_visibility_map_rel );
4658PG_FUNCTION_INFO_V1 (pg_visibility );
@@ -478,6 +490,8 @@ collect_visibility_data(Oid relid, bool include_pd)
478490 BlockNumber blkno ;
479491 Buffer vmbuffer = InvalidBuffer ;
480492 BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
493+ BlockRangeReadStreamPrivate p ;
494+ ReadStream * stream = NULL ;
481495
482496 rel = relation_open (relid , AccessShareLock );
483497
@@ -489,6 +503,20 @@ collect_visibility_data(Oid relid, bool include_pd)
489503 info -> next = 0 ;
490504 info -> count = nblocks ;
491505
506+ /* Create a stream if reading main fork. */
507+ if (include_pd )
508+ {
509+ p .current_blocknum = 0 ;
510+ p .last_exclusive = nblocks ;
511+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
512+ bstrategy ,
513+ rel ,
514+ MAIN_FORKNUM ,
515+ block_range_read_stream_cb ,
516+ & p ,
517+ 0 );
518+ }
519+
492520 for (blkno = 0 ; blkno < nblocks ; ++ blkno )
493521 {
494522 int32 mapbits ;
@@ -513,8 +541,7 @@ collect_visibility_data(Oid relid, bool include_pd)
513541 Buffer buffer ;
514542 Page page ;
515543
516- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
517- bstrategy );
544+ buffer = read_stream_next_buffer (stream , NULL );
518545 LockBuffer (buffer , BUFFER_LOCK_SHARE );
519546
520547 page = BufferGetPage (buffer );
@@ -525,6 +552,12 @@ collect_visibility_data(Oid relid, bool include_pd)
525552 }
526553 }
527554
555+ if (include_pd )
556+ {
557+ Assert (read_stream_next_buffer (stream , NULL ) == InvalidBuffer );
558+ read_stream_end (stream );
559+ }
560+
528561 /* Clean up. */
529562 if (vmbuffer != InvalidBuffer )
530563 ReleaseBuffer (vmbuffer );
@@ -610,6 +643,38 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
610643 }
611644}
612645
646+ /*
647+ * Callback function to get next block for read stream object used in
648+ * collect_corrupt_items() function.
649+ */
650+ static BlockNumber
651+ collect_corrupt_items_read_stream_next_block (ReadStream * stream ,
652+ void * callback_private_data ,
653+ void * per_buffer_data )
654+ {
655+ struct collect_corrupt_items_read_stream_private * p = callback_private_data ;
656+
657+ for (; p -> current_blocknum < p -> last_exclusive ; p -> current_blocknum ++ )
658+ {
659+ bool check_frozen = false;
660+ bool check_visible = false;
661+
662+ /* Make sure we are interruptible. */
663+ CHECK_FOR_INTERRUPTS ();
664+
665+ if (p -> all_frozen && VM_ALL_FROZEN (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
666+ check_frozen = true;
667+ if (p -> all_visible && VM_ALL_VISIBLE (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
668+ check_visible = true;
669+ if (!check_visible && !check_frozen )
670+ continue ;
671+
672+ return p -> current_blocknum ++ ;
673+ }
674+
675+ return InvalidBlockNumber ;
676+ }
677+
613678/*
614679 * Returns a list of items whose visibility map information does not match
615680 * the status of the tuples on the page.
@@ -628,12 +693,13 @@ static corrupt_items *
628693collect_corrupt_items (Oid relid , bool all_visible , bool all_frozen )
629694{
630695 Relation rel ;
631- BlockNumber nblocks ;
632696 corrupt_items * items ;
633- BlockNumber blkno ;
634697 Buffer vmbuffer = InvalidBuffer ;
635698 BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
636699 TransactionId OldestXmin = InvalidTransactionId ;
700+ struct collect_corrupt_items_read_stream_private p ;
701+ ReadStream * stream ;
702+ Buffer buffer ;
637703
638704 rel = relation_open (relid , AccessShareLock );
639705
@@ -643,8 +709,6 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
643709 if (all_visible )
644710 OldestXmin = GetStrictOldestNonRemovableTransactionId (rel );
645711
646- nblocks = RelationGetNumberOfBlocks (rel );
647-
648712 /*
649713 * Guess an initial array size. We don't expect many corrupted tuples, so
650714 * start with a small array. This function uses the "next" field to track
@@ -658,34 +722,38 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
658722 items -> count = 64 ;
659723 items -> tids = palloc (items -> count * sizeof (ItemPointerData ));
660724
725+ p .current_blocknum = 0 ;
726+ p .last_exclusive = RelationGetNumberOfBlocks (rel );
727+ p .rel = rel ;
728+ p .vmbuffer = InvalidBuffer ;
729+ p .all_frozen = all_frozen ;
730+ p .all_visible = all_visible ;
731+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
732+ bstrategy ,
733+ rel ,
734+ MAIN_FORKNUM ,
735+ collect_corrupt_items_read_stream_next_block ,
736+ & p ,
737+ 0 );
738+
661739 /* Loop over every block in the relation. */
662- for ( blkno = 0 ; blkno < nblocks ; ++ blkno )
740+ while (( buffer = read_stream_next_buffer ( stream , NULL )) != InvalidBuffer )
663741 {
664- bool check_frozen = false;
665- bool check_visible = false;
666- Buffer buffer ;
742+ bool check_frozen = all_frozen ;
743+ bool check_visible = all_visible ;
667744 Page page ;
668745 OffsetNumber offnum ,
669746 maxoff ;
747+ BlockNumber blkno ;
670748
671749 /* Make sure we are interruptible. */
672750 CHECK_FOR_INTERRUPTS ();
673751
674- /* Use the visibility map to decide whether to check this page. */
675- if (all_frozen && VM_ALL_FROZEN (rel , blkno , & vmbuffer ))
676- check_frozen = true;
677- if (all_visible && VM_ALL_VISIBLE (rel , blkno , & vmbuffer ))
678- check_visible = true;
679- if (!check_visible && !check_frozen )
680- continue ;
681-
682- /* Read and lock the page. */
683- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
684- bstrategy );
685752 LockBuffer (buffer , BUFFER_LOCK_SHARE );
686753
687754 page = BufferGetPage (buffer );
688755 maxoff = PageGetMaxOffsetNumber (page );
756+ blkno = BufferGetBlockNumber (buffer );
689757
690758 /*
691759 * The visibility map bits might have changed while we were acquiring
@@ -778,10 +846,13 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
778846
779847 UnlockReleaseBuffer (buffer );
780848 }
849+ read_stream_end (stream );
781850
782851 /* Clean up. */
783852 if (vmbuffer != InvalidBuffer )
784853 ReleaseBuffer (vmbuffer );
854+ if (p .vmbuffer != InvalidBuffer )
855+ ReleaseBuffer (p .vmbuffer );
785856 relation_close (rel , AccessShareLock );
786857
787858 /*
0 commit comments