0
\$\begingroup\$

References

Source code

https://github.com/guest271314/websocket-stream-impl

Implementation

class WebSocketStreamImpl {
  #ws;
  #readableController;
  #writableController;
  #readable;
  #writable;
  #handleCloseEvent;
  #closedPromise = Promise.withResolvers();
  #openedPromise = Promise.withResolvers();
  opened;
  closed;
  url;
  signal;
  protocols = [];
  constructor(url, options = {}) {
    try {
      this.url = url;
      if (options?.protocols) {
        this.protocols = options.protocols;
      }
      if (options?.signal) {
        this.signal = options.signal;
        this.signal.addEventListener("abort", async (e) => {
          try {
            this.#closedPromise.reject(new DOMException("WebSocket handshake was aborted", "ABORT_ERR"));
            this.#openedPromise.reject(new DOMException("WebSocket handshake was aborted", "ABORT_ERR"));
          } catch (e2) {
            console.log(e2);
          }
        });
      }
      this.#openedPromise.promise.catch(() => {});
      this.#closedPromise.promise.catch(() => {});
      this.closed = this.#closedPromise.promise.catch((e) => {
        throw e;
      });
      this.opened = new Promise(async (resolve, reject) => {
        try {
          const aborted = this.signal?.aborted;
          if (aborted) {
            return reject(this.#openedPromise);
          }
        } catch {
          return;
        }
        const args = [url, { protocols: this.protocols }];
        if (this.protocols.length === 0) {
          args.pop();
        }
        this.#handleCloseEvent = function handleCloseEvent(e) {
          const { code, reason } = e;
          try {
            if (this.#readable.locked) {
              this.#readableController.close();
            }
            if (this.#writable.locked) {
              this.#writable.close().catch(() => {});
            }
            this.#closedPromise.resolve({ closeCode: code, reason });
          } catch (e2) {}
        };
        this.#ws = new WebSocket(...args);
        this.#ws.binaryType = "arraybuffer";
        this.#ws.addEventListener("close", this.#handleCloseEvent.bind(this));
        this.#ws.addEventListener("error", (e) => {
          this.#closedPromise.reject(e);
        }, { once: true });
        this.#ws.addEventListener("message", (e) => {
          this.#readableController.enqueue(e.data);
        });
        this.#ws.addEventListener("open", (e) => {
          this.#readable = new ReadableStream({
            start: (c) => {
              this.#readableController = c;
            },
            cancel: async (reason) => {
              console.log(reason);
              await this.#writable.close();
            }
          });
          this.#writable = new WritableStream({
            start: (c) => {
              this.#writableController = c;
            },
            write: (value) => {
              this.#ws.send(value);
            },
            close: () => {
              console.log("close");
              this.#readableController.close();
              this.#ws.close();
            },
            abort: (reason) => {}
          });
          resolve({
            readable: this.#readable,
            writable: this.#writable
          });
        });
      }).catch((e) => {
        throw e;
      });
    } catch (e) {
      throw e;
    }
  }
  close({ closeCode = 1000, reason = "" } = {}) {
    if (navigator.userAgent.includes("Node")) {
      this.#handleCloseEvent({ code: closeCode, reason });
    } else {
      this.#ws.close(closeCode, reason);
    }
  }
}

export { WebSocketStreamImpl }

Usage

import { WebSocketStreamImpl } from "./websocket-stream-impl.js";
if (!Object.hasOwn(globalThis, "WebSocketStream")) {
  globalThis.WebSocketStream = class WebSocketStream extends WebSocketStreamImpl {};
}
(async() => {
  globalThis.WebSocketStream = class WebSocketStream
    extends WebSocketStreamImpl {};

  var u8 = new Uint8Array(1024 ** 2 * 20).fill(1);
  var decoder = new TextDecoder();
  var encoder = new TextEncoder();

  var wss = new WebSocketStream("ws://127.0.0.1:44818");
  console.log(wss);

  var { readable, writable } = await wss.opened.catch(console.warn);

  var reader = readable.getReader();
  var writer = writable.getWriter();

  Promise.allSettled([writable.closed, readable.closed, wss.closed]).then((
    [, , { value }],
  ) => console.log(value)).catch(console.error);

  async function stream(data) {
    const len = 65536;
    let bytes = 0;

    if (typeof data === "string") {
      for (let i = 0; i < data.length; i += len) {
        await writer.ready;
        await writer.write(data.slice(i, i + len));
        const { value, done } = await reader.read();
        bytes += value.length;
      }
    } else {
      for (let i = 0; i < data.length; i += len) {
        const uint8 = data.subarray(i, i + len);
        await writer.ready;
        await writer.write(uint8);
        const { value, done } = await reader.read();
        bytes += value.byteLength;
      }
    }
    return bytes;
  }

  var binaryResult = await stream(u8).catch((e) => e);
  var textResult = await stream("text").catch((e) => e);

  console.log({
    binaryResult,
    textResult,
  });

  wss.close({
    closeCode: 4999,
    reason: "Done streaming",
  });
})();

Tests

  • node v26.0.0-nightly20251107a78f7d9e02
  • deno 3.0.0-rc.0+1404815
  • tjs v24.12.0
  • bun 1.3.2
  • chrome Version 144.0.7515.0 (Developer Build) (64-bit)
  • firefox 147.0a1 (2025-11-10) (64-bit)
user@debian:~/bin$ node wss.js
WebSocketStream {
  opened: Promise { <pending> },
  closed: Promise { <pending> },
  url: 'ws://127.0.0.1:44818',
  signal: undefined,
  protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: 'Done streaming' }
^C
user@debian:~/bin$ deno -A wss.js
WebSocketStream {
  opened: Promise { <pending> },
  closed: Promise { <pending> },
  url: "ws://127.0.0.1:44818",
  signal: undefined,
  protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: "Done streaming" }
user@debian:~/bin$ tjs run wss.js
{
 opened: {},
  closed: {},
  url: 'ws://127.0.0.1:44818',
  signal: undefined,
  protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: 'Done streaming' }
^C
user@debian:~/bin$ bun wss.js
WebSocketStream {
  opened: Promise { <pending> },
  closed: Promise { <pending> },
  url: "ws://127.0.0.1:44818",
  signal: undefined,
  protocols: [],
  close: [Function: close],
}
{
  binaryResult: 20971520,
  textResult: 4,
}
{
  closeCode: 4999,
  reason: "Done streaming",
}

Yes, node hangs on closing the WebSocket due to Node.js Undici implementation of WebSocket. And yes, CTRL+C is necessary to exit from tjs (txiki.js). Not necessary using deno or bun.

\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.