Skip to content

Commit 656bb4c

Browse files
committed
Various improvements to reconnecting websockets + More tests
1 parent 53037a3 commit 656bb4c

File tree

4 files changed

+261
-199
lines changed

4 files changed

+261
-199
lines changed

src/api/coderApi.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ export class CoderApi extends Api {
240240
socketFactory,
241241
this.output,
242242
configs.apiRoute,
243+
undefined,
244+
() =>
245+
this.reconnectingSockets.delete(
246+
reconnectingSocket as ReconnectingWebSocket<unknown>,
247+
),
243248
);
244249

245250
this.reconnectingSockets.add(
Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { WebSocketEventType } from "coder/site/src/utils/OneWayWebSocket";
2+
import type { CloseEvent } from "ws";
23

34
import type { Logger } from "../logging/logger";
45

@@ -18,9 +19,6 @@ export type ReconnectingWebSocketOptions = {
1819
// 403 Forbidden, 410 Gone, 426 Upgrade Required, 1002/1003 Protocol errors
1920
const UNRECOVERABLE_CLOSE_CODES = new Set([403, 410, 426, 1002, 1003]);
2021

21-
// Custom close code for intentional reconnection (4000-4999 range is for private use)
22-
const CLOSE_CODE_RECONNECTING = 4000;
23-
2422
export class ReconnectingWebSocket<TData = unknown>
2523
implements UnidirectionalStream<TData>
2624
{
@@ -40,12 +38,15 @@ export class ReconnectingWebSocket<TData = unknown>
4038
#reconnectTimeoutId: NodeJS.Timeout | null = null;
4139
#isDisposed = false;
4240
#isConnecting = false;
41+
#pendingReconnect = false;
42+
readonly #onDispose?: () => void;
4343

4444
private constructor(
4545
socketFactory: SocketFactory<TData>,
4646
logger: Logger,
4747
apiRoute: string,
4848
options: ReconnectingWebSocketOptions = {},
49+
onDispose?: () => void,
4950
) {
5051
this.#socketFactory = socketFactory;
5152
this.#logger = logger;
@@ -56,21 +57,24 @@ export class ReconnectingWebSocket<TData = unknown>
5657
jitterFactor: options.jitterFactor ?? 0.1,
5758
};
5859
this.#backoffMs = this.#options.initialBackoffMs;
60+
this.#onDispose = onDispose;
5961
}
6062

6163
static async create<TData>(
6264
socketFactory: SocketFactory<TData>,
6365
logger: Logger,
6466
apiRoute: string,
6567
options: ReconnectingWebSocketOptions = {},
68+
onDispose?: () => void,
6669
): Promise<ReconnectingWebSocket<TData>> {
6770
const instance = new ReconnectingWebSocket<TData>(
6871
socketFactory,
6972
logger,
7073
apiRoute,
7174
options,
75+
onDispose,
7276
);
73-
await instance.#connect();
77+
await instance.connect();
7478
return instance;
7579
}
7680

@@ -85,10 +89,6 @@ export class ReconnectingWebSocket<TData = unknown>
8589
(this.#eventHandlers[event] as Set<EventHandler<TData, TEvent>>).add(
8690
callback,
8791
);
88-
89-
if (this.#currentSocket) {
90-
this.#currentSocket.addEventListener(event, callback);
91-
}
9292
}
9393

9494
removeEventListener<TEvent extends WebSocketEventType>(
@@ -98,95 +98,95 @@ export class ReconnectingWebSocket<TData = unknown>
9898
(this.#eventHandlers[event] as Set<EventHandler<TData, TEvent>>).delete(
9999
callback,
100100
);
101-
102-
if (this.#currentSocket) {
103-
this.#currentSocket.removeEventListener(event, callback);
104-
}
105101
}
106102

107-
close(code?: number, reason?: string): void {
103+
reconnect(): void {
108104
if (this.#isDisposed) {
109105
return;
110106
}
111107

112-
this.#isDisposed = true;
113-
114108
if (this.#reconnectTimeoutId !== null) {
115109
clearTimeout(this.#reconnectTimeoutId);
116110
this.#reconnectTimeoutId = null;
117111
}
118112

119-
if (this.#currentSocket) {
120-
this.#currentSocket.close(code, reason);
121-
this.#currentSocket = null;
113+
// If already connecting, schedule reconnect after current attempt
114+
if (this.#isConnecting) {
115+
this.#pendingReconnect = true;
116+
return;
122117
}
123118

124-
for (const set of Object.values(this.#eventHandlers)) {
125-
set.clear();
126-
}
119+
// connect() will close any existing socket
120+
this.connect().catch((error) => {
121+
if (!this.#isDisposed) {
122+
this.#logger.warn(
123+
`Manual reconnection failed for ${this.#apiRoute}: ${error instanceof Error ? error.message : String(error)}`,
124+
);
125+
this.scheduleReconnect();
126+
}
127+
});
127128
}
128129

129-
reconnect(): void {
130+
close(code?: number, reason?: string): void {
130131
if (this.#isDisposed) {
131132
return;
132133
}
133134

134-
if (this.#reconnectTimeoutId !== null) {
135-
clearTimeout(this.#reconnectTimeoutId);
136-
this.#reconnectTimeoutId = null;
137-
}
138-
135+
// Fire close handlers synchronously before disposing
139136
if (this.#currentSocket) {
140-
this.#currentSocket.close(CLOSE_CODE_RECONNECTING, "Reconnecting");
137+
this.executeHandlers("close", {
138+
code: code ?? 1000,
139+
reason: reason ?? "",
140+
wasClean: true,
141+
type: "close",
142+
target: this.#currentSocket,
143+
} as CloseEvent);
141144
}
145+
146+
this.dispose(code, reason);
142147
}
143148

144-
async #connect(): Promise<void> {
149+
private async connect(): Promise<void> {
145150
if (this.#isDisposed || this.#isConnecting) {
146151
return;
147152
}
148153

149154
this.#isConnecting = true;
150155
try {
156+
// Close any existing socket before creating a new one
157+
if (this.#currentSocket) {
158+
this.#currentSocket.close(1000, "Replacing connection");
159+
this.#currentSocket = null;
160+
}
161+
151162
const socket = await this.#socketFactory();
152163
this.#currentSocket = socket;
153164

154-
socket.addEventListener("open", () => {
165+
socket.addEventListener("open", (event) => {
155166
this.#backoffMs = this.#options.initialBackoffMs;
167+
this.executeHandlers("open", event);
156168
});
157169

158-
for (const handler of this.#eventHandlers.open) {
159-
socket.addEventListener("open", handler);
160-
}
161-
162-
for (const handler of this.#eventHandlers.message) {
163-
socket.addEventListener("message", handler);
164-
}
170+
socket.addEventListener("message", (event) => {
171+
this.executeHandlers("message", event);
172+
});
165173

166-
for (const handler of this.#eventHandlers.error) {
167-
socket.addEventListener("error", handler);
168-
}
174+
socket.addEventListener("error", (event) => {
175+
this.executeHandlers("error", event);
176+
});
169177

170178
socket.addEventListener("close", (event) => {
171-
for (const handler of this.#eventHandlers.close) {
172-
handler(event);
173-
}
174-
175179
if (this.#isDisposed) {
176180
return;
177181
}
178182

183+
this.executeHandlers("close", event);
184+
179185
if (UNRECOVERABLE_CLOSE_CODES.has(event.code)) {
180186
this.#logger.error(
181187
`WebSocket connection closed with unrecoverable error code ${event.code}`,
182188
);
183-
this.#isDisposed = true;
184-
return;
185-
}
186-
187-
// Reconnect if this was an intentional close for reconnection
188-
if (event.code === CLOSE_CODE_RECONNECTING) {
189-
this.#scheduleReconnect();
189+
this.dispose();
190190
return;
191191
}
192192

@@ -196,14 +196,19 @@ export class ReconnectingWebSocket<TData = unknown>
196196
}
197197

198198
// Reconnect on abnormal closures (e.g., 1006) or other unexpected codes
199-
this.#scheduleReconnect();
199+
this.scheduleReconnect();
200200
});
201201
} finally {
202202
this.#isConnecting = false;
203+
204+
if (this.#pendingReconnect) {
205+
this.#pendingReconnect = false;
206+
this.reconnect();
207+
}
203208
}
204209
}
205210

206-
#scheduleReconnect(): void {
211+
private scheduleReconnect(): void {
207212
if (this.#isDisposed || this.#reconnectTimeoutId !== null) {
208213
return;
209214
}
@@ -218,21 +223,58 @@ export class ReconnectingWebSocket<TData = unknown>
218223

219224
this.#reconnectTimeoutId = setTimeout(() => {
220225
this.#reconnectTimeoutId = null;
221-
// Errors already handled in #connect
222-
this.#connect().catch((error) => {
226+
this.connect().catch((error) => {
223227
if (!this.#isDisposed) {
224228
this.#logger.warn(
225229
`WebSocket connection failed for ${this.#apiRoute}: ${error instanceof Error ? error.message : String(error)}`,
226230
);
227-
this.#scheduleReconnect();
231+
this.scheduleReconnect();
228232
}
229233
});
230234
}, delayMs);
231235

232236
this.#backoffMs = Math.min(this.#backoffMs * 2, this.#options.maxBackoffMs);
233237
}
234238

235-
isDisposed(): boolean {
236-
return this.#isDisposed;
239+
private executeHandlers<TEvent extends WebSocketEventType>(
240+
event: TEvent,
241+
eventData: Parameters<EventHandler<TData, TEvent>>[0],
242+
): void {
243+
const handlers = this.#eventHandlers[event] as Set<
244+
EventHandler<TData, TEvent>
245+
>;
246+
for (const handler of handlers) {
247+
try {
248+
handler(eventData);
249+
} catch (error) {
250+
this.#logger.error(
251+
`Error in ${event} handler for ${this.#apiRoute}: ${error instanceof Error ? error.message : String(error)}`,
252+
);
253+
}
254+
}
255+
}
256+
257+
private dispose(code?: number, reason?: string): void {
258+
if (this.#isDisposed) {
259+
return;
260+
}
261+
262+
this.#isDisposed = true;
263+
264+
if (this.#reconnectTimeoutId !== null) {
265+
clearTimeout(this.#reconnectTimeoutId);
266+
this.#reconnectTimeoutId = null;
267+
}
268+
269+
if (this.#currentSocket) {
270+
this.#currentSocket.close(code, reason);
271+
this.#currentSocket = null;
272+
}
273+
274+
for (const set of Object.values(this.#eventHandlers)) {
275+
set.clear();
276+
}
277+
278+
this.#onDispose?.();
237279
}
238280
}

0 commit comments

Comments
 (0)