References
Source code
https://github.com/guest271314/websocket-stream-impl
Implementation
class WebSocketStreamImpl {
#ws;
#readableController;
#writableController;
#readable;
#writable;
#handleCloseEvent;
#closedPromise = Promise.withResolvers();
#openedPromise = Promise.withResolvers();
opened;
closed;
url;
signal;
protocols = [];
constructor(url, options = {}) {
try {
this.url = url;
if (options?.protocols) {
this.protocols = options.protocols;
}
if (options?.signal) {
this.signal = options.signal;
this.signal.addEventListener("abort", async (e) => {
try {
this.#closedPromise.reject(new DOMException("WebSocket handshake was aborted", "ABORT_ERR"));
this.#openedPromise.reject(new DOMException("WebSocket handshake was aborted", "ABORT_ERR"));
} catch (e2) {
console.log(e2);
}
});
}
this.#openedPromise.promise.catch(() => {});
this.#closedPromise.promise.catch(() => {});
this.closed = this.#closedPromise.promise.catch((e) => {
throw e;
});
this.opened = new Promise(async (resolve, reject) => {
try {
const aborted = this.signal?.aborted;
if (aborted) {
return reject(this.#openedPromise);
}
} catch {
return;
}
const args = [url, { protocols: this.protocols }];
if (this.protocols.length === 0) {
args.pop();
}
this.#handleCloseEvent = function handleCloseEvent(e) {
const { code, reason } = e;
try {
if (this.#readable.locked) {
this.#readableController.close();
}
if (this.#writable.locked) {
this.#writable.close().catch(() => {});
}
this.#closedPromise.resolve({ closeCode: code, reason });
} catch (e2) {}
};
this.#ws = new WebSocket(...args);
this.#ws.binaryType = "arraybuffer";
this.#ws.addEventListener("close", this.#handleCloseEvent.bind(this));
this.#ws.addEventListener("error", (e) => {
this.#closedPromise.reject(e);
}, { once: true });
this.#ws.addEventListener("message", (e) => {
this.#readableController.enqueue(e.data);
});
this.#ws.addEventListener("open", (e) => {
this.#readable = new ReadableStream({
start: (c) => {
this.#readableController = c;
},
cancel: async (reason) => {
console.log(reason);
await this.#writable.close();
}
});
this.#writable = new WritableStream({
start: (c) => {
this.#writableController = c;
},
write: (value) => {
this.#ws.send(value);
},
close: () => {
console.log("close");
this.#readableController.close();
this.#ws.close();
},
abort: (reason) => {}
});
resolve({
readable: this.#readable,
writable: this.#writable
});
});
}).catch((e) => {
throw e;
});
} catch (e) {
throw e;
}
}
close({ closeCode = 1000, reason = "" } = {}) {
if (navigator.userAgent.includes("Node")) {
this.#handleCloseEvent({ code: closeCode, reason });
} else {
this.#ws.close(closeCode, reason);
}
}
}
export { WebSocketStreamImpl }
Usage
import { WebSocketStreamImpl } from "./websocket-stream-impl.js";
if (!Object.hasOwn(globalThis, "WebSocketStream")) {
globalThis.WebSocketStream = class WebSocketStream extends WebSocketStreamImpl {};
}
(async() => {
globalThis.WebSocketStream = class WebSocketStream
extends WebSocketStreamImpl {};
var u8 = new Uint8Array(1024 ** 2 * 20).fill(1);
var decoder = new TextDecoder();
var encoder = new TextEncoder();
var wss = new WebSocketStream("ws://127.0.0.1:44818");
console.log(wss);
var { readable, writable } = await wss.opened.catch(console.warn);
var reader = readable.getReader();
var writer = writable.getWriter();
Promise.allSettled([writable.closed, readable.closed, wss.closed]).then((
[, , { value }],
) => console.log(value)).catch(console.error);
async function stream(data) {
const len = 65536;
let bytes = 0;
if (typeof data === "string") {
for (let i = 0; i < data.length; i += len) {
await writer.ready;
await writer.write(data.slice(i, i + len));
const { value, done } = await reader.read();
bytes += value.length;
}
} else {
for (let i = 0; i < data.length; i += len) {
const uint8 = data.subarray(i, i + len);
await writer.ready;
await writer.write(uint8);
const { value, done } = await reader.read();
bytes += value.byteLength;
}
}
return bytes;
}
var binaryResult = await stream(u8).catch((e) => e);
var textResult = await stream("text").catch((e) => e);
console.log({
binaryResult,
textResult,
});
wss.close({
closeCode: 4999,
reason: "Done streaming",
});
})();
Tests
nodev26.0.0-nightly20251107a78f7d9e02deno3.0.0-rc.0+1404815tjsv24.12.0bun1.3.2chromeVersion 144.0.7515.0 (Developer Build) (64-bit)firefox147.0a1 (2025-11-10) (64-bit)
user@debian:~/bin$ node wss.js
WebSocketStream {
opened: Promise { <pending> },
closed: Promise { <pending> },
url: 'ws://127.0.0.1:44818',
signal: undefined,
protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: 'Done streaming' }
^C
user@debian:~/bin$ deno -A wss.js
WebSocketStream {
opened: Promise { <pending> },
closed: Promise { <pending> },
url: "ws://127.0.0.1:44818",
signal: undefined,
protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: "Done streaming" }
user@debian:~/bin$ tjs run wss.js
{
opened: {},
closed: {},
url: 'ws://127.0.0.1:44818',
signal: undefined,
protocols: []
}
{ binaryResult: 20971520, textResult: 4 }
{ closeCode: 4999, reason: 'Done streaming' }
^C
user@debian:~/bin$ bun wss.js
WebSocketStream {
opened: Promise { <pending> },
closed: Promise { <pending> },
url: "ws://127.0.0.1:44818",
signal: undefined,
protocols: [],
close: [Function: close],
}
{
binaryResult: 20971520,
textResult: 4,
}
{
closeCode: 4999,
reason: "Done streaming",
}
Yes, node hangs on closing the WebSocket due to Node.js Undici implementation of WebSocket. And yes, CTRL+C is necessary to exit from tjs (txiki.js). Not necessary using deno or bun.