Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions tests/asyncio/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async def test_workflow_headers(qstash_client: AsyncQStash) -> None:
url=WORKFLOW_ENDPOINT,
initial_payload="my-payload",
env=None,
retries=None,
retries=1,
failure_url=WORKFLOW_ENDPOINT,
)

async def execute() -> None:
Expand Down Expand Up @@ -60,28 +61,41 @@ async def execute() -> None:
"queue": None,
"headers": {
"Content-Type": "application/json",
"Upstash-Callback": WORKFLOW_ENDPOINT,
"Upstash-Method": "PATCH",
"Upstash-Workflow-Init": "false",
"Upstash-Workflow-RunId": "wfr-id",
"Upstash-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
"Upstash-Failure-Callback-Workflow-Runid": "wfr-id",
"Upstash-Failure-Callback-Workflow-Init": "false",
"Upstash-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Failure-Callback-Workflow-Calltype": "failureCall",
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
"Upstash-Callback-Failure-Callback-Workflow-Runid": "wfr-id",
"Upstash-Callback-Failure-Callback-Workflow-Init": "false",
"Upstash-Callback-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Callback-Failure-Callback-Workflow-Calltype": "failureCall",
"Upstash-Failure-Callback-Retries": "1",
"Upstash-Callback-Failure-Callback-Retries": "1",
"Upstash-Retries": "10",
"Upstash-Callback-Retries": "1",
"Upstash-Forward-my-header": "my-value",
"Upstash-Callback": "https://www.my-website.com/api",
"Upstash-Callback-Workflow-RunId": "wfr-id",
"Upstash-Callback-Workflow-CallType": "fromCallback",
"Upstash-Callback-Workflow-Init": "false",
"Upstash-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Callback-Feature-Set": "LazyFetch,InitialBody",
"Upstash-Callback-Forward-Upstash-Workflow-Callback": "true",
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
"Upstash-Callback-Forward-Upstash-Workflow-StepId": "1",
"Upstash-Callback-Forward-Upstash-Workflow-StepName": "my-step",
"Upstash-Callback-Forward-Upstash-Workflow-StepType": "Call",
"Upstash-Callback-Retries": "3",
"Upstash-Callback-Workflow-CallType": "fromCallback",
"Upstash-Callback-Workflow-Init": "false",
"Upstash-Callback-Workflow-RunId": "wfr-id",
"Upstash-Callback-Workflow-Url": WORKFLOW_ENDPOINT,
"Upstash-Failure-Callback-Retries": "3",
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
"Upstash-Forward-my-header": "my-value",
"Upstash-Method": "PATCH",
"Upstash-Retries": str(retries),
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
"Upstash-Workflow-CallType": "toCallback",
"Upstash-Workflow-Init": "false",
"Upstash-Workflow-RunId": "wfr-id",
"Upstash-Workflow-Url": WORKFLOW_ENDPOINT,
},
}
],
Expand Down
48 changes: 31 additions & 17 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_workflow_headers(qstash_client: QStash) -> None:
url=WORKFLOW_ENDPOINT,
initial_payload="my-payload",
env=None,
retries=None,
retries=1,
failure_url=WORKFLOW_ENDPOINT,
)

def execute() -> None:
Expand Down Expand Up @@ -59,28 +60,41 @@ def execute() -> None:
"queue": None,
"headers": {
"Content-Type": "application/json",
"Upstash-Callback": WORKFLOW_ENDPOINT,
"Upstash-Method": "PATCH",
"Upstash-Workflow-Init": "false",
"Upstash-Workflow-RunId": "wfr-id",
"Upstash-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
"Upstash-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
"Upstash-Failure-Callback-Workflow-Runid": "wfr-id",
"Upstash-Failure-Callback-Workflow-Init": "false",
"Upstash-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Failure-Callback-Workflow-Calltype": "failureCall",
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Is-Failure": "true",
"Upstash-Callback-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback": "true",
"Upstash-Callback-Failure-Callback-Workflow-Runid": "wfr-id",
"Upstash-Callback-Failure-Callback-Workflow-Init": "false",
"Upstash-Callback-Failure-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Callback-Failure-Callback-Workflow-Calltype": "failureCall",
"Upstash-Failure-Callback-Retries": "1",
"Upstash-Callback-Failure-Callback-Retries": "1",
"Upstash-Retries": "10",
"Upstash-Callback-Retries": "1",
"Upstash-Forward-my-header": "my-value",
"Upstash-Callback": "https://www.my-website.com/api",
"Upstash-Callback-Workflow-RunId": "wfr-id",
"Upstash-Callback-Workflow-CallType": "fromCallback",
"Upstash-Callback-Workflow-Init": "false",
"Upstash-Callback-Workflow-Url": "https://www.my-website.com/api",
"Upstash-Callback-Feature-Set": "LazyFetch,InitialBody",
"Upstash-Callback-Forward-Upstash-Workflow-Callback": "true",
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
"Upstash-Callback-Forward-Upstash-Workflow-StepId": "1",
"Upstash-Callback-Forward-Upstash-Workflow-StepName": "my-step",
"Upstash-Callback-Forward-Upstash-Workflow-StepType": "Call",
"Upstash-Callback-Retries": "3",
"Upstash-Callback-Workflow-CallType": "fromCallback",
"Upstash-Callback-Workflow-Init": "false",
"Upstash-Callback-Workflow-RunId": "wfr-id",
"Upstash-Callback-Workflow-Url": WORKFLOW_ENDPOINT,
"Upstash-Failure-Callback-Retries": "3",
"Upstash-Feature-Set": "WF_NoDelete,InitialBody",
"Upstash-Forward-my-header": "my-value",
"Upstash-Method": "PATCH",
"Upstash-Retries": str(retries),
"Upstash-Callback-Forward-Upstash-Workflow-Concurrent": "1",
"Upstash-Callback-Forward-Upstash-Workflow-ContentType": "application/json",
"Upstash-Workflow-CallType": "toCallback",
"Upstash-Workflow-Init": "false",
"Upstash-Workflow-RunId": "wfr-id",
"Upstash-Workflow-Url": WORKFLOW_ENDPOINT,
},
}
],
Expand Down
1 change: 1 addition & 0 deletions upstash_workflow/asyncio/context/auto_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async def submit_steps_to_qstash(
self.context.retries,
lazy_step.retries if isinstance(lazy_step, _LazyCallStep) else None,
lazy_step.timeout if isinstance(lazy_step, _LazyCallStep) else None,
self.context.failure_url,
).headers

will_wait = (
Expand Down
2 changes: 2 additions & 0 deletions upstash_workflow/asyncio/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
headers: Dict[str, str],
steps: List[DefaultStep],
url: str,
failure_url: Optional[str],
initial_payload: TInitialPayload,
env: Optional[Dict[str, Optional[str]]] = None,
retries: Optional[int] = None,
Expand All @@ -55,6 +56,7 @@ def __init__(
self.workflow_run_id: str = workflow_run_id
self._steps: List[DefaultStep] = steps
self.url: str = url
self.failure_url = failure_url
self.headers: Dict[str, str] = headers
self.request_payload: TInitialPayload = initial_payload
self.env: Dict[str, Optional[str]] = env or {}
Expand Down
1 change: 1 addition & 0 deletions upstash_workflow/asyncio/serve/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async def try_authentication(
initial_payload=context.request_payload,
env=context.env,
retries=context.retries,
failure_url=context.failure_url,
)

try:
Expand Down
37 changes: 33 additions & 4 deletions upstash_workflow/asyncio/serve/options.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
import os
import json
import logging
from typing import Callable, Dict, Optional, cast, TypeVar
from typing import Callable, Dict, Optional, cast, TypeVar, Any, Generic, Awaitable
from qstash import AsyncQStash, Receiver
from upstash_workflow.workflow_types import _Response
from upstash_workflow.constants import DEFAULT_RETRIES
from upstash_workflow.types import (
_FinishCondition,
)
from upstash_workflow.asyncio.types import ServeBaseOptions
from upstash_workflow import AsyncWorkflowContext

_logger = logging.getLogger(__name__)
from dataclasses import dataclass

TResponse = TypeVar("TResponse")
_logger = logging.getLogger(__name__)
TInitialPayload = TypeVar("TInitialPayload")
TResponse = TypeVar("TResponse")


@dataclass
class ServeOptions(Generic[TInitialPayload, TResponse]):
qstash_client: AsyncQStash
initial_payload_parser: Callable[[str], TInitialPayload]
receiver: Optional[Receiver]
base_url: Optional[str]
env: Dict[str, Optional[str]]
retries: int
url: Optional[str]
failure_function: Optional[
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
]
failure_url: Optional[str]


@dataclass
class ServeBaseOptions(
Generic[TInitialPayload, TResponse], ServeOptions[TInitialPayload, TResponse]
):
on_step_finish: Callable[[str, _FinishCondition], TResponse]


def _process_options(
Expand All @@ -26,6 +49,10 @@ def _process_options(
env: Optional[Dict[str, Optional[str]]] = None,
retries: Optional[int] = DEFAULT_RETRIES,
url: Optional[str] = None,
failure_function: Optional[
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
] = None,
failure_url: Optional[str] = None,
) -> ServeBaseOptions[TInitialPayload, TResponse]:
environment = env if env is not None else dict(os.environ)

Expand Down Expand Up @@ -94,6 +121,8 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
env=environment,
retries=DEFAULT_RETRIES if retries is None else retries,
url=url,
failure_url=failure_url,
failure_function=failure_function,
)


Expand Down
39 changes: 38 additions & 1 deletion upstash_workflow/asyncio/serve/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from upstash_workflow.workflow_types import _Response, _AsyncRequest
from upstash_workflow.asyncio.workflow_parser import (
_get_payload,
_handle_failure,
)
from upstash_workflow.workflow_parser import _validate_request, _parse_request
from upstash_workflow.asyncio.workflow_requests import (
Expand Down Expand Up @@ -39,6 +40,10 @@ def _serve_base(
env: Optional[Dict[str, Optional[str]]] = None,
retries: Optional[int] = None,
url: Optional[str] = None,
failure_function: Optional[
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
] = None,
failure_url: Optional[str] = None,
) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]:
processed_options = _process_options(
qstash_client=qstash_client,
Expand All @@ -49,6 +54,8 @@ def _serve_base(
env=env,
retries=retries,
url=url,
failure_function=failure_function,
failure_url=failure_url,
)
qstash_client = processed_options.qstash_client
on_step_finish = processed_options.on_step_finish
Expand All @@ -58,9 +65,17 @@ def _serve_base(
env = processed_options.env
retries = processed_options.retries
url = processed_options.url
failure_url = processed_options.failure_url
failure_function = processed_options.failure_function

async def _handler(request: TRequest) -> TResponse:
workflow_url = _determine_urls(cast(_AsyncRequest, request), url, base_url)
workflow_url, workflow_failure_url = _determine_urls(
cast(_AsyncRequest, request),
url,
base_url,
False if failure_function is None else True,
failure_url,
)

request_payload = await _get_payload(request) or ""
_verify_request(
Expand All @@ -78,6 +93,20 @@ async def _handler(request: TRequest) -> TResponse:
raw_initial_payload = parse_request_response.raw_initial_payload
steps = parse_request_response.steps

failure_check = await _handle_failure(
request,
request_payload,
qstash_client,
initial_payload_parser,
route_function,
failure_function,
env,
retries,
)

if failure_check == "is-failure-callback":
return on_step_finish(workflow_run_id, "failure-callback")

workflow_context = AsyncWorkflowContext(
qstash_client=qstash_client,
workflow_run_id=workflow_run_id,
Expand All @@ -89,6 +118,7 @@ async def _handler(request: TRequest) -> TResponse:
url=workflow_url,
env=env,
retries=retries,
failure_url=workflow_failure_url,
)

auth_check = await _DisabledWorkflowContext[Any].try_authentication(
Expand All @@ -110,6 +140,7 @@ async def _handler(request: TRequest) -> TResponse:
raw_initial_payload,
qstash_client,
workflow_url,
workflow_failure_url,
retries,
)

Expand Down Expand Up @@ -153,6 +184,10 @@ def serve(
env: Optional[Dict[str, Optional[str]]] = None,
retries: Optional[int] = None,
url: Optional[str] = None,
failure_function: Optional[
Callable[[AsyncWorkflowContext, int, str, Dict[str, str]], Awaitable[Any]]
] = None,
failure_url: Optional[str] = None,
) -> Dict[str, Callable[[TRequest], Awaitable[TResponse]]]:
"""
Creates a method that handles incoming requests and runs the provided
Expand All @@ -178,4 +213,6 @@ def serve(
env=env,
retries=retries,
url=url,
failure_function=failure_function,
failure_url=failure_url,
)
31 changes: 0 additions & 31 deletions upstash_workflow/asyncio/types.py

This file was deleted.

Loading