Skip to content

Commit c46fa88

Browse files
authored
DX-1791: Add Failure Function (#32)
* feat: add failure functions * fix: fmt and tests * fix: fmt * fix: pass failure_url as positional param * fix: don't set retries header if retry==3 * fix: mv async ServeOptions to options.py and delete async.types.ts * fix: rm cast * fix: failure_function return types as Any and Awaitable[Any]
1 parent 35351ff commit c46fa88

File tree

20 files changed

+471
-102
lines changed

20 files changed

+471
-102
lines changed

tests/asyncio/test_context.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ async def test_workflow_headers(qstash_client: AsyncQStash) -> None:
3030
url=WORKFLOW_ENDPOINT,
3131
initial_payload="my-payload",
3232
env=None,
33-
retries=None,
33+
retries=1,
34+
failure_url=WORKFLOW_ENDPOINT,
3435
)
3536

3637
async def execute() -> None:
@@ -60,28 +61,41 @@ async def execute() -> None:
6061
"queue": None,
6162
"headers": {
6263
"Content-Type": "application/json",
63-
"Upstash-Callback": WORKFLOW_ENDPOINT,
64+
"Upstash-Method": "PATCH",
65+
"Upstash-Workflow-Init": "false",
66+
"Upstash-Workflow-RunId": "wfr-id",
67+
"Upstash-Workflow-Url": "https://www.my-website.com/api",
68+
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
69+
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
70+
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
71+
"Upstash-Failure-Callback-Workflow-Runid": "wfr-id",
72+
"Upstash-Failure-Callback-Workflow-Init": "false",
73+
"Upstash-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
74+
"Upstash-Failure-Callback-Workflow-Calltype": "failureCall",
75+
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
76+
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
77+
"Upstash-Callback-Failure-Callback-Workflow-Runid": "wfr-id",
78+
"Upstash-Callback-Failure-Callback-Workflow-Init": "false",
79+
"Upstash-Callback-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
80+
"Upstash-Callback-Failure-Callback-Workflow-Calltype": "failureCall",
81+
"Upstash-Failure-Callback-Retries": "1",
82+
"Upstash-Callback-Failure-Callback-Retries": "1",
83+
"Upstash-Retries": "10",
84+
"Upstash-Callback-Retries": "1",
85+
"Upstash-Forward-my-header": "my-value",
86+
"Upstash-Callback": "https://www.my-website.com/api",
87+
"Upstash-Callback-Workflow-RunId": "wfr-id",
88+
"Upstash-Callback-Workflow-CallType": "fromCallback",
89+
"Upstash-Callback-Workflow-Init": "false",
90+
"Upstash-Callback-Workflow-Url": "https://www.my-website.com/api",
6491
"Upstash-Callback-Feature-Set": "LazyFetch,InitialBody",
6592
"Upstash-Callback-Forward-Upstash-Workflow-Callback": "true",
66-
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
67-
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
6893
"Upstash-Callback-Forward-Upstash-Workflow-StepId": "1",
6994
"Upstash-Callback-Forward-Upstash-Workflow-StepName": "my-step",
7095
"Upstash-Callback-Forward-Upstash-Workflow-StepType": "Call",
71-
"Upstash-Callback-Retries": "3",
72-
"Upstash-Callback-Workflow-CallType": "fromCallback",
73-
"Upstash-Callback-Workflow-Init": "false",
74-
"Upstash-Callback-Workflow-RunId": "wfr-id",
75-
"Upstash-Callback-Workflow-Url": WORKFLOW_ENDPOINT,
76-
"Upstash-Failure-Callback-Retries": "3",
77-
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
78-
"Upstash-Forward-my-header": "my-value",
79-
"Upstash-Method": "PATCH",
80-
"Upstash-Retries": str(retries),
96+
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
97+
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
8198
"Upstash-Workflow-CallType": "toCallback",
82-
"Upstash-Workflow-Init": "false",
83-
"Upstash-Workflow-RunId": "wfr-id",
84-
"Upstash-Workflow-Url": WORKFLOW_ENDPOINT,
8599
},
86100
}
87101
],

tests/test_context.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def test_workflow_headers(qstash_client: QStash) -> None:
2929
url=WORKFLOW_ENDPOINT,
3030
initial_payload="my-payload",
3131
env=None,
32-
retries=None,
32+
retries=1,
33+
failure_url=WORKFLOW_ENDPOINT,
3334
)
3435

3536
def execute() -> None:
@@ -59,28 +60,41 @@ def execute() -> None:
5960
"queue": None,
6061
"headers": {
6162
"Content-Type": "application/json",
62-
"Upstash-Callback": WORKFLOW_ENDPOINT,
63+
"Upstash-Method": "PATCH",
64+
"Upstash-Workflow-Init": "false",
65+
"Upstash-Workflow-RunId": "wfr-id",
66+
"Upstash-Workflow-Url": "https://www.my-website.com/api",
67+
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
68+
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
69+
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
70+
"Upstash-Failure-Callback-Workflow-Runid": "wfr-id",
71+
"Upstash-Failure-Callback-Workflow-Init": "false",
72+
"Upstash-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
73+
"Upstash-Failure-Callback-Workflow-Calltype": "failureCall",
74+
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
75+
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
76+
"Upstash-Callback-Failure-Callback-Workflow-Runid": "wfr-id",
77+
"Upstash-Callback-Failure-Callback-Workflow-Init": "false",
78+
"Upstash-Callback-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
79+
"Upstash-Callback-Failure-Callback-Workflow-Calltype": "failureCall",
80+
"Upstash-Failure-Callback-Retries": "1",
81+
"Upstash-Callback-Failure-Callback-Retries": "1",
82+
"Upstash-Retries": "10",
83+
"Upstash-Callback-Retries": "1",
84+
"Upstash-Forward-my-header": "my-value",
85+
"Upstash-Callback": "https://www.my-website.com/api",
86+
"Upstash-Callback-Workflow-RunId": "wfr-id",
87+
"Upstash-Callback-Workflow-CallType": "fromCallback",
88+
"Upstash-Callback-Workflow-Init": "false",
89+
"Upstash-Callback-Workflow-Url": "https://www.my-website.com/api",
6390
"Upstash-Callback-Feature-Set": "LazyFetch,InitialBody",
6491
"Upstash-Callback-Forward-Upstash-Workflow-Callback": "true",
65-
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
66-
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
6792
"Upstash-Callback-Forward-Upstash-Workflow-StepId": "1",
6893
"Upstash-Callback-Forward-Upstash-Workflow-StepName": "my-step",
6994
"Upstash-Callback-Forward-Upstash-Workflow-StepType": "Call",
70-
"Upstash-Callback-Retries": "3",
71-
"Upstash-Callback-Workflow-CallType": "fromCallback",
72-
"Upstash-Callback-Workflow-Init": "false",
73-
"Upstash-Callback-Workflow-RunId": "wfr-id",
74-
"Upstash-Callback-Workflow-Url": WORKFLOW_ENDPOINT,
75-
"Upstash-Failure-Callback-Retries": "3",
76-
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
77-
"Upstash-Forward-my-header": "my-value",
78-
"Upstash-Method": "PATCH",
79-
"Upstash-Retries": str(retries),
95+
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
96+
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
8097
"Upstash-Workflow-CallType": "toCallback",
81-
"Upstash-Workflow-Init": "false",
82-
"Upstash-Workflow-RunId": "wfr-id",
83-
"Upstash-Workflow-Url": WORKFLOW_ENDPOINT,
8498
},
8599
}
86100
],

upstash_workflow/asyncio/context/auto_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async def submit_steps_to_qstash(
8484
self.context.retries,
8585
lazy_step.retries if isinstance(lazy_step, _LazyCallStep) else None,
8686
lazy_step.timeout if isinstance(lazy_step, _LazyCallStep) else None,
87+
self.context.failure_url,
8788
).headers
8889

8990
will_wait = (

upstash_workflow/asyncio/context/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(
4747
headers: Dict[str, str],
4848
steps: List[DefaultStep],
4949
url: str,
50+
failure_url: Optional[str],
5051
initial_payload: TInitialPayload,
5152
env: Optional[Dict[str, Optional[str]]] = None,
5253
retries: Optional[int] = None,
@@ -55,6 +56,7 @@ def __init__(
5556
self.workflow_run_id: str = workflow_run_id
5657
self._steps: List[DefaultStep] = steps
5758
self.url: str = url
59+
self.failure_url = failure_url
5860
self.headers: Dict[str, str] = headers
5961
self.request_payload: TInitialPayload = initial_payload
6062
self.env: Dict[str, Optional[str]] = env or {}

upstash_workflow/asyncio/serve/authorization.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def try_authentication(
3838
initial_payload=context.request_payload,
3939
env=context.env,
4040
retries=context.retries,
41+
failure_url=context.failure_url,
4142
)
4243

4344
try:

upstash_workflow/asyncio/serve/options.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,42 @@
11
import os
22
import json
33
import logging
4-
from typing import Callable, Dict, Optional, cast, TypeVar
4+
from typing import Callable, Dict, Optional, cast, TypeVar, Any, Generic, Awaitable
55
from qstash import AsyncQStash, Receiver
66
from upstash_workflow.workflow_types import _Response
77
from upstash_workflow.constants import DEFAULT_RETRIES
88
from upstash_workflow.types import (
99
_FinishCondition,
1010
)
11-
from upstash_workflow.asyncio.types import ServeBaseOptions
11+
from upstash_workflow import AsyncWorkflowContext
1212

13-
_logger = logging.getLogger(__name__)
13+
from dataclasses import dataclass
1414

15-
TResponse = TypeVar("TResponse")
15+
_logger = logging.getLogger(__name__)
1616
TInitialPayload = TypeVar("TInitialPayload")
17+
TResponse = TypeVar("TResponse")
18+
19+
20+
@dataclass
21+
class ServeOptions(Generic[TInitialPayload, TResponse]):
22+
qstash_client: AsyncQStash
23+
initial_payload_parser: Callable[[str], TInitialPayload]
24+
receiver: Optional[Receiver]
25+
base_url: Optional[str]
26+
env: Dict[str, Optional[str]]
27+
retries: int
28+
url: Optional[str]
29+
failure_function: Optional[
30+
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
31+
]
32+
failure_url: Optional[str]
33+
34+
35+
@dataclass
36+
class ServeBaseOptions(
37+
Generic[TInitialPayload, TResponse], ServeOptions[TInitialPayload, TResponse]
38+
):
39+
on_step_finish: Callable[[str, _FinishCondition], TResponse]
1740

1841

1942
def _process_options(
@@ -26,6 +49,10 @@ def _process_options(
2649
env: Optional[Dict[str, Optional[str]]] = None,
2750
retries: Optional[int] = DEFAULT_RETRIES,
2851
url: Optional[str] = None,
52+
failure_function: Optional[
53+
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
54+
] = None,
55+
failure_url: Optional[str] = None,
2956
) -> ServeBaseOptions[TInitialPayload, TResponse]:
3057
environment = env if env is not None else dict(os.environ)
3158

@@ -94,6 +121,8 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
94121
env=environment,
95122
retries=DEFAULT_RETRIES if retries is None else retries,
96123
url=url,
124+
failure_url=failure_url,
125+
failure_function=failure_function,
97126
)
98127

99128

upstash_workflow/asyncio/serve/serve.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from upstash_workflow.workflow_types import _Response, _AsyncRequest
66
from upstash_workflow.asyncio.workflow_parser import (
77
_get_payload,
8+
_handle_failure,
89
)
910
from upstash_workflow.workflow_parser import _validate_request, _parse_request
1011
from upstash_workflow.asyncio.workflow_requests import (
@@ -39,6 +40,10 @@ def _serve_base(
3940
env: Optional[Dict[str, Optional[str]]] = None,
4041
retries: Optional[int] = None,
4142
url: Optional[str] = None,
43+
failure_function: Optional[
44+
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
45+
] = None,
46+
failure_url: Optional[str] = None,
4247
) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]:
4348
processed_options = _process_options(
4449
qstash_client=qstash_client,
@@ -49,6 +54,8 @@ def _serve_base(
4954
env=env,
5055
retries=retries,
5156
url=url,
57+
failure_function=failure_function,
58+
failure_url=failure_url,
5259
)
5360
qstash_client = processed_options.qstash_client
5461
on_step_finish = processed_options.on_step_finish
@@ -58,9 +65,17 @@ def _serve_base(
5865
env = processed_options.env
5966
retries = processed_options.retries
6067
url = processed_options.url
68+
failure_url = processed_options.failure_url
69+
failure_function = processed_options.failure_function
6170

6271
async def _handler(request: TRequest) -> TResponse:
63-
workflow_url = _determine_urls(cast(_AsyncRequest, request), url, base_url)
72+
workflow_url, workflow_failure_url = _determine_urls(
73+
cast(_AsyncRequest, request),
74+
url,
75+
base_url,
76+
False if failure_function is None else True,
77+
failure_url,
78+
)
6479

6580
request_payload = await _get_payload(request) or ""
6681
_verify_request(
@@ -78,6 +93,20 @@ async def _handler(request: TRequest) -> TResponse:
7893
raw_initial_payload = parse_request_response.raw_initial_payload
7994
steps = parse_request_response.steps
8095

96+
failure_check = await _handle_failure(
97+
request,
98+
request_payload,
99+
qstash_client,
100+
initial_payload_parser,
101+
route_function,
102+
failure_function,
103+
env,
104+
retries,
105+
)
106+
107+
if failure_check == "is-failure-callback":
108+
return on_step_finish(workflow_run_id, "failure-callback")
109+
81110
workflow_context = AsyncWorkflowContext(
82111
qstash_client=qstash_client,
83112
workflow_run_id=workflow_run_id,
@@ -89,6 +118,7 @@ async def _handler(request: TRequest) -> TResponse:
89118
url=workflow_url,
90119
env=env,
91120
retries=retries,
121+
failure_url=workflow_failure_url,
92122
)
93123

94124
auth_check = await _DisabledWorkflowContext[Any].try_authentication(
@@ -110,6 +140,7 @@ async def _handler(request: TRequest) -> TResponse:
110140
raw_initial_payload,
111141
qstash_client,
112142
workflow_url,
143+
workflow_failure_url,
113144
retries,
114145
)
115146

@@ -153,6 +184,10 @@ def serve(
153184
env: Optional[Dict[str, Optional[str]]] = None,
154185
retries: Optional[int] = None,
155186
url: Optional[str] = None,
187+
failure_function: Optional[
188+
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
189+
] = None,
190+
failure_url: Optional[str] = None,
156191
) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]:
157192
"""
158193
Creates a method that handles incoming requests and runs the provided
@@ -178,4 +213,6 @@ def serve(
178213
env=env,
179214
retries=retries,
180215
url=url,
216+
failure_function=failure_function,
217+
failure_url=failure_url,
181218
)

upstash_workflow/asyncio/types.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)