1

I use RxJs to process data replayed from a file. Each data item contains a timereceived property. While replaying, I want to create buffers with all data items originally received within a given timespan x. In other words: I want to add all items to the current buffer while the timespan between the first buffer element received and the current element received is less than timespan x.

Example test:

it('should be able to create buffers based on time received', function() {
    // given
    let source = require('rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120}
    ]);

    // when
    let actual = app.bufferWithTimeReceived(source, 105).toArray();

    // then
    console.log(actual);
    assert.equal(actual.length, 2); // first contains val 1-3, second val 4-5
})

If I would not replay all data from the file but just receive it in real time, I could use bufferWithTime for that and would be fine.

Update with another example

    // given
    let source = require('rxjs/Rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120},
        {val: 6, timereceived: 9920},
        {val: 7, timereceived: 9930}
    ]);

    // when
    app.bufferWithTimeReceived(source, 30).subscribe(console.log);


    // then
    // expected output would be [val 1-2][val 3-5][val 6-7] (empty arrays in between would be ok)

Update end

Now I played around with different approaches. My last one was:

exports.bufferWithTimeReceived = (source, timespan) => {
    return Rx.Observable.defer(() => Rx.Observable.create(function (observer) {
        let currBuffer = [];
        source.subscribe(x => {
            if (currBuffer.length == 0)
                currBuffer = [x];
            else {
                if (x.timereceived-currBuffer[0].timereceived < timespan)
                    currBuffer.push(x);
                else {
                    observer.onNext(currBuffer);
                    currBuffer = [x];
                }
            }
        },
        (err)=>observer.onError(err),
        ()=>observer.onCompleted());
    }));
};

Unfortunately this only leads to oArrayObservable { source: Defer { _f: [Function] } } as an error message, which is not very helpful. I also wondered how Rx - Divide stream into segments (lists) by condition might could help me?!

Bonus question: Any hint how I could make this buffer overlapping?

2 Answers 2

1

I'd do it like this but note that this is RxJS 5. However, mergeAll should be available in RxJS 4 as well (maybe uder different names).

const THRESHOLD = 105;

const data = [
  {val: 1, timereceived: 10},
  {val: 2, timereceived: 20},
  {val: 3, timereceived: 100},
  {val: 4, timereceived: 110},
  {val: 5, timereceived: 120}
];
const source = Observable.from(data)
  .groupBy(item => parseInt(item.timereceived / THRESHOLD))
  .map(observable => observable.toArray())
  .mergeAll()
  .subscribe(console.log);

The groupBy operator creates a new Observable for each key returned by parseInt(item.timereceived / THRESHOLD). Then I chain it with toArray() because I want to collect all its items before reemitting them and mergeAll() that subscribes to all Observables (in this case 2) and reemits their items which is always a single array for each source Observable.

This produces the following output:

[ { val: 1, timereceived: 10 }, { val: 2, timereceived: 20 }, { val: 3, timereceived: 100 } ]
[ { val: 4, timereceived: 110 }, { val: 5, timereceived: 120 } ]

See:

Sign up to request clarification or add additional context in comments.

1 Comment

thanks a lot. rxjs5 is ok. unfortunately this does not solve my problem since I need to calculate deltas between the candidate items for one buffer at a time. I updated the question with another example to clarify. Thanks in advance!
0

Thanks to martin's answer I thought again about groupBy and this seems to work for me:

exports.bufferWithTimeReceived = (source, timespan) => {
        let currTime;
        return source.groupBy(x => {
            if (!currTime)
                currTime = x.timereceived;
            if (x.timereceived-currTime > timespan)
                currTime = x.timereceived;
            return currTime;            
        })
        .map(observable => observable.toArray())
        .mergeAll()
};

But I wonder if bufferWhen could offer a more elegant solution?!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.