3434#include "optimizer/planner.h"
3535#include "storage/spin.h"
3636#include "tcop/tcopprot.h"
37+ #include "utils/dsa.h"
3738#include "utils/memutils.h"
3839#include "utils/snapmgr.h"
3940
4748#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
4849#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
4950#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
51+ #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
5052
5153#define PARALLEL_TUPLE_QUEUE_SIZE 65536
5254
@@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
345347 int param_len ;
346348 int instrumentation_len = 0 ;
347349 int instrument_offset = 0 ;
350+ Size dsa_minsize = dsa_minimum_size ();
348351
349352 /* Allocate object for return value. */
350353 pei = palloc0 (sizeof (ParallelExecutorInfo ));
@@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
413416 shm_toc_estimate_keys (& pcxt -> estimator , 1 );
414417 }
415418
419+ /* Estimate space for DSA area. */
420+ shm_toc_estimate_chunk (& pcxt -> estimator , dsa_minsize );
421+ shm_toc_estimate_keys (& pcxt -> estimator , 1 );
422+
416423 /* Everyone's had a chance to ask for space, so now create the DSM. */
417424 InitializeParallelDSM (pcxt );
418425
@@ -466,6 +473,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
466473 pei -> instrumentation = instrumentation ;
467474 }
468475
476+ /*
477+ * Create a DSA area that can be used by the leader and all workers.
478+ * (However, if we failed to create a DSM and are using private memory
479+ * instead, then skip this.)
480+ */
481+ if (pcxt -> seg != NULL )
482+ {
483+ char * area_space ;
484+
485+ area_space = shm_toc_allocate (pcxt -> toc , dsa_minsize );
486+ shm_toc_insert (pcxt -> toc , PARALLEL_KEY_DSA , area_space );
487+ pei -> area = dsa_create_in_place (area_space , dsa_minsize ,
488+ LWTRANCHE_PARALLEL_QUERY_DSA ,
489+ "parallel_query_dsa" ,
490+ pcxt -> seg );
491+ }
492+
493+ /*
494+ * Make the area available to executor nodes running in the leader. See
495+ * also ParallelQueryMain which makes it available to workers.
496+ */
497+ estate -> es_query_dsa = pei -> area ;
498+
469499 /*
470500 * Give parallel-aware nodes a chance to initialize their shared data.
471501 * This also initializes the elements of instrumentation->ps_instrument,
@@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
571601void
572602ExecParallelCleanup (ParallelExecutorInfo * pei )
573603{
604+ if (pei -> area != NULL )
605+ {
606+ dsa_detach (pei -> area );
607+ pei -> area = NULL ;
608+ }
574609 if (pei -> pcxt != NULL )
575610 {
576611 DestroyParallelContext (pei -> pcxt );
@@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
728763 QueryDesc * queryDesc ;
729764 SharedExecutorInstrumentation * instrumentation ;
730765 int instrument_options = 0 ;
766+ void * area_space ;
767+ dsa_area * area ;
731768
732769 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
733770 receiver = ExecParallelGetReceiver (seg , toc );
@@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
739776 /* Prepare to track buffer usage during query execution. */
740777 InstrStartParallelQuery ();
741778
742- /* Start up the executor, have it run the plan, and then shut it down. */
779+ /* Attach to the dynamic shared memory area. */
780+ area_space = shm_toc_lookup (toc , PARALLEL_KEY_DSA );
781+ area = dsa_attach_in_place (area_space , seg );
782+
783+ /* Start up the executor */
743784 ExecutorStart (queryDesc , 0 );
785+
786+ /* Special executor initialization steps for parallel workers */
787+ queryDesc -> planstate -> state -> es_query_dsa = area ;
744788 ExecParallelInitializeWorker (queryDesc -> planstate , toc );
789+
790+ /* Run the plan */
745791 ExecutorRun (queryDesc , ForwardScanDirection , 0L );
792+
793+ /* Shut down the executor */
746794 ExecutorFinish (queryDesc );
747795
748796 /* Report buffer usage during parallel execution. */
@@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
758806 ExecutorEnd (queryDesc );
759807
760808 /* Cleanup. */
809+ dsa_detach (area );
761810 FreeQueryDesc (queryDesc );
762811 (* receiver -> rDestroy ) (receiver );
763812}
0 commit comments