Skip to content

Commit 5acd6f2

Browse files
RobertCraigieyjp20
authored andcommitted
chore(internal): cleanup event stream helpers (#950)
* [wip]: refactor * a solution * Bind this * fix formatting --------- Co-authored-by: Young-Jin Park <[email protected]>
1 parent b0c2e26 commit 5acd6f2

File tree

5 files changed

+99
-395
lines changed

5 files changed

+99
-395
lines changed

src/lib/AbstractChatCompletionRunner.ts

Lines changed: 16 additions & 239 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
type ChatCompletionCreateParams,
99
type ChatCompletionTool,
1010
} from 'openai/resources/chat/completions';
11-
import { APIUserAbortError, OpenAIError } from 'openai/error';
11+
import { OpenAIError } from 'openai/error';
1212
import {
1313
type RunnableFunction,
1414
isRunnableFunctionWithParse,
@@ -20,75 +20,36 @@ import {
2020
ChatCompletionStreamingToolRunnerParams,
2121
} from './ChatCompletionStreamingRunner';
2222
import { isAssistantMessage, isFunctionMessage, isToolMessage } from './chatCompletionUtils';
23+
import { BaseEvents, EventStream } from './EventStream';
2324

2425
const DEFAULT_MAX_CHAT_COMPLETIONS = 10;
2526
export interface RunnerOptions extends Core.RequestOptions {
2627
/** How many requests to make before canceling. Default 10. */
2728
maxChatCompletions?: number;
2829
}
2930

30-
export abstract class AbstractChatCompletionRunner<
31-
Events extends CustomEvents<any> = AbstractChatCompletionRunnerEvents,
32-
> {
33-
controller: AbortController = new AbortController();
34-
35-
#connectedPromise: Promise<void>;
36-
#resolveConnectedPromise: () => void = () => {};
37-
#rejectConnectedPromise: (error: OpenAIError) => void = () => {};
38-
39-
#endPromise: Promise<void>;
40-
#resolveEndPromise: () => void = () => {};
41-
#rejectEndPromise: (error: OpenAIError) => void = () => {};
42-
43-
#listeners: { [Event in keyof Events]?: ListenersForEvent<Events, Event> } = {};
44-
31+
export class AbstractChatCompletionRunner<
32+
EventTypes extends AbstractChatCompletionRunnerEvents,
33+
> extends EventStream<EventTypes> {
4534
protected _chatCompletions: ChatCompletion[] = [];
4635
messages: ChatCompletionMessageParam[] = [];
4736

48-
#ended = false;
49-
#errored = false;
50-
#aborted = false;
51-
#catchingPromiseCreated = false;
52-
53-
constructor() {
54-
this.#connectedPromise = new Promise<void>((resolve, reject) => {
55-
this.#resolveConnectedPromise = resolve;
56-
this.#rejectConnectedPromise = reject;
57-
});
58-
59-
this.#endPromise = new Promise<void>((resolve, reject) => {
60-
this.#resolveEndPromise = resolve;
61-
this.#rejectEndPromise = reject;
62-
});
63-
64-
// Don't let these promises cause unhandled rejection errors.
65-
// we will manually cause an unhandled rejection error later
66-
// if the user hasn't registered any error listener or called
67-
// any promise-returning method.
68-
this.#connectedPromise.catch(() => {});
69-
this.#endPromise.catch(() => {});
70-
}
71-
72-
protected _run(executor: () => Promise<any>) {
73-
// Unfortunately if we call `executor()` immediately we get runtime errors about
74-
// references to `this` before the `super()` constructor call returns.
75-
setTimeout(() => {
76-
executor().then(() => {
77-
this._emitFinal();
78-
this._emit('end');
79-
}, this.#handleError);
80-
}, 0);
81-
}
82-
83-
protected _addChatCompletion(chatCompletion: ChatCompletion): ChatCompletion {
37+
protected _addChatCompletion(
38+
this: AbstractChatCompletionRunner<AbstractChatCompletionRunnerEvents>,
39+
chatCompletion: ChatCompletion,
40+
): ChatCompletion {
8441
this._chatCompletions.push(chatCompletion);
8542
this._emit('chatCompletion', chatCompletion);
8643
const message = chatCompletion.choices[0]?.message;
8744
if (message) this._addMessage(message as ChatCompletionMessageParam);
8845
return chatCompletion;
8946
}
9047

91-
protected _addMessage(message: ChatCompletionMessageParam, emit = true) {
48+
protected _addMessage(
49+
this: AbstractChatCompletionRunner<AbstractChatCompletionRunnerEvents>,
50+
message: ChatCompletionMessageParam,
51+
emit = true,
52+
) {
9253
if (!('content' in message)) message.content = null;
9354

9455
this.messages.push(message);
@@ -110,99 +71,6 @@ export abstract class AbstractChatCompletionRunner<
11071
}
11172
}
11273

113-
protected _connected() {
114-
if (this.ended) return;
115-
this.#resolveConnectedPromise();
116-
this._emit('connect');
117-
}
118-
119-
get ended(): boolean {
120-
return this.#ended;
121-
}
122-
123-
get errored(): boolean {
124-
return this.#errored;
125-
}
126-
127-
get aborted(): boolean {
128-
return this.#aborted;
129-
}
130-
131-
abort() {
132-
this.controller.abort();
133-
}
134-
135-
/**
136-
* Adds the listener function to the end of the listeners array for the event.
137-
* No checks are made to see if the listener has already been added. Multiple calls passing
138-
* the same combination of event and listener will result in the listener being added, and
139-
* called, multiple times.
140-
* @returns this ChatCompletionStream, so that calls can be chained
141-
*/
142-
on<Event extends keyof Events>(event: Event, listener: ListenerForEvent<Events, Event>): this {
143-
const listeners: ListenersForEvent<Events, Event> =
144-
this.#listeners[event] || (this.#listeners[event] = []);
145-
listeners.push({ listener });
146-
return this;
147-
}
148-
149-
/**
150-
* Removes the specified listener from the listener array for the event.
151-
* off() will remove, at most, one instance of a listener from the listener array. If any single
152-
* listener has been added multiple times to the listener array for the specified event, then
153-
* off() must be called multiple times to remove each instance.
154-
* @returns this ChatCompletionStream, so that calls can be chained
155-
*/
156-
off<Event extends keyof Events>(event: Event, listener: ListenerForEvent<Events, Event>): this {
157-
const listeners = this.#listeners[event];
158-
if (!listeners) return this;
159-
const index = listeners.findIndex((l) => l.listener === listener);
160-
if (index >= 0) listeners.splice(index, 1);
161-
return this;
162-
}
163-
164-
/**
165-
* Adds a one-time listener function for the event. The next time the event is triggered,
166-
* this listener is removed and then invoked.
167-
* @returns this ChatCompletionStream, so that calls can be chained
168-
*/
169-
once<Event extends keyof Events>(event: Event, listener: ListenerForEvent<Events, Event>): this {
170-
const listeners: ListenersForEvent<Events, Event> =
171-
this.#listeners[event] || (this.#listeners[event] = []);
172-
listeners.push({ listener, once: true });
173-
return this;
174-
}
175-
176-
/**
177-
* This is similar to `.once()`, but returns a Promise that resolves the next time
178-
* the event is triggered, instead of calling a listener callback.
179-
* @returns a Promise that resolves the next time given event is triggered,
180-
* or rejects if an error is emitted. (If you request the 'error' event,
181-
* returns a promise that resolves with the error).
182-
*
183-
* Example:
184-
*
185-
* const message = await stream.emitted('message') // rejects if the stream errors
186-
*/
187-
emitted<Event extends keyof Events>(
188-
event: Event,
189-
): Promise<
190-
EventParameters<Events, Event> extends [infer Param] ? Param
191-
: EventParameters<Events, Event> extends [] ? void
192-
: EventParameters<Events, Event>
193-
> {
194-
return new Promise((resolve, reject) => {
195-
this.#catchingPromiseCreated = true;
196-
if (event !== 'error') this.once('error', reject);
197-
this.once(event, resolve as any);
198-
});
199-
}
200-
201-
async done(): Promise<void> {
202-
this.#catchingPromiseCreated = true;
203-
await this.#endPromise;
204-
}
205-
20674
/**
20775
* @returns a promise that resolves with the final ChatCompletion, or rejects
20876
* if an error occurred or the stream ended prematurely without producing a ChatCompletion.
@@ -327,75 +195,7 @@ export abstract class AbstractChatCompletionRunner<
327195
return [...this._chatCompletions];
328196
}
329197

330-
#handleError = (error: unknown) => {
331-
this.#errored = true;
332-
if (error instanceof Error && error.name === 'AbortError') {
333-
error = new APIUserAbortError();
334-
}
335-
if (error instanceof APIUserAbortError) {
336-
this.#aborted = true;
337-
return this._emit('abort', error);
338-
}
339-
if (error instanceof OpenAIError) {
340-
return this._emit('error', error);
341-
}
342-
if (error instanceof Error) {
343-
const openAIError: OpenAIError = new OpenAIError(error.message);
344-
// @ts-ignore
345-
openAIError.cause = error;
346-
return this._emit('error', openAIError);
347-
}
348-
return this._emit('error', new OpenAIError(String(error)));
349-
};
350-
351-
protected _emit<Event extends keyof Events>(event: Event, ...args: EventParameters<Events, Event>) {
352-
// make sure we don't emit any events after end
353-
if (this.#ended) {
354-
return;
355-
}
356-
357-
if (event === 'end') {
358-
this.#ended = true;
359-
this.#resolveEndPromise();
360-
}
361-
362-
const listeners: ListenersForEvent<Events, Event> | undefined = this.#listeners[event];
363-
if (listeners) {
364-
this.#listeners[event] = listeners.filter((l) => !l.once) as any;
365-
listeners.forEach(({ listener }: any) => listener(...args));
366-
}
367-
368-
if (event === 'abort') {
369-
const error = args[0] as APIUserAbortError;
370-
if (!this.#catchingPromiseCreated && !listeners?.length) {
371-
Promise.reject(error);
372-
}
373-
this.#rejectConnectedPromise(error);
374-
this.#rejectEndPromise(error);
375-
this._emit('end');
376-
return;
377-
}
378-
379-
if (event === 'error') {
380-
// NOTE: _emit('error', error) should only be called from #handleError().
381-
382-
const error = args[0] as OpenAIError;
383-
if (!this.#catchingPromiseCreated && !listeners?.length) {
384-
// Trigger an unhandled rejection if the user hasn't registered any error handlers.
385-
// If you are seeing stack traces here, make sure to handle errors via either:
386-
// - runner.on('error', () => ...)
387-
// - await runner.done()
388-
// - await runner.finalChatCompletion()
389-
// - etc.
390-
Promise.reject(error);
391-
}
392-
this.#rejectConnectedPromise(error);
393-
this.#rejectEndPromise(error);
394-
this._emit('end');
395-
}
396-
}
397-
398-
protected _emitFinal() {
198+
protected override _emitFinal(this: AbstractChatCompletionRunner<AbstractChatCompletionRunnerEvents>) {
399199
const completion = this._chatCompletions[this._chatCompletions.length - 1];
400200
if (completion) this._emit('finalChatCompletion', completion);
401201
const finalMessage = this.#getFinalMessage();
@@ -650,27 +450,7 @@ export abstract class AbstractChatCompletionRunner<
650450
}
651451
}
652452

653-
type CustomEvents<Event extends string> = {
654-
[k in Event]: k extends keyof AbstractChatCompletionRunnerEvents ? AbstractChatCompletionRunnerEvents[k]
655-
: (...args: any[]) => void;
656-
};
657-
658-
type ListenerForEvent<Events extends CustomEvents<any>, Event extends keyof Events> = Event extends (
659-
keyof AbstractChatCompletionRunnerEvents
660-
) ?
661-
AbstractChatCompletionRunnerEvents[Event]
662-
: Events[Event];
663-
664-
type ListenersForEvent<Events extends CustomEvents<any>, Event extends keyof Events> = Array<{
665-
listener: ListenerForEvent<Events, Event>;
666-
once?: boolean;
667-
}>;
668-
type EventParameters<Events extends CustomEvents<any>, Event extends keyof Events> = Parameters<
669-
ListenerForEvent<Events, Event>
670-
>;
671-
672-
export interface AbstractChatCompletionRunnerEvents {
673-
connect: () => void;
453+
export interface AbstractChatCompletionRunnerEvents extends BaseEvents {
674454
functionCall: (functionCall: ChatCompletionMessage.FunctionCall) => void;
675455
message: (message: ChatCompletionMessageParam) => void;
676456
chatCompletion: (completion: ChatCompletion) => void;
@@ -680,8 +460,5 @@ export interface AbstractChatCompletionRunnerEvents {
680460
finalFunctionCall: (functionCall: ChatCompletionMessage.FunctionCall) => void;
681461
functionCallResult: (content: string) => void;
682462
finalFunctionCallResult: (content: string) => void;
683-
error: (error: OpenAIError) => void;
684-
abort: (error: APIUserAbortError) => void;
685-
end: () => void;
686463
totalUsage: (usage: CompletionUsage) => void;
687464
}

0 commit comments

Comments
 (0)