4

I have a function that streams data in batches via a callback.

Each batch will await the callback function before fetching another batch and the entire function returns a promise that resolves when all batches are finished.

(I'm using TypeScript annotations to help with readability)

async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}

How do I to turn this function into an async generator that yields one value at a time?

async function* generatorStream(): AsyncIterableIterator<number> {}

This has proven to be quite a difficult task.

I've toyed around with this problem and I've built something that works, but its very convoluted and I can't justify merging this code and making others on my team deal with it.


Here's my current implementation:

I'm using this helper function that created a "deferred" promise which helps with passing promises around callbacks.

interface DeferredPromise<T> {
    resolve: (value: T) => void
    reject: (error: any) => void
    promise: Promise<T>
}

function deferred<T>(): DeferredPromise<T> {
    let resolve
    let reject
    const promise = new Promise<T>((res, rej) => {
        resolve = res
        reject = rej
    })
    return {
        resolve: resolve as (value: T) => void,
        reject: reject as (error: any) => void,
        promise,
    }
}

Next I have this hairball of logic that linearizes the promise callbacks into a chain where each promise resolved a batch with next function that will return another promise fetching the next batch.

type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done

async function chainedPromises() {
    let deferred = PromiseUtils.deferred<Result>()

    callbackStream(async batch => {
        const next = PromiseUtils.deferred<null>()
        deferred.resolve({
            done: false,
            value: batch,
            next: () => {
                deferred = PromiseUtils.deferred<Result>()
                next.resolve(null)
                return deferred.promise
            },
        })
        await next.promise
    }).then(() => {
        deferred.resolve({ done: true })
    })

    return deferred.promise
}

From here, creating a generator that yields one item at a time isn't very difficult:

async function* generatorStream(): AsyncIterableIterator<number> {
    let next = chainedPromises
    while (true) {
        const result = await next()
        if (result.done) {
            return
        }
        for (const item of result.value) {
            yield item
        }
        next = result.next
    }
}

I think we can all agree that the intermediate chainedPromises function is very confusing and convoluted. Is there any way I can transform callbackStream into generatorStream in a way that is easy to understand and easy to follow? I don't mind using a library if its well established, but I would also appreciate a simple implementation from first-principles.

4
  • Yeah, looks like your code is over complicating something here,.. If your using a generator, why even have a callback or deferred?. Looking at your code it's a little tricky trying to figure out what your wanting to achieve. If it's a stream that generates blocks, then that's were an async generator will shine.. Commented Jun 14, 2018 at 19:41
  • "Each batch will await the callback function" - so the lib already understands promises? Can you maybe show how it is implemented? Commented Jun 14, 2018 at 20:26
  • 1
    I'm guessing that callbackStream is actually implemented in a fashion that would fit better to return an iterator than to take a callback, so changing the implementation would be much simpler than wrapping it in a way to fit the iterator interface. Commented Jun 14, 2018 at 20:36
  • Hey @Bergi you're totally right. 2 hours later and it all clicked. Node Streams, Generators, and the whole async iterable iterator abstraction... Commented Jun 14, 2018 at 23:30

6 Answers 6

3

You need a event bucket, here is an example:

function bucket() {
  const stack = [],
                iterate = bucket();
    
  var next;
  
  async function * bucket() {
        while (true) {
            yield new Promise((res) => {
                if (stack.length > 0) {
                    return res(stack.shift());
                }

                next = res;
            });
        }
  }  
  
  iterate.push = (itm) => {
    if (next) {
      next(itm);
      next = false;
      return;
    }

    stack.push(itm);
  }  
  
  return iterate;
}

;(async function() {
  let evts = new bucket();

  setInterval(()=>{
    evts.push(Date.now());
    evts.push(Date.now() + '++');
  }, 1000);

  for await (let evt of evts) {
    console.log(evt);
  }
})();
Sign up to request clarification or add additional context in comments.

3 Comments

I've written a typescript compliant version of this, if anyone else found this answer helpful but needs TS support: github.com/Olian04/typescript-helpers/blob/master/src/lib/…
@Olian04 the link above is 404'ing. Mind updating please?
3

Here's a more modern TypeScript BufferedIterator implementation that's inspired by @NSD's "bucket" approach: https://github.com/felipecsl/obgen/blob/master/src/bufferedIterator.ts

This class implements the AsyncIterableIterator<T> interface

Sample usage:

(async () => {
  const buffer = new BufferedIterator();
  const callback = (c: string) => buffer.emit(c);
  callback("a");
  callback("b");
  callback("c");
  delay(1000).then(() => callback("d"));
  delay(2000).then(() => callback("e"));
  // make sure you call end() to indicate the end of the stream if applicable
  delay(3000).then(() => buffer.end());
  for await (const value of buffer) {
    console.log(value);
  }
  // or use drain() to collect all items into an array
  // console.log(await buffer.drain());
  console.log("done");
})();

3 Comments

Really appreciate you stumbling by and posting this. Very valuable. I wonder, if there are any performance concerns or implications of using BufferedIterator, that one should be aware of
@Mal there's a TODO in that class for handling max capacity since the buffer will currently grow unbounded. That means, it will keep growing for as long as data keeps being emitted to the stream. If you have a high volume of data thats not being consumed, then it could lead to out of memory errors. One could easily add a max capacity if needed. I might add support for that in the future
I imagine there would be an interest in an updated version supporting max capacity / backpressure handling, so please keep us updated. I have yet to encounter such a solution, but I was excited that BufferedIterator worked out of the box for me!
2

No, I don't think there's a way to implement this transformation in a way that's easy to understand and easy to follow. However, I would recommend to drop the deferreds (you're never rejecting anyway) and just use the promise constructor. Also I'd rather implement an asynchronous generator right away.

function queue() {
    let resolve = _ => {};
    const q = {
        put(value) {
            resolve(value);
            q.promise = new Promise(r => { resolve = r; });
        },
        promise: null,
    }
    q.put(); // generate first promise
    return q;
}
function toAsyncIterator(callbackStream) {
    const query = queue();
    const result = queue();
    const end = callbackStream(batch => {
        result.put(batch);
        return query.promise;
    }).then(value => ({value, done: true}));
    end.catch(e => void e); // prevent unhandled promise rejection warnings
    return {
        [Symbol.asyncIterator]() { return this; },
        next(x) {
            query.put(x);
            return Promise.race([
                end,
                result.promise.then(value => ({value, done:false}))
            ]);
        }
    }
}
async function* batchToAsyncIterator(batchCallbackStream) {
    for await (const batch of toAsyncIterator(batchCallbackStream)) {
        // for (const val of batch) yield val;
        // or simpler:
        yield* batch;
    }
}

2 Comments

I got this to work on modern node by fixing the following things: - missing parenthesis on result.promise.then(value => ({value, done:false}) - replacing yield* with yield - in queue, make put take an argument and pass it on to resolve
@Kevin Thanks, fixed. The yield* is on purpose as it helps to convert the arrays (batches, as in the OPs code) into individual elements yielded by the generator. If you don't need that, just call toAsyncIterator directly instead of batchToAsyncIterator.
0

Would it work if there will be typescript solution?

It should handle condition when callback is called faster then promise is resolved a couple of times. Callback can be a method that has this signature callback(error, result, index) It is set to finish when callback is called with no arguments. Usage:

asAsyncOf(this.storage, this.storage.each);

Solution:

function asAsyncOf<T1, T2, T3, T4, Y>(c, fn: { (a: T1, a1: T2, a2: T3, a3: T4, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T1, a1: T2, a2: T3, a3: T4): AsyncGenerator<Y>
function asAsyncOf<T1, T2, T3, Y>(c, fn: { (a: T1, a1: T2, a2: T3, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T1, a1: T2, a3: T3): AsyncGenerator<Y>
function asAsyncOf<T1, T2, Y>(c, fn: { (a: T1, a1: T2, cb: {(err?, res?: Y, index?: number): boolean}): void}, a: T1, a1: T2): AsyncGenerator<Y>
function asAsyncOf<T, Y>(c, fn: { (a: T, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T): AsyncGenerator<Y>
function asAsyncOf<Y>(c, fn: { (cb: {(err?, res?: Y, index?: number): boolean}): void}): AsyncGenerator<Y>
async function* asAsyncOf(context, fn, ...args) {
    let next = (result?) => { };
    let fail = (err) => { };
    let finish = {};
    const items = [];
    let started = true;
    try {
        fn.apply(context, [...args, function (err, result, index) {
            const nextArgs = [].slice.call(arguments, 0);
            if (nextArgs.length === 0) {
                started = false;
                next(finish);
                return true;
            }
            if (err) {
                fail(err);
                return true;
            }
            items.push(result);
            next(result);
        }]);
    } catch (ex) {
        fail(ex);
    }
    while (true) {
        const promise = started ? new Promise((resolve, error) => {
            next = resolve;
            fail = error;
        }) : Promise.resolve(finish);
        const record = await promise;
        if (record === finish) {
            while (items.length) {
                const item = items.shift();
                yield item;
            }
            return;
        }
        while (items.length) {
            const item = items.shift();
            yield item;
        }
    }
}
export { asAsyncOf };

Comments

0

I have built this lib for Deno.js, but the code should work everywhere (I'll publish it to npm soon).

https://deno.land/x/async_generator_callback

https://github.com/LMGU-Technik/async-generator-callback

Adapted example to the ones in this question:

import { AsyncGeneratorCallback } from "https://deno.land/x/async_generator_callback";

const foo = new AsyncGeneratorCallback<string>();

foo.call("A");
foo.call("B");

setTimeout(()=>{
    foo.call("C");

    // cleanup, pending promise would prevent process from exiting
    // supports the new typescript `using` keyword
    foo[Symbol.dispose]();
}, 1000);

for await (const arg of foo) {
    console.log(arg)
}

Comments

0

This is my TS-native approach that I based on @Bergi's work

type Start<T> = (onItem: (item: T) => void, onClose: (err: unknown) => void) => void;
function toAsyncIterator2<T>(startStream: Start<T>) {
    let wakeConsumer: (() => void) | undefined

    let close: ((_: unknown) => void);
    const closePromise = new Promise((resolve) => close = resolve);

    const queue: T[] = [];

    return {
        [Symbol.asyncIterator]() {
            startStream((item: T) => {
                queue.push(item)

                if (wakeConsumer) {
                    wakeConsumer();
                    wakeConsumer = undefined;
                }
            }, close);

            return {
                async next() {
                    if (0 < queue.length) {
                        return { value: queue.shift()!, done: false };
                    }

                    const wakePromise = new Promise<void>((resolve) => {
                        wakeConsumer = resolve;
                    });

                    await Promise.race([wakePromise, closePromise]);
                    if (0 < queue.length) {
                        return { value: queue.shift()!, done: false };
                    }
                    return { done: true }
                }
            }
        }
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.