@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5555static TupleTableSlot * gather_merge_getnext (GatherMergeState * gm_state );
5656static HeapTuple gm_readnext_tuple (GatherMergeState * gm_state , int nreader ,
5757 bool nowait , bool * done );
58- static void gather_merge_init (GatherMergeState * gm_state );
5958static void ExecShutdownGatherMergeWorkers (GatherMergeState * node );
59+ static void gather_merge_setup (GatherMergeState * gm_state );
60+ static void gather_merge_init (GatherMergeState * gm_state );
61+ static void gather_merge_clear_tuples (GatherMergeState * gm_state );
6062static bool gather_merge_readnext (GatherMergeState * gm_state , int reader ,
6163 bool nowait );
6264static void load_tuple_array (GatherMergeState * gm_state , int reader );
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
149151 }
150152
151153 /*
152- * store the tuple descriptor into gather merge state, so we can use it
153- * later while initializing the gather merge slots.
154+ * Store the tuple descriptor into gather merge state, so we can use it
155+ * while initializing the gather merge slots.
154156 */
155157 if (!ExecContextForcesOids (& gm_state -> ps , & hasoid ))
156158 hasoid = false;
157159 tupDesc = ExecTypeFromTL (outerNode -> targetlist , hasoid );
158160 gm_state -> tupDesc = tupDesc ;
159161
162+ /* Now allocate the workspace for gather merge */
163+ gather_merge_setup (gm_state );
164+
160165 return gm_state ;
161166}
162167
@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
340345 /* Make sure any existing workers are gracefully shut down */
341346 ExecShutdownGatherMergeWorkers (node );
342347
348+ /* Free any unused tuples, so we don't leak memory across rescans */
349+ gather_merge_clear_tuples (node );
350+
343351 /* Mark node so that shared state will be rebuilt at next call */
344352 node -> initialized = false;
345353 node -> gm_initialized = false;
@@ -370,49 +378,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
370378}
371379
372380/*
373- * Initialize the Gather merge tuple read.
381+ * Set up the data structures that we'll need for Gather Merge.
382+ *
383+ * We allocate these once on the basis of gm->num_workers, which is an
384+ * upper bound for the number of workers we'll actually have. During
385+ * a rescan, we reset the structures to empty. This approach simplifies
386+ * not leaking memory across rescans.
374387 *
375- * Pull at least a single tuple from each worker + leader and set up the heap.
388+ * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
389+ * are for workers. The values placed into gm_heap correspond to indexes
390+ * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
391+ * 0 to n-1; it has no entry for the leader.
376392 */
377393static void
378- gather_merge_init (GatherMergeState * gm_state )
394+ gather_merge_setup (GatherMergeState * gm_state )
379395{
380- int nreaders = gm_state -> nreaders ;
381- bool nowait = true ;
396+ GatherMerge * gm = castNode ( GatherMerge , gm_state -> ps . plan ) ;
397+ int nreaders = gm -> num_workers ;
382398 int i ;
383399
384400 /*
385401 * Allocate gm_slots for the number of workers + one more slot for leader.
386- * Last slot is always for leader. Leader always calls ExecProcNode() to
387- * read the tuple which will return the TupleTableSlot. Later it will
388- * directly get assigned to gm_slot. So just initialize leader gm_slot
389- * with NULL. For other slots, code below will call
390- * ExecInitExtraTupleSlot() to create a slot for the worker's results.
402+ * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
403+ * read the tuple, and then stores it directly into its gm_slots entry.
404+ * For other slots, code below will call ExecInitExtraTupleSlot() to
405+ * create a slot for the worker's results. Note that during any single
406+ * scan, we might have fewer than num_workers available workers, in which
407+ * case the extra array entries go unused.
391408 */
392- gm_state -> gm_slots =
393- palloc ((gm_state -> nreaders + 1 ) * sizeof (TupleTableSlot * ));
394- gm_state -> gm_slots [gm_state -> nreaders ] = NULL ;
395-
396- /* Initialize the tuple slot and tuple array for each worker */
397- gm_state -> gm_tuple_buffers =
398- (GMReaderTupleBuffer * ) palloc0 (sizeof (GMReaderTupleBuffer ) *
399- gm_state -> nreaders );
400- for (i = 0 ; i < gm_state -> nreaders ; i ++ )
409+ gm_state -> gm_slots = (TupleTableSlot * * )
410+ palloc0 ((nreaders + 1 ) * sizeof (TupleTableSlot * ));
411+
412+ /* Allocate the tuple slot and tuple array for each worker */
413+ gm_state -> gm_tuple_buffers = (GMReaderTupleBuffer * )
414+ palloc0 (nreaders * sizeof (GMReaderTupleBuffer ));
415+
416+ for (i = 0 ; i < nreaders ; i ++ )
401417 {
402418 /* Allocate the tuple array with length MAX_TUPLE_STORE */
403419 gm_state -> gm_tuple_buffers [i ].tuple =
404420 (HeapTuple * ) palloc0 (sizeof (HeapTuple ) * MAX_TUPLE_STORE );
405421
406- /* Initialize slot for worker */
407- gm_state -> gm_slots [i ] = ExecInitExtraTupleSlot (gm_state -> ps .state );
408- ExecSetSlotDescriptor (gm_state -> gm_slots [i ],
422+ /* Initialize tuple slot for worker */
423+ gm_state -> gm_slots [i + 1 ] = ExecInitExtraTupleSlot (gm_state -> ps .state );
424+ ExecSetSlotDescriptor (gm_state -> gm_slots [i + 1 ],
409425 gm_state -> tupDesc );
410426 }
411427
412428 /* Allocate the resources for the merge */
413- gm_state -> gm_heap = binaryheap_allocate (gm_state -> nreaders + 1 ,
429+ gm_state -> gm_heap = binaryheap_allocate (nreaders + 1 ,
414430 heap_compare_slots ,
415431 gm_state );
432+ }
433+
434+ /*
435+ * Initialize the Gather Merge.
436+ *
437+ * Reset data structures to ensure they're empty. Then pull at least one
438+ * tuple from leader + each worker (or set its "done" indicator), and set up
439+ * the heap.
440+ */
441+ static void
442+ gather_merge_init (GatherMergeState * gm_state )
443+ {
444+ int nreaders = gm_state -> nreaders ;
445+ bool nowait = true;
446+ int i ;
447+
448+ /* Assert that gather_merge_setup made enough space */
449+ Assert (nreaders <= castNode (GatherMerge , gm_state -> ps .plan )-> num_workers );
450+
451+ /* Reset leader's tuple slot to empty */
452+ gm_state -> gm_slots [0 ] = NULL ;
453+
454+ /* Reset the tuple slot and tuple array for each worker */
455+ for (i = 0 ; i < nreaders ; i ++ )
456+ {
457+ /* Reset tuple array to empty */
458+ gm_state -> gm_tuple_buffers [i ].nTuples = 0 ;
459+ gm_state -> gm_tuple_buffers [i ].readCounter = 0 ;
460+ /* Reset done flag to not-done */
461+ gm_state -> gm_tuple_buffers [i ].done = false;
462+ /* Ensure output slot is empty */
463+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
464+ }
465+
466+ /* Reset binary heap to empty */
467+ binaryheap_reset (gm_state -> gm_heap );
416468
417469 /*
418470 * First, try to read a tuple from each worker (including leader) in
@@ -422,14 +474,13 @@ gather_merge_init(GatherMergeState *gm_state)
422474 * least one tuple) to the heap.
423475 */
424476reread :
425- for (i = 0 ; i < nreaders + 1 ; i ++ )
477+ for (i = 0 ; i <= nreaders ; i ++ )
426478 {
427479 CHECK_FOR_INTERRUPTS ();
428480
429- /* ignore this source if already known done */
430- if ((i < nreaders ) ?
431- !gm_state -> gm_tuple_buffers [i ].done :
432- gm_state -> need_to_scan_locally )
481+ /* skip this source if already known done */
482+ if ((i == 0 ) ? gm_state -> need_to_scan_locally :
483+ !gm_state -> gm_tuple_buffers [i - 1 ].done )
433484 {
434485 if (TupIsNull (gm_state -> gm_slots [i ]))
435486 {
@@ -450,9 +501,9 @@ gather_merge_init(GatherMergeState *gm_state)
450501 }
451502
452503 /* need not recheck leader, since nowait doesn't matter for it */
453- for (i = 0 ; i < nreaders ; i ++ )
504+ for (i = 1 ; i <= nreaders ; i ++ )
454505 {
455- if (!gm_state -> gm_tuple_buffers [i ].done &&
506+ if (!gm_state -> gm_tuple_buffers [i - 1 ].done &&
456507 TupIsNull (gm_state -> gm_slots [i ]))
457508 {
458509 nowait = false;
@@ -467,23 +518,23 @@ gather_merge_init(GatherMergeState *gm_state)
467518}
468519
469520/*
470- * Clear out the tuple table slots for each gather merge input.
521+ * Clear out the tuple table slot, and any unused pending tuples,
522+ * for each gather merge input.
471523 */
472524static void
473- gather_merge_clear_slots (GatherMergeState * gm_state )
525+ gather_merge_clear_tuples (GatherMergeState * gm_state )
474526{
475527 int i ;
476528
477529 for (i = 0 ; i < gm_state -> nreaders ; i ++ )
478530 {
479- pfree (gm_state -> gm_tuple_buffers [i ].tuple );
480- ExecClearTuple (gm_state -> gm_slots [i ]);
481- }
531+ GMReaderTupleBuffer * tuple_buffer = & gm_state -> gm_tuple_buffers [i ];
482532
483- /* Free tuple array as we don't need it any more */
484- pfree (gm_state -> gm_tuple_buffers );
485- /* Free the binaryheap, which was created for sort */
486- binaryheap_free (gm_state -> gm_heap );
533+ while (tuple_buffer -> readCounter < tuple_buffer -> nTuples )
534+ heap_freetuple (tuple_buffer -> tuple [tuple_buffer -> readCounter ++ ]);
535+
536+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
537+ }
487538}
488539
489540/*
@@ -526,7 +577,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
526577 if (binaryheap_empty (gm_state -> gm_heap ))
527578 {
528579 /* All the queues are exhausted, and so is the heap */
529- gather_merge_clear_slots (gm_state );
580+ gather_merge_clear_tuples (gm_state );
530581 return NULL ;
531582 }
532583 else
@@ -548,10 +599,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
548599 int i ;
549600
550601 /* Don't do anything if this is the leader. */
551- if (reader == gm_state -> nreaders )
602+ if (reader == 0 )
552603 return ;
553604
554- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
605+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
555606
556607 /* If there's nothing in the array, reset the counters to zero. */
557608 if (tuple_buffer -> nTuples == tuple_buffer -> readCounter )
@@ -590,7 +641,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
590641 * If we're being asked to generate a tuple from the leader, then we just
591642 * call ExecProcNode as normal to produce one.
592643 */
593- if (gm_state -> nreaders == reader )
644+ if (reader == 0 )
594645 {
595646 if (gm_state -> need_to_scan_locally )
596647 {
@@ -601,7 +652,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
601652
602653 if (!TupIsNull (outerTupleSlot ))
603654 {
604- gm_state -> gm_slots [reader ] = outerTupleSlot ;
655+ gm_state -> gm_slots [0 ] = outerTupleSlot ;
605656 return true;
606657 }
607658 /* need_to_scan_locally serves as "done" flag for leader */
@@ -611,7 +662,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
611662 }
612663
613664 /* Otherwise, check the state of the relevant tuple buffer. */
614- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
665+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
615666
616667 if (tuple_buffer -> nTuples > tuple_buffer -> readCounter )
617668 {
@@ -621,8 +672,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
621672 else if (tuple_buffer -> done )
622673 {
623674 /* Reader is known to be exhausted. */
624- DestroyTupleQueueReader (gm_state -> reader [reader ]);
625- gm_state -> reader [reader ] = NULL ;
675+ DestroyTupleQueueReader (gm_state -> reader [reader - 1 ]);
676+ gm_state -> reader [reader - 1 ] = NULL ;
626677 return false;
627678 }
628679 else
@@ -649,14 +700,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
649700 ExecStoreTuple (tup , /* tuple to store */
650701 gm_state -> gm_slots [reader ], /* slot in which to store the
651702 * tuple */
652- InvalidBuffer , /* buffer associated with this tuple */
653- true); /* pfree this pointer if not from heap */
703+ InvalidBuffer , /* no buffer associated with tuple */
704+ true); /* pfree tuple when done with it */
654705
655706 return true;
656707}
657708
658709/*
659- * Attempt to read a tuple from given reader .
710+ * Attempt to read a tuple from given worker .
660711 */
661712static HeapTuple
662713gm_readnext_tuple (GatherMergeState * gm_state , int nreader , bool nowait ,
@@ -671,7 +722,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
671722 CHECK_FOR_INTERRUPTS ();
672723
673724 /* Attempt to read a tuple. */
674- reader = gm_state -> reader [nreader ];
725+ reader = gm_state -> reader [nreader - 1 ];
675726
676727 /* Run TupleQueueReaders in per-tuple context */
677728 tupleContext = gm_state -> ps .ps_ExprContext -> ecxt_per_tuple_memory ;
0 commit comments