Skip to content

Improve efficiency of cancelling in progress TaskEvent records in v3 and v4 #2112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,43 @@ export class EventRepository {
});
}

async cancelEvents(events: TaskEventRecord[], cancelledAt: Date, reason: string) {
const eventsToCancel = events.filter((event) => event.isPartial);

if (eventsToCancel.length === 0) {
return;
}

await this.insertMany(
eventsToCancel.map((event) => ({
...omit(event, "id"),
isPartial: false,
isError: false,
isCancelled: true,
status: "ERROR",
links: event.links ?? [],
events: [
{
name: "cancellation",
time: cancelledAt,
properties: {
reason,
},
},
...((event.events as any[]) ?? []),
],
duration: calculateDurationFromStart(event.startTime, cancelledAt),
properties: event.properties as Attributes,
metadata: event.metadata as Attributes,
style: event.style as Attributes,
output: event.output as Attributes,
outputType: event.outputType,
payload: event.payload as Attributes,
payloadType: event.payloadType,
}))
);
}

async crashEvent({
event,
crashedAt,
Expand Down Expand Up @@ -394,28 +431,35 @@ export class EventRepository {
queryOptions,
startCreatedAt,
endCreatedAt,
{ spanId: true, isPartial: true, isCancelled: true }
{ spanId: true, isPartial: true, isCancelled: true },
undefined,
{ limit: 500 }
);

const filteredTaskEvents = taskEvents.filter((event) => {
// Event must be partial
if (!event.isPartial) return false;

// If the event is cancelled, it is not incomplete
if (event.isCancelled) return false;

if (allowCompleteDuplicate) {
return true;
// Optimize the filtering by pre-processing the data
const completeEventSpanIds = new Set<string>();
const incompleteEvents: Array<{ spanId: string }> = [];

// Single pass to categorize events and build lookup structures
for (const event of taskEvents) {
if (!event.isPartial && !event.isCancelled) {
// This is a complete event
completeEventSpanIds.add(event.spanId);
} else if (event.isPartial && !event.isCancelled) {
// This is a potentially incomplete event
incompleteEvents.push(event);
}
// Skip cancelled events as they are not incomplete
}

// There must not be another complete event with the same spanId
const hasCompleteDuplicate = taskEvents.some(
(otherEvent) =>
otherEvent.spanId === event.spanId && !otherEvent.isPartial && !otherEvent.isCancelled
);
// Filter incomplete events, excluding those with complete duplicates
const filteredTaskEvents = allowCompleteDuplicate
? incompleteEvents
: incompleteEvents.filter((event) => !completeEventSpanIds.has(event.spanId));

return !hasCompleteDuplicate;
});
if (filteredTaskEvents.length === 0) {
return [];
}

return this.#queryEvents(
storeTable,
Expand Down
9 changes: 3 additions & 6 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,9 @@ export function registerRunEngineEventBusHandlers() {
run.completedAt ?? undefined
);

await Promise.all(
inProgressEvents.map((event) => {
const error = createJsonErrorObject(run.error);
return eventRepository.cancelEvent(event, time, error.message);
})
);
const error = createJsonErrorObject(run.error);

await eventRepository.cancelEvents(inProgressEvents, time, error.message);
} catch (error) {
logger.error("[runCancelled] Failed to cancel event", {
error: error instanceof Error ? error.message : error,
Expand Down
7 changes: 2 additions & 5 deletions apps/webapp/app/v3/services/cancelTaskRunV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ export class CancelTaskRunServiceV1 extends BaseService {

logger.debug("Cancelling in-progress events", {
inProgressEvents: inProgressEvents.map((event) => event.id),
eventCount: inProgressEvents.length,
});

await Promise.all(
inProgressEvents.map((event) => {
return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason);
})
);
await eventRepository.cancelEvents(inProgressEvents, opts.cancelledAt, opts.reason);

// Cancel any in progress attempts
if (opts.cancelAttempts) {
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/taskEventStore.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class TaskEventStore {
endCreatedAt?: Date,
select?: TSelect,
orderBy?: Prisma.TaskEventOrderByWithRelationInput,
options?: { includeDebugLogs?: boolean }
options?: { includeDebugLogs?: boolean; limit?: number }
): Promise<Prisma.TaskEventGetPayload<{ select: TSelect }>[]> {
let finalWhere: Prisma.TaskEventWhereInput = where;

Expand Down Expand Up @@ -111,6 +111,7 @@ export class TaskEventStore {
},
select,
orderBy,
take: options?.limit,
})) as Prisma.TaskEventGetPayload<{ select: TSelect }>[];
} else {
// When partitioning is not enabled, we ignore the createdAt range.
Expand All @@ -121,6 +122,7 @@ export class TaskEventStore {
},
select,
orderBy,
take: options?.limit,
})) as Prisma.TaskEventGetPayload<{ select: TSelect }>[];
}
}
Expand Down