Skip to content

Commit e1d88f5

Browse files
Merge pull request #1218 from stainless-sdks/pakrym/support-streaming-background-responses
Support streaming background responses
2 parents 1bd29ac + f8ed654 commit e1d88f5

File tree

4 files changed

+166
-22
lines changed

4 files changed

+166
-22
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env -S npm run tsn -T
2+
3+
import OpenAI from 'openai';
4+
5+
const openai = new OpenAI();
6+
7+
async function main() {
8+
const runner = openai.responses.stream({
9+
model: 'gpt-4o-2024-08-06',
10+
input: 'solve 8x + 31 = 2',
11+
background: true,
12+
});
13+
14+
let id: string | null = null;
15+
16+
for await (const event of runner) {
17+
if (event.type == 'response.created') {
18+
id = event.response.id;
19+
}
20+
21+
console.log('event', event);
22+
if (event.sequence_number == 10) {
23+
break;
24+
}
25+
}
26+
27+
console.log('Interrupted. Continuing...');
28+
29+
const runner2 = openai.responses.stream({
30+
response_id: id!,
31+
starting_after: 10,
32+
});
33+
34+
for await (const event of runner2) {
35+
console.log('event', event);
36+
}
37+
38+
const result = await runner2.finalResponse();
39+
console.log(result);
40+
}
41+
42+
main();

src/lib/ResponsesParser.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { OpenAIError } from '../error';
22
import type { ChatCompletionTool } from '../resources/chat/completions';
33
import {
4+
ResponseTextConfig,
45
type FunctionTool,
56
type ParsedContent,
67
type ParsedResponse,
@@ -20,7 +21,9 @@ export type ResponseCreateParamsWithTools = ResponseCreateParamsBase & {
2021
tools?: ParseableToolsParams;
2122
};
2223

23-
export type ExtractParsedContentFromParams<Params extends ResponseCreateParamsWithTools> =
24+
type TextConfigParams = { text?: ResponseTextConfig };
25+
26+
export type ExtractParsedContentFromParams<Params extends TextConfigParams> =
2427
NonNullable<Params['text']>['format'] extends AutoParseableTextFormat<infer P> ? P : null;
2528

2629
export function maybeParseResponse<

src/lib/responses/ResponseStream.ts

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
ResponseTextConfig,
23
type ParsedResponse,
34
type Response,
45
type ResponseCreateParamsBase,
@@ -10,12 +11,40 @@ import { APIUserAbortError, OpenAIError } from '../../error';
1011
import OpenAI from '../../index';
1112
import { type BaseEvents, EventStream } from '../EventStream';
1213
import { type ResponseFunctionCallArgumentsDeltaEvent, type ResponseTextDeltaEvent } from './EventTypes';
13-
import { maybeParseResponse } from '../ResponsesParser';
14+
import { maybeParseResponse, ParseableToolsParams } from '../ResponsesParser';
15+
import { Stream } from 'openai/streaming';
1416

15-
export type ResponseStreamParams = Omit<ResponseCreateParamsBase, 'stream'> & {
17+
export type ResponseStreamParams = ResponseCreateAndStreamParams | ResponseStreamByIdParams;
18+
19+
export type ResponseCreateAndStreamParams = Omit<ResponseCreateParamsBase, 'stream'> & {
1620
stream?: true;
1721
};
1822

23+
export type ResponseStreamByIdParams = {
24+
/**
25+
* The ID of the response to stream.
26+
*/
27+
response_id: string;
28+
/**
29+
* If provided, the stream will start after the event with the given sequence number.
30+
*/
31+
starting_after?: number;
32+
/**
33+
* Configuration options for a text response from the model. Can be plain text or
34+
* structured JSON data. Learn more:
35+
*
36+
* - [Text inputs and outputs](https://platform.openai.com/docs/guides/text)
37+
* - [Structured Outputs](https://platform.openai.com/docs/guides/structured-outputs)
38+
*/
39+
text?: ResponseTextConfig;
40+
41+
/**
42+
* An array of tools the model may call while generating a response. When continuing a stream, provide
43+
* the same tools as the original request.
44+
*/
45+
tools?: ParseableToolsParams;
46+
};
47+
1948
type ResponseEvents = BaseEvents &
2049
Omit<
2150
{
@@ -52,7 +81,7 @@ export class ResponseStream<ParsedT = null>
5281
): ResponseStream<ParsedT> {
5382
const runner = new ResponseStream<ParsedT>(params as ResponseCreateParamsStreaming);
5483
runner._run(() =>
55-
runner._createResponse(client, params, {
84+
runner._createOrRetrieveResponse(client, params, {
5685
...options,
5786
headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' },
5887
}),
@@ -65,11 +94,17 @@ export class ResponseStream<ParsedT = null>
6594
this.#currentResponseSnapshot = undefined;
6695
}
6796

68-
#addEvent(this: ResponseStream<ParsedT>, event: ResponseStreamEvent) {
97+
#addEvent(this: ResponseStream<ParsedT>, event: ResponseStreamEvent, starting_after: number | null) {
6998
if (this.ended) return;
7099

100+
const maybeEmit = (name: string, event: ResponseStreamEvent & { snapshot?: string }) => {
101+
if (starting_after == null || event.sequence_number > starting_after) {
102+
this._emit(name as any, event);
103+
}
104+
};
105+
71106
const response = this.#accumulateResponse(event);
72-
this._emit('event', event);
107+
maybeEmit('event', event);
73108

74109
switch (event.type) {
75110
case 'response.output_text.delta': {
@@ -86,7 +121,7 @@ export class ResponseStream<ParsedT = null>
86121
throw new OpenAIError(`expected content to be 'output_text', got ${content.type}`);
87122
}
88123

89-
this._emit('response.output_text.delta', {
124+
maybeEmit('response.output_text.delta', {
90125
...event,
91126
snapshot: content.text,
92127
});
@@ -99,16 +134,15 @@ export class ResponseStream<ParsedT = null>
99134
throw new OpenAIError(`missing output at index ${event.output_index}`);
100135
}
101136
if (output.type === 'function_call') {
102-
this._emit('response.function_call_arguments.delta', {
137+
maybeEmit('response.function_call_arguments.delta', {
103138
...event,
104139
snapshot: output.arguments,
105140
});
106141
}
107142
break;
108143
}
109144
default:
110-
// @ts-ignore
111-
this._emit(event.type, event);
145+
maybeEmit(event.type, event);
112146
break;
113147
}
114148
}
@@ -128,9 +162,9 @@ export class ResponseStream<ParsedT = null>
128162
return parsedResponse;
129163
}
130164

131-
protected async _createResponse(
165+
protected async _createOrRetrieveResponse(
132166
client: OpenAI,
133-
params: ResponseStreamingParams,
167+
params: ResponseStreamParams,
134168
options?: Core.RequestOptions,
135169
): Promise<ParsedResponse<ParsedT>> {
136170
const signal = options?.signal;
@@ -140,13 +174,25 @@ export class ResponseStream<ParsedT = null>
140174
}
141175
this.#beginRequest();
142176

143-
const stream = await client.responses.create(
144-
{ ...params, stream: true },
145-
{ ...options, signal: this.controller.signal },
146-
);
177+
let stream: Stream<ResponseStreamEvent> | undefined;
178+
let starting_after: number | null = null;
179+
if ('response_id' in params) {
180+
stream = await client.responses.retrieve(
181+
params.response_id,
182+
{ stream: true },
183+
{ ...options, signal: this.controller.signal, stream: true },
184+
);
185+
starting_after = params.starting_after ?? null;
186+
} else {
187+
stream = await client.responses.create(
188+
{ ...params, stream: true },
189+
{ ...options, signal: this.controller.signal },
190+
);
191+
}
192+
147193
this._connected();
148194
for await (const event of stream) {
149-
this.#addEvent(event);
195+
this.#addEvent(event, starting_after);
150196
}
151197
if (stream.controller.signal?.aborted) {
152198
throw new APIUserAbortError();

src/resources/responses/responses.ts

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,21 +112,44 @@ export class Responses extends APIResource {
112112
* );
113113
* ```
114114
*/
115+
115116
retrieve(
116117
responseId: string,
117-
query?: ResponseRetrieveParams,
118+
query?: ResponseRetrieveParamsNonStreaming,
118119
options?: Core.RequestOptions,
119120
): Core.APIPromise<Response>;
121+
retrieve(
122+
responseId: string,
123+
query?: ResponseRetrieveParamsStreaming,
124+
options?: Core.RequestOptions,
125+
): Core.APIPromise<Stream<ResponseStreamEvent>>;
120126
retrieve(responseId: string, options?: Core.RequestOptions): Core.APIPromise<Response>;
127+
retrieve(
128+
responseId: string,
129+
query: ResponseRetrieveParams | Core.RequestOptions,
130+
options?: Core.RequestOptions,
131+
): Core.APIPromise<Response> | Core.APIPromise<Stream<ResponseStreamEvent>>;
121132
retrieve(
122133
responseId: string,
123134
query: ResponseRetrieveParams | Core.RequestOptions = {},
124135
options?: Core.RequestOptions,
125-
): Core.APIPromise<Response> {
126-
if (isRequestOptions(query)) {
136+
): Core.APIPromise<Response> | Core.APIPromise<Stream<ResponseStreamEvent>> {
137+
if (isRequestOptions(query) && options === undefined) {
127138
return this.retrieve(responseId, {}, query);
128139
}
129-
return this._client.get(`/responses/${responseId}`, { query, ...options });
140+
return (
141+
this._client.get(`/responses/${responseId}`, {
142+
query,
143+
...options,
144+
stream: query.stream ?? false,
145+
}) as APIPromise<Response> | APIPromise<Stream<ResponseStreamEvent>>
146+
)._thenUnwrap((rsp) => {
147+
if ('object' in rsp && rsp.object === 'response') {
148+
addOutputText(rsp as Response);
149+
}
150+
151+
return rsp;
152+
}) as APIPromise<Response> | APIPromise<Stream<ResponseStreamEvent>>;
130153
}
131154

132155
/**
@@ -4204,6 +4227,11 @@ export interface ResponseWebSearchCallCompletedEvent {
42044227
*/
42054228
item_id: string;
42064229

4230+
/**
4231+
* A sequence number for this chunk of the stream response.
4232+
*/
4233+
sequence_number: number;
4234+
42074235
/**
42084236
* The index of the output item that the web search call is associated with.
42094237
*/
@@ -4224,6 +4252,11 @@ export interface ResponseWebSearchCallInProgressEvent {
42244252
*/
42254253
item_id: string;
42264254

4255+
/**
4256+
* A sequence number for this chunk of the stream response.
4257+
*/
4258+
sequence_number: number;
4259+
42274260
/**
42284261
* The index of the output item that the web search call is associated with.
42294262
*/
@@ -4244,6 +4277,11 @@ export interface ResponseWebSearchCallSearchingEvent {
42444277
*/
42454278
item_id: string;
42464279

4280+
/**
4281+
* A sequence number for this chunk of the stream response.
4282+
*/
4283+
sequence_number: number;
4284+
42474285
/**
42484286
* The index of the output item that the web search call is associated with.
42494287
*/
@@ -4825,14 +4863,29 @@ export interface ResponseCreateParamsStreaming extends ResponseCreateParamsBase
48254863
stream: true;
48264864
}
48274865

4828-
export interface ResponseRetrieveParams {
4866+
export type ResponseRetrieveParams = ResponseRetrieveParamsStreaming | ResponseRetrieveParamsNonStreaming;
4867+
export interface ResponseRetrieveParamsBase {
48294868
/**
48304869
* Additional fields to include in the response. See the `include` parameter for
48314870
* Response creation above for more information.
48324871
*/
48334872
include?: Array<ResponseIncludable>;
4873+
4874+
starting_after?: number | null;
4875+
stream?: boolean | null;
48344876
}
48354877

4878+
export interface ResponseRetrieveParamsStreaming extends ResponseRetrieveParamsBase {
4879+
stream: true;
4880+
}
4881+
export interface ResponseRetrieveParamsNonStreaming extends ResponseRetrieveParamsBase {
4882+
stream?: false | null;
4883+
}
4884+
4885+
export namespace ResponseRetrieveParams {
4886+
export type ResponseRetrieveParamsStreaming = ResponsesAPI.ResponseRetrieveParamsStreaming;
4887+
export type ResponseRetrieveParamsNonStreaming = ResponsesAPI.ResponseRetrieveParamsNonStreaming;
4888+
}
48364889
Responses.InputItems = InputItems;
48374890

48384891
export declare namespace Responses {

0 commit comments

Comments
 (0)