Skip to content
6 changes: 3 additions & 3 deletions python/ray/_private/resource_isolation_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _validate_and_get_system_reserved_cpu(
does not have enough available cpus.

"""
available_system_cpus = utils.get_num_cpus()
available_system_cpus = utils.get_num_cpus(truncate=False)

if available_system_cpus < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES:
raise ValueError(
Expand Down Expand Up @@ -220,9 +220,9 @@ def _validate_and_get_system_reserved_cpu(
f"greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES}"
)

if system_reserved_cpu > available_system_cpus:
if system_reserved_cpu >= available_system_cpus:
raise ValueError(
f"The requested system_reserved_cpu={system_reserved_cpu} is greater than "
f"The requested system_reserved_cpu={system_reserved_cpu} is greater than or equal to "
f"the number of cpus available={available_system_cpus}. "
"Pick a smaller number of cpu cores to reserve for ray system processes."
)
Expand Down
7 changes: 5 additions & 2 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ def _get_docker_cpus(

def get_num_cpus(
override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING,
) -> int:
truncate: bool = True,
) -> float:
"""
Get the number of CPUs available on this node.
Depending on the situation, use multiprocessing.cpu_count() or cgroups.
Expand All @@ -432,6 +433,7 @@ def get_num_cpus(
RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log
the warning is determined by the env variable
RAY_DISABLE_DOCKER_CPU_WARNING.
truncate: truncates the return value and drops the decimal part.
"""
cpu_count = multiprocessing.cpu_count()
if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"):
Expand Down Expand Up @@ -473,7 +475,8 @@ def get_num_cpus(
f"truncated from {docker_count} to "
f"{int(docker_count)}."
)
docker_count = int(docker_count)
if truncate:
docker_count = int(docker_count)
cpu_count = docker_count

except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def test_enabled_resource_isolation_with_default_config_picks_min_values(monkeyp
# NOTE: if you change the DEFAULT_MIN_SYSTEM_* constants, you may need to modify this test.
# if the total number of cpus is between [1,19] the system cgroup will a weight that is equal to 1 cpu core.
# if the total amount of memory is between [0.5GB, 4.8GB] the system cgroup will get 0.5GB + object store memory.
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 1)
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 2)
monkeypatch.setattr(
common_utils, "get_system_memory", lambda *args, **kwargs: 0.5 * (1024**3)
)
config = ResourceIsolationConfig(enable_resource_isolation=True)
assert config.system_reserved_cpu_weight == 10000
assert config.system_reserved_cpu_weight == 5000
assert config.system_reserved_memory == 500 * (1024**2)

monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 19)
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_enabled_with_resource_overrides_less_than_minimum_defaults_raise_value_
)


def test_enabled_with_resource_overrides_greater_than_available_resources_raise_value_error(
def test_enabled_with_resource_overrides_gte_than_available_resources_raise_value_error(
monkeypatch,
):
# The following values in ray_constants define the maximum reserved values to run ray with resource isolation.
Expand All @@ -186,11 +186,9 @@ def test_enabled_with_resource_overrides_greater_than_available_resources_raise_
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 32)
with pytest.raises(
ValueError,
match="The requested system_reserved_cpu=32.1 is greater than the number of cpus available=32",
match="The requested system_reserved_cpu=32.0 is greater than or equal to the number of cpus available=32",
):
ResourceIsolationConfig(
enable_resource_isolation=True, system_reserved_cpu=32.1
)
ResourceIsolationConfig(enable_resource_isolation=True, system_reserved_cpu=32)

monkeypatch.setattr(
common_utils, "get_system_memory", lambda *args, **kwargs: 10 * (1024**3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,23 @@ def cleanup_test_suite():
) as base_subtree_control_file:
base_subtree_control_file.write("-cpu -memory")
base_subtree_control_file.flush()
# 2) Move processes back into the leaf cgroup.
# 2) Move processes back into the root cgroup.
with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
_LEAF_GROUP / "cgroup.procs", "r"
) as leaf_procs_file:
leaf_cgroup_lines = leaf_procs_file.readlines()
for line in leaf_cgroup_lines:
root_procs_file.write(line.strip())
root_procs_file.flush()
# 3) Move the current process back into the _ROOT_CGROUP
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't previously required because the Driver (for ray.init) was moved by the test script into the LEAF_CGROUP_ and never touched again.

However, now it will be moved by the raylet back into _TEST_CGROUP when ray stops.

with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
_TEST_CGROUP / "cgroup.procs", "r"
) as test_procs_file:
test_cgroup_lines = test_procs_file.readlines()
for line in test_cgroup_lines:
root_procs_file.write(line.strip())
root_procs_file.flush()

# 3) Delete the cgroups.
os.rmdir(_LEAF_GROUP)
os.rmdir(_TEST_CGROUP)
Expand Down Expand Up @@ -431,9 +440,6 @@ def test_ray_cli_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up
assert result.exit_code == 0
resource_isolation_config.add_object_store_memory(object_store_memory)
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
)

@ray.remote(num_cpus=1)
class Actor:
Expand All @@ -447,12 +453,17 @@ def get_pid(self):
for _ in range(num_cpus):
actor_refs.append(Actor.remote())
worker_pids = set()
worker_pids.add(str(os.getpid()))
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
)
runner.invoke(scripts.stop)

assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)


Expand Down Expand Up @@ -492,9 +503,6 @@ def test_ray_init_resource_isolation_creates_cgroup_hierarchy_and_cleans_up(
object_store_memory=object_store_memory,
)
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
)

@ray.remote(num_cpus=1)
class Actor:
Expand All @@ -508,8 +516,12 @@ def get_pid(self):
for _ in range(num_cpus):
actor_refs.append(Actor.remote())
worker_pids = set()
worker_pids.add(str(os.getpid()))
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
)
Expand Down
5 changes: 3 additions & 2 deletions src/ray/common/cgroup2/cgroup_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,11 @@ Status CgroupManager::Initialize(int64_t system_reserved_cpu_weight,
RAY_RETURN_NOT_OK(cgroup_driver_->MoveAllProcesses(base_cgroup_, non_ray_cgroup_));
RegisterMoveAllProcesses(non_ray_cgroup_, base_cgroup_);

// NOTE: Since the raylet does not own the lifecycle of all system processes,
// there's no guarantee that there are no pids in the system leaf cgroup.
// NOTE: Since the raylet does not own the lifecycle of all system or worker processes,
// there's no guarantee that there are no pids in the system leaf or the workers cgroup.
// Therefore, pids need to be migrated out of the system cgroup to delete it.
RegisterMoveAllProcesses(system_leaf_cgroup_, base_cgroup_);
RegisterMoveAllProcesses(workers_cgroup_, base_cgroup_);

std::array<const std::string *, 2> cpu_controlled_cgroups{&base_cgroup_, &node_cgroup_};
std::array<const std::string *, 3> memory_controlled_cgroups{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,44 @@ TEST_F(SysFsCgroupDriverIntegrationTest,
ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString();
}

TEST_F(SysFsCgroupDriverIntegrationTest,
AddProcessToCgroupSucceedsIfProcessAlreadyInCgroup) {
auto cgroup_or_status = TempCgroupDirectory::Create(test_cgroup_path_, S_IRWXU);
ASSERT_TRUE(cgroup_or_status.ok()) << cgroup_or_status.ToString();
auto cgroup = std::move(cgroup_or_status.value());
auto child_cgroup_or_status = TempCgroupDirectory::Create(cgroup->GetPath(), S_IRWXU);
ASSERT_TRUE(child_cgroup_or_status.ok()) << child_cgroup_or_status.ToString();
auto child_cgroup = std::move(child_cgroup_or_status.value());
StatusOr<std::pair<pid_t, int>> child_process_s =
StartChildProcessInCgroup(cgroup->GetPath());
ASSERT_TRUE(child_process_s.ok()) << child_process_s.ToString();
auto [child_pid, child_pidfd] = child_process_s.value();
SysFsCgroupDriver driver;
Status s =
driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid));
ASSERT_TRUE(s.ok()) << s.ToString();
Status s2 =
driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid));
ASSERT_TRUE(s2.ok()) << s2.ToString();
// Assert that the child's pid is actually in the new file.
std::string child_cgroup_procs_file_path = child_cgroup->GetPath() +
std::filesystem::path::preferred_separator +
"cgroup.procs";
std::ifstream child_cgroup_procs_file(child_cgroup_procs_file_path);
ASSERT_TRUE(child_cgroup_procs_file.is_open())
<< "Could not open file " << child_cgroup_procs_file_path << ".";
std::unordered_set<int> child_cgroup_pids;
int pid = -1;
while (child_cgroup_procs_file >> pid) {
ASSERT_FALSE(child_cgroup_procs_file.fail())
<< "Unable to read pid from file " << child_cgroup_procs_file_path;
child_cgroup_pids.emplace(pid);
}
EXPECT_EQ(child_cgroup_pids.size(), 1);
EXPECT_TRUE(child_cgroup_pids.find(child_pid) != child_cgroup_pids.end());
Status terminate_s =
TerminateChildProcessAndWaitForTimeout(child_pid, child_pidfd, 5000);
ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString();
}

} // namespace ray
2 changes: 0 additions & 2 deletions src/ray/common/cgroup2/sysfs_cgroup_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ Status SysFsCgroupDriver::DisableController(const std::string &cgroup_path,
return Status::OK();
}

// What's the right thing here? If the controller is specified?
// The correct API would be specify where the controller should be enabled.
Status SysFsCgroupDriver::AddConstraint(const std::string &cgroup_path,
const std::string &constraint,
const std::string &constraint_value) {
Expand Down
10 changes: 5 additions & 5 deletions src/ray/common/cgroup2/tests/cgroup_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ TEST(CgroupManagerTest, CreateSucceedsWithCleanupInOrder) {
}

// Processes must be moved third.
// Processes were moved both out of the system_leaf cgroup and the non_ray
// cgroup.
ASSERT_EQ(processes_moved->size(), 2);
std::array<std::string, 2> process_moved_cgroups{system_leaf_cgroup_path,
non_ray_cgroup_path};
// Processes were moved both out of the system_leaf, workers, and non_ray
// cgroups.
ASSERT_EQ(processes_moved->size(), 3);
std::array<std::string, 3> process_moved_cgroups{
system_leaf_cgroup_path, non_ray_cgroup_path, workers_cgroup_path};

// The order in which processes were moved back from leaf nodes to the base_cgroup
// does not matter.
Expand Down
5 changes: 4 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,10 @@ Status NodeManager::RegisterForNewDriver(
worker->SetProcess(Process::FromPid(pid));
rpc::JobConfig job_config;
job_config.ParseFromString(message->serialized_job_config()->str());

Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid));
RAY_CHECK(s.ok()) << absl::StrFormat(
"Failed to move the driver process into the workers cgroup with error %s",
s.ToString());
return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
}

Expand Down