What would be an efficient approach to replicate pandas rolling window functionality on live streaming data?
Suppose we want to maintain a sum of last n observations, collections.deque with maxlen parameter is the way to go, but what if instead of a fixed n we need a sum of values from last m seconds?
Using pandas.Series would be inefficient since the underlying numpy arrays are immutable and thus unsuitable for handling live data streams - each append copies the whole array.
Something like cachetools.TTLCache is good for storing the live data, but inefficient for calculations - getting the sum would require to iterate over each element every time.
Currently my approach for maintaining a live data stream sum uses collections.deque, time-to-live parameter and a while loop for discarding old values:
import time
from collections import deque
class Accumulator:
def __init__(self, ttl):
self.ttl = ttl
self.sum = 0
self.q = deque()
def append(self, value):
self.q.append((time.time(), value))
self.sum += value
self.discard_old_values()
def discard_old_values(self):
cutoff_time = time.time() - self.ttl
try:
while self.q[0][0] < cutoff_time:
self.sum -= self.q.popleft()[1]
except IndexError:
pass
def get_sum(self):
self.discard_old_values()
return self.sum
So the questions are:
- Can we do better?
- Is there a way to get rid of the ugly
whileloop here? - What's the common name for this type of "TTL queue" data structure?
- Is there a popular Python library that already implements it?
- Is there a way to utilize
pandasrolling windows on mutable collections?