@@ -229,6 +229,8 @@ static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Rela
229229 bool isconcurrent , int request );
230230static void _brin_end_parallel (BrinLeader * brinleader , BrinBuildState * state );
231231static Size _brin_parallel_estimate_shared (Relation heap , Snapshot snapshot );
232+ static double _brin_parallel_heapscan (BrinBuildState * buildstate );
233+ static double _brin_parallel_merge (BrinBuildState * buildstate );
232234static void _brin_leader_participate_as_worker (BrinBuildState * buildstate ,
233235 Relation heap , Relation index );
234236static void _brin_parallel_scan_and_build (BrinBuildState * buildstate ,
@@ -1201,6 +1203,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12011203 tuplesort_begin_index_brin (maintenance_work_mem , coordinate ,
12021204 TUPLESORT_NONE );
12031205
1206+ /* scan the relation and merge per-worker results */
1207+ reltuples = _brin_parallel_merge (state );
1208+
12041209 _brin_end_parallel (state -> bs_leader , state );
12051210 }
12061211 else /* no parallel index build */
@@ -1233,14 +1238,10 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12331238 brin_fill_empty_ranges (state ,
12341239 state -> bs_currRangeStart ,
12351240 state -> bs_maxRangeStart );
1236-
1237- /* track the number of relation tuples */
1238- state -> bs_reltuples = reltuples ;
12391241 }
12401242
12411243 /* release resources */
12421244 idxtuples = state -> bs_numtuples ;
1243- reltuples = state -> bs_reltuples ;
12441245 brinRevmapTerminate (state -> bs_rmAccess );
12451246 terminate_brin_buildstate (state );
12461247
@@ -2329,6 +2330,22 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
23292330 return true;
23302331}
23312332
2333+ /*
2334+ * Create parallel context, and launch workers for leader.
2335+ *
2336+ * buildstate argument should be initialized (with the exception of the
2337+ * tuplesort states, which may later be created based on shared
2338+ * state initially set up here).
2339+ *
2340+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2341+ *
2342+ * request is the target number of parallel worker processes to launch.
2343+ *
2344+ * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2345+ * mode by passing it to _brin_end_parallel() at the very end of its index
2346+ * build. If not even a single worker process can be launched, this is
2347+ * never set, and caller should proceed with a serial index build.
2348+ */
23322349static void
23332350_brin_begin_parallel (BrinBuildState * buildstate , Relation heap , Relation index ,
23342351 bool isconcurrent , int request )
@@ -2517,27 +2534,87 @@ static void
25172534_brin_end_parallel (BrinLeader * brinleader , BrinBuildState * state )
25182535{
25192536 int i ;
2520- BrinTuple * btup ;
2521- BrinMemTuple * memtuple = NULL ;
2522- Size tuplen ;
2523- BrinShared * brinshared = brinleader -> brinshared ;
2524- BlockNumber prevblkno = InvalidBlockNumber ;
2525- MemoryContext rangeCxt ,
2526- oldCxt ;
25272537
25282538 /* Shutdown worker processes */
25292539 WaitForParallelWorkersToFinish (brinleader -> pcxt );
25302540
25312541 /*
2532- * If we didn't actually launch workers, we still have to make sure to
2533- * exit parallel mode.
2542+ * Next, accumulate WAL usage. (This must wait for the workers to finish,
2543+ * or we might get incomplete data.)
25342544 */
2535- if (!state )
2536- goto cleanup ;
2545+ for (i = 0 ; i < brinleader -> pcxt -> nworkers_launched ; i ++ )
2546+ InstrAccumParallelQuery (& brinleader -> bufferusage [i ], & brinleader -> walusage [i ]);
2547+
2548+ /* Free last reference to MVCC snapshot, if one was used */
2549+ if (IsMVCCSnapshot (brinleader -> snapshot ))
2550+ UnregisterSnapshot (brinleader -> snapshot );
2551+ DestroyParallelContext (brinleader -> pcxt );
2552+ ExitParallelMode ();
2553+ }
2554+
2555+ /*
2556+ * Within leader, wait for end of heap scan.
2557+ *
2558+ * When called, parallel heap scan started by _brin_begin_parallel() will
2559+ * already be underway within worker processes (when leader participates
2560+ * as a worker, we should end up here just as workers are finishing).
2561+ *
2562+ * Returns the total number of heap tuples scanned.
2563+ */
2564+ static double
2565+ _brin_parallel_heapscan (BrinBuildState * state )
2566+ {
2567+ BrinShared * brinshared = state -> bs_leader -> brinshared ;
2568+ int nparticipanttuplesorts ;
2569+
2570+ nparticipanttuplesorts = state -> bs_leader -> nparticipanttuplesorts ;
2571+ for (;;)
2572+ {
2573+ SpinLockAcquire (& brinshared -> mutex );
2574+ if (brinshared -> nparticipantsdone == nparticipanttuplesorts )
2575+ {
2576+ /* copy the data into leader state */
2577+ state -> bs_reltuples = brinshared -> reltuples ;
2578+ state -> bs_numtuples = brinshared -> indtuples ;
25372579
2538- /* copy the data into leader state (we have to wait for the workers ) */
2539- state -> bs_reltuples = brinshared -> reltuples ;
2540- state -> bs_numtuples = brinshared -> indtuples ;
2580+ SpinLockRelease (& brinshared -> mutex );
2581+ break ;
2582+ }
2583+ SpinLockRelease (& brinshared -> mutex );
2584+
2585+ ConditionVariableSleep (& brinshared -> workersdonecv ,
2586+ WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN );
2587+ }
2588+
2589+ ConditionVariableCancelSleep ();
2590+
2591+ return state -> bs_reltuples ;
2592+ }
2593+
2594+ /*
2595+ * Within leader, wait for end of heap scan and merge per-worker results.
2596+ *
2597+ * After waiting for all workers to finish, merge the per-worker results into
2598+ * the complete index. The results from each worker are sorted by block number
2599+ * (start of the page range). While combinig the per-worker results we merge
2600+ * summaries for the same page range, and also fill-in empty summaries for
2601+ * ranges without any tuples.
2602+ *
2603+ * Returns the total number of heap tuples scanned.
2604+ */
2605+ static double
2606+ _brin_parallel_merge (BrinBuildState * state )
2607+ {
2608+ BrinTuple * btup ;
2609+ BrinMemTuple * memtuple = NULL ;
2610+ Size tuplen ;
2611+ BlockNumber prevblkno = InvalidBlockNumber ;
2612+ MemoryContext rangeCxt ,
2613+ oldCxt ;
2614+ double reltuples ;
2615+
2616+ /* wait for workers to scan table and produce partial results */
2617+ reltuples = _brin_parallel_heapscan (state );
25412618
25422619 /* do the actual sort in the leader */
25432620 tuplesort_performsort (state -> bs_sortstate );
@@ -2569,7 +2646,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
25692646 while ((btup = tuplesort_getbrintuple (state -> bs_sortstate , & tuplen , true)) != NULL )
25702647 {
25712648 /* Ranges should be multiples of pages_per_range for the index. */
2572- Assert (btup -> bt_blkno % brinshared -> pagesPerRange == 0 );
2649+ Assert (btup -> bt_blkno % state -> bs_leader -> brinshared -> pagesPerRange == 0 );
25732650
25742651 /*
25752652 * Do we need to union summaries for the same page range?
@@ -2665,20 +2742,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
26652742 MemoryContextSwitchTo (oldCxt );
26662743 MemoryContextDelete (rangeCxt );
26672744
2668- /*
2669- * Next, accumulate WAL usage. (This must wait for the workers to finish,
2670- * or we might get incomplete data.)
2671- */
2672- for (i = 0 ; i < brinleader -> pcxt -> nworkers_launched ; i ++ )
2673- InstrAccumParallelQuery (& brinleader -> bufferusage [i ], & brinleader -> walusage [i ]);
2674-
2675- cleanup :
2676-
2677- /* Free last reference to MVCC snapshot, if one was used */
2678- if (IsMVCCSnapshot (brinleader -> snapshot ))
2679- UnregisterSnapshot (brinleader -> snapshot );
2680- DestroyParallelContext (brinleader -> pcxt );
2681- ExitParallelMode ();
2745+ return reltuples ;
26822746}
26832747
26842748/*
0 commit comments