1010 * transvalue = transfunc(transvalue, input_value(s))
1111 * result = finalfunc(transvalue, direct_argument(s))
1212 *
13- * If a finalfunc is not supplied or finalizeAggs is false, then the result
14- * is just the ending value of transvalue.
15- *
16- * Other behavior is also supported and is controlled by the 'combineStates'
17- * and 'finalizeAggs'. 'combineStates' controls whether the trans func or
18- * the combine func is used during aggregation. When 'combineStates' is
19- * true we expect other (previously) aggregated states as input rather than
20- * input tuples. This mode facilitates multiple aggregate stages which
21- * allows us to support pushing aggregation down deeper into the plan rather
22- * than leaving it for the final stage. For example with a query such as:
23- *
24- * SELECT count(*) FROM (SELECT * FROM a UNION ALL SELECT * FROM b);
25- *
26- * with this functionality the planner has the flexibility to generate a
27- * plan which performs count(*) on table a and table b separately and then
28- * add a combine phase to combine both results. In this case the combine
29- * function would simply add both counts together.
30- *
31- * When multiple aggregate stages exist the planner should have set the
32- * 'finalizeAggs' to true only for the final aggregtion state, and each
33- * stage, apart from the very first one should have 'combineStates' set to
34- * true. This permits plans such as:
35- *
36- * Finalize Aggregate
37- * -> Partial Aggregate
38- * -> Partial Aggregate
39- *
40- * Combine functions which use pass-by-ref states should be careful to
41- * always update the 1st state parameter by adding the 2nd parameter to it,
42- * rather than the other way around. If the 1st state is NULL, then it's not
43- * sufficient to simply return the 2nd state, as the memory context is
44- * incorrect. Instead a new state should be created in the correct aggregate
45- * memory context and the 2nd state should be copied over.
46- *
47- * The 'serialStates' option can be used to allow multi-stage aggregation
48- * for aggregates with an INTERNAL state type. When this mode is disabled
49- * only a pointer to the INTERNAL aggregate states are passed around the
50- * executor. When enabled, INTERNAL states are serialized and deserialized
51- * as required; this is useful when data must be passed between processes.
13+ * If a finalfunc is not supplied then the result is just the ending
14+ * value of transvalue.
15+ *
16+ * Other behaviors can be selected by the "aggsplit" mode, which exists
17+ * to support partial aggregation. It is possible to:
18+ * * Skip running the finalfunc, so that the output is always the
19+ * final transvalue state.
20+ * * Substitute the combinefunc for the transfunc, so that transvalue
21+ * states (propagated up from a child partial-aggregation step) are merged
22+ * rather than processing raw input rows. (The statements below about
23+ * the transfunc apply equally to the combinefunc, when it's selected.)
24+ * * Apply the serializefunc to the output values (this only makes sense
25+ * when skipping the finalfunc, since the serializefunc works on the
26+ * transvalue data type).
27+ * * Apply the deserializefunc to the input values (this only makes sense
28+ * when using the combinefunc, for similar reasons).
29+ * It is the planner's responsibility to connect up Agg nodes using these
30+ * alternate behaviors in a way that makes sense, with partial aggregation
31+ * results being fed to nodes that expect them.
5232 *
5333 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
5434 * input tuples and eliminate duplicates (if required) before performing
5535 * the above-depicted process. (However, we don't do that for ordered-set
5636 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
57- * so far as this module is concerned.)
37+ * so far as this module is concerned.) Note that partial aggregation
38+ * is not supported in these cases, since we couldn't ensure global
39+ * ordering or distinctness of the inputs.
5840 *
5941 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
6042 * then the first non-NULL input_value is assigned directly to transvalue,
@@ -862,8 +844,6 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
862844 int numGroupingSets = Max (aggstate -> phase -> numsets , 1 );
863845 int numTrans = aggstate -> numtrans ;
864846
865- Assert (!aggstate -> combineStates );
866-
867847 for (transno = 0 ; transno < numTrans ; transno ++ )
868848 {
869849 AggStatePerTrans pertrans = & aggstate -> pertrans [transno ];
@@ -948,9 +928,11 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
948928}
949929
950930/*
951- * combine_aggregates is used when running in 'combineState' mode. This
952- * advances each aggregate transition state by adding another transition state
953- * to it.
931+ * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
932+ * mode. The principal difference is that here we may need to apply the
933+ * deserialization function before running the transfn (which, in this mode,
934+ * is actually the aggregate's combinefn). Also, we know we don't need to
935+ * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
954936 */
955937static void
956938combine_aggregates (AggState * aggstate , AggStatePerGroup pergroup )
@@ -960,14 +942,13 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
960942
961943 /* combine not supported with grouping sets */
962944 Assert (aggstate -> phase -> numsets == 0 );
963- Assert (aggstate -> combineStates );
964945
965946 for (transno = 0 ; transno < numTrans ; transno ++ )
966947 {
967948 AggStatePerTrans pertrans = & aggstate -> pertrans [transno ];
949+ AggStatePerGroup pergroupstate = & pergroup [transno ];
968950 TupleTableSlot * slot ;
969951 FunctionCallInfo fcinfo = & pertrans -> transfn_fcinfo ;
970- AggStatePerGroup pergroupstate = & pergroup [transno ];
971952
972953 /* Evaluate the current input expressions for this aggregate */
973954 slot = ExecProject (pertrans -> evalproj , NULL );
@@ -979,15 +960,12 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
979960 */
980961 if (OidIsValid (pertrans -> deserialfn_oid ))
981962 {
982- /*
983- * Don't call a strict deserialization function with NULL input. A
984- * strict deserialization function and a null value means we skip
985- * calling the combine function for this state. We assume that
986- * this would be a waste of time and effort anyway so just skip
987- * it.
988- */
963+ /* Don't call a strict deserialization function with NULL input */
989964 if (pertrans -> deserialfn .fn_strict && slot -> tts_isnull [0 ])
990- continue ;
965+ {
966+ fcinfo -> arg [1 ] = slot -> tts_values [0 ];
967+ fcinfo -> argnull [1 ] = slot -> tts_isnull [0 ];
968+ }
991969 else
992970 {
993971 FunctionCallInfo dsinfo = & pertrans -> deserialfn_fcinfo ;
@@ -1110,7 +1088,6 @@ advance_combine_function(AggState *aggstate,
11101088 pergroupstate -> transValueIsNull = fcinfo -> isnull ;
11111089
11121090 MemoryContextSwitchTo (oldContext );
1113-
11141091}
11151092
11161093
@@ -1415,7 +1392,7 @@ finalize_aggregate(AggState *aggstate,
14151392}
14161393
14171394/*
1418- * Compute the final value of one partial aggregate.
1395+ * Compute the output value of one partial aggregate.
14191396 *
14201397 * The serialization function will be run, and the result delivered, in the
14211398 * output-tuple context; caller's CurrentMemoryContext does not matter.
@@ -1432,8 +1409,8 @@ finalize_partialaggregate(AggState *aggstate,
14321409 oldContext = MemoryContextSwitchTo (aggstate -> ss .ps .ps_ExprContext -> ecxt_per_tuple_memory );
14331410
14341411 /*
1435- * serialfn_oid will be set if we must serialize the input state before
1436- * calling the combine function on the state.
1412+ * serialfn_oid will be set if we must serialize the transvalue before
1413+ * returning it
14371414 */
14381415 if (OidIsValid (pertrans -> serialfn_oid ))
14391416 {
@@ -1577,12 +1554,12 @@ finalize_aggregates(AggState *aggstate,
15771554 pergroupstate );
15781555 }
15791556
1580- if (aggstate -> finalizeAggs )
1581- finalize_aggregate (aggstate , peragg , pergroupstate ,
1582- & aggvalues [aggno ], & aggnulls [aggno ]);
1583- else
1557+ if (DO_AGGSPLIT_SKIPFINAL (aggstate -> aggsplit ))
15841558 finalize_partialaggregate (aggstate , peragg , pergroupstate ,
15851559 & aggvalues [aggno ], & aggnulls [aggno ]);
1560+ else
1561+ finalize_aggregate (aggstate , peragg , pergroupstate ,
1562+ & aggvalues [aggno ], & aggnulls [aggno ]);
15861563 }
15871564}
15881565
@@ -2114,10 +2091,10 @@ agg_retrieve_direct(AggState *aggstate)
21142091 */
21152092 for (;;)
21162093 {
2117- if (!aggstate -> combineStates )
2118- advance_aggregates (aggstate , pergroup );
2119- else
2094+ if (DO_AGGSPLIT_COMBINE (aggstate -> aggsplit ))
21202095 combine_aggregates (aggstate , pergroup );
2096+ else
2097+ advance_aggregates (aggstate , pergroup );
21212098
21222099 /* Reset per-input-tuple context after each tuple */
21232100 ResetExprContext (tmpcontext );
@@ -2225,10 +2202,10 @@ agg_fill_hash_table(AggState *aggstate)
22252202 entry = lookup_hash_entry (aggstate , outerslot );
22262203
22272204 /* Advance the aggregates */
2228- if (!aggstate -> combineStates )
2229- advance_aggregates (aggstate , entry -> pergroup );
2230- else
2205+ if (DO_AGGSPLIT_COMBINE (aggstate -> aggsplit ))
22312206 combine_aggregates (aggstate , entry -> pergroup );
2207+ else
2208+ advance_aggregates (aggstate , entry -> pergroup );
22322209
22332210 /* Reset per-input-tuple context after each tuple */
22342211 ResetExprContext (tmpcontext );
@@ -2352,18 +2329,16 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
23522329 aggstate -> aggs = NIL ;
23532330 aggstate -> numaggs = 0 ;
23542331 aggstate -> numtrans = 0 ;
2332+ aggstate -> aggsplit = node -> aggsplit ;
23552333 aggstate -> maxsets = 0 ;
23562334 aggstate -> hashfunctions = NULL ;
23572335 aggstate -> projected_set = -1 ;
23582336 aggstate -> current_set = 0 ;
23592337 aggstate -> peragg = NULL ;
23602338 aggstate -> pertrans = NULL ;
23612339 aggstate -> curpertrans = NULL ;
2362- aggstate -> agg_done = false;
2363- aggstate -> combineStates = node -> combineStates ;
2364- aggstate -> finalizeAggs = node -> finalizeAggs ;
2365- aggstate -> serialStates = node -> serialStates ;
23662340 aggstate -> input_done = false;
2341+ aggstate -> agg_done = false;
23672342 aggstate -> pergroup = NULL ;
23682343 aggstate -> grp_firstTuple = NULL ;
23692344 aggstate -> hashtable = NULL ;
@@ -2681,6 +2656,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
26812656
26822657 /* Planner should have assigned aggregate to correct level */
26832658 Assert (aggref -> agglevelsup == 0 );
2659+ /* ... and the split mode should match */
2660+ Assert (aggref -> aggsplit == aggstate -> aggsplit );
26842661
26852662 /* 1. Check for already processed aggs which can be re-used */
26862663 existing_aggno = find_compatible_peragg (aggref , aggstate , aggno ,
@@ -2724,7 +2701,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
27242701 * If this aggregation is performing state combines, then instead of
27252702 * using the transition function, we'll use the combine function
27262703 */
2727- if (aggstate -> combineStates )
2704+ if (DO_AGGSPLIT_COMBINE ( aggstate -> aggsplit ) )
27282705 {
27292706 transfn_oid = aggform -> aggcombinefn ;
27302707
@@ -2736,39 +2713,45 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
27362713 transfn_oid = aggform -> aggtransfn ;
27372714
27382715 /* Final function only required if we're finalizing the aggregates */
2739- if (aggstate -> finalizeAggs )
2740- peragg -> finalfn_oid = finalfn_oid = aggform -> aggfinalfn ;
2741- else
2716+ if (DO_AGGSPLIT_SKIPFINAL (aggstate -> aggsplit ))
27422717 peragg -> finalfn_oid = finalfn_oid = InvalidOid ;
2718+ else
2719+ peragg -> finalfn_oid = finalfn_oid = aggform -> aggfinalfn ;
27432720
27442721 serialfn_oid = InvalidOid ;
27452722 deserialfn_oid = InvalidOid ;
27462723
27472724 /*
2748- * Determine if we require serialization or deserialization of the
2749- * aggregate states. This is only required if the aggregate state is
2750- * internal.
2725+ * Check if serialization/deserialization is required. We only do it
2726+ * for aggregates that have transtype INTERNAL.
27512727 */
2752- if (aggstate -> serialStates && aggtranstype == INTERNALOID )
2728+ if (aggtranstype == INTERNALOID )
27532729 {
27542730 /*
2755- * The planner should only have generated an agg node with
2756- * serialStates if every aggregate with an INTERNAL state has
2757- * serialization/deserialization functions . Verify that.
2731+ * The planner should only have generated a serialize agg node if
2732+ * every aggregate with an INTERNAL state has a serialization
2733+ * function . Verify that.
27582734 */
2759- if (!OidIsValid (aggform -> aggserialfn ))
2760- elog (ERROR , "serialfunc not set during serialStates aggregation step" );
2761-
2762- if (!OidIsValid (aggform -> aggdeserialfn ))
2763- elog (ERROR , "deserialfunc not set during serialStates aggregation step" );
2735+ if (DO_AGGSPLIT_SERIALIZE (aggstate -> aggsplit ))
2736+ {
2737+ /* serialization only valid when not running finalfn */
2738+ Assert (DO_AGGSPLIT_SKIPFINAL (aggstate -> aggsplit ));
27642739
2765- /* serialization func only required when not finalizing aggs */
2766- if (! aggstate -> finalizeAggs )
2740+ if (! OidIsValid ( aggform -> aggserialfn ))
2741+ elog ( ERROR , "serialfunc not provided for serialization aggregation" );
27672742 serialfn_oid = aggform -> aggserialfn ;
2743+ }
2744+
2745+ /* Likewise for deserialization functions */
2746+ if (DO_AGGSPLIT_DESERIALIZE (aggstate -> aggsplit ))
2747+ {
2748+ /* deserialization only valid when combining states */
2749+ Assert (DO_AGGSPLIT_COMBINE (aggstate -> aggsplit ));
27682750
2769- /* deserialization func only required when combining states */
2770- if ( aggstate -> combineStates )
2751+ if (! OidIsValid ( aggform -> aggdeserialfn ))
2752+ elog ( ERROR , "deserialfunc not provided for deserialization aggregation" );
27712753 deserialfn_oid = aggform -> aggdeserialfn ;
2754+ }
27722755 }
27732756
27742757 /* Check that aggregate owner has permission to call component fns */
@@ -2853,7 +2836,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
28532836 }
28542837
28552838 /* get info about the output value's datatype */
2856- get_typlenbyval (aggref -> aggoutputtype ,
2839+ get_typlenbyval (aggref -> aggtype ,
28572840 & peragg -> resulttypeLen ,
28582841 & peragg -> resulttypeByVal );
28592842
@@ -2972,7 +2955,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
29722955 * transfn and transfn_oid fields of pertrans refer to the combine
29732956 * function rather than the transition function.
29742957 */
2975- if (aggstate -> combineStates )
2958+ if (DO_AGGSPLIT_COMBINE ( aggstate -> aggsplit ) )
29762959 {
29772960 Expr * combinefnexpr ;
29782961
0 commit comments