Skip to content

Commit 7ffa822

Browse files
authored
Improve workflow task deadlock and eviction (#806)
1 parent 92b7758 commit 7ffa822

File tree

5 files changed

+487
-63
lines changed

5 files changed

+487
-63
lines changed

temporalio/worker/_replayer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,22 @@ def __init__(
5151
"""Create a replayer to replay workflows from history.
5252
5353
See :py:meth:`temporalio.worker.Worker.__init__` for a description of
54-
most of the arguments. The same arguments need to be passed to the
55-
replayer that were passed to the worker when the workflow originally
54+
most of the arguments. Most of the same arguments need to be passed to
55+
the replayer that were passed to the worker when the workflow originally
5656
ran.
57+
58+
Note, unlike the worker, for the replayer the workflow_task_executor
59+
will default to a new thread pool executor with no max_workers set that
60+
will be shared across all replay calls and never explicitly shut down.
61+
Users are encouraged to provide their own if needing more control.
5762
"""
5863
if not workflows:
5964
raise ValueError("At least one workflow must be specified")
6065
self._config = ReplayerConfig(
6166
workflows=list(workflows),
62-
workflow_task_executor=workflow_task_executor,
67+
workflow_task_executor=(
68+
workflow_task_executor or concurrent.futures.ThreadPoolExecutor()
69+
),
6370
workflow_runner=workflow_runner,
6471
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
6572
namespace=namespace,
@@ -195,6 +202,7 @@ def on_eviction_hook(
195202
task_queue=task_queue,
196203
workflows=self._config["workflows"],
197204
workflow_task_executor=self._config["workflow_task_executor"],
205+
max_concurrent_workflow_tasks=5,
198206
workflow_runner=self._config["workflow_runner"],
199207
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
200208
data_converter=self._config["data_converter"],

temporalio/worker/_worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@ def __init__(
109109
workflow_task_executor: Thread pool executor for workflow tasks. If
110110
this is not present, a new
111111
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
112-
created with ``max_workers`` set to ``max(os.cpu_count(), 4)``.
113-
The default one will be properly shutdown, but if one is
114-
provided, the caller is responsible for shutting it down after
112+
created with ``max_workers`` set to
113+
``max_concurrent_workflow_tasks`` if it is present, or 500
114+
otherwise. The default one will be properly shutdown, but if one
115+
is provided, the caller is responsible for shutting it down after
115116
the worker is shut down.
116117
workflow_runner: Runner for workflows.
117118
unsandboxed_workflow_runner: Runner for workflows that opt-out of
@@ -312,6 +313,7 @@ def __init__(
312313
task_queue=task_queue,
313314
workflows=workflows,
314315
workflow_task_executor=workflow_task_executor,
316+
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
315317
workflow_runner=workflow_runner,
316318
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
317319
data_converter=client_config["data_converter"],

0 commit comments

Comments
 (0)