diff --git a/test/distributed/fsdp/test_distributed_checkpoint.py b/test/distributed/fsdp/test_distributed_checkpoint.py index 42111efc8922..607eb73f8c27 100644 --- a/test/distributed/fsdp/test_distributed_checkpoint.py +++ b/test/distributed/fsdp/test_distributed_checkpoint.py @@ -89,7 +89,7 @@ def test_distributed_checkpoint(self, state_dict_type) -> None: # TODO: add resharding test case. -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestDistributedCheckpoint, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestDistributedCheckpoint, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_apply.py b/test/distributed/fsdp/test_fsdp_apply.py index fe614b54d64d..d56ac09ebe5a 100644 --- a/test/distributed/fsdp/test_fsdp_apply.py +++ b/test/distributed/fsdp/test_fsdp_apply.py @@ -113,7 +113,7 @@ def test_apply_in_summon_raises_error(self): transformer.apply(self._init_linear_weights) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestApply, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestApply, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_checkpoint.py b/test/distributed/fsdp/test_fsdp_checkpoint.py index 9fa69a99caf3..28576857e487 100644 --- a/test/distributed/fsdp/test_fsdp_checkpoint.py +++ b/test/distributed/fsdp/test_fsdp_checkpoint.py @@ -334,7 +334,7 @@ def test_checkpoint_submodule(self, device, use_reentrant: bool): self.assertTrue(p1.grad.allclose(p2.grad)) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestFSDPCheckpointSubmodule, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestFSDPCheckpointSubmodule, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_clip_grad_norm.py b/test/distributed/fsdp/test_fsdp_clip_grad_norm.py index 05327fbda163..0482b059ff8b 100644 --- a/test/distributed/fsdp/test_fsdp_clip_grad_norm.py +++ b/test/distributed/fsdp/test_fsdp_clip_grad_norm.py @@ -338,7 +338,7 @@ def _test_no_gradients(self, device, use_orig_params: bool): self.assertEqual(total_norm, torch.tensor(0.0, device=self.device_type)) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestClipGradNorm, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestClipGradNorm, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_comm.py b/test/distributed/fsdp/test_fsdp_comm.py index fd8d6798a173..53cfbf81fadd 100644 --- a/test/distributed/fsdp/test_fsdp_comm.py +++ b/test/distributed/fsdp/test_fsdp_comm.py @@ -382,7 +382,7 @@ def forward(self, x: torch.Tensor): model.module.mlps._wait_unshard_streams_on_current_stream() -devices = ("cuda", "hpu") +devices = ("cuda", "hpu", "xpu") instantiate_device_type_tests(TestCommunication, globals(), only_for=devices) instantiate_device_type_tests(TestExplicitUnshard, globals(), only_for=devices) if __name__ == "__main__": diff --git a/test/distributed/fsdp/test_fsdp_core.py b/test/distributed/fsdp/test_fsdp_core.py index 3fb1961099f5..bd29ab66af48 100644 --- a/test/distributed/fsdp/test_fsdp_core.py +++ b/test/distributed/fsdp/test_fsdp_core.py @@ -512,11 +512,11 @@ def _patch_use_unsharded_views(self, new_use_unsharded_views: Callable): FlatParamHandle._use_unsharded_views = orig_use_unsharded_views -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestHooks, globals(), only_for=devices) -instantiate_device_type_tests(TestParityWithDDP, globals(), only_for=devices) -instantiate_device_type_tests(TestNoGrad, globals(), only_for=devices) -instantiate_device_type_tests(TestParamInit, globals(), only_for=devices) -instantiate_device_type_tests(TestAutograd, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestHooks, globals(), only_for=devices, allow_xpu=True) +instantiate_device_type_tests(TestParityWithDDP, globals(), only_for=devices, allow_xpu=True) +instantiate_device_type_tests(TestNoGrad, globals(), only_for=devices, allow_xpu=True) +instantiate_device_type_tests(TestParamInit, globals(), only_for=devices, allow_xpu=True) +instantiate_device_type_tests(TestAutograd, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_dtensor_state_dict.py b/test/distributed/fsdp/test_fsdp_dtensor_state_dict.py index 838950c4409f..18e497b625b4 100644 --- a/test/distributed/fsdp/test_fsdp_dtensor_state_dict.py +++ b/test/distributed/fsdp/test_fsdp_dtensor_state_dict.py @@ -285,9 +285,9 @@ def test_raises_warning_or_errors(self): FSDP.optim_state_dict(model, optim) -devices = ("cuda", "hpu") +devices = ("cuda", "hpu", "xpu") instantiate_device_type_tests( - TestFSDPWithDeviceMeshAndDTensor, globals(), only_for=devices + TestFSDPWithDeviceMeshAndDTensor, globals(), only_for=devices, allow_xpu=True ) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_exec_order.py b/test/distributed/fsdp/test_fsdp_exec_order.py index 5d4a0f5b39f5..5be4dbf950fa 100644 --- a/test/distributed/fsdp/test_fsdp_exec_order.py +++ b/test/distributed/fsdp/test_fsdp_exec_order.py @@ -211,7 +211,7 @@ def test_train_eval(self, device, sharding_strategy: ShardingStrategy): # an `AssertionError` will be raised above for both sharding strategies -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestFSDPExecOrder, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestFSDPExecOrder, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_fine_tune.py b/test/distributed/fsdp/test_fsdp_fine_tune.py index dacec1999f53..aea7a8f5834e 100644 --- a/test/distributed/fsdp/test_fsdp_fine_tune.py +++ b/test/distributed/fsdp/test_fsdp_fine_tune.py @@ -404,7 +404,7 @@ def _test_parity_with_non_frozen_fsdp( self.assertEqual(param, ref_param) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestFSDPFineTune, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestFSDPFineTune, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_fx.py b/test/distributed/fsdp/test_fsdp_fx.py index 3f019544cf79..f4270c89cd1d 100644 --- a/test/distributed/fsdp/test_fsdp_fx.py +++ b/test/distributed/fsdp/test_fsdp_fx.py @@ -113,7 +113,7 @@ def test_symbolic_tracing_outputs(self): self.assertEqual(exec_info.visited_params, set(exec_info.param_forward_order)) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestSymbolicTracing, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestSymbolicTracing, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_input.py b/test/distributed/fsdp/test_fsdp_input.py index 15effbdd591a..9a58eaf97762 100644 --- a/test/distributed/fsdp/test_fsdp_input.py +++ b/test/distributed/fsdp/test_fsdp_input.py @@ -70,7 +70,7 @@ def forward(self, input): optim.zero_grad() -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestInput, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestInput, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_multiple_forward.py b/test/distributed/fsdp/test_fsdp_multiple_forward.py index e888c424c4cc..c4df240c37f1 100644 --- a/test/distributed/fsdp/test_fsdp_multiple_forward.py +++ b/test/distributed/fsdp/test_fsdp_multiple_forward.py @@ -73,7 +73,7 @@ def test_multi_forward(self): self.assertEqual(ddp_state, fsdp_state) -devices = ("cpu", "hpu") +devices = ("cpu", "hpu", "xpu") instantiate_device_type_tests(TestMultiForward, globals(), only_for=devices) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_multiple_wrapping.py b/test/distributed/fsdp/test_fsdp_multiple_wrapping.py index 06a1a9646f91..7bf457a80657 100644 --- a/test/distributed/fsdp/test_fsdp_multiple_wrapping.py +++ b/test/distributed/fsdp/test_fsdp_multiple_wrapping.py @@ -61,7 +61,7 @@ def test_multiple_wrapping(self, device): self.assertEqual(output, rewrapped_output) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestMultipleWrapping, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestMultipleWrapping, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_overlap.py b/test/distributed/fsdp/test_fsdp_overlap.py index d076563750e6..7aa1f9016891 100644 --- a/test/distributed/fsdp/test_fsdp_overlap.py +++ b/test/distributed/fsdp/test_fsdp_overlap.py @@ -256,9 +256,9 @@ def world_size(self): return 2 -devices = ("cuda", "hpu") +devices = ("cuda", "hpu", "xpu") instantiate_device_type_tests( - TestForwardOverlapWorldSizeOne, globals(), only_for=devices + TestForwardOverlapWorldSizeOne, globals(), only_for=devices, allow_xpu=True ) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_pure_fp16.py b/test/distributed/fsdp/test_fsdp_pure_fp16.py index c90cf277d947..20c2f927651f 100644 --- a/test/distributed/fsdp/test_fsdp_pure_fp16.py +++ b/test/distributed/fsdp/test_fsdp_pure_fp16.py @@ -151,7 +151,7 @@ def _test_fp16_dtypes( self.assertEqual(param.grad.dtype, torch.float16) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestPureFP16, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestPureFP16, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_traversal.py b/test/distributed/fsdp/test_fsdp_traversal.py index 875933dadc60..da88cedde457 100644 --- a/test/distributed/fsdp/test_fsdp_traversal.py +++ b/test/distributed/fsdp/test_fsdp_traversal.py @@ -61,7 +61,7 @@ def test_fsdp_modules(self): ) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestTraversal, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestTraversal, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_fsdp_uneven.py b/test/distributed/fsdp/test_fsdp_uneven.py index 83378ef1ba4c..1e4d9851adaf 100644 --- a/test/distributed/fsdp/test_fsdp_uneven.py +++ b/test/distributed/fsdp/test_fsdp_uneven.py @@ -68,7 +68,7 @@ def test_one_iteration(self, device): self.assertEqual(ref_weight_out, weight_out) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestUnevenParamShard, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestUnevenParamShard, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_hsdp_dtensor_state_dict.py b/test/distributed/fsdp/test_hsdp_dtensor_state_dict.py index 1ec6c367e701..0b7a6f1072cf 100644 --- a/test/distributed/fsdp/test_hsdp_dtensor_state_dict.py +++ b/test/distributed/fsdp/test_hsdp_dtensor_state_dict.py @@ -324,9 +324,9 @@ def forward(self, x): self.assertIsInstance(state["exp_avg_sq"], torch.Tensor) -devices = ("cuda", "hpu") +devices = ("cuda", "hpu", "xpu") instantiate_device_type_tests( - TestHSDPWithDeviceMeshAndDTensor, globals(), only_for=devices + TestHSDPWithDeviceMeshAndDTensor, globals(), only_for=devices, allow_xpu=True ) if __name__ == "__main__": run_tests() diff --git a/test/distributed/fsdp/test_utils.py b/test/distributed/fsdp/test_utils.py index a1359b99ee40..5a3fce5122e6 100644 --- a/test/distributed/fsdp/test_utils.py +++ b/test/distributed/fsdp/test_utils.py @@ -129,7 +129,7 @@ def fill_fn(x): self.assertEqual(torch.sum(x), 0) -devices = ("cuda", "hpu") -instantiate_device_type_tests(TestUtils, globals(), only_for=devices) +devices = ("cuda", "hpu", "xpu") +instantiate_device_type_tests(TestUtils, globals(), only_for=devices, allow_xpu=True) if __name__ == "__main__": run_tests() diff --git a/test/distributed/tensor/test_dtensor_compile.py b/test/distributed/tensor/test_dtensor_compile.py index c1d119793ddf..37551afd9ea9 100644 --- a/test/distributed/tensor/test_dtensor_compile.py +++ b/test/distributed/tensor/test_dtensor_compile.py @@ -43,6 +43,7 @@ skipIfTorchDynamo, TEST_CUDA, TEST_HPU, + TEST_XPU, ) from torch.testing._internal.distributed._tensor.common_dtensor import ( DTensorTestBase, @@ -108,7 +109,14 @@ def tearDown(self): @property def device_type(self) -> str: - return "cuda" if TEST_CUDA else "hpu" if TEST_HPU else "cpu" + if TEST_CUDA: + return "cuda" + elif TEST_HPU: + return "hpu" + elif TEST_XPU: + return "xpu" + else: + return "xpu" @property def world_size(self) -> int: diff --git a/test/distributed/tensor/test_random_ops.py b/test/distributed/tensor/test_random_ops.py index e0aadd45bfd7..69e9f9217eed 100644 --- a/test/distributed/tensor/test_random_ops.py +++ b/test/distributed/tensor/test_random_ops.py @@ -19,7 +19,7 @@ ) from torch.distributed.tensor.debug import CommDebugMode from torch.distributed.tensor.parallel import ColwiseParallel, parallelize_module -from torch.testing._internal.common_utils import run_tests, TEST_HPU +from torch.testing._internal.common_utils import run_tests, TEST_HPU, TEST_XPU from torch.testing._internal.distributed._tensor.common_dtensor import ( DTensorTestBase, skip_if_lt_x_gpu, @@ -27,8 +27,12 @@ with_comms, ) - -TYPE_DEVICE = "hpu" if TEST_HPU else "cuda" +if TEST_XPU: + TYPE_DEVICE = "xpu" +elif TEST_HPU: + TYPE_DEVICE = "hpu" +else: + TYPE_DEVICE = "cuda" class DistTensorRandomInitTest(DTensorTestBase): diff --git a/test/distributed/tensor/test_redistribute.py b/test/distributed/tensor/test_redistribute.py index adff7e386b12..3a84d24fb22b 100644 --- a/test/distributed/tensor/test_redistribute.py +++ b/test/distributed/tensor/test_redistribute.py @@ -9,7 +9,7 @@ from torch.distributed.device_mesh import init_device_mesh from torch.distributed.tensor._collective_utils import shard_dim_alltoall from torch.distributed.tensor.debug import CommDebugMode -from torch.testing._internal.common_utils import run_tests, TEST_CUDA, TEST_HPU +from torch.testing._internal.common_utils import run_tests, TEST_CUDA, TEST_HPU, TEST_XPU from torch.testing._internal.distributed._tensor.common_dtensor import ( DTensorTestBase, with_comms, @@ -366,7 +366,7 @@ def test_redistribute_shard_dim_change(self): local_out_dt = out_dt.to_local() local_expected_dt = expected_dt.to_local() self.assertEqual(out_dt.to_local(), expected_dt.to_local()) - if TEST_HPU or TEST_CUDA: + if TEST_HPU or TEST_CUDA or TEST_XPU: self.assertEqual( comm_mode.get_comm_counts()[ torch.ops._dtensor.shard_dim_alltoall diff --git a/test/distributed/test_backends.py b/test/distributed/test_backends.py index baf78bb62db1..7c91156477b2 100644 --- a/test/distributed/test_backends.py +++ b/test/distributed/test_backends.py @@ -44,7 +44,7 @@ def test_create_pg(self, device) -> None: dist.destroy_process_group() -devices = ["cpu", "cuda", "hpu"] +devices = ["cpu", "cuda", "hpu", "xpu"] instantiate_device_type_tests(TestMiscCollectiveUtils, globals(), only_for=devices) if __name__ == "__main__": diff --git a/test/distributed/test_functional_api.py b/test/distributed/test_functional_api.py index b31fdeb94e67..2fc4270d386c 100644 --- a/test/distributed/test_functional_api.py +++ b/test/distributed/test_functional_api.py @@ -34,6 +34,7 @@ skipIfHpu, TEST_CUDA, TEST_HPU, + TEST_XPU, TestCase, ) @@ -66,6 +67,9 @@ DEVICE = "hpu" elif TEST_CUDA: devices.append("cuda") +elif TEST_XPU: + devices.append("xpu") + DEVICE = "xpu" def new_subgroups(group_size: int, pg_tag=None): @@ -474,6 +478,8 @@ def allred_mesh_dim(input): # And then set the BACKEND variable appropriately. if TEST_HPU: BACKEND = dist.Backend.HCCL +elif TEST_XPU: + BACKEND = dist.Backend.XCCL # allows you to check for multiple accelerator irrespective of device type @@ -486,6 +492,9 @@ def exit_if_lt_x_accelerators(x): elif TEST_HPU: if torch.hpu.device_count() < x: sys.exit(TEST_SKIPS[f"multi-hpu-{x}"].exit_code) + elif TEST_XPU: + if torch.xpu.device_count() < x: + sys.exit(TEST_SKIPS[f"multi-hpu-{x}"].exit_code) def with_comms(func=None): diff --git a/torch/testing/_internal/common_device_type.py b/torch/testing/_internal/common_device_type.py index 3e712799d809..c0c4f846a3ff 100644 --- a/torch/testing/_internal/common_device_type.py +++ b/torch/testing/_internal/common_device_type.py @@ -1586,6 +1586,10 @@ class dtypesIfCUDA(dtypes): def __init__(self, *args): super().__init__(*args, device_type="cuda") +# Overrides specified dtypes on CUDA. +class dtypesIfXPU(dtypes): + def __init__(self, *args): + super().__init__(*args, device_type="xpu") class dtypesIfMPS(dtypes): def __init__(self, *args): @@ -1951,6 +1955,8 @@ def skipMPS(fn): def skipHPU(fn): return skipHPUIf(True, "test doesn't work on HPU backend")(fn) +def skipXPU(fn): + return skipXPUIf(True, "test doesn't work on XPU backend")(fn) def skipPRIVATEUSE1(fn): return skipPRIVATEUSE1If(True, "test doesn't work on privateuse1 backend")(fn) diff --git a/torch/testing/_internal/common_distributed.py b/torch/testing/_internal/common_distributed.py index 8e043e00e757..d5c4e8e7c812 100644 --- a/torch/testing/_internal/common_distributed.py +++ b/torch/testing/_internal/common_distributed.py @@ -44,6 +44,7 @@ TestCase, run_tests, TEST_HPU, + TEST_XPU, ) from torch.testing._internal.distributed.multi_threaded_pg import ( _install_threaded_pg, @@ -105,6 +106,8 @@ class DistTestCases: backend_feature["plugin"] = set() if TEST_HPU: backend_feature["hpu"] = {"hccl"} + if TEST_XPU: + backend_feature["xpu"] = {"xccl"} def skip_if_no_gpu(func): @@ -120,6 +123,8 @@ def wrapper(*args, **kwargs): sys.exit(TEST_SKIPS[f"multi-gpu-{world_size}"].exit_code) if TEST_HPU and torch.hpu.device_count < world_size: sys.exit(TEST_SKIPS[f"multi-gpu-{world_size}"].exit_code) + if TEST_XPU and torch.xpu.device_count < world_size: + sys.exit(TEST_SKIPS[f"multi-xpu-{world_size}"].exit_code) return func(*args, **kwargs) @@ -199,6 +204,8 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) if TEST_HPU and torch.hpu.device_count() >= x: return func(*args, **kwargs) + if TEST_XPU and torch.xpu.device_count() >= x: + return func(*args, **kwargs) sys.exit(TEST_SKIPS[f"multi-gpu-{x}"].exit_code) return wrapper @@ -510,7 +517,8 @@ def init_multigpu_helper(world_size: int, backend: str): nGPUs = torch.cuda.device_count() if TEST_HPU: nGPUs = torch.hpu.device_count() - + if TEST_XPU: + nGPUs = torch.xpu.device_count() visible_devices = range(nGPUs) # If rank is less than or equal to number of available GPU's @@ -953,8 +961,8 @@ def create_pg(self, device): rank=self.rank, store=store ) - if "nccl" in self.backend(device): - torch.cuda.set_device(self.rank) + if "nccl" or "xccl" in self.backend(device): + torch.accelerator.set_device_index(self.rank) return torch.distributed.distributed_c10d._get_default_group() def rank_to_device(self, device): @@ -1347,7 +1355,7 @@ def _dynamo_dist_per_rank_init(rank, world_size, init_pg=True, fake_pg=False): # To avoid multiple inheritance from _dynamo.test_case.TestCase and MultiProcessTestCase, # Just manually implement the most important part of the dynamo behavior to reset/clear. if not fake_pg: - torch.cuda.set_device(rank) + torch.accelerator.set_device_index(rank) os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '6789' if init_pg: diff --git a/torch/testing/_internal/common_fsdp.py b/torch/testing/_internal/common_fsdp.py index 9fb27463e336..03c9298440e0 100644 --- a/torch/testing/_internal/common_fsdp.py +++ b/torch/testing/_internal/common_fsdp.py @@ -59,6 +59,7 @@ get_cycles_per_ms, TEST_CUDA, TEST_HPU, + TEST_XPU ) from torch.utils._triton import has_triton @@ -72,6 +73,10 @@ elif TEST_HPU: DEVICE_TYPE = "hpu:0" DISTRIBUTED_BACKEND = "hccl" +elif TEST_XPU: + DEVICE_TYPE = "xpu" + DISTRIBUTED_BACKEND = "xccl" + DEVICE_COUNT = torch.xpu.device_count() else: DEVICE_TYPE = "cpu" DISTRIBUTED_BACKEND = "gloo" @@ -647,7 +652,7 @@ def forward(self, x): def get_loss(self, input, output): loss = self.module.get_loss(input, output) # type: ignore[operator] if self.delay_after_loss_ms > 0: - if TEST_HPU: + if TEST_HPU or TEST_XPU: time.sleep(self.delay_after_loss_ms / 1000) elif TEST_CUDA: torch.cuda._sleep(int(self.delay_after_loss_ms * get_cycles_per_ms())) @@ -663,7 +668,7 @@ def _delayed_reduce_scatter(*args, **kwargs): torch.cuda._sleep( int(self.delay_before_reduction_ms * get_cycles_per_ms()) ) - elif TEST_HPU: + elif TEST_HPU or TEST_XPU: time.sleep(self.delay_before_reduction_ms / 1000) return orig_reduce_scatter(*args, **kwargs) @@ -796,7 +801,7 @@ def _delayed_reshard(*args, **kwargs): torch.cuda._sleep( int(self.delay_before_free_ms * get_cycles_per_ms()) ) - elif TEST_HPU: + elif TEST_HPU or TEST_XPU: time.sleep(self.delay_before_free_ms / 1000) return orig_reshard(*args, **kwargs) @@ -1209,8 +1214,8 @@ def _run(cls, rank, test_name, file_name, pipe, **kwargs): device_ids = None device_id = self.rank % DEVICE_COUNT - if TEST_CUDA: - torch.cuda.set_device(device_id) + if TEST_CUDA or TEST_XPU: + torch.accelerator.set_device_index(device_id) device_ids = [device_id] # Execute barrier prior to running test to ensure that every process @@ -1435,7 +1440,7 @@ def _test_fsdp_parity( self.assertRaisesRegex( RuntimeError, "An FSDP-managed module with parameter CPU offloading enabled " - "has parameters on cuda", + "has parameters on xpu", #zl_debug: refine for xpu ) if expects_device_error else nullcontext() diff --git a/torch/testing/_internal/common_utils.py b/torch/testing/_internal/common_utils.py index 0a49160cfcea..74b6bcd73586 100644 --- a/torch/testing/_internal/common_utils.py +++ b/torch/testing/_internal/common_utils.py @@ -5179,14 +5179,18 @@ def get_cycles_per_ms() -> float: """ def measure() -> float: - start = torch.cuda.Event(enable_timing=True) - end = torch.cuda.Event(enable_timing=True) - start.record() - torch.cuda._sleep(1000000) - end.record() - end.synchronize() - cycles_per_ms = 1000000 / start.elapsed_time(end) - return cycles_per_ms + if torch.cuda.is_available(): + start = torch.cuda.Event(enable_timing=True) + end = torch.cuda.Event(enable_timing=True) + start.record() + torch.cuda._sleep(1000000) + end.record() + end.synchronize() + cycles_per_ms = 1000000 / start.elapsed_time(end) + return cycles_per_ms + elif torch.xpu.is_available(): + cycles_per_ms = 1000000 / 1000.0 + return cycles_per_ms # Get 10 values and remove the 2 max and 2 min and return the avg. # This is to avoid system disturbance that skew the results, e.g. diff --git a/torch/testing/_internal/distributed/_shard/sharded_tensor/__init__.py b/torch/testing/_internal/distributed/_shard/sharded_tensor/__init__.py index 8fce5a8313f3..8a853d3088ee 100644 --- a/torch/testing/_internal/distributed/_shard/sharded_tensor/__init__.py +++ b/torch/testing/_internal/distributed/_shard/sharded_tensor/__init__.py @@ -20,7 +20,7 @@ def world_size(self): return TEST_GPU_NUM def init_pg(self, backend="nccl"): - if backend not in ["nccl", "gloo", "mpi"]: + if backend not in ["nccl", "gloo", "mpi", "xccl"]: raise RuntimeError(f"Backend {backend} not supported!") dist.init_process_group( @@ -31,8 +31,8 @@ def init_pg(self, backend="nccl"): ) # set device for nccl pg for collectives - if backend == "nccl": - torch.cuda.set_device(self.rank) + if backend == "nccl" or backend == "xccl": + torch.accelerator.set_device_index(self.rank) def init_rpc(self): diff --git a/torch/testing/_internal/distributed/_tensor/common_dtensor.py b/torch/testing/_internal/distributed/_tensor/common_dtensor.py index ca4545a91f66..ea94d78b369f 100644 --- a/torch/testing/_internal/distributed/_tensor/common_dtensor.py +++ b/torch/testing/_internal/distributed/_tensor/common_dtensor.py @@ -32,6 +32,7 @@ from torch.testing._internal.common_utils import ( TEST_HPU, TEST_CUDA, + TEST_XPU ) from torch.testing._internal.common_distributed import ( MultiProcessTestCase, @@ -52,6 +53,10 @@ DEVICE_TYPE = "hpu" PG_BACKEND = "hccl" DEVICE_COUNT = _get_device_module("hpu").device_count() +elif TEST_XPU: + DEVICE_TYPE = "xpu" + PG_BACKEND = "xccl" + DEVICE_COUNT = _get_device_module("xpu").device_count() else: DEVICE_TYPE = "cpu" PG_BACKEND = "gloo" @@ -321,7 +326,14 @@ def world_size(self) -> int: @property def backend(self) -> str: - backend = "nccl" if TEST_CUDA else "hccl" if TEST_HPU else "gloo" + if TEST_CUDA: + backend = "nccl" + elif TEST_HPU: + backend = "hccl" + elif TEST_XPU: + backend = "xccl" + else: + backend = "gloo" return backend def build_device_mesh(self) -> DeviceMesh: @@ -331,13 +343,13 @@ def init_pg(self, eager_init) -> None: if "nccl" in self.backend and torch.cuda.device_count() < self.world_size: sys.exit(TEST_SKIPS[f"multi-gpu-{self.world_size}"].exit_code) - if self.backend not in ["nccl", "gloo", "mpi", "cpu:gloo,cuda:nccl", "hccl"]: + if self.backend not in ["nccl", "gloo", "mpi", "cpu:gloo,cuda:nccl", "hccl", "xccl"]: raise RuntimeError(f"Backend {self.backend} not supported!") device_id = None - if "nccl" in self.backend: + if "nccl" or "xccl" in self.backend: # set device for nccl pg for collectives - torch.cuda.set_device(self.rank) + torch.accelerator.set_device_index(self.rank) # we only need to set device_id for nccl backend with eager init device_id = torch.device(f"{self.device_type}:{self.rank}") if eager_init else None # For nccl backend, bind the device to the process if device_id is not None @@ -391,10 +403,10 @@ def wrapper( self, *args: tuple[object], **kwargs: dict[str, Any] # type: ignore[misc] ) -> None: # if enough GPU we can use GPU, otherwise we fallback to CPU - if not TEST_CUDA or torch.cuda.device_count() < self.world_size: - self.device_type = "cpu" - else: - self.device_type = DEVICE_TYPE + # if not TEST_CUDA or torch.cuda.device_count() < self.world_size: + # self.device_type = "cpu" + # else: + self.device_type = DEVICE_TYPE #zl_debug need to refine self.init_pg(eager_init) diff --git a/torch/testing/_internal/distributed/distributed_test.py b/torch/testing/_internal/distributed/distributed_test.py index b56c0a9f17f6..9f8fc335cfbb 100644 --- a/torch/testing/_internal/distributed/distributed_test.py +++ b/torch/testing/_internal/distributed/distributed_test.py @@ -1278,7 +1278,7 @@ def test_coalescing_manager(self): world_size = dist.get_world_size() rank_to_GPU = init_multigpu_helper(world_size, BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) num_colls = 2 size_per_coll = 8 small_tensors = [ @@ -1312,7 +1312,7 @@ def test_coalescing_manager_async(self): world_size = dist.get_world_size() rank_to_GPU = init_multigpu_helper(world_size, BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) num_colls = 2 size_per_coll = 8 small_tensors = [ @@ -1345,7 +1345,7 @@ def test_batch_isend_irecv_nccl(self): world_size = dist.get_world_size() rank_to_GPU = init_multigpu_helper(world_size, BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) p2p_op_list = [] recv_tensors = [None for _ in range(world_size)] expected_tensors = [None for _ in range(world_size)] @@ -1385,7 +1385,7 @@ def test_batch_isend_irecv_ring_exchange_nccl(self): world_size = dist.get_world_size() rank_to_GPU = init_multigpu_helper(world_size, BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) send_tensor = _build_tensor(world_size, device_id=device_id) recv_tensor = _build_tensor(world_size, value=-1, device_id=device_id) @@ -1438,7 +1438,7 @@ def test_batch_isend_irecv_no_rank_zero_nccl(self): rank = dist.get_rank() rank_to_GPU = init_multigpu_helper(dist.get_world_size(), BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) p2p_op_list = [] if rank == 1: @@ -1559,7 +1559,7 @@ def _test_send_recv_nccl(self, profiler_ctx=None): world_size = dist.get_world_size() rank_to_GPU = init_multigpu_helper(world_size, BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) tensor = _build_tensor(rank + 1, device_id=device_id) profiler_cls = profiler_ctx if profiler_ctx is not None else nullcontext() @@ -2046,7 +2046,7 @@ def test_broadcast_cuda(self): group, group_id, rank = self._init_global_test() rank_to_GPU = init_multigpu_helper(dist.get_world_size(), BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU) @skip_if_small_worldsize @@ -2073,7 +2073,7 @@ def test_nccl_high_priority_stream(self): group, _, rank = self._init_global_test() rank_to_GPU = init_multigpu_helper(dist.get_world_size(), BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) new_port = str(MASTER_PORT + 1) os.environ["MASTER_PORT"] = new_port @@ -2152,7 +2152,7 @@ def test_reduce_sum_cuda(self): group, group_id, rank = self._init_global_test() rank_to_GPU = init_multigpu_helper(dist.get_world_size(), BACKEND) device_id = rank_to_GPU[rank][0] - torch.cuda.set_device(device_id) + torch.accelerator.set_device_index(device_id) self._test_reduce_helper( group, group_id,