Skip to content

Commit 3023dc0

Browse files
authored
Wait for streaming to complete before ending traces in async mode (#575)
1 parent b148144 commit 3023dc0

File tree

5 files changed

+142
-1
lines changed

5 files changed

+142
-1
lines changed

.changeset/mean-spoons-stand.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openai/agents-core': patch
3+
---
4+
5+
Fixes a bug where `onTraceEnd` was called immediately after `onTraceStart` when streaming is enabled

packages/agents-core/src/result.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ export class StreamedRunResult<
243243
#completedPromiseResolve: (() => void) | undefined;
244244
#completedPromiseReject: ((err: unknown) => void) | undefined;
245245
#cancelled: boolean = false;
246+
#streamLoopPromise: Promise<void> | undefined;
246247

247248
constructor(
248249
result: {
@@ -413,4 +414,22 @@ export class StreamedRunResult<
413414
[Symbol.asyncIterator](): AsyncIterator<RunStreamEvent> {
414415
return this.#readableStream[Symbol.asyncIterator]();
415416
}
417+
418+
/**
419+
* @internal
420+
* Sets the stream loop promise that completes when the internal stream loop finishes.
421+
* This is used to defer trace end until all agent work is complete.
422+
*/
423+
_setStreamLoopPromise(promise: Promise<void>) {
424+
this.#streamLoopPromise = promise;
425+
}
426+
427+
/**
428+
* @internal
429+
* Returns a promise that resolves when the stream loop completes.
430+
* This is used by the tracing system to wait for all agent work before ending the trace.
431+
*/
432+
_getStreamLoopPromise(): Promise<void> | undefined {
433+
return this.#streamLoopPromise;
434+
}
416435
}

packages/agents-core/src/run.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,11 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
11381138
result.maxTurns = options.maxTurns ?? state._maxTurns;
11391139

11401140
// Continue the stream loop without blocking
1141-
this.#runStreamLoop(result, options, isResumedState).then(
1141+
const streamLoopPromise = this.#runStreamLoop(
1142+
result,
1143+
options,
1144+
isResumedState,
1145+
).then(
11421146
() => {
11431147
result._done();
11441148
},
@@ -1147,6 +1151,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
11471151
},
11481152
);
11491153

1154+
// Attach the stream loop promise so trace end waits for the loop to complete
1155+
result._setStreamLoopPromise(streamLoopPromise);
1156+
11501157
return result;
11511158
});
11521159
}

packages/agents-core/src/tracing/context.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { AsyncLocalStorage } from '@openai/agents-core/_shims';
22
import { Trace, TraceOptions } from './traces';
33
import { getGlobalTraceProvider } from './provider';
44
import { Span, SpanError } from './spans';
5+
import { StreamedRunResult } from '../result';
56

67
type ContextState = {
78
trace?: Trace;
@@ -58,6 +59,17 @@ function _wrapFunctionWithTraceLifecycle<T>(fn: (trace: Trace) => Promise<T>) {
5859

5960
await trace.start();
6061
const result = await fn(trace);
62+
63+
// If result is a StreamedRunResult, defer trace end until stream loop completes
64+
if (result instanceof StreamedRunResult) {
65+
const streamLoopPromise = result._getStreamLoopPromise();
66+
if (streamLoopPromise) {
67+
streamLoopPromise.finally(() => trace.end());
68+
return result;
69+
}
70+
}
71+
72+
// For non-streaming results, end trace synchronously
6173
await trace.end();
6274

6375
return result;

packages/agents-core/test/tracing.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ import { withAgentSpan } from '../src/tracing/createSpans';
3737

3838
import { TraceProvider } from '../src/tracing/provider';
3939

40+
import { Runner } from '../src/run';
41+
import { Agent } from '../src/agent';
42+
import { FakeModel, fakeModelMessage, FakeModelProvider } from './stubs';
43+
import { Usage } from '../src/usage';
44+
import * as protocol from '../src/types/protocol';
45+
import { setDefaultModelProvider } from '../src/providers';
46+
4047
class TestExporter implements TracingExporter {
4148
public exported: Array<(Trace | Span<any>)[]> = [];
4249

@@ -259,6 +266,97 @@ describe('withTrace & span helpers (integration)', () => {
259266
expect(startedIds).toContain(capturedSpanId);
260267
expect(endedIds).toContain(capturedSpanId);
261268
});
269+
270+
it('streaming run waits for stream loop to complete before calling onTraceEnd', async () => {
271+
// Set up model provider
272+
setDefaultModelProvider(new FakeModelProvider());
273+
274+
const traceStartTimes: number[] = [];
275+
const traceEndTimes: number[] = [];
276+
const spanEndTimes: number[] = [];
277+
278+
class OrderTrackingProcessor implements TracingProcessor {
279+
async onTraceStart(_trace: Trace): Promise<void> {
280+
traceStartTimes.push(Date.now());
281+
}
282+
async onTraceEnd(_trace: Trace): Promise<void> {
283+
traceEndTimes.push(Date.now());
284+
}
285+
async onSpanStart(_span: Span<any>): Promise<void> {
286+
// noop
287+
}
288+
async onSpanEnd(_span: Span<any>): Promise<void> {
289+
spanEndTimes.push(Date.now());
290+
}
291+
async shutdown(): Promise<void> {
292+
/* noop */
293+
}
294+
async forceFlush(): Promise<void> {
295+
/* noop */
296+
}
297+
}
298+
299+
const orderProcessor = new OrderTrackingProcessor();
300+
setTraceProcessors([orderProcessor]);
301+
302+
// Create a fake model that supports streaming
303+
class StreamingFakeModel extends FakeModel {
304+
async *getStreamedResponse(
305+
_request: any,
306+
): AsyncIterable<protocol.StreamEvent> {
307+
const response = await this.getResponse(_request);
308+
yield {
309+
type: 'response_done',
310+
response: {
311+
id: 'resp-1',
312+
usage: {
313+
requests: 1,
314+
inputTokens: 0,
315+
outputTokens: 0,
316+
totalTokens: 0,
317+
},
318+
output: response.output,
319+
},
320+
} as any;
321+
}
322+
}
323+
324+
const agent = new Agent({
325+
name: 'TestAgent',
326+
model: new StreamingFakeModel([
327+
{
328+
output: [fakeModelMessage('Final output')],
329+
usage: new Usage(),
330+
},
331+
]),
332+
});
333+
334+
const runner = new Runner({
335+
tracingDisabled: false,
336+
});
337+
338+
// Run with streaming
339+
const result = await runner.run(agent, 'test input', { stream: true });
340+
341+
// Consume the stream
342+
for await (const _event of result) {
343+
// consume all events
344+
}
345+
346+
// Wait for completion
347+
await result.completed;
348+
349+
// onTraceEnd should be called after all spans have ended
350+
expect(traceStartTimes.length).toBe(1);
351+
expect(traceEndTimes.length).toBe(1);
352+
expect(spanEndTimes.length).toBeGreaterThan(0);
353+
354+
// The trace should end after all spans have ended
355+
const lastSpanEndTime = Math.max(...spanEndTimes);
356+
const traceEndTime = traceEndTimes[0];
357+
358+
expect(traceEndTime).toBeGreaterThanOrEqual(lastSpanEndTime);
359+
});
262360
});
263361

264362
// -----------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)