It is interesting to try using a custom aggregation function in window functions.
Also, compare a query with such an aggregating function with other methods (recursion and self-join).
I will take example from Question and Answer (How to calculate an exponential moving average on postgres?)
EMA calculation (https://en.wikipedia.org/wiki/Exponential_smoothing)
T(0)=v(0)
T(n)=T(n-1)*alpha + v(n)*(1-alpha)
CREATE FUNCTION and CREATE AGGREGATE.
create or replace function ema_func(state numeric, inval numeric, alpha numeric)
returns numeric
language plpgsql as $$
begin
-- logger
insert into logproc(state, inval,alpha)
values(state,inval,alpha);
-- aggregate calculation
return case
when state is null then inval -- first call
else state * alpha+ (1-alpha) * inval -- next calls
end;
end
$$;
create aggregate ema(numeric, numeric) (sfunc = ema_func, stype = numeric);
Added logging to see what calls to the user function will be made during the aggregation process and with which parameters. (table logproc)
WITH data AS(
SELECT *,
GREATEST(
high - low,
ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
ABS(low - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
) AS true_range
FROM core_price
)
select *
,cast( ema(true_range, 13.0/14.0) over(w) as decimal(38,34)) as average_true_range
from data window w as (partition by symbol_id order by timestamp)
;
| id |
symbol_id |
timestamp |
low |
high |
close |
true_range |
average_true_range |
| 1 |
1 |
1 |
1.000000 |
3.000000 |
2.000000 |
2.000000 |
2.0000000000000000000000000000000000 |
| 2 |
1 |
2 |
2.000000 |
6.000000 |
4.000000 |
4.000000 |
2.1428571428571428571400000000000000 |
| 3 |
1 |
3 |
4.000000 |
6.000000 |
5.000000 |
2.000000 |
2.1326530612244897959159183673469388 |
| 4 |
1 |
4 |
3.000000 |
5.000000 |
4.000000 |
2.000000 |
2.1231778425655976676363994169096210 |
Let's see log table.
For first call "state" is null.
select id,inval,alpha, state from logproc limit 20;
| id |
inval |
alpha |
state |
| 1 |
2.000000 |
0.92857142857142857143 |
null |
| 2 |
4.000000 |
0.92857142857142857143 |
2.000000 |
| 3 |
2.000000 |
0.92857142857142857143 |
2.14285714285714285714000000 |
| 4 |
2.000000 |
0.92857142857142857143 |
2.1326530612244897959159183673469387755102000000 |
fiddle
It is useful to see the query execution plans.
I created a table with 4K rows. Considering @MatBailie's comment in the question comment, this should be enough.
I can tell right away that the custom aggregate function works great.
Queries an EXPLAIN ANALYZE
-- original query by user31749517
EXPLAIN ANALYZE VERBOSE
with recursive p as (
select *,
greatest(
high - low,
abs(high - lag(close) over (order by timestamp)),
abs(low - lag(close) over (order by timestamp))
) as true_range,
row_number() over (order by timestamp) as seqnum
from core_price
),
cte as (
select id,seqnum, high, low, close, timestamp, true_range,
true_range::float as average_true_range
from p
where seqnum = 1
union all
select p.id,p.seqnum,p.high,p.low,p.close,p.timestamp,p.true_range,
(cte.average_true_range * 13.0 / 14 + p.true_range / 14) as average_true_range
from cte
join p on p.seqnum = cte.seqnum + 1
)
select *
from cte
order by seqnum;
QUERY PLAN
| Sort (cost=2800.82..2836.05 rows=14092 width=134) (actual time=3648.743..3649.070 rows=4000 loops=1) |
| Output: cte.id, cte.seqnum, cte.high, cte.low, cte.close, cte."timestamp", cte.true_range, cte.average_true_range |
| Sort Key: cte.seqnum |
| Sort Method: quicksort Memory: 659kB |
| CTE p |
| -> WindowAgg (cost=188.79..270.90 rows=2346 width=130) (actual time=3.652..10.382 rows=4000 loops=1) |
| Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))), row_number() OVER (?) |
| -> Sort (cost=188.79..194.65 rows=2346 width=90) (actual time=3.497..3.826 rows=4000 loops=1) |
| Output: core_price."timestamp", core_price.id, core_price.symbol_id, core_price.low, core_price.high, core_price.close |
| Sort Key: core_price."timestamp" |
| Sort Method: quicksort Memory: 409kB |
| -> Seq Scan on public.core_price (cost=0.00..57.46 rows=2346 width=90) (actual time=0.023..1.336 rows=4000 loops=1) |
| Output: core_price."timestamp", core\_price.id, core_price.symbol_id, core_price.low, core_price.high, core_price.close |
| CTE cte |
| -> Recursive Union (cost=0.00..1276.96 rows=14092 width=134) (actual time=3.661..3637.728 rows=4000 loops=1) |
| -> CTE Scan on p (cost=0.00..52.81 rows=12 width=134) (actual time=3.660..14.008 rows=1 loops=1) |
| Output: p.id, p.seqnum, p.high, p.low, p.close, p."timestamp", p.true_range, (p.true_range)::double precision |
| Filter: (p.seqnum = 1) |
| Rows Removed by Filter: 3999 |
| -> Hash Join (cost=3.90..94.23 rows=1408 width=134) (actual time=0.464..0.902 rows=1 loops=4000) |
| Output: p_1.id, p_1.seqnum, p_1.high, p_1.low, p_1.close, p_1."timestamp", p_1.true_range, (((cte_1.average_true_range \* '13'::double precision) \/ '14'::double precision) + ((p_1.true\_range \/ '14'::numeric))::double precision) |
| Hash Cond: (p_1.seqnum = (cte_1.seqnum + 1)) |
| -> CTE Scan on p p_1 (cost=0.00..46.92 rows=2346 width=126) (actual time=0.000..0.345 rows=4000 loops=4000) |
| Output: p_1.id, p_1.symbol_id, p_1."timestamp", p_1.low, p_1.high, p_1.close, p_1.true_range, p_1.seqnum |
| -> Hash (cost=2.40..2.40 rows=120 width=16) (actual time=0.001..0.001 rows=1 loops=4000) |
| Output: cte_1.average_true_range, cte_1.seqnum |
| Buckets: 1024 Batches: 1 Memory Usage: 9kB |
| -> WorkTable Scan on cte cte_1 (cost=0.00..2.40 rows=120 width=16) (actual time=0.000..0.000 rows=1 loops=4000) |
| Output: cte_1.average_true_range, cte_1.seqnum |
| -> CTE Scan on cte (cost=0.00..281.84 rows=14092 width=134) (actual time=3.662..3643.207 rows=4000 loops=1) |
| Output: cte.id, cte.seqnum, cte.high, cte.low, cte.close, cte."timestamp", cte.true_range, cte.average_true\_range |
| Planning Time: 0.802 ms |
| Execution Time: 3650.009 ms |
Next query create huge intermediate rowset 4000 rows-->8 002 000 rows. Aggregation of this rows take main part of query.
In practice, I think it is possible to limit the depth of summation to values of the order of 50-100 rows for alpha=13.0/14.0.
power(13.0/14.0,100)=0.0006.
power(13.0/14.0, 60)=0.0117
Then the query will run noticeably faster.
-- query example by MatBailie
EXPLAIN ANALYZE VERBOSE
WITH enumerated AS(
SELECT
*,
GREATEST(
high - low,
ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
ABS(low - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
) AS true_range,
ROW_NUMBER() OVER (PARTITION BY symbol_id ORDER BY timestamp) AS seqnum
FROM core_price
)
SELECT c.id,
c.symbol_id,
c.timestamp,
SUM(
POWER(13.0 / 14.0, c.seqnum - p.seqnum)
*
CASE
WHEN p.seqnum = 1 THEN p.true_range
ELSE p.true_range / 14.0
END
) as average_true_range
FROM enumerated AS c -- current
INNER JOIN enumerated AS p -- preceding
ON p.symbol_id = c.symbol_id
AND p.timestamp <= c.timestamp
GROUP BY c.id,c.symbol_id, c.timestamp
ORDER BY c.id,c.symbol_id, c.timestamp
QUERY PLAN
| Sort (cost=1368.09..1368.68 rows=235 width=44) (actual time=15570.760..15570.996 rows=4000 loops=1) |
| Output: c.id, c.symbol_id, c."timestamp", (sum((power(0.92857142857142857143, ((c.seqnum - p.seqnum))::numeric) \* CASE WHEN (p.seqnum = 1) THEN p.true_range ELSE (p.true\_range \/ 14.0) END))) |
| Sort Key: c.id, c.symbol_id, c."timestamp" |
| Sort Method: quicksort Memory: 409kB |
| CTE enumerated |
| -> WindowAgg (cost=188.79..276.76 rows=2346 width=130) (actual time=1.669..8.050 rows=4000 loops=1) |
| Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))), row_number() OVER (?) |
| -> Sort (cost=188.79..194.65 rows=2346 width=90) (actual time=1.656..1.970 rows=4000 loops=1) |
| Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
| Sort Key: core_price.symbol_id, core_price."timestamp" |
| Sort Method: quicksort Memory: 409kB |
| -> Seq Scan on public.core_price (cost=0.00..57.46 rows=2346 width=90) (actual time=0.015..0.714 rows=4000 loops=1) |
| Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
| -> HashAggregate (cost=1079.14..1082.07 rows=235 width=44) (actual time=15566.871..15569.373 rows=4000 loops=1) |
| Output: c.id, c.symbol_id, c."timestamp", sum((power(0.92857142857142857143, ((c.seqnum - p.seqnum))::numeric) \* CASE WHEN (p.seqnum = 1) THEN p.true_range ELSE (p.true_range \/ 14.0) END)) |
| Group Key: c.id, c.symbol\_id, c."timestamp" |
| Batches: 1 Memory Usage: 2257kB |
| -> Merge Join (cost=356.50..849.81 rows=9173 width=60) (actual time=13.322..3899.459 rows=8002000 loops=1) |
| Output: c.id, c.symbol\id, c."timestamp", c.seqnum, p.seqnum, p.true_range |
| Merge Cond: (c.symbol_id = p.symbol_id) |
| Join Filter: (p."timestamp" \<= c."timestamp") |
| Rows Removed by Join Filter: 7998000 |
| -> Sort (cost=178.25..184.11 rows=2346 width=20) (actual time=11.523..12.697 rows=4000 loops=1) |
| Output: c.id, c.symbol_id, c."timestamp", c.seqnum |
| Sort Key: c.symbol_id |
| Sort Method: quicksort Memory: 409kB |
| -> CTE Scan on enumerated c (cost=0.00..46.92 rows=2346 width=20) (actual time=1.671..10.584 rows=4000 loops=1) |
| Output: c.id, c.symbol_id, c."timestamp", c.seqnum |
| -> Sort (cost=178.25..184.11 rows=2346 width=48) (actual time=1.788..1012.801 rows=15996001 loops=1) |
| Output: p.seqnum, p.true_range, p.symbol_id, p."timestamp" |
| Sort Key: p.symbol_id |
| Sort Method: quicksort Memory: 409kB |
| -> CTE Scan on enumerated p (cost=0.00..46.92 rows=2346 width=48) (actual time=0.002..0.747 rows=4000 loops=1) |
| Output: p.seqnum, p.true_range, p.symbol_id, p."timestamp" |
| Planning Time: 0.362 ms |
| Execution Time: 15571.376 ms |
-- query using custom aggregate function
EXPLAIN ANALYZE VERBOSE
WITH data AS(
SELECT *,
GREATEST(
high - low,
ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
ABS(low - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
) AS true_range
FROM core_price
)
select *
,cast( ema(true_range, 13.0/14.0) over(w) as decimal(38,34)) as average_true_range
from data window w as (PARTITION BY symbol_id ORDER BY timestamp)
;
QUERY PLAN
| WindowAgg (cost=0.28..442.59 rows=4000 width=98) (actual time=0.458..138.641 rows=4000 loops=1) |
| Output: data.id, data.symbol_id, data."timestamp", data.low, data.high, data.close, data.true_range, (ema(data.true_range, 0.92857142857142857143) OVER (?))::numeric(38,34) |
| -> Subquery Scan on data (cost=0.28..362.59 rows=4000 width=68) (actual time=0.031..12.633 rows=4000 loops=1) |
| Output: data.symbol_id, data."timestamp", data.id, data.low, data.high, data.close, data.true_range |
| -> WindowAgg (cost=0.28..322.59 rows=4000 width=68) (actual time=0.030..11.816 rows=4000 loops=1) |
| Output: core_price.id, core_price.symbol\_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))) |
| -> Index Scan using ix_coreprice_symbol_timestamp on public.core_price (cost=0.28..192.59 rows=4000 width=36) (actual time=0.019..3.398 rows=4000 loops=1) |
| Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
| Planning Time: 0.302 ms |
| Execution Time: 139.140 ms |
fiddle
Query plan for 20K rows
QUERY PLAN
WindowAgg (cost=0.29..1789.32 rows=11523 width=152) (actual time=0.528..775.657 rows=20000 loops=1)
Output: data.id, data.symbol_id, data."timestamp", data.low, data.high, data.close, data.true_range, (ema(data.true_range, 0.92857142857142857143) OVER (?))::numeric(38,34)
-> Subquery Scan on data (cost=0.29..1558.86 rows=11523 width=122) (actual time=0.033..67.436 rows=20000 loops=1)
Output: data.symbol_id, data."timestamp", data.id, data.low, data.high, data.close, data.true_range
-> WindowAgg (cost=0.29..1443.63 rows=11523 width=122) (actual time=0.032..62.921 rows=20000 loops=1)
Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?))))
-> Index Scan using ix_coreprice_symbol_timestamp on public.core_price (cost=0.29..1069.13 rows=11523 width=90) (actual time=0.020..19.198 rows=20000 loops=1)
Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close
Planning Time: 0.234 ms
Execution Time: 778.026 ms
Fiddle
CREATE FUNCTIONandCREATE AGGREGATE?