@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
223223 * ----------------------------------------------------------------
224224 */
225225
226+ /*
227+ * Streaming read API callback for parallel sequential scans. Returns the next
228+ * block the caller wants from the read stream or InvalidBlockNumber when done.
229+ */
230+ static BlockNumber
231+ heap_scan_stream_read_next_parallel (ReadStream * stream ,
232+ void * callback_private_data ,
233+ void * per_buffer_data )
234+ {
235+ HeapScanDesc scan = (HeapScanDesc ) callback_private_data ;
236+
237+ Assert (ScanDirectionIsForward (scan -> rs_dir ));
238+ Assert (scan -> rs_base .rs_parallel );
239+
240+ if (unlikely (!scan -> rs_inited ))
241+ {
242+ /* parallel scan */
243+ table_block_parallelscan_startblock_init (scan -> rs_base .rs_rd ,
244+ scan -> rs_parallelworkerdata ,
245+ (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
246+
247+ /* may return InvalidBlockNumber if there are no more blocks */
248+ scan -> rs_prefetch_block = table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
249+ scan -> rs_parallelworkerdata ,
250+ (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
251+ scan -> rs_inited = true;
252+ }
253+ else
254+ {
255+ scan -> rs_prefetch_block = table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
256+ scan -> rs_parallelworkerdata , (ParallelBlockTableScanDesc )
257+ scan -> rs_base .rs_parallel );
258+ }
259+
260+ return scan -> rs_prefetch_block ;
261+ }
262+
263+ /*
264+ * Streaming read API callback for serial sequential and TID range scans.
265+ * Returns the next block the caller wants from the read stream or
266+ * InvalidBlockNumber when done.
267+ */
268+ static BlockNumber
269+ heap_scan_stream_read_next_serial (ReadStream * stream ,
270+ void * callback_private_data ,
271+ void * per_buffer_data )
272+ {
273+ HeapScanDesc scan = (HeapScanDesc ) callback_private_data ;
274+
275+ if (unlikely (!scan -> rs_inited ))
276+ {
277+ scan -> rs_prefetch_block = heapgettup_initial_block (scan , scan -> rs_dir );
278+ scan -> rs_inited = true;
279+ }
280+ else
281+ scan -> rs_prefetch_block = heapgettup_advance_block (scan ,
282+ scan -> rs_prefetch_block ,
283+ scan -> rs_dir );
284+
285+ return scan -> rs_prefetch_block ;
286+ }
287+
226288/* ----------------
227289 * initscan - scan code common to heap_beginscan and heap_rescan
228290 * ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
325387 scan -> rs_cbuf = InvalidBuffer ;
326388 scan -> rs_cblock = InvalidBlockNumber ;
327389
390+ /*
391+ * Initialize to ForwardScanDirection because it is most common and
392+ * because heap scans go forward before going backward (e.g. CURSORs).
393+ */
394+ scan -> rs_dir = ForwardScanDirection ;
395+ scan -> rs_prefetch_block = InvalidBlockNumber ;
396+
328397 /* page-at-a-time fields are always invalid when not rs_inited */
329398
330399 /*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
508577/*
509578 * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
510579 *
511- * Read the next block of the scan relation into a buffer and pin that buffer
512- * before saving it in the scan descriptor.
580+ * Read the next block of the scan relation from the read stream and save it
581+ * in the scan descriptor. It is already pinned .
513582 */
514583static inline void
515584heap_fetch_next_buffer (HeapScanDesc scan , ScanDirection dir )
516585{
586+ Assert (scan -> rs_read_stream );
587+
517588 /* release previous scan buffer, if any */
518589 if (BufferIsValid (scan -> rs_cbuf ))
519590 {
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
528599 */
529600 CHECK_FOR_INTERRUPTS ();
530601
531- if (unlikely (!scan -> rs_inited ))
602+ /*
603+ * If the scan direction is changing, reset the prefetch block to the
604+ * current block. Otherwise, we will incorrectly prefetch the blocks
605+ * between the prefetch block and the current block again before
606+ * prefetching blocks in the new, correct scan direction.
607+ */
608+ if (unlikely (scan -> rs_dir != dir ))
532609 {
533- scan -> rs_cblock = heapgettup_initial_block (scan , dir );
610+ scan -> rs_prefetch_block = scan -> rs_cblock ;
611+ read_stream_reset (scan -> rs_read_stream );
612+ }
534613
535- /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
536- Assert (scan -> rs_cblock != InvalidBlockNumber ||
537- !BufferIsValid (scan -> rs_cbuf ));
614+ scan -> rs_dir = dir ;
538615
539- scan -> rs_inited = true;
540- }
541- else
542- scan -> rs_cblock = heapgettup_advance_block (scan , scan -> rs_cblock ,
543- dir );
544-
545- /* read block if valid */
546- if (BlockNumberIsValid (scan -> rs_cblock ))
547- scan -> rs_cbuf = ReadBufferExtended (scan -> rs_base .rs_rd , MAIN_FORKNUM ,
548- scan -> rs_cblock , RBM_NORMAL ,
549- scan -> rs_strategy );
616+ scan -> rs_cbuf = read_stream_next_buffer (scan -> rs_read_stream , NULL );
617+ if (BufferIsValid (scan -> rs_cbuf ))
618+ scan -> rs_cblock = BufferGetBlockNumber (scan -> rs_cbuf );
550619}
551620
552621/*
@@ -560,34 +629,18 @@ static pg_noinline BlockNumber
560629heapgettup_initial_block (HeapScanDesc scan , ScanDirection dir )
561630{
562631 Assert (!scan -> rs_inited );
632+ Assert (scan -> rs_base .rs_parallel == NULL );
563633
564634 /* When there are no pages to scan, return InvalidBlockNumber */
565635 if (scan -> rs_nblocks == 0 || scan -> rs_numblocks == 0 )
566636 return InvalidBlockNumber ;
567637
568638 if (ScanDirectionIsForward (dir ))
569639 {
570- /* serial scan */
571- if (scan -> rs_base .rs_parallel == NULL )
572- return scan -> rs_startblock ;
573- else
574- {
575- /* parallel scan */
576- table_block_parallelscan_startblock_init (scan -> rs_base .rs_rd ,
577- scan -> rs_parallelworkerdata ,
578- (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
579-
580- /* may return InvalidBlockNumber if there are no more blocks */
581- return table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
582- scan -> rs_parallelworkerdata ,
583- (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
584- }
640+ return scan -> rs_startblock ;
585641 }
586642 else
587643 {
588- /* backward parallel scan not supported */
589- Assert (scan -> rs_base .rs_parallel == NULL );
590-
591644 /*
592645 * Disable reporting to syncscan logic in a backwards scan; it's not
593646 * very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
699752static inline BlockNumber
700753heapgettup_advance_block (HeapScanDesc scan , BlockNumber block , ScanDirection dir )
701754{
702- if (ScanDirectionIsForward (dir ))
755+ Assert (scan -> rs_base .rs_parallel == NULL );
756+
757+ if (likely (ScanDirectionIsForward (dir )))
703758 {
704- if (scan -> rs_base .rs_parallel == NULL )
705- {
706- block ++ ;
759+ block ++ ;
707760
708- /* wrap back to the start of the heap */
709- if (block >= scan -> rs_nblocks )
710- block = 0 ;
761+ /* wrap back to the start of the heap */
762+ if (block >= scan -> rs_nblocks )
763+ block = 0 ;
711764
712- /*
713- * Report our new scan position for synchronization purposes. We
714- * don't do that when moving backwards, however. That would just
715- * mess up any other forward-moving scanners.
716- *
717- * Note: we do this before checking for end of scan so that the
718- * final state of the position hint is back at the start of the
719- * rel. That's not strictly necessary, but otherwise when you run
720- * the same query multiple times the starting position would shift
721- * a little bit backwards on every invocation, which is confusing.
722- * We don't guarantee any specific ordering in general, though.
723- */
724- if (scan -> rs_base .rs_flags & SO_ALLOW_SYNC )
725- ss_report_location (scan -> rs_base .rs_rd , block );
726-
727- /* we're done if we're back at where we started */
728- if (block == scan -> rs_startblock )
729- return InvalidBlockNumber ;
765+ /*
766+ * Report our new scan position for synchronization purposes. We don't
767+ * do that when moving backwards, however. That would just mess up any
768+ * other forward-moving scanners.
769+ *
770+ * Note: we do this before checking for end of scan so that the final
771+ * state of the position hint is back at the start of the rel. That's
772+ * not strictly necessary, but otherwise when you run the same query
773+ * multiple times the starting position would shift a little bit
774+ * backwards on every invocation, which is confusing. We don't
775+ * guarantee any specific ordering in general, though.
776+ */
777+ if (scan -> rs_base .rs_flags & SO_ALLOW_SYNC )
778+ ss_report_location (scan -> rs_base .rs_rd , block );
730779
731- /* check if the limit imposed by heap_setscanlimits() is met */
732- if (scan -> rs_numblocks != InvalidBlockNumber )
733- {
734- if (-- scan -> rs_numblocks == 0 )
735- return InvalidBlockNumber ;
736- }
780+ /* we're done if we're back at where we started */
781+ if (block == scan -> rs_startblock )
782+ return InvalidBlockNumber ;
737783
738- return block ;
739- }
740- else
784+ /* check if the limit imposed by heap_setscanlimits() is met */
785+ if (scan -> rs_numblocks != InvalidBlockNumber )
741786 {
742- return table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
743- scan -> rs_parallelworkerdata , (ParallelBlockTableScanDesc )
744- scan -> rs_base .rs_parallel );
787+ if (-- scan -> rs_numblocks == 0 )
788+ return InvalidBlockNumber ;
745789 }
790+
791+ return block ;
746792 }
747793 else
748794 {
@@ -879,6 +925,7 @@ heapgettup(HeapScanDesc scan,
879925
880926 scan -> rs_cbuf = InvalidBuffer ;
881927 scan -> rs_cblock = InvalidBlockNumber ;
928+ scan -> rs_prefetch_block = InvalidBlockNumber ;
882929 tuple -> t_data = NULL ;
883930 scan -> rs_inited = false;
884931}
@@ -974,6 +1021,7 @@ heapgettup_pagemode(HeapScanDesc scan,
9741021 ReleaseBuffer (scan -> rs_cbuf );
9751022 scan -> rs_cbuf = InvalidBuffer ;
9761023 scan -> rs_cblock = InvalidBlockNumber ;
1024+ scan -> rs_prefetch_block = InvalidBlockNumber ;
9771025 tuple -> t_data = NULL ;
9781026 scan -> rs_inited = false;
9791027}
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
10691117
10701118 initscan (scan , key , false);
10711119
1120+ scan -> rs_read_stream = NULL ;
1121+
1122+ /*
1123+ * Set up a read stream for sequential scans and TID range scans. This
1124+ * should be done after initscan() because initscan() allocates the
1125+ * BufferAccessStrategy object passed to the streaming read API.
1126+ */
1127+ if (scan -> rs_base .rs_flags & SO_TYPE_SEQSCAN ||
1128+ scan -> rs_base .rs_flags & SO_TYPE_TIDRANGESCAN )
1129+ {
1130+ ReadStreamBlockNumberCB cb ;
1131+
1132+ if (scan -> rs_base .rs_parallel )
1133+ cb = heap_scan_stream_read_next_parallel ;
1134+ else
1135+ cb = heap_scan_stream_read_next_serial ;
1136+
1137+ scan -> rs_read_stream = read_stream_begin_relation (READ_STREAM_SEQUENTIAL ,
1138+ scan -> rs_strategy ,
1139+ scan -> rs_base .rs_rd ,
1140+ MAIN_FORKNUM ,
1141+ cb ,
1142+ scan ,
1143+ 0 );
1144+ }
1145+
1146+
10721147 return (TableScanDesc ) scan ;
10731148}
10741149
@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
11111186
11121187 Assert (scan -> rs_empty_tuples_pending == 0 );
11131188
1189+ /*
1190+ * The read stream is reset on rescan. This must be done before
1191+ * initscan(), as some state referred to by read_stream_reset() is reset
1192+ * in initscan().
1193+ */
1194+ if (scan -> rs_read_stream )
1195+ read_stream_reset (scan -> rs_read_stream );
1196+
11141197 /*
11151198 * reinitialize scan descriptor
11161199 */
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
11351218
11361219 Assert (scan -> rs_empty_tuples_pending == 0 );
11371220
1221+ /*
1222+ * Must free the read stream before freeing the BufferAccessStrategy.
1223+ */
1224+ if (scan -> rs_read_stream )
1225+ read_stream_end (scan -> rs_read_stream );
1226+
11381227 /*
11391228 * decrement relation reference count and free scan descriptor storage
11401229 */
0 commit comments