Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/strong-seas-impress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Decouple zod
1 change: 1 addition & 0 deletions apps/webapp/app/components/jobs/JobsTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ function classForJobStatus(status: JobRunStatus) {
case "WAITING_ON_CONNECTIONS":
case "PENDING":
case "UNRESOLVED_AUTH":
case "INVALID_PAYLOAD":
return "text-rose-500";
default:
return "";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/components/run/RunCompletedDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CodeBlock } from "~/components/code/CodeBlock";
import { DateTime } from "~/components/primitives/DateTime";
import { Paragraph } from "~/components/primitives/Paragraph";
import { RunStatusIcon, RunStatusLabel } from "~/components/runs/RunStatuses";
import { MatchedRun, useRun } from "~/hooks/useRun";
import { MatchedRun } from "~/hooks/useRun";
import { formatDuration } from "~/utils";
import {
RunPanel,
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/components/runs/RunStatuses.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ export function hasFinished(status: JobRunStatus): boolean {
status === "ABORTED" ||
status === "TIMED_OUT" ||
status === "CANCELED" ||
status === "UNRESOLVED_AUTH"
status === "UNRESOLVED_AUTH" ||
status === "INVALID_PAYLOAD"
);
}

Expand Down Expand Up @@ -49,6 +50,7 @@ export function RunStatusIcon({ status, className }: { status: JobRunStatus; cla
case "TIMED_OUT":
return <ExclamationTriangleIcon className={cn(runStatusClassNameColor(status), className)} />;
case "UNRESOLVED_AUTH":
case "INVALID_PAYLOAD":
return <XCircleIcon className={cn(runStatusClassNameColor(status), className)} />;
case "WAITING_ON_CONNECTIONS":
return <WrenchIcon className={cn(runStatusClassNameColor(status), className)} />;
Expand Down Expand Up @@ -77,6 +79,7 @@ export function runBasicStatus(status: JobRunStatus): RunBasicStatus {
case "UNRESOLVED_AUTH":
case "CANCELED":
case "ABORTED":
case "INVALID_PAYLOAD":
return "FAILED";
case "SUCCESS":
return "COMPLETED";
Expand Down Expand Up @@ -111,6 +114,8 @@ export function runStatusTitle(status: JobRunStatus): string {
return "Canceled";
case "UNRESOLVED_AUTH":
return "Unresolved auth";
case "INVALID_PAYLOAD":
return "Invalid payload";
default: {
const _exhaustiveCheck: never = status;
throw new Error(`Non-exhaustive match for value: ${status}`);
Expand All @@ -130,6 +135,7 @@ export function runStatusClassNameColor(status: JobRunStatus): string {
return "text-amber-300";
case "FAILURE":
case "UNRESOLVED_AUTH":
case "INVALID_PAYLOAD":
return "text-rose-500";
case "TIMED_OUT":
return "text-amber-300";
Expand Down
17 changes: 16 additions & 1 deletion apps/webapp/app/services/runs/performRunExecutionV1.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
CachedTaskSchema,
RunJobError,
RunJobInvalidPayloadError,
RunJobResumeWithTask,
RunJobRetryWithTask,
RunJobSuccess,
Expand Down Expand Up @@ -348,6 +349,11 @@ export class PerformRunExecutionV1Service {

break;
}
case "INVALID_PAYLOAD": {
await this.#failRunWithInvalidPayloadError(execution, safeBody.data);

break;
}
default: {
const _exhaustiveCheck: never = status;
throw new Error(`Non-exhaustive match for value: ${status}`);
Expand Down Expand Up @@ -453,6 +459,15 @@ export class PerformRunExecutionV1Service {
});
}

async #failRunWithInvalidPayloadError(
execution: FoundRunExecution,
data: RunJobInvalidPayloadError
) {
return await $transaction(this.#prismaClient, async (tx) => {
await this.#failRunExecution(tx, execution, data.errors, "INVALID_PAYLOAD");
});
}

async #retryRunWithTask(execution: FoundRunExecution, data: RunJobRetryWithTask) {
const { run } = execution;

Expand Down Expand Up @@ -572,7 +587,7 @@ export class PerformRunExecutionV1Service {
prisma: PrismaClientOrTransaction,
execution: FoundRunExecution,
output: Record<string, any>,
status: "FAILURE" | "ABORTED" | "UNRESOLVED_AUTH" = "FAILURE"
status: "FAILURE" | "ABORTED" | "UNRESOLVED_AUTH" | "INVALID_PAYLOAD" = "FAILURE"
): Promise<void> {
const { run } = execution;

Expand Down
25 changes: 24 additions & 1 deletion apps/webapp/app/services/runs/performRunExecutionV2.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
CachedTask,
RunJobError,
RunJobInvalidPayloadError,
RunJobResumeWithTask,
RunJobRetryWithTask,
RunJobSuccess,
Expand Down Expand Up @@ -360,6 +361,11 @@ export class PerformRunExecutionV2Service {

break;
}
case "INVALID_PAYLOAD": {
await this.#failRunWithInvalidPayloadError(run, safeBody.data, durationInMs);

break;
}
default: {
const _exhaustiveCheck: never = status;
throw new Error(`Non-exhaustive match for value: ${status}`);
Expand Down Expand Up @@ -455,6 +461,23 @@ export class PerformRunExecutionV2Service {
});
}

async #failRunWithInvalidPayloadError(
execution: FoundRun,
data: RunJobInvalidPayloadError,
durationInMs: number
) {
return await $transaction(this.#prismaClient, async (tx) => {
await this.#failRunExecution(
tx,
"EXECUTE_JOB",
execution,
data.errors,
"INVALID_PAYLOAD",
durationInMs
);
});
}

async #retryRunWithTask(
run: FoundRun,
data: RunJobRetryWithTask,
Expand Down Expand Up @@ -579,7 +602,7 @@ export class PerformRunExecutionV2Service {
reason: "EXECUTE_JOB" | "PREPROCESS",
run: FoundRun,
output: Record<string, any>,
status: "FAILURE" | "ABORTED" | "TIMED_OUT" | "UNRESOLVED_AUTH" = "FAILURE",
status: "FAILURE" | "ABORTED" | "TIMED_OUT" | "UNRESOLVED_AUTH" | "INVALID_PAYLOAD" = "FAILURE",
durationInMs: number = 0
): Promise<void> {
await $transaction(prisma, async (tx) => {
Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ulid } from "ulid";
import { z } from "zod";
import { Prettify } from "../types";
import { addMissingVersionField } from "./addMissingVersionField";
import { ErrorWithStackSchema } from "./errors";
import { ErrorWithStackSchema, SchemaErrorSchema } from "./errors";
import { EventRuleSchema } from "./eventFilter";
import { ConnectionAuthSchema, IntegrationConfigSchema } from "./integrations";
import { DeserializedJsonSchema, SerializableJsonSchema } from "./json";
Expand Down Expand Up @@ -437,6 +437,13 @@ export const RunJobErrorSchema = z.object({

export type RunJobError = z.infer<typeof RunJobErrorSchema>;

export const RunJobInvalidPayloadErrorSchema = z.object({
status: z.literal("INVALID_PAYLOAD"),
errors: z.array(SchemaErrorSchema),
});

export type RunJobInvalidPayloadError = z.infer<typeof RunJobInvalidPayloadErrorSchema>;

export const RunJobUnresolvedAuthErrorSchema = z.object({
status: z.literal("UNRESOLVED_AUTH_ERROR"),
issues: z.record(z.object({ id: z.string(), error: z.string() })),
Expand Down Expand Up @@ -477,6 +484,7 @@ export type RunJobSuccess = z.infer<typeof RunJobSuccessSchema>;
export const RunJobResponseSchema = z.discriminatedUnion("status", [
RunJobErrorSchema,
RunJobUnresolvedAuthErrorSchema,
RunJobInvalidPayloadErrorSchema,
RunJobResumeWithTaskSchema,
RunJobRetryWithTaskSchema,
RunJobCanceledWithTaskSchema,
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/schemas/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ export const ErrorWithStackSchema = z.object({
});

export type ErrorWithStack = z.infer<typeof ErrorWithStackSchema>;

export const SchemaErrorSchema = z.object({
path: z.array(z.string()),
message: z.string(),
});

export type SchemaError = z.infer<typeof SchemaErrorSchema>;
1 change: 1 addition & 0 deletions packages/core/src/schemas/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const RunStatusSchema = z.union([
z.literal("ABORTED"),
z.literal("CANCELED"),
z.literal("UNRESOLVED_AUTH"),
z.literal("INVALID_PAYLOAD"),
]);

export const RunTaskSchema = z.object({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterEnum
ALTER TYPE "JobRunStatus" ADD VALUE 'INVALID_PAYLOAD';
1 change: 1 addition & 0 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ enum JobRunStatus {
ABORTED
CANCELED
UNRESOLVED_AUTH
INVALID_PAYLOAD
}

model JobRunExecution {
Expand Down
3 changes: 1 addition & 2 deletions packages/trigger-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
"ulid": "^2.3.0",
"uuid": "^9.0.0",
"ws": "^8.11.0",
"zod": "3.21.4",
"zod-error": "1.5.0"
"zod": "3.21.4"
},
"devDependencies": {
"@trigger.dev/tsconfig": "workspace:*",
Expand Down
6 changes: 5 additions & 1 deletion packages/trigger-sdk/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ErrorWithStack, ServerTask } from "@trigger.dev/core";
import { ErrorWithStack, SchemaError, ServerTask } from "@trigger.dev/core";

export class ResumeWithTaskError {
constructor(public task: ServerTask) {}
Expand All @@ -16,6 +16,10 @@ export class CanceledWithTaskError {
constructor(public task: ServerTask) {}
}

export class ParsedPayloadSchemaError {
constructor(public schemaErrors: SchemaError[]) {}
}

/** Use this function if you're using a `try/catch` block to catch errors.
* It checks if a thrown error is a special internal error that you should ignore.
* If this returns `true` then you must rethrow the error: `throw err;`
Expand Down
11 changes: 10 additions & 1 deletion packages/trigger-sdk/src/triggerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ import {
StatusUpdate,
} from "@trigger.dev/core";
import { ApiClient } from "./apiClient";
import { CanceledWithTaskError, ResumeWithTaskError, RetryWithTaskError } from "./errors";
import {
CanceledWithTaskError,
ParsedPayloadSchemaError,
ResumeWithTaskError,
RetryWithTaskError,
} from "./errors";
import { TriggerIntegration } from "./integrations";
import { IO } from "./io";
import { createIOWithIntegrations } from "./ioWithIntegrations";
Expand Down Expand Up @@ -712,6 +717,10 @@ export class TriggerClient {

return { status: "SUCCESS", output };
} catch (error) {
if (error instanceof ParsedPayloadSchemaError) {
return { status: "INVALID_PAYLOAD", errors: error.schemaErrors };
}

if (error instanceof ResumeWithTaskError) {
return { status: "RESUME_WITH_TASK", task: error.task };
}
Expand Down
15 changes: 11 additions & 4 deletions packages/trigger-sdk/src/triggers/eventTrigger.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { EventFilter, TriggerMetadata, deepMergeFilters } from "@trigger.dev/core";
import { z } from "zod";
import { Job } from "../job";
import { TriggerClient } from "../triggerClient";
import { EventSpecification, EventSpecificationExample, Trigger } from "../types";
import { EventSpecification, EventSpecificationExample, SchemaParser, Trigger } from "../types";
import { formatSchemaErrors } from "../utils/formatSchemaErrors";
import { ParsedPayloadSchemaError } from "../errors";

type EventTriggerOptions<TEventSpecification extends EventSpecification<any>> = {
event: TEventSpecification;
Expand Down Expand Up @@ -50,7 +51,7 @@ type TriggerOptions<TEvent> = {
/** A [Zod](https://trigger.dev/docs/documentation/guides/zod) schema that defines the shape of the event payload.
* The default is `z.any()` which is `any`.
* */
schema?: z.Schema<TEvent>;
schema?: SchemaParser<TEvent>;
/** You can use this to filter events based on the source. */
source?: string;
/** Used to filter which events trigger the Job
Expand Down Expand Up @@ -94,7 +95,13 @@ export function eventTrigger<TEvent extends any = any>(
examples: options.examples,
parsePayload: (rawPayload: any) => {
if (options.schema) {
return options.schema.parse(rawPayload);
const results = options.schema.safeParse(rawPayload);

if (!results.success) {
throw new ParsedPayloadSchemaError(formatSchemaErrors(results.error.issues));
}

return results.data;
}

return rawPayload as any;
Expand Down
8 changes: 3 additions & 5 deletions packages/trigger-sdk/src/triggers/externalSource.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { z } from "zod";

import {
DisplayProperty,
EventFilter,
Expand All @@ -16,7 +14,7 @@ import { IOWithIntegrations, TriggerIntegration } from "../integrations";
import { IO } from "../io";
import { Job } from "../job";
import { TriggerClient } from "../triggerClient";
import type { EventSpecification, Trigger, TriggerContext } from "../types";
import type { EventSpecification, SchemaParser, Trigger, TriggerContext } from "../types";
import { slugifyId } from "../utils";
import { SerializableJson } from "@trigger.dev/core";
import { ConnectionAuth } from "@trigger.dev/core";
Expand Down Expand Up @@ -154,8 +152,8 @@ type ExternalSourceOptions<
> = {
id: string;
version: string;
schema: z.Schema<TParams>;
optionSchema?: z.Schema<TTriggerOptionDefinitions>;
schema: SchemaParser<TParams>;
optionSchema?: SchemaParser<TTriggerOptionDefinitions>;
integration: TIntegration;
register: RegisterFunction<TIntegration, TParams, TChannel, TTriggerOptionDefinitions>;
filter?: FilterFunction<TParams, TTriggerOptionDefinitions>;
Expand Down
13 changes: 13 additions & 0 deletions packages/trigger-sdk/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,16 @@ export interface EventSpecification<TEvent extends any> {

export type EventTypeFromSpecification<TEventSpec extends EventSpecification<any>> =
TEventSpec extends EventSpecification<infer TEvent> ? TEvent : never;

export type SchemaParserIssue = { path: PropertyKey[]; message: string };

export type SchemaParserResult<T> =
| {
success: true;
data: T;
}
| { success: false; error: { issues: SchemaParserIssue[] } };

export type SchemaParser<T extends unknown = unknown> = {
safeParse: (a: unknown) => SchemaParserResult<T>;
};
9 changes: 9 additions & 0 deletions packages/trigger-sdk/src/utils/formatSchemaErrors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { SchemaError } from "@trigger.dev/core";
import { SchemaParserIssue } from "../types";

export function formatSchemaErrors(errors: SchemaParserIssue[]): SchemaError[] {
return errors.map((error) => {
const { path, message } = error;
return { path: path.map(String), message };
});
}
2 changes: 0 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion references/job-catalog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
"@types/node": "20.4.2",
"typescript": "5.1.6",
"zod": "3.21.4",
"@trigger.dev/airtable": "workspace:*",
"@trigger.dev/linear": "workspace:*"
},
"trigger.dev": {
Expand Down
Loading