-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Data] Fixed max_task_in_flight_per_actor
to be defined by max_concurrency
set by default
#57772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request updates the default calculation for max_task_in_flight_per_actor
to be based on max_concurrency
, which is a sensible improvement. The changes also include better validation for the actor pool configuration and corresponding tests. The implementation looks solid. I have a couple of suggestions to improve the documentation and test code quality.
"""Validate autoscaling configuration. | ||
Args: | ||
autoscaling_config: The autoscaling configuration to validate. | ||
max_actor_concurrency: The maximum concurrency per actor. | ||
max_tasks_in_flight_per_actor: The maximum tasks in flight per actor. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring for this function appears to be incorrect. The Args
section lists autoscaling_config
, max_actor_concurrency
, and max_tasks_in_flight_per_actor
, but these are not parameters of the function. The function's actual parameters are actor_pool
and op
. Please update the docstring to accurately describe the function's parameters.
"""Validate autoscaling configuration for an actor pool.
Args:
actor_pool: The actor pool to validate.
op: The physical operator that owns the actor pool.
"""
def test_autoscaling_config_validation_warnings( | ||
ray_start_10_cpus_shared, restore_data_context | ||
): | ||
"""Test that validation warnings are emitted when actor pool config won't allow scaling up.""" | ||
from unittest.mock import patch | ||
|
||
class SimpleMapper: | ||
"""Simple callable class for testing autoscaling validation.""" | ||
|
||
def __call__(self, row): | ||
# Map operates on rows which are dicts | ||
return {"value": row["id"] * 2} | ||
|
||
# Test #1: Invalid config (should warn) | ||
# - max_tasks_in_flight / max_concurrency == 1 | ||
# - Default upscaling threshold (200%) | ||
with patch( | ||
"ray.data._internal.actor_autoscaler.default_actor_autoscaler.logger.warning" | ||
) as mock_warning: | ||
ds = ray.data.range(2, override_num_blocks=2).map_batches( | ||
SimpleMapper, | ||
compute=ray.data.ActorPoolStrategy( | ||
max_tasks_in_flight_per_actor=1, | ||
), | ||
max_concurrency=1, | ||
) | ||
# Take just one item to minimize execution time | ||
ds.take_all() | ||
|
||
# Check that warning was called with expected message | ||
wanr_log_args_str = str(mock_warning.call_args_list) | ||
expected_message = ( | ||
"⚠️ Actor Pool configuration of the " | ||
"ActorPoolMapOperator[MapBatches(SimpleMapper)] will not allow it to scale up: " | ||
"configured utilization threshold (200.0%) couldn't be reached with " | ||
"configured max_concurrency=1 and max_tasks_in_flight_per_actor=1 " | ||
"(max utilization will be max_tasks_in_flight_per_actor / max_concurrency = 100%)" | ||
) | ||
|
||
assert expected_message in wanr_log_args_str | ||
|
||
# Test #2: Provided config is valid (no warnings) | ||
# - max_tasks_in_flight / max_concurrency == 2 (default) | ||
# - Default upscaling threshold (200%) | ||
with patch( | ||
"ray.data._internal.actor_autoscaler.default_actor_autoscaler.logger.warning" | ||
) as mock_warning: | ||
ds = ray.data.range(2, override_num_blocks=2).map_batches( | ||
SimpleMapper, | ||
compute=ray.data.ActorPoolStrategy( | ||
max_tasks_in_flight_per_actor=2, | ||
), | ||
max_concurrency=1, | ||
) | ||
ds.take_all() | ||
|
||
# Check that this warning hasn't been emitted | ||
wanr_log_args_str = str(mock_warning.call_args_list) | ||
expected_message = ( | ||
"⚠️ Actor Pool configuration of the " | ||
"ActorPoolMapOperator[MapBatches(SimpleMapper)] will not allow it to scale up: " | ||
) | ||
|
||
assert expected_message not in wanr_log_args_str | ||
|
||
# Test #3: Default config is valid (no warnings) | ||
# - max_tasks_in_flight / max_concurrency == 4 (default) | ||
# - Default upscaling threshold (200%) | ||
with patch( | ||
"ray.data._internal.actor_autoscaler.default_actor_autoscaler.logger.warning" | ||
) as mock_warning: | ||
ds = ray.data.range(2, override_num_blocks=2).map_batches( | ||
SimpleMapper, compute=ray.data.ActorPoolStrategy() | ||
) | ||
ds.take_all() | ||
|
||
# Check that this warning hasn't been emitted | ||
wanr_log_args_str = str(mock_warning.call_args_list) | ||
expected_message = ( | ||
"⚠️ Actor Pool configuration of the " | ||
"ActorPoolMapOperator[MapBatches(SimpleMapper)] will not allow it to scale up: " | ||
) | ||
|
||
assert expected_message not in wanr_log_args_str | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test function contains three separate test cases that are very similar in structure. To reduce code duplication and make the test more maintainable and readable, consider refactoring it to use pytest.mark.parametrize
.
Additionally, there is a recurring typo in the variable name wanr_log_args_str
, which should be warn_log_args_str
.
Here is an example of how you could refactor this test:
@pytest.mark.parametrize(
"actor_pool_strategy_kwargs, max_concurrency, should_warn",
[
# Test #1: Invalid config (should warn)
({"max_tasks_in_flight_per_actor": 1}, 1, True),
# Test #2: Provided config is valid (no warnings)
({"max_tasks_in_flight_per_actor": 2}, 1, False),
# Test #3: Default config is valid (no warnings)
({}, None, False),
],
)
def test_autoscaling_config_validation_warnings(
ray_start_10_cpus_shared,
restore_data_context,
actor_pool_strategy_kwargs,
max_concurrency,
should_warn,
):
# ... test logic ...
with patch(...) as mock_warning:
# ...
warn_log_args_str = str(mock_warning.call_args_list)
warning_message_prefix = (
"⚠️ Actor Pool configuration of the "
"ActorPoolMapOperator[MapBatches(SimpleMapper)] will not allow it to scale up:"
)
if should_warn:
assert warning_message_prefix in warn_log_args_str
else:
assert warning_message_prefix not in warn_log_args_str
By default set max_tasks_in_flight_per_actor to be multiple of max_concurrency Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
124ef15
to
27b3412
Compare
== actor_pool.max_tasks_in_flight_per_actor() | ||
and self._actor_pool_scaling_up_threshold > 1.0 | ||
max_tasks_in_flight_per_actor / max_concurrency | ||
< self._actor_pool_scaling_up_threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Autoscaling Validation Fails on Zero Concurrency
The autoscaling configuration validation can raise a ZeroDivisionError
when max_concurrency
is 0. The logic divides by max_concurrency
without a zero-check. While max_concurrency=0
is an invalid configuration, users can set it, causing the validation to crash instead of handling this edge case gracefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Description
Revisited the way
max_task_in_flight_per_actor
is determined by default:max_concurrency * DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR
Also, cleaned up validation inside the ActorPool verifying whether provided configuration is valid. Added test to assert that warning is logged properly with invalid configuration.
Related issues
Types of change
Checklist
Does this PR introduce breaking changes?
Testing:
Code Quality:
git commit -s
)Documentation:
doc/source/
(if applicable)Additional context