1

I'm trying to calculate the average true range on some time series dataset stored in postgres. Its calculation requires a 14 period exponential moving average of true range which based on the answer here is obtained using:

with recursive p as (
    select *,
           greatest(
               high - low,
               abs(high - lag(close) over (order by timestamp)),
               abs(low - lag(close) over (order by timestamp))
           ) as true_range,
           row_number() over (order by timestamp) as seqnum
    from core_price
    where timestamp > 1751329927
      and symbol_id = 1
    limit 20000
),
cte as (
    select seqnum,
           high,
           low,
           close,
           timestamp,
           true_range,
           true_range::float as average_true_range
    from p
    where seqnum = 1

    union all

    select p.seqnum,
           p.high,
           p.low,
           p.close,
           p.timestamp,
           p.true_range,
           (cte.average_true_range * 13.0 / 14 + p.true_range / 14) as average_true_range
    from cte
    join p on p.seqnum = cte.seqnum + 1
)
select *
from cte
order by seqnum;

It calculates the correct values however, it's very slow (takes 3s on m4 max mbp for 20,000 rows) and there's millions of rows so it's not by any means efficient or acceptable. The same calculation takes milliseconds in pandas using df.ewm for the entire period of 7M rows. How can it be optimized?

One possible way of optimization is to replace ema with sma which can be calculated quickly in postgres without recursion, using window functions but it doesn't calculate the standard ATR and is less sensitive to recent changes in price, so I'm not interested in that approach.

4
  • 2
    Sample input data and expected output is essential Commented Nov 9 at 10:31
  • Are you able to add a custom windows function using CREATE FUNCTION and CREATE AGGREGATE? Commented Nov 9 at 10:39
  • See example with custom average function (stackoverflow.com/questions/8871426/…) Commented Nov 9 at 11:22
  • 1
    High, low and close, imply daily figures. Surely no individual symbol has 20k days of data you want to apply a moving average to? Even if you do want to apply a moving average over a 20k entry time series, value 1 is going to have next to zero impact after a couple of weeks, so why not allow a window size limit of 28 or even 56? Commented Nov 9 at 21:22

2 Answers 2

3

It is interesting to try using a custom aggregation function in window functions.

Also, compare a query with such an aggregating function with other methods (recursion and self-join). I will take example from Question and Answer (How to calculate an exponential moving average on postgres?)

EMA calculation (https://en.wikipedia.org/wiki/Exponential_smoothing)

 T(0)=v(0)
 T(n)=T(n-1)*alpha + v(n)*(1-alpha)

CREATE FUNCTION and CREATE AGGREGATE.

create or replace function ema_func(state numeric, inval numeric, alpha numeric)
  returns numeric
  language plpgsql as $$
begin
     -- logger
  insert into logproc(state, inval,alpha)
  values(state,inval,alpha);
     -- aggregate calculation
  return case
         when state is null then inval         -- first call
         else state * alpha+ (1-alpha) * inval -- next calls 
         end;
end
$$;

create aggregate ema(numeric, numeric) (sfunc = ema_func, stype = numeric);

Added logging to see what calls to the user function will be made during the aggregation process and with which parameters. (table logproc)

WITH data AS(
  SELECT *,
    GREATEST(
      high - low,
      ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
      ABS(low -  LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
    ) AS true_range
  FROM core_price
)
select *
   ,cast( ema(true_range, 13.0/14.0) over(w)  as  decimal(38,34)) as average_true_range
from data window w as (partition by symbol_id order by timestamp) 
;
id symbol_id timestamp low high close true_range average_true_range
1 1 1 1.000000 3.000000 2.000000 2.000000 2.0000000000000000000000000000000000
2 1 2 2.000000 6.000000 4.000000 4.000000 2.1428571428571428571400000000000000
3 1 3 4.000000 6.000000 5.000000 2.000000 2.1326530612244897959159183673469388
4 1 4 3.000000 5.000000 4.000000 2.000000 2.1231778425655976676363994169096210

Let's see log table.
For first call "state" is null.

select id,inval,alpha, state from logproc limit 20;
id inval alpha state
1 2.000000 0.92857142857142857143 null
2 4.000000 0.92857142857142857143 2.000000
3 2.000000 0.92857142857142857143 2.14285714285714285714000000
4 2.000000 0.92857142857142857143 2.1326530612244897959159183673469387755102000000

fiddle

It is useful to see the query execution plans.
I created a table with 4K rows. Considering @MatBailie's comment in the question comment, this should be enough.
I can tell right away that the custom aggregate function works great.

Queries an EXPLAIN ANALYZE

-- original query by user31749517
EXPLAIN ANALYZE VERBOSE 
with recursive p as (
    select *,
           greatest(
               high - low,
               abs(high - lag(close) over (order by timestamp)),
               abs(low - lag(close) over (order by timestamp))
           ) as true_range,
           row_number() over (order by timestamp) as seqnum
    from core_price
),
cte as (
    select id,seqnum, high, low, close, timestamp, true_range,
           true_range::float as average_true_range
    from p
    where seqnum = 1

    union all

    select p.id,p.seqnum,p.high,p.low,p.close,p.timestamp,p.true_range,
       (cte.average_true_range * 13.0 / 14 + p.true_range / 14) as average_true_range
    from cte
    join p on p.seqnum = cte.seqnum + 1
)
select *
from cte
order by seqnum;

QUERY PLAN

| Sort  (cost=2800.82..2836.05 rows=14092 width=134) (actual time=3648.743..3649.070 rows=4000 loops=1) |
|   Output: cte.id, cte.seqnum, cte.high, cte.low, cte.close, cte."timestamp", cte.true_range, cte.average_true_range |
|   Sort Key: cte.seqnum |
|   Sort Method: quicksort  Memory: 659kB |
|   CTE p |
|     ->  WindowAgg  (cost=188.79..270.90 rows=2346 width=130) (actual time=3.652..10.382 rows=4000 loops=1) |
|           Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))), row_number() OVER (?) |
|           ->  Sort  (cost=188.79..194.65 rows=2346 width=90) (actual time=3.497..3.826 rows=4000 loops=1) |
|                 Output: core_price."timestamp", core_price.id, core_price.symbol_id, core_price.low, core_price.high, core_price.close |
|                 Sort Key: core_price."timestamp" |
|                 Sort Method: quicksort  Memory: 409kB |
|                 ->  Seq Scan on public.core_price  (cost=0.00..57.46 rows=2346 width=90) (actual time=0.023..1.336 rows=4000 loops=1) |
|                       Output: core_price."timestamp", core\_price.id, core_price.symbol_id, core_price.low, core_price.high, core_price.close |
|   CTE cte |
|     ->  Recursive Union  (cost=0.00..1276.96 rows=14092 width=134) (actual time=3.661..3637.728 rows=4000 loops=1) |
|           ->  CTE Scan on p  (cost=0.00..52.81 rows=12 width=134) (actual time=3.660..14.008 rows=1 loops=1) |
|                 Output: p.id, p.seqnum, p.high, p.low, p.close, p."timestamp", p.true_range, (p.true_range)::double precision |
|                 Filter: (p.seqnum = 1) |
|                 Rows Removed by Filter: 3999 |
|           ->  Hash Join  (cost=3.90..94.23 rows=1408 width=134) (actual time=0.464..0.902 rows=1 loops=4000) |
|                 Output: p_1.id, p_1.seqnum, p_1.high, p_1.low, p_1.close, p_1."timestamp", p_1.true_range, (((cte_1.average_true_range \* '13'::double precision) \/ '14'::double precision) + ((p_1.true\_range \/ '14'::numeric))::double precision) |
|                 Hash Cond: (p_1.seqnum = (cte_1.seqnum + 1)) |
|                 ->  CTE Scan on p p_1  (cost=0.00..46.92 rows=2346 width=126) (actual time=0.000..0.345 rows=4000 loops=4000) |
|                       Output: p_1.id, p_1.symbol_id, p_1."timestamp", p_1.low, p_1.high, p_1.close, p_1.true_range, p_1.seqnum |
|                 ->  Hash  (cost=2.40..2.40 rows=120 width=16) (actual time=0.001..0.001 rows=1 loops=4000) |
|                       Output: cte_1.average_true_range, cte_1.seqnum |
|                       Buckets: 1024  Batches: 1  Memory Usage: 9kB |
|                       ->  WorkTable Scan on cte cte_1  (cost=0.00..2.40 rows=120 width=16) (actual time=0.000..0.000 rows=1 loops=4000) |
|                             Output: cte_1.average_true_range, cte_1.seqnum |
|   ->  CTE Scan on cte  (cost=0.00..281.84 rows=14092 width=134) (actual time=3.662..3643.207 rows=4000 loops=1) |
|         Output: cte.id, cte.seqnum, cte.high, cte.low, cte.close, cte."timestamp", cte.true_range, cte.average_true\_range |
| Planning Time: 0.802 ms |
| Execution Time: 3650.009 ms |

Next query create huge intermediate rowset 4000 rows-->8 002 000 rows. Aggregation of this rows take main part of query.
In practice, I think it is possible to limit the depth of summation to values of the order of 50-100 rows for alpha=13.0/14.0.
power(13.0/14.0,100)=0.0006.
power(13.0/14.0, 60)=0.0117

Then the query will run noticeably faster.

-- query example by MatBailie
EXPLAIN ANALYZE VERBOSE 
WITH enumerated AS(
  SELECT
    *,
    GREATEST(
      high - low,
      ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
      ABS(low -  LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
    ) AS true_range,
    ROW_NUMBER() OVER (PARTITION BY symbol_id ORDER BY timestamp) AS seqnum
  FROM     core_price
)
SELECT c.id,
  c.symbol_id,
  c.timestamp,
  SUM(
    POWER(13.0 / 14.0, c.seqnum - p.seqnum)
    *
    CASE
      WHEN p.seqnum = 1 THEN p.true_range
                        ELSE p.true_range / 14.0
    END
  ) as average_true_range
FROM enumerated   AS c  -- current
INNER JOIN enumerated   AS p  -- preceding
    ON  p.symbol_id  = c.symbol_id
    AND p.timestamp <= c.timestamp
GROUP BY c.id,c.symbol_id, c.timestamp
ORDER BY c.id,c.symbol_id, c.timestamp

QUERY PLAN

| Sort  (cost=1368.09..1368.68 rows=235 width=44) (actual time=15570.760..15570.996 rows=4000 loops=1) |
|   Output: c.id, c.symbol_id, c."timestamp", (sum((power(0.92857142857142857143, ((c.seqnum - p.seqnum))::numeric) \* CASE WHEN (p.seqnum = 1) THEN p.true_range ELSE (p.true\_range \/ 14.0) END))) |
|   Sort Key: c.id, c.symbol_id, c."timestamp" |
|   Sort Method: quicksort  Memory: 409kB |
|   CTE enumerated |
|     ->  WindowAgg  (cost=188.79..276.76 rows=2346 width=130) (actual time=1.669..8.050 rows=4000 loops=1) |
|           Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))), row_number() OVER (?) |
|           ->  Sort  (cost=188.79..194.65 rows=2346 width=90) (actual time=1.656..1.970 rows=4000 loops=1) |
|                 Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
|                 Sort Key: core_price.symbol_id, core_price."timestamp" |
|                 Sort Method: quicksort  Memory: 409kB |
|                 ->  Seq Scan on public.core_price  (cost=0.00..57.46 rows=2346 width=90) (actual time=0.015..0.714 rows=4000 loops=1) |
|                       Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
|   ->  HashAggregate  (cost=1079.14..1082.07 rows=235 width=44) (actual time=15566.871..15569.373 rows=4000 loops=1) |
|         Output: c.id, c.symbol_id, c."timestamp", sum((power(0.92857142857142857143, ((c.seqnum - p.seqnum))::numeric) \* CASE WHEN (p.seqnum = 1) THEN p.true_range ELSE (p.true_range \/ 14.0) END)) |
|         Group Key: c.id, c.symbol\_id, c."timestamp" |
|         Batches: 1  Memory Usage: 2257kB |
|         ->  Merge Join  (cost=356.50..849.81 rows=9173 width=60) (actual time=13.322..3899.459 rows=8002000 loops=1) |
|               Output: c.id, c.symbol\id, c."timestamp", c.seqnum, p.seqnum, p.true_range |
|               Merge Cond: (c.symbol_id = p.symbol_id) |
|               Join Filter: (p."timestamp" \<= c."timestamp") |
|               Rows Removed by Join Filter: 7998000 |
|               ->  Sort  (cost=178.25..184.11 rows=2346 width=20) (actual time=11.523..12.697 rows=4000 loops=1) |
|                     Output: c.id, c.symbol_id, c."timestamp", c.seqnum |
|                     Sort Key: c.symbol_id |
|                     Sort Method: quicksort  Memory: 409kB |
|                     ->  CTE Scan on enumerated c  (cost=0.00..46.92 rows=2346 width=20) (actual time=1.671..10.584 rows=4000 loops=1) |
|                           Output: c.id, c.symbol_id, c."timestamp", c.seqnum |
|               ->  Sort  (cost=178.25..184.11 rows=2346 width=48) (actual time=1.788..1012.801 rows=15996001 loops=1) |
|                     Output: p.seqnum, p.true_range, p.symbol_id, p."timestamp" |
|                     Sort Key: p.symbol_id |
|                     Sort Method: quicksort  Memory: 409kB |
|                     ->  CTE Scan on enumerated p  (cost=0.00..46.92 rows=2346 width=48) (actual time=0.002..0.747 rows=4000 loops=1) |
|                           Output: p.seqnum, p.true_range, p.symbol_id, p."timestamp" |
| Planning Time: 0.362 ms |
| Execution Time: 15571.376 ms |
-- query  using custom aggregate function
EXPLAIN ANALYZE VERBOSE 
WITH data AS(
  SELECT *,
    GREATEST(
      high - low,
      ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
      ABS(low -  LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
    ) AS true_range
  FROM core_price
)
select *
   ,cast( ema(true_range, 13.0/14.0) over(w)  as  decimal(38,34)) as average_true_range
from data window w as (PARTITION BY symbol_id ORDER BY timestamp) 
;

QUERY PLAN

| WindowAgg  (cost=0.28..442.59 rows=4000 width=98) (actual time=0.458..138.641 rows=4000 loops=1) |
|   Output: data.id, data.symbol_id, data."timestamp", data.low, data.high, data.close, data.true_range, (ema(data.true_range, 0.92857142857142857143) OVER (?))::numeric(38,34) |
|   ->  Subquery Scan on data  (cost=0.28..362.59 rows=4000 width=68) (actual time=0.031..12.633 rows=4000 loops=1) |
|         Output: data.symbol_id, data."timestamp", data.id, data.low, data.high, data.close, data.true_range |
|         ->  WindowAgg  (cost=0.28..322.59 rows=4000 width=68) (actual time=0.030..11.816 rows=4000 loops=1) |
|               Output: core_price.id, core_price.symbol\_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?)))) |
|               ->  Index Scan using ix_coreprice_symbol_timestamp on public.core_price  (cost=0.28..192.59 rows=4000 width=36) (actual time=0.019..3.398 rows=4000 loops=1) |
|                     Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close |
| Planning Time: 0.302 ms |
| Execution Time: 139.140 ms |

fiddle

Query plan for 20K rows

QUERY PLAN
WindowAgg  (cost=0.29..1789.32 rows=11523 width=152) (actual time=0.528..775.657 rows=20000 loops=1)
  Output: data.id, data.symbol_id, data."timestamp", data.low, data.high, data.close, data.true_range, (ema(data.true_range, 0.92857142857142857143) OVER (?))::numeric(38,34)
  ->  Subquery Scan on data  (cost=0.29..1558.86 rows=11523 width=122) (actual time=0.033..67.436 rows=20000 loops=1)
        Output: data.symbol_id, data."timestamp", data.id, data.low, data.high, data.close, data.true_range
        ->  WindowAgg  (cost=0.29..1443.63 rows=11523 width=122) (actual time=0.032..62.921 rows=20000 loops=1)
              Output: core_price.id, core_price.symbol_id, core_price."timestamp", core_price.low, core_price.high, core_price.close, GREATEST((core_price.high - core_price.low), abs((core_price.high - lag(core_price.close) OVER (?))), abs((core_price.low - lag(core_price.close) OVER (?))))
              ->  Index Scan using ix_coreprice_symbol_timestamp on public.core_price  (cost=0.29..1069.13 rows=11523 width=90) (actual time=0.020..19.198 rows=20000 loops=1)
                    Output: core_price.symbol_id, core_price."timestamp", core_price.id, core_price.low, core_price.high, core_price.close
Planning Time: 0.234 ms
Execution Time: 778.026 ms

Fiddle

Sign up to request clarification or add additional context in comments.

2 Comments

You'd need MSFUNC etc for a moving aggregate (window function) otherwise performance is going to be terrible. Also sql language and PARALLEL SAFE would improve perf, and IMMUTABLE once you remove the logging. What should happen for null rows is also not clear, you may want to specify STRICT etc.
Thank you for comment. I'll look at additionally PARALLEL SAFE. As for STRICT, it is quite possible to apply it, despite the fact that the data should not be NULL in essence. MSFUNC is not needed in its current form (we use UNBOUNDED PRECEDING). If it is possible to limit the window, then of course this option will need to be applied - it can give a significant performance improvement.
1

Valnik's custom aggregate function is better, but here's a standard SQL method that avoids recursion...

WITH
  enumerated AS
(
  SELECT
    *,
    GREATEST(
      high - low,
      ABS(high - LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp)),
      ABS(low -  LAG(close) OVER (PARTITION BY symbol_id ORDER BY timestamp))
    )
      AS true_range,
    ROW_NUMBER() OVER (PARTITION BY symbol_id ORDER BY timestamp)
      AS seqnum
  FROM
    core_price
)
SELECT
  c.symbol_id,
  c.timestamp,
  SUM(
    POWER(13.0 / 14.0, c.seqnum - p.seqnum)
    *
    CASE
      WHEN p.seqnum = 1 THEN p.true_range
                        ELSE p.true_range / 14.0
    END
  )
FROM
  enumerated   AS c  -- current
INNER JOIN
  enumerated   AS p  -- preceding
    ON  p.symbol_id  = c.symbol_id
    AND p.timestamp <= c.timestamp
GROUP BY
  c.symbol_id,
  c.timestamp
ORDER BY
  c.symbol_id,
  c.timestamp

It basically rearranges the equation so that...

  • c = 13/14
  • v0 = x0 * c^0
  • v1 = x0 * c^1 + x1 * c^0
  • v2 = x0 * c^2 + x1 * c^1 + x2 * c^0
  • v3 = x0 * c^3 + x1 * c^2 + x2 * c^1 + x3 * c^0

With a small hack (the case expression) to deal with the divide by 14 for all inputs, except the first.

https://dbfiddle.uk/ZoLrrTJg

10 Comments

FYI, dbfiddle.uk/lqrRoqrs: failing for running too long with 1000 randomly generated rows ...
1) 10k, not 1k. 2) Runs fine if you include a decent primary key ;) dbfiddle.uk/WvCi54aN
The recursive version yes not the other one...dbfiddle.uk/lqrRoqrs
Ahah, you're right, I didn't spot that the fiddle you linked only had the recursive code.
Could limit the moving average to the last 365 entries, as by 365 iterations, the starting value is effectively reduced to zero (multiplied by 1.78891855815385E−12); dbfiddle.uk/bxVe4bih
This performs much worse than the query in question. I killed it after 1 min on 20,000 data points.
Worth a try though. Maybe with a 365 row window (see comments above) it will perform better?
Or better still, use the custom aggregate function recommended in comments to your question?
Somebody to test the performance with the custom aggregate function?
I tried to test the custom aggregate function in my answer. Take a look, please,

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.