6

Scenario

I'm given a function with an asynchronous callback like

let readFile:   (path: string, callback: (line: string, eof: boolean) => void) => void

Though I would prefer a function using AsyncIterable/AsyncGenerator signature instead:

let readFileV2: (path: string) => AsyncIterable<string>

Problem

Without readFileV2, I have to read a file like

let file = await new Promise((res, err) => {
    let file = ''
    readFile('./myfile.txt', (line, eof) => {
        if (eof) { return res(file) }
        file += line + '\n'
    })
})

.. while readFileV2 allows me to do it cleaner like

let file = '';
for await (let line of readFileV2('./myfile.txt')) {
    file += line + '\n'
}

Question

Is there a way for me to transform readFile into readFileV2?

Updated for clarification:

Is there a general approach to transform a function with an async callback argument to an AsyncGenerator/AsyncIterable variant?

And can this approach be demonstrated on the readFile function above?

References

I see two related questions here:

However, they don't seem to provide a clear answer.

1
  • Comments are not for extended discussion; this conversation has been moved to chat. Commented Nov 3, 2022 at 13:01

4 Answers 4

4

Disclaimer at the outset: I am answering the following question:

Given a data providing function fn of a form like (...args: A, callback: (data: T, done: boolean) => void) => void for some list of initial argument types A and data type T, how can we transform this function transform(fn) to produce a new function of the form (...args: A) => AsyncIterable<T>?

It is quite possible that this isn't the right thing to be doing in general, since consumers of AsyncIterable<T> may process data slowly or abort early, and a function of type (...args: [...A, (data: T, done: boolean) => void]) => void can't possibly react to that; it will call callback once per piece of data, whenever it wants, and it will not stop until it feels like it.


Still, here is one possible implementation:

const transform = <A extends any[], T>(
    fn: (...args: [...args: A, callback: (val: T, done: boolean) => void]) => void
) => (...args: A): AsyncIterable<T> => {
    let values: Promise<[T, boolean]>[] = [];
    let resolve: (x: [T, boolean]) => void;
    values.push(new Promise(r => { resolve = r; }));
    fn(...args, (val: T, done: boolean) => {
        resolve([val, done]);
        values.push(new Promise(r => { resolve = r; }));
    });
    return async function* () {
        let val: T;
        for (let i = 0, done = false; !done; i++) {
            [val, done] = await values[i];
            delete values[i];
            yield val;
        }
    }();
}

Essentially we provide a queue of data values, values, which gets written to inside the callback passed to fn, and which gets read from inside a generator function. This is accomplished by a chain of promises; the first promise is created manually, and each time data is available, it resolves the current promise and pushes new values with a new promise onto the queue. The generator function awaits these promises, pulls data off the queue, and removes the consumed data.


To test it, someone needs to provide fn. Here's one possibility:

function sleep(ms: number) {
    return new Promise<void>(r => setTimeout(r, ms));
}

const provideData = async (name: string, callback: (line: string, eof: boolean) => void) => {
    const contents = [
        "This is line 1 of " + name, "and this is line 2",
        "and line 3", "and 4", "5",
        "and that's the end of " + name + "."
    ];
    for (const [line, eof] of contents.map((l, i, a) => [l, i >= a.length - 1] as const)) {
        await sleep(1000); // I guess it takes a second to read each line
        callback(line, eof);
    }
}

The provideData function accepts a callback and calls it once per second with successive lines of an array. And now we transform it:

const provideDataV2 = transform(provideData);
// let provideDataV2: (name: string) => AsyncIterable<string>

And let's test the transformer:

async function foo() {
    console.log(new Date().toLocaleTimeString(), "starting")
    const iter = provideDataV2("my data");
    await sleep(2500); // not ready to read yet, I guess    
    for await (let line of iter) {
        console.log(new Date().toLocaleTimeString(), line)
    }
    console.log(new Date().toLocaleTimeString(), "done")
}
foo()

/* 
[LOG]: "2:48:36 PM",  "starting" 
[LOG]: "2:48:37 PM",  "This is line 1 of my data" 
[LOG]: "2:48:38 PM",  "and this is line 2" 
[LOG]: "2:48:39 PM",  "and line 3" 
[LOG]: "2:48:40 PM",  "and 4" 
[LOG]: "2:48:41 PM",  "5" 
[LOG]: "2:48:42 PM",  "and that's the end of my data." 
[LOG]: "2:48:42 PM",  "done" 
*/

Looks good.

Is it perfect? Does it have weird side effects in response to weird situations (e.g., are you going to iterate it multiple times)? Should it handle errors in a particular way? Are there recommended solutions elsewhere? Not sure. This is just a possible implementation of transform that adheres to the contract laid out in the question as asked.

Playground link to code

Sign up to request clarification or add additional context in comments.

3 Comments

You could go even further and use Promise<IteratorResult<T>> instead of a tuple, then just implement the iterator object yourself instead of using a generator function
@Bergi, is this what you mean? Do you think that's an improvement or should I leave it alone?
@jcalz Thanks a lot for your effort on this question and providing a solution. Just FYI I'm giving it a few more hours for edits/suggestions to tick in, and then I'll accept this answer
1

This has been a NodeJS-native API since v10, no need reinventing it:

const {createReadStream} = require('fs');
const {createInterface} = require('readline');

function readFileLines(fileName: string): AsyncIterable<string> {
    const input = createReadStream(fileName);
    return createInterface({input, crlfDelay: Infinity});
}

Testing it:

const lines = readFileLines('./test1.js');
for await(const l of lines) {
    console.log(l);
}

7 Comments

For clarity, the question in OP looks like "I'm given a function like let readFile: (path: string, callback: (line: string, eof: boolean) => void) => void though I would prefer a function like let readFileV2: (path: string) => AsyncIterable<string>. Is there a way for me to transform readFile into readFileV2?" And your answer is something like "don't do that, whoever gave you readFile should take it back and give you a better function"? Are you saying this is an XY problem? Or am I missing something about how this answer addresses the question?
@jcalz There are many examples out there of how to take a function that can produce data step-by-step, and then showing how to convert it into a generator. But the OP is giving a function example that shouldn't be wrapped into a generator, because it is producing all data at once, so any way you wrap it, you'll end up just doing more work, and quite inefficiently. That's why specifically for that readFile the best advice is not to do it, and resort to a more suitable file-read API that's available.
When you say "that readFile", what are you talking about? The toy implementation I put in my playground link? Again, I am not the OP, I was just throwing together a mock/stub function to test against. Hopefully what you're saying is applicable to all possible implementations of readFile (such as this) and not just the particular ones I've been using as tests.
Also, since the question said "I'm given a function" and this answer is "don't use that, use readFileLines instead", I'm still a little concerned about whether it's possible for the OP to take such advice. The OP presumably didn't write the function they are given, so they can't necessarily re-implement it... we don't know if the file system being probed is local, remote, virtual, etc, so I'd expect this answer to say something like "if the readFile function you are given is just reading from a local file system, you should throw it away and use the following instead:"
@Mal As I explained above, readFile makes a very poor example for a generic approach, because it is one function that should not be converted into a generator.
|
0

Yes.

I did this for Deno.serve which is an HTTP server that takes a callback and an options object like Deno.serve(req => respondWith(req), {port: 3000}).

Basically the code is;

async function* emitterGen(opts){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  Deno.serve( req => ( _resolve(req)
                     , _req = new Promise((resolve,reject) => _resolve = resolve)
                     )
            , opts
            );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen({port: 3000});

for await (let req of reqEmitter){
  respondWith(req);
}

Obviously the code above is simplified, without exception handling. Yet it should be sufficient to answer your question.

Here is a working mock up sever which creates a random number (0-99) as request (req) at every random (0-999) ms and invokes cb (handler) with req. Stops after 5 iterations.

function server(cb,ms){
  let count  = 5,
      looper = function(c = count,t = ms){
                 let stoid = setTimeout( req => ( cb(req)
                                                , --c && looper(c, Math.random()*1000 >>> 0)
                                                , clearTimeout(stoid)
                                                )
                                       , t
                                       , Math.random()*100 >>> 0
                                       )
               }
  looper();
}

async function* emitterGen(ms){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  server( req => ( _resolve(req)
                 , _req = new Promise((resolve,reject) => _resolve = resolve)
                 )
        , ms
        );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen(1000);

// since there is no top level await in SO snippets
(async function(){
  for await (let req of reqEmitter){
    console.log(`Received request is: ${req}`);
  }
})();

3 Comments

This solution might be risky, and certainly doesn't extrapolate. Specifically: your resolve() handler must run immediately after the new request comes in, before any other new request is fired, or you will overwrite and drop entries. Microtasks (resolve handlers) are guaranteed to run in the same task, so thats good, but if multiple new network requests could be emitted in the same task, that won't be good enough. I don't know how Deno.serve is implemented maybe if guarantees a new task for each. But for general solution, I think you would want an array of buffered values.
@mmocny Thanks for the comment. I can not see any reason to employ a buffer. Invoking the callback is invoking the _resolve with the received req and resolving yielded _req and resetting _req and _resolve with a new promise and it's resolve function to be used for the next req to be received. In fact we don't even need to use an async generator. We could drop the async keyword and use yield _req; istead of yield await _req; since _req is already a promise.
@mmocny OK I see your concern. This works with the assumtion of the consumer (the for await of loop) consumes the received requests faster than they arrive which might not be the case since we might need disk access or heavy processing to prepare a response and at the meantime another request may arrive. There can be two approaches. We may spawn a promisified thread and immediatelly return a promise that containes the eventual response. We may etablish an async queue which in fact i had implemented previously and easily applicable for this job.
0

I created a class that can produce an async generator from any source:

/** Infinite async generator. Iterates messages pushed to it until closed. */
class Machina<T> {

  #open = true;
  #queue: T[] = [];
  #resolve: (() => void) | undefined;

  async * stream(): AsyncGenerator<T> {
    this.#open = true;

    while (this.#open) {
      if (this.#queue.length) {
        yield this.#queue.shift()!;
        continue;
      }

      await new Promise<void>((_resolve) => {
        this.#resolve = _resolve;
      });
    }
  }

  push(data: T): void {
    this.#queue.push(data);
    this.#resolve?.();
  }

  close(): void {
    this.#open = false;
    this.#resolve?.();
  }

}

export { Machina };

You can use it like this:

// Create the Machina instance
const machina = new Machina<string>();

// Async generator loop
async function getMessages() {
  for await (const msg of machina.stream()) {
    console.log(msg);
  }
}

// Start the generator
getMessages();

// Push messages to it
machina.push('hello!');
machina.push('whats up?');
machina.push('greetings');

// Stop the generator
machina.close();

For your specific case, something like this should work:

/** Read each line of the file as an AsyncGenerator. */
function readFileAsync(path: string): AsyncGenerator<string> {
  const machina = new Machina<string>();

  readFile(path, (line: string, eof: boolean) => {
    if (eof) {
      machina.close();
    } else {
      machina.push(line);
    }
  });

  return machina.stream();
}

// Usage
for await (const line of readFileAsync('file.txt')) {
  console.log(line);
}

How it works

  1. Calling machina.stream() kicks off an infinite loop, but it becomes paused immediately (on the first iteration) because it's waiting for a promise that isn't resolved.
  2. Calling machina.push() adds an item to the buffer, and then unpauses it by resolving the promise. When it becomes unpaused, it empties the buffer into the stream and then pauses it again by awaiting a new promise.
  3. The consumer of machina.stream() receives the pushed items. You can do this repeatedly.

Other considerations:

  • Even if the Machina instance goes out of scope (ie ready to be garbage-collected), its promise will still run forever. So you need to manually call machina.close() when you are done streaming, if ever. Simply breaking out of the loop is not enough!
  • I tried a slightly different design at first, without using a buffer. It turns out the buffer is needed, otherwise you cannot push multiple items in the same tick (all after the first will be dropped). But if you're only pushing one item per tick and are actively consuming the stream, the buffer will only ever contain one item. This is basically just a small memory consideration.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.