Skip to content

Commit 6119d99

Browse files
committed
Add SSE fallback to some one way WS connections
1 parent a401805 commit 6119d99

File tree

9 files changed

+511
-109
lines changed

9 files changed

+511
-109
lines changed

src/api/coderApi.ts

Lines changed: 120 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ import {
66
} from "axios";
77
import { Api } from "coder/site/src/api/api";
88
import {
9+
type ServerSentEvent,
910
type GetInboxNotificationResponse,
1011
type ProvisionerJobLog,
11-
type ServerSentEvent,
1212
type Workspace,
1313
type WorkspaceAgent,
1414
} from "coder/site/src/api/typesGenerated";
1515
import * as vscode from "vscode";
16-
import { type ClientOptions } from "ws";
16+
import { type ClientOptions, type CloseEvent, type ErrorEvent } from "ws";
1717

1818
import { CertificateError } from "../error";
1919
import { getHeaderCommand, getHeaders } from "../headers";
20+
import { EventStreamLogger } from "../logging/eventStreamLogger";
2021
import {
2122
createRequestMeta,
2223
logRequest,
@@ -29,11 +30,12 @@ import {
2930
HttpClientLogLevel,
3031
} from "../logging/types";
3132
import { sizeOf } from "../logging/utils";
32-
import { WsLogger } from "../logging/wsLogger";
33+
import { type UnidirectionalStream } from "../websocket/eventStreamConnection";
3334
import {
3435
OneWayWebSocket,
3536
type OneWayWebSocketInit,
3637
} from "../websocket/oneWayWebSocket";
38+
import { SseConnection } from "../websocket/sseConnection";
3739

3840
import { createHttpAgent } from "./utils";
3941

@@ -84,8 +86,9 @@ export class CoderApi extends Api {
8486
};
8587

8688
watchWorkspace = async (workspace: Workspace, options?: ClientOptions) => {
87-
return this.createWebSocket<ServerSentEvent>({
89+
return this.createWebSocketWithFallback<ServerSentEvent>({
8890
apiRoute: `/api/v2/workspaces/${workspace.id}/watch-ws`,
91+
fallbackApiRoute: `/api/v2/workspaces/${workspace.id}/watch`,
8992
options,
9093
});
9194
};
@@ -94,8 +97,9 @@ export class CoderApi extends Api {
9497
agentId: WorkspaceAgent["id"],
9598
options?: ClientOptions,
9699
) => {
97-
return this.createWebSocket<ServerSentEvent>({
100+
return this.createWebSocketWithFallback<ServerSentEvent>({
98101
apiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata-ws`,
102+
fallbackApiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata`,
99103
options,
100104
});
101105
};
@@ -137,6 +141,7 @@ export class CoderApi extends Api {
137141
const httpAgent = await createHttpAgent(
138142
vscode.workspace.getConfiguration(),
139143
);
144+
140145
const webSocket = new OneWayWebSocket<TData>({
141146
location: baseUrl,
142147
...configs,
@@ -152,28 +157,123 @@ export class CoderApi extends Api {
152157
},
153158
});
154159

155-
const wsUrl = new URL(webSocket.url);
156-
const pathWithQuery = wsUrl.pathname + wsUrl.search;
157-
const wsLogger = new WsLogger(this.output, pathWithQuery);
158-
wsLogger.logConnecting();
160+
this.attachStreamLogger(webSocket);
161+
return webSocket;
162+
}
159163

160-
webSocket.addEventListener("open", () => {
161-
wsLogger.logOpen();
162-
});
164+
private attachStreamLogger<TData>(
165+
connection: UnidirectionalStream<TData>,
166+
): void {
167+
const url = new URL(connection.url);
168+
const logger = new EventStreamLogger(
169+
this.output,
170+
url.pathname + url.search,
171+
url.protocol.startsWith("http") ? "SSE" : "WS",
172+
);
173+
logger.logConnecting();
163174

164-
webSocket.addEventListener("message", (event) => {
165-
wsLogger.logMessage(event.sourceEvent.data);
166-
});
175+
connection.addEventListener("open", () => logger.logOpen());
176+
connection.addEventListener("close", (event: CloseEvent) =>
177+
logger.logClose(event.code, event.reason),
178+
);
179+
connection.addEventListener("error", (event: ErrorEvent) =>
180+
logger.logError(event.error, event.message),
181+
);
182+
connection.addEventListener("message", (event) =>
183+
logger.logMessage(event.sourceEvent.data),
184+
);
185+
}
186+
187+
/**
188+
* Create a WebSocket connection with SSE fallback on 404
189+
*/
190+
private async createWebSocketWithFallback<TData = unknown>(configs: {
191+
apiRoute: string;
192+
fallbackApiRoute: string;
193+
searchParams?: Record<string, string> | URLSearchParams;
194+
options?: ClientOptions;
195+
}): Promise<UnidirectionalStream<TData>> {
196+
let webSocket: OneWayWebSocket<TData>;
197+
try {
198+
webSocket = await this.createWebSocket<TData>({
199+
apiRoute: configs.apiRoute,
200+
searchParams: configs.searchParams,
201+
options: configs.options,
202+
});
203+
} catch {
204+
// Failed to create WebSocket, use SSE fallback
205+
return this.createSseFallback<TData>(
206+
configs.fallbackApiRoute,
207+
configs.searchParams,
208+
);
209+
}
167210

168-
webSocket.addEventListener("close", (event) => {
169-
wsLogger.logClose(event.code, event.reason);
211+
return this.waitForConnection(webSocket, () =>
212+
this.createSseFallback<TData>(
213+
configs.fallbackApiRoute,
214+
configs.searchParams,
215+
),
216+
);
217+
}
218+
219+
private waitForConnection<TData>(
220+
connection: UnidirectionalStream<TData>,
221+
onNotFound?: () => Promise<UnidirectionalStream<TData>>,
222+
): Promise<UnidirectionalStream<TData>> {
223+
return new Promise((resolve, reject) => {
224+
const cleanup = () => {
225+
connection.removeEventListener("open", handleOpen);
226+
connection.removeEventListener("error", handleError);
227+
};
228+
229+
const handleOpen = () => {
230+
cleanup();
231+
resolve(connection);
232+
};
233+
234+
const handleError = (event: ErrorEvent) => {
235+
cleanup();
236+
const is404 =
237+
event.message?.includes("404") ||
238+
event.error?.message?.includes("404");
239+
240+
if (is404 && onNotFound) {
241+
connection.close();
242+
onNotFound().then(resolve).catch(reject);
243+
} else {
244+
reject(event.error || new Error(event.message));
245+
}
246+
};
247+
248+
connection.addEventListener("open", handleOpen);
249+
connection.addEventListener("error", handleError);
170250
});
251+
}
171252

172-
webSocket.addEventListener("error", (event) => {
173-
wsLogger.logError(event.error, event.message);
253+
/**
254+
* Create SSE fallback connection
255+
*/
256+
private async createSseFallback<TData = unknown>(
257+
apiRoute: string,
258+
searchParams?: Record<string, string> | URLSearchParams,
259+
): Promise<UnidirectionalStream<TData>> {
260+
this.output.warn(`WebSocket failed, using SSE fallback: ${apiRoute}`);
261+
262+
const baseUrlRaw = this.getAxiosInstance().defaults.baseURL;
263+
if (!baseUrlRaw) {
264+
throw new Error("No base URL set on REST client");
265+
}
266+
267+
const baseUrl = new URL(baseUrlRaw);
268+
const sseConnection = new SseConnection({
269+
location: baseUrl,
270+
apiRoute,
271+
searchParams,
272+
axiosInstance: this.getAxiosInstance(),
174273
});
175274

176-
return webSocket;
275+
this.attachStreamLogger(sseConnection);
276+
return this.waitForConnection(sseConnection);
177277
}
178278
}
179279

src/api/streamingFetchAdapter.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { type AxiosInstance } from "axios";
2+
import { type FetchLikeInit, type FetchLikeResponse } from "eventsource";
3+
import { type IncomingMessage } from "http";
4+
5+
/**
6+
* Creates a fetch adapter using an Axios instance that returns streaming responses.
7+
* This is used by EventSource to make authenticated SSE connections.
8+
*/
9+
export function createStreamingFetchAdapter(
10+
axiosInstance: AxiosInstance,
11+
): (url: string | URL, init?: FetchLikeInit) => Promise<FetchLikeResponse> {
12+
return async (
13+
url: string | URL,
14+
init?: FetchLikeInit,
15+
): Promise<FetchLikeResponse> => {
16+
const urlStr = url.toString();
17+
18+
const response = await axiosInstance.request<IncomingMessage>({
19+
url: urlStr,
20+
signal: init?.signal,
21+
headers: init?.headers,
22+
responseType: "stream",
23+
validateStatus: () => true, // Don't throw on any status code
24+
});
25+
26+
const stream = new ReadableStream({
27+
start(controller) {
28+
response.data.on("data", (chunk: Buffer) => {
29+
controller.enqueue(chunk);
30+
});
31+
32+
response.data.on("end", () => {
33+
controller.close();
34+
});
35+
36+
response.data.on("error", (err: Error) => {
37+
controller.error(err);
38+
});
39+
},
40+
41+
cancel() {
42+
response.data.destroy();
43+
return Promise.resolve();
44+
},
45+
});
46+
47+
return {
48+
body: {
49+
getReader: () => stream.getReader(),
50+
},
51+
url: urlStr,
52+
status: response.status,
53+
redirected: response.request?.res?.responseUrl !== urlStr,
54+
headers: {
55+
get: (name: string) => {
56+
const value = response.headers[name.toLowerCase()];
57+
return value === undefined ? null : String(value);
58+
},
59+
},
60+
};
61+
};
62+
}

src/logging/wsLogger.ts renamed to src/logging/eventStreamLogger.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,35 @@ const numFormatter = new Intl.NumberFormat("en", {
1212
compactDisplay: "short",
1313
});
1414

15-
export class WsLogger {
15+
export class EventStreamLogger {
1616
private readonly logger: Logger;
1717
private readonly url: string;
1818
private readonly id: string;
19+
private readonly protocol: string;
1920
private readonly startedAt: number;
2021
private openedAt?: number;
2122
private msgCount = 0;
2223
private byteCount = 0;
2324
private unknownByteCount = false;
2425

25-
constructor(logger: Logger, url: string) {
26+
constructor(logger: Logger, url: string, protocol: "WS" | "SSE") {
2627
this.logger = logger;
2728
this.url = url;
29+
this.protocol = protocol;
2830
this.id = createRequestId();
2931
this.startedAt = Date.now();
3032
}
3133

3234
logConnecting(): void {
33-
this.logger.trace(`→ WS ${shortId(this.id)} ${this.url}`);
35+
this.logger.trace(`→ ${this.protocol} ${shortId(this.id)} ${this.url}`);
3436
}
3537

3638
logOpen(): void {
3739
this.openedAt = Date.now();
3840
const time = formatTime(this.openedAt - this.startedAt);
39-
this.logger.trace(`← WS ${shortId(this.id)} connected ${this.url} ${time}`);
41+
this.logger.trace(
42+
`← ${this.protocol} ${shortId(this.id)} connected ${this.url} ${time}`,
43+
);
4044
}
4145

4246
logMessage(data: unknown): void {
@@ -62,15 +66,15 @@ export class WsLogger {
6266
const statsStr = ` [${stats.join(", ")}]`;
6367

6468
this.logger.trace(
65-
`▣ WS ${shortId(this.id)} closed ${this.url}${codeStr}${reasonStr}${statsStr}`,
69+
`▣ ${this.protocol} ${shortId(this.id)} closed ${this.url}${codeStr}${reasonStr}${statsStr}`,
6670
);
6771
}
6872

6973
logError(error: unknown, message: string): void {
7074
const time = formatTime(Date.now() - this.startedAt);
7175
const errorMsg = message || errToStr(error, "connection error");
7276
this.logger.error(
73-
`✗ WS ${shortId(this.id)} error ${this.url} ${time} - ${errorMsg}`,
77+
`✗ ${this.protocol} ${shortId(this.id)} error ${this.url} ${time} - ${errorMsg}`,
7478
error,
7579
);
7680
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { type WebSocketEventType } from "coder/site/src/utils/OneWayWebSocket";
2+
import {
3+
type CloseEvent,
4+
type Event as WsEvent,
5+
type ErrorEvent,
6+
type MessageEvent,
7+
} from "ws";
8+
9+
// Event payload types matching OneWayWebSocket
10+
export type ParsedMessageEvent<TData> = Readonly<
11+
| {
12+
sourceEvent: MessageEvent;
13+
parsedMessage: TData;
14+
parseError: undefined;
15+
}
16+
| {
17+
sourceEvent: MessageEvent;
18+
parsedMessage: undefined;
19+
parseError: Error;
20+
}
21+
>;
22+
23+
export type EventPayloadMap<TData> = {
24+
close: CloseEvent;
25+
error: ErrorEvent;
26+
message: ParsedMessageEvent<TData>;
27+
open: WsEvent;
28+
};
29+
30+
export type EventHandler<TData, TEvent extends WebSocketEventType> = (
31+
payload: EventPayloadMap<TData>[TEvent],
32+
) => void;
33+
34+
/**
35+
* Common interface for both WebSocket and SSE connections that handle event streams.
36+
* Matches the OneWayWebSocket interface for compatibility.
37+
*/
38+
export interface UnidirectionalStream<TData> {
39+
readonly url: string;
40+
addEventListener<TEvent extends WebSocketEventType>(
41+
eventType: TEvent,
42+
callback: EventHandler<TData, TEvent>,
43+
): void;
44+
45+
removeEventListener<TEvent extends WebSocketEventType>(
46+
eventType: TEvent,
47+
callback: EventHandler<TData, TEvent>,
48+
): void;
49+
50+
close(code?: number, reason?: string): void;
51+
}

0 commit comments

Comments
 (0)