1

What would be an efficient approach to replicate pandas rolling window functionality on live streaming data?

Suppose we want to maintain a sum of last n observations, collections.deque with maxlen parameter is the way to go, but what if instead of a fixed n we need a sum of values from last m seconds?

Using pandas.Series would be inefficient since the underlying numpy arrays are immutable and thus unsuitable for handling live data streams - each append copies the whole array.

Something like cachetools.TTLCache is good for storing the live data, but inefficient for calculations - getting the sum would require to iterate over each element every time.

Currently my approach for maintaining a live data stream sum uses collections.deque, time-to-live parameter and a while loop for discarding old values:

import time
from collections import deque

class Accumulator:
    def __init__(self, ttl):
        self.ttl = ttl
        self.sum = 0
        self.q = deque()

    def append(self, value):
        self.q.append((time.time(), value))
        self.sum += value
        self.discard_old_values()

    def discard_old_values(self):
        cutoff_time = time.time() - self.ttl
        try:
            while self.q[0][0] < cutoff_time:
                self.sum -= self.q.popleft()[1]
        except IndexError:
            pass

    def get_sum(self):
        self.discard_old_values()
        return self.sum

So the questions are:

  1. Can we do better?
  2. Is there a way to get rid of the ugly while loop here?
  3. What's the common name for this type of "TTL queue" data structure?
  4. Is there a popular Python library that already implements it?
  5. Is there a way to utilize pandas rolling windows on mutable collections?
1
  • 1
    This implementation is pretty good for what it does. Does it satisfy your requirements, or do you mean something else by "we need a sum of values from last m seconds"? Commented Jul 24, 2018 at 0:45

1 Answer 1

2

Here is my proposed improvement to discard_old_values:

def discard_old_values(self):
    die = time.time() - self.ttl
    while(len(self.q) > 0 and self.q[0][0] < die):
        self.sum -= self.q.popleft()[1]

I decided to, instead of call time.time() for each element that may be discarded, call it once and do the arithmetic to find the "die time" only once.

This saves the cost of throwing exceptions, and the cost of multiple calls to time.

Sign up to request clarification or add additional context in comments.

1 Comment

Good point on moving time.time() call outside of the while loop. Edited the original post to reflect that. Not so sure about adding len(self.q) > 0 (which can be shortened to simply self.q) check for each element. I think catching IndexError just once should be faster.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.