diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 067302aba9..7d1625395f 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -326,6 +326,43 @@ export class EventRepository { }); } + async cancelEvents(events: TaskEventRecord[], cancelledAt: Date, reason: string) { + const eventsToCancel = events.filter((event) => event.isPartial); + + if (eventsToCancel.length === 0) { + return; + } + + await this.insertMany( + eventsToCancel.map((event) => ({ + ...omit(event, "id"), + isPartial: false, + isError: false, + isCancelled: true, + status: "ERROR", + links: event.links ?? [], + events: [ + { + name: "cancellation", + time: cancelledAt, + properties: { + reason, + }, + }, + ...((event.events as any[]) ?? []), + ], + duration: calculateDurationFromStart(event.startTime, cancelledAt), + properties: event.properties as Attributes, + metadata: event.metadata as Attributes, + style: event.style as Attributes, + output: event.output as Attributes, + outputType: event.outputType, + payload: event.payload as Attributes, + payloadType: event.payloadType, + })) + ); + } + async crashEvent({ event, crashedAt, @@ -394,28 +431,35 @@ export class EventRepository { queryOptions, startCreatedAt, endCreatedAt, - { spanId: true, isPartial: true, isCancelled: true } + { spanId: true, isPartial: true, isCancelled: true }, + undefined, + { limit: 500 } ); - const filteredTaskEvents = taskEvents.filter((event) => { - // Event must be partial - if (!event.isPartial) return false; - - // If the event is cancelled, it is not incomplete - if (event.isCancelled) return false; - - if (allowCompleteDuplicate) { - return true; + // Optimize the filtering by pre-processing the data + const completeEventSpanIds = new Set(); + const incompleteEvents: Array<{ spanId: string }> = []; + + // Single pass to categorize events and build lookup structures + for (const event of taskEvents) { + if (!event.isPartial && !event.isCancelled) { + // This is a complete event + completeEventSpanIds.add(event.spanId); + } else if (event.isPartial && !event.isCancelled) { + // This is a potentially incomplete event + incompleteEvents.push(event); } + // Skip cancelled events as they are not incomplete + } - // There must not be another complete event with the same spanId - const hasCompleteDuplicate = taskEvents.some( - (otherEvent) => - otherEvent.spanId === event.spanId && !otherEvent.isPartial && !otherEvent.isCancelled - ); + // Filter incomplete events, excluding those with complete duplicates + const filteredTaskEvents = allowCompleteDuplicate + ? incompleteEvents + : incompleteEvents.filter((event) => !completeEventSpanIds.has(event.spanId)); - return !hasCompleteDuplicate; - }); + if (filteredTaskEvents.length === 0) { + return []; + } return this.#queryEvents( storeTable, diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 2767e613db..2c2ed59a8a 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -309,12 +309,9 @@ export function registerRunEngineEventBusHandlers() { run.completedAt ?? undefined ); - await Promise.all( - inProgressEvents.map((event) => { - const error = createJsonErrorObject(run.error); - return eventRepository.cancelEvent(event, time, error.message); - }) - ); + const error = createJsonErrorObject(run.error); + + await eventRepository.cancelEvents(inProgressEvents, time, error.message); } catch (error) { logger.error("[runCancelled] Failed to cancel event", { error: error instanceof Error ? error.message : error, diff --git a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts index 78aec652f7..1a8c136e5c 100644 --- a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts @@ -95,13 +95,10 @@ export class CancelTaskRunServiceV1 extends BaseService { logger.debug("Cancelling in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), + eventCount: inProgressEvents.length, }); - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason); - }) - ); + await eventRepository.cancelEvents(inProgressEvents, opts.cancelledAt, opts.reason); // Cancel any in progress attempts if (opts.cancelAttempts) { diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index 1f926e89f8..7a87d29b35 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -77,7 +77,7 @@ export class TaskEventStore { endCreatedAt?: Date, select?: TSelect, orderBy?: Prisma.TaskEventOrderByWithRelationInput, - options?: { includeDebugLogs?: boolean } + options?: { includeDebugLogs?: boolean; limit?: number } ): Promise[]> { let finalWhere: Prisma.TaskEventWhereInput = where; @@ -111,6 +111,7 @@ export class TaskEventStore { }, select, orderBy, + take: options?.limit, })) as Prisma.TaskEventGetPayload<{ select: TSelect }>[]; } else { // When partitioning is not enabled, we ignore the createdAt range. @@ -121,6 +122,7 @@ export class TaskEventStore { }, select, orderBy, + take: options?.limit, })) as Prisma.TaskEventGetPayload<{ select: TSelect }>[]; } }