diff --git a/conf/experimental/test/deepep_standard.toml b/conf/experimental/test/deepep_standard.toml index e50c4e6df..cfe8ea869 100644 --- a/conf/experimental/test/deepep_standard.toml +++ b/conf/experimental/test/deepep_standard.toml @@ -20,13 +20,12 @@ test_template_name = "DeepEP" [cmd_args] # Local .sqsh file: -# docker_image_url = "/.autodirect/mswg2/E2E/Regression_logs/squash/yoel/dp-benchmark-shuffle.sqsh" # Container registry (uses your Docker credentials): docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" mode = "standard" -tokens = 1024 +tokens = 4096 num_experts = 256 num_topk = 8 hidden_size = 7168 diff --git a/conf/experimental/test/nccl_test_alltoallv.toml b/conf/experimental/test/nccl_test_alltoallv.toml new file mode 100644 index 000000000..2062feb88 --- /dev/null +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "nccl_test_alltoallv" +description = "NCCL AlltoAllv" +test_template_name = "NcclTest" + +[cmd_args] +docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" +# Container provides /opt/nccl-tests/build/alltoallv_perf. +subtest_name = "alltoallv_perf_mpi" +nthreads = 1 +ngpus = 1 +minbytes = "512M" +maxbytes = "512M" +stepfactor = 2 +iters = 10 +warmup_iters = 1 +check = 1 +blocking = 0 +use_deepep_matrix = true diff --git a/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml b/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml new file mode 100644 index 000000000..111209b98 --- /dev/null +++ b/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "deepep-with-nccl-alltoallv" + +# First run the DeepEP benchmark which generates the traffic matrix +[[Tests]] +id = "Tests.deepep" +test_name = "deepep_standard" +num_nodes = 2 +nodes = ["dgx-gaia-55", "dgx-gaia-56"] +time_limit = "00:30:00" + +# Then run NCCL AlltoAllv test using the generated matrix +[[Tests]] +id = "Tests.nccl_alltoallv" +test_name = "nccl_alltoallv" +num_nodes = 2 +nodes = ["dgx-gaia-55", "dgx-gaia-56"] +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.deepep" diff --git a/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml b/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml new file mode 100644 index 000000000..0c58c8e32 --- /dev/null +++ b/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "deepep-with-ucc-alltoallv" + +# First run the DeepEP benchmark which generates the traffic matrix +[[Tests]] +id = "Tests.deepep" +test_name = "deepep_standard" +num_nodes = 2 +time_limit = "00:30:00" + +# Then run UCC AlltoAllv test using the generated matrix (auto-converted) +[[Tests]] +id = "Tests.ucc_alltoallv" +test_name = "ucc_alltoallv_deepep" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.deepep" diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 5ad03b8d0..94f22d6f8 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -82,6 +82,7 @@ def register_all(): DeepEPReportGenerationStrategy, DeepEPSlurmCommandGenStrategy, DeepEPTestDefinition, + DeepEPMoEThroughputReporter, ) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, @@ -301,6 +302,7 @@ def register_all(): Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("deepep_moe_throughput", DeepEPMoEThroughputReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/deepep/__init__.py b/src/cloudai/workloads/deepep/__init__.py index f145a3b3a..a559eaa79 100644 --- a/src/cloudai/workloads/deepep/__init__.py +++ b/src/cloudai/workloads/deepep/__init__.py @@ -15,11 +15,13 @@ # limitations under the License. from .deepep import DeepEPCmdArgs, DeepEPTestDefinition +from .deepep_moe_throughput_reporter import DeepEPMoEThroughputReporter from .report_generation_strategy import DeepEPReportGenerationStrategy from .slurm_command_gen_strategy import DeepEPSlurmCommandGenStrategy __all__ = [ "DeepEPCmdArgs", + "DeepEPMoEThroughputReporter", "DeepEPReportGenerationStrategy", "DeepEPSlurmCommandGenStrategy", "DeepEPTestDefinition", diff --git a/src/cloudai/workloads/deepep/deepep.py b/src/cloudai/workloads/deepep/deepep.py index 1b01c88b6..44c09bd65 100644 --- a/src/cloudai/workloads/deepep/deepep.py +++ b/src/cloudai/workloads/deepep/deepep.py @@ -38,6 +38,7 @@ class DeepEPCmdArgs(CmdArgs): num_iterations: int = 50 shuffle_columns: bool = False use_kineto_profiler: bool = False + enable_tuning: bool = False num_sms: int = 24 num_qps_per_rank: int = 12 config_file_path: str = "/tmp/config.yaml" diff --git a/src/cloudai/workloads/deepep/deepep_combined_report.py b/src/cloudai/workloads/deepep/deepep_combined_report.py new file mode 100644 index 000000000..a29504048 --- /dev/null +++ b/src/cloudai/workloads/deepep/deepep_combined_report.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""DeepEP dependency helpers for Slurm UCC/NCCL.""" + +from __future__ import annotations + +from pathlib import Path + +from cloudai.core import TestRun + +DEEPEP_PREV_MOUNT = "/cloudai_deepep_prev" + + +def start_post_comp_chain(test_run: TestRun) -> list[TestRun]: + """Follow ``start_post_comp`` (e.g. UCC → NCCL → DeepEP).""" + dep = test_run.dependencies.get("start_post_comp") + if dep is None: + return [] + chain: list[TestRun] = [] + seen: set[int] = set() + cur: TestRun | None = dep.test_run + while cur is not None and id(cur) not in seen: + seen.add(id(cur)) + chain.append(cur) + nxt = cur.dependencies.get("start_post_comp") + cur = nxt.test_run if nxt else None + return chain + + +def _has_ucc_matrix_under(root: Path) -> bool: + if (root / "ucc_matrix.txt").is_file(): + return True + return any(root.glob("**/ucc_matrix.txt")) + + +def deepep_benchmark_root(test_run: TestRun) -> Path | None: + """DeepEP job directory (``ucc_matrix`` or BENCHMARK stdout), walking ``start_post_comp``.""" + for tr in start_post_comp_chain(test_run): + root = tr.output_path + if _has_ucc_matrix_under(root): + return root + st = root / "stdout.txt" + if st.is_file(): + try: + if "BENCHMARK: DeepEP Results" in st.read_text(errors="replace")[:250000]: + return root + except OSError: + continue + return None + + +def deepep_results_json_files(test_output_path: Path) -> list[Path]: + """All ``results.json`` paths under ``results/benchmark_*`` or top-level ``benchmark_*``.""" + found: list[Path] = [] + for pattern in ("results/benchmark_*_ranks_*", "benchmark_*_ranks_*"): + for d in sorted(test_output_path.glob(pattern)): + if d.is_dir(): + rj = d / "results.json" + if rj.is_file(): + found.append(rj) + return found diff --git a/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py b/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py new file mode 100644 index 000000000..7bef381f9 --- /dev/null +++ b/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Scenario-level MoE throughput summary: standalone SVG file.""" + +from __future__ import annotations + +import html +import json +import logging +import re +from pathlib import Path + +from cloudai._core.base_reporter import Reporter +from cloudai.workloads.deepep.deepep import DeepEPTestDefinition +from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files +from cloudai.workloads.nccl_test.nccl import NCCLTestDefinition +from cloudai.workloads.nccl_test.performance_report_generation_strategy import extract_nccl_data +from cloudai.workloads.ucc_test.ucc import UCCTestDefinition + + +def _deepep_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: + """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" + paths = deepep_results_json_files(test_output) + if not paths: + return [] + latest = max(paths, key=lambda p: p.stat().st_mtime) + try: + rows = json.loads(latest.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logging.debug("DeepEP results.json unreadable %s: %s", latest, e) + return [] + if not isinstance(rows, list): + return [] + + by_op: dict[str, float] = {} + for row in rows: + if not isinstance(row, dict): + continue + op = row.get("operation") + if not isinstance(op, str) or "bus_bw_avg" not in row: + continue + op_l = op.lower() + if op_l not in ("dispatch", "combine"): + continue + try: + by_op[op_l] = float(row["bus_bw_avg"]) + except (TypeError, ValueError): + continue + + out: list[tuple[str, float, str]] = [] + # Stable order: dispatch then combine, only if present in JSON + if "dispatch" in by_op: + out.append(("DeepEP dispatch", by_op["dispatch"], "#2ca02c")) + if "combine" in by_op: + out.append(("DeepEP combine", by_op["combine"], "#31a354")) + return out + + +def _mean_ucc_bus_bw_gb_s(test_output: Path) -> float | None: + for name in ("stdout.txt", "ucc_perftest_capture.log"): + path = test_output / name + if not path.is_file(): + continue + v = _parse_ucc_perftest_mean_bus_avg(path) + if v is not None: + return v + return None + + +def _parse_ucc_perftest_mean_bus_avg(path: Path) -> float | None: + """Mean of ``Bus Bandwidth … avg`` column over numeric data rows (8 fields).""" + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return None + avgs: list[float] = [] + for line in text.splitlines(): + parts = re.split(r"\s+", line.strip()) + if len(parts) != 8: + continue + if not parts[0].isdigit() or not parts[1].isdigit(): + continue + try: + sz = float(parts[1]) + bavg = float(parts[5]) + except ValueError: + continue + if sz < 1048576: + continue + avgs.append(bavg) + if not avgs: + return None + return float(sum(avgs) / len(avgs)) + + +def _mean_nccl_oop_busbw_gb_s(test_output: Path) -> float | None: + rows, _, _, _ = extract_nccl_data(test_output / "stdout.txt") + if not rows: + return None + vals: list[float] = [] + for parts in rows: + try: + vals.append(float(parts[7])) + except (IndexError, ValueError): + continue + if not vals: + return None + return float(sum(vals) / len(vals)) + + +def _write_moe_throughput_svg( + path: Path, + *, + scenario_name: str, + labels: list[str], + values: list[float], + colors: list[str], + y_axis_label: str, +) -> None: + """Bar chart + value markers; standalone SVG.""" + n = len(labels) + ml, mr, mt = 72, 44, 72 + ih = 300 + mb = max(100, 36 + n * 18) + h = mt + ih + mb + w = max(720, min(1280, ml + mr + max(1, n) * 92)) + + iw = w - ml - mr + y0 = mt + ih + vmin, vmax = 0.0, max(values) * 1.12 if values else 1.0 + if vmax <= vmin: + vmax = vmin + 1.0 + + def ypx(v: float) -> float: + return y0 - (v - vmin) / (vmax - vmin) * ih + + slot = iw / max(n, 1) + bar_w = min(56.0, slot * 0.55) + centers = [ml + (i + 0.5) * slot for i in range(n)] + pts = [(cx, ypx(v)) for cx, v in zip(centers, values, strict=True)] + + parts: list[str] = [ + '', + f'', + '', + f'{html.escape(scenario_name)}', + f'{html.escape(y_axis_label)}', + f'', + f'', + ] + + for g in (0.25, 0.5, 0.75): + gy = y0 - g * ih + parts.append( + f'' + ) + gv = vmin + g * (vmax - vmin) + parts.append( + f'{gv:.1f}' + ) + + for cx, val, col, lab in zip(centers, values, colors, labels, strict=True): + top = ypx(val) + x1 = cx - bar_w / 2 + hbar = y0 - top + parts.append( + f'' + ) + + for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): + parts.append( + f'' + ) + parts.append( + f'{val:.2f}' + ) + parts.append( + f'' + f"{html.escape(lab)}" + ) + + parts.append( + f'{html.escape(y_axis_label)}' + ) + + leg_y = y0 + 38 + parts.append(f'Summary') + for i, (lab, val, col) in enumerate(zip(labels, values, colors, strict=True)): + parts.append( + f'' + f'{html.escape(lab)}' + f": {val:.4f} GB/s" + ) + + parts.append("") + + path.write_text("\n".join(parts), encoding="utf-8") + + +class DeepEPMoEThroughputReporter(Reporter): + """After the scenario finishes, write one standalone SVG chart under the results root.""" + + def generate(self) -> None: + self.load_test_runs() + deepep_trs = [tr for tr in self.trs if isinstance(tr.test, DeepEPTestDefinition)] + if not deepep_trs: + logging.debug("Skipping deepep_moe_throughput: no DeepEP test in scenario.") + return + + categories: list[str] = [] + values: list[float] = [] + colors: list[str] = [] + + deepep_bars = _deepep_dispatch_combine_bars(deepep_trs[0].output_path) + if not deepep_bars: + logging.warning( + "Skipping deepep_moe_throughput: no dispatch/combine bus_bw_avg in DeepEP results.json under %s", + deepep_trs[0].output_path, + ) + return + for lab, val, col in deepep_bars: + categories.append(lab) + values.append(val) + colors.append(col) + + ucc_trs = [tr for tr in self.trs if isinstance(tr.test, UCCTestDefinition)] + if ucc_trs: + uval = _mean_ucc_bus_bw_gb_s(ucc_trs[0].output_path) + if uval is not None: + categories.append("UCC") + values.append(uval) + colors.append("#1f77b4") + else: + logging.debug("UCC test present but bus bandwidth not parsed from outputs.") + + nccl_trs = [tr for tr in self.trs if isinstance(tr.test, NCCLTestDefinition)] + if nccl_trs: + nval = _mean_nccl_oop_busbw_gb_s(nccl_trs[0].output_path) + if nval is not None: + categories.append("NCCL") + values.append(nval) + colors.append("#ff7f0e") + else: + logging.debug("NCCL test present but perf table not parsed from stdout.") + + out = self.results_root / f"{self.test_scenario.name}-moe-throughput.svg" + _write_moe_throughput_svg( + out, + scenario_name=self.test_scenario.name, + labels=categories, + values=values, + colors=colors, + y_axis_label="Mean bus bandwidth (GB/s)", + ) + logging.info("Generated MoE throughput comparison at %s", out) diff --git a/src/cloudai/workloads/deepep/report_generation_strategy.py b/src/cloudai/workloads/deepep/report_generation_strategy.py index fc33cd741..91a621496 100644 --- a/src/cloudai/workloads/deepep/report_generation_strategy.py +++ b/src/cloudai/workloads/deepep/report_generation_strategy.py @@ -25,6 +25,7 @@ from cloudai.core import ReportGenerationStrategy from cloudai.report_generator.tool.csv_report_tool import CSVReportTool from cloudai.util.lazy_imports import lazy +from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files if TYPE_CHECKING: import pandas as pd @@ -40,34 +41,18 @@ def can_handle_directory(self) -> bool: Returns: bool: True if directory contains DeepEP results. """ - # Check for results subdirectories created by DeepEP directory_path = self.test_run.output_path - matching_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if matching_dirs: - # Check if any of them has results.json - for result_dir in matching_dirs: - if (result_dir / "results.json").exists(): - return True - - return False + return bool(deepep_results_json_files(directory_path)) def generate_report(self) -> None: """Generate a report from DeepEP benchmark results.""" directory_path = self.test_run.output_path test_name = self.test_run.test.name - results_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if not results_dirs: - return - all_results = [] - for result_dir in results_dirs: - results_json = result_dir / "results.json" - if not results_json.exists(): - continue + for results_json in deepep_results_json_files(directory_path): + result_dir = results_json.parent try: with open(results_json, "r") as f: @@ -76,6 +61,9 @@ def generate_report(self) -> None: logging.debug(f"Error parsing {results_json}: {e}") continue + if not isinstance(results_data, list): + continue + match = re.match(r"benchmark_(\d+)_ranks_(.+?)_(low_latency|standard)", result_dir.name) num_ranks, timestamp, mode = 0, "unknown", "unknown" if match: @@ -84,6 +72,8 @@ def generate_report(self) -> None: mode = match.group(3) for result in results_data: + if not isinstance(result, dict): + continue result["num_ranks"] = num_ranks result["timestamp"] = timestamp result["mode"] = mode @@ -99,6 +89,9 @@ def generate_report(self) -> None: "num_tokens", "hidden", "deepep_time", + "bus_bw_avg", + "bus_bw_min", + "bus_bw_max", "global_bw", "simple_rdma_bw", "simple_nvl_bw", diff --git a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py index 60bbf3000..3dc021900 100644 --- a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py @@ -27,7 +27,7 @@ class DeepEPSlurmCommandGenStrategy(SlurmCommandGenStrategy): def _append_head_node_detection(self, batch_script_content: List[str]) -> None: """ - Append bash commands to detect head node IP for torchrun. + Append bash commands to detect head node IP. Args: batch_script_content: The list of script lines to append to. @@ -44,6 +44,9 @@ def _append_head_node_detection(self, batch_script_content: List[str]) -> None: "echo Num Nodes: ${#nodes[@]}", "echo Head Node IP: $head_node_ip", "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", ] ) @@ -66,7 +69,7 @@ def _container_mounts(self) -> List[str]: self._generate_config_yaml(config_file_path, cmd_args) mounts = [ - f"{config_file_path.parent.absolute()}:{config_file_path.parent.absolute()}", + f"{config_file_path.absolute()}:{cmd_args.config_file_path}", f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", ] @@ -87,23 +90,12 @@ def generate_test_command(self) -> List[str]: else: benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark_ll.py" - _, nodes = self.system.get_nodes_by_spec(self.test_run.nnodes, self.test_run.nodes) - num_nodes = len(nodes) if nodes else self.test_run.nnodes - - config_file_path = self.test_run.output_path / "config.yaml" - command_parts = [ - "torchrun", - f"--nnodes={num_nodes}", - "--nproc_per_node=1", - "--rdzv_id=$RANDOM", - "--rdzv_backend=c10d", - "--rdzv_endpoint=$head_node_ip:29500", + return [ + "python", benchmark_script, - str(config_file_path.absolute()), + cmd_args.config_file_path, ] - return command_parts - def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> None: """ Generate YAML configuration file for DeepEP benchmark. @@ -136,4 +128,7 @@ def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> N def gen_srun_success_check(self) -> str: """Check if DeepEP benchmark completed successfully.""" output_file = self.test_run.output_path / "stdout.txt" - return f'grep -q "global_bw\\|deepep_time" {output_file} && echo 1 || echo 0' + return ( + 'grep -Eq "global_bw|RDMA BW \\(GB/s\\)|NVLink BW \\(GB/s\\)|Bus BW \\(GB/s\\)|Global BW \\(GB/s\\)" ' + f'{output_file} && echo 1 || echo 0' + ) diff --git a/src/cloudai/workloads/nccl_test/nccl.py b/src/cloudai/workloads/nccl_test/nccl.py index c0381c5b0..2968cf2af 100644 --- a/src/cloudai/workloads/nccl_test/nccl.py +++ b/src/cloudai/workloads/nccl_test/nccl.py @@ -29,6 +29,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -41,6 +42,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", @@ -55,6 +57,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -67,6 +70,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", @@ -97,6 +101,8 @@ class NCCLCmdArgs(CmdArgs): blocking: Union[int, list[int]] = 0 cudagraph: Union[int, list[int]] = 0 stepfactor: Optional[Union[int, list[int]]] = None + use_deepep_matrix: bool = False + alltoallv_matrix_container_path: str = "/tmp/traffic_matrix.txt" class NCCLTestDefinition(TestDefinition): diff --git a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py index 1295187e0..80795a07c 100644 --- a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py @@ -14,18 +14,77 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root -from .nccl import NCCLTestDefinition +from .nccl import NCCLCmdArgs, NCCLTestDefinition + +_ALLTOALLV_MATRIX_ENV = "ALLTOALLV_MATRIX_FILE" +_NCCL_TESTS_ALLTOALLV_PERF = "/opt/nccl-tests/build/alltoallv_perf" + + +def _nccl_cmd_scalar(value: object) -> object: + if isinstance(value, list): + return value[0] if value else value + return value + + +def _nccl_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes nccl_matrix.txt under the dependency test output or a timestamped benchmark subdir.""" + direct = dep_out / "nccl_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/nccl_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None class NcclTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for NCCL tests on Slurm systems.""" + def _deepep_nccl_matrix_host_path(self) -> Path | None: + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + root = deepep_benchmark_root(self.test_run) + if root is None: + return None + return _nccl_matrix_path_under_deepep_output(root) + def _container_mounts(self) -> List[str]: - return [] + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + matrix_host = self._deepep_nccl_matrix_host_path() + if matrix_host is None: + return [] + + dest = tdef.cmd_args.alltoallv_matrix_container_path + mounts: List[str] = [f"{matrix_host.resolve()}:{dest}"] + + dr = deepep_benchmark_root(self.test_run) + if dr is not None: + mounts.append(f"{dr.resolve()}:{DEEPEP_PREV_MOUNT}:ro") + return mounts + + @property + def final_env_vars(self) -> dict[str, str | list[str]]: + env_vars = dict(super().final_env_vars) + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.use_deepep_matrix and self._deepep_nccl_matrix_host_path() is not None: + env_vars[_ALLTOALLV_MATRIX_ENV] = tdef.cmd_args.alltoallv_matrix_container_path + return env_vars + + @final_env_vars.setter + def final_env_vars(self, value: dict[str, str | list[str]]) -> None: + super().final_env_vars = value def image_path(self) -> str | None: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) @@ -33,10 +92,35 @@ def image_path(self) -> str | None: def generate_test_command(self) -> List[str]: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.subtest_name == "alltoallv_perf_mpi": + a = tdef.cmd_args + parts: List[str] = [ + _NCCL_TESTS_ALLTOALLV_PERF, + "-b", + str(_nccl_cmd_scalar(a.minbytes)), + "-e", + str(_nccl_cmd_scalar(a.maxbytes)), + "-g", + str(_nccl_cmd_scalar(a.ngpus)), + "-w", + str(_nccl_cmd_scalar(a.warmup_iters)), + "-n", + str(_nccl_cmd_scalar(a.iters)), + ] + if self.test_run.test.extra_cmd_args: + parts.append(self.test_run.test.extra_args_str) + return parts + srun_command_parts = [f"{tdef.cmd_args.subtest_name}"] + skip_cli = { + "docker_image_url", + "subtest_name", + "use_deepep_matrix", + "alltoallv_matrix_container_path", + } nccl_test_args = tdef.cmd_args.model_dump().keys() for arg in nccl_test_args: - if arg in {"docker_image_url", "subtest_name"}: + if arg in skip_cli: continue value = getattr(tdef.cmd_args, arg) diff --git a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py index 80e1a34c7..43f0df435 100644 --- a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py @@ -14,18 +14,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root from .ucc import UCCCmdArgs, UCCTestDefinition +_UCC_GEN_MATRIX_CONTAINER = "/opt/hpcx/ucc/tools/perf/generator/input_matrices.txt" + + +def _ucc_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes ucc_matrix.txt under a timestamped benchmark subdir; resolve either layout.""" + direct = dep_out / "ucc_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/ucc_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None + class UCCTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for UCC tests on Slurm systems.""" + def _deepep_ucc_matrix_host_path(self) -> Path | None: + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + dep_out = deepep_benchmark_root(self.test_run) + if dep_out is None: + return None + return _ucc_matrix_path_under_deepep_output(dep_out) + def _container_mounts(self) -> List[str]: - return [] + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + deepep_root = deepep_benchmark_root(self.test_run) + if deepep_root is None: + return [] + + matrix_host = self._deepep_ucc_matrix_host_path() + if matrix_host is None: + return [] + + return [ + f"{matrix_host.resolve()}:{_UCC_GEN_MATRIX_CONTAINER}", + f"{deepep_root.resolve()}:{DEEPEP_PREV_MOUNT}:ro", + ] def image_path(self) -> str | None: tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) @@ -41,6 +82,8 @@ def generate_test_command(self) -> List[str]: srun_command_parts.append(f"-e {tdef_cmd_args.e}") if tdef_cmd_args.gen is not None: srun_command_parts.append(f"--gen {tdef_cmd_args.gen}") + elif self._deepep_ucc_matrix_host_path() is not None: + srun_command_parts.append(f"--gen file:name={_UCC_GEN_MATRIX_CONTAINER}") srun_command_parts.append("-m cuda") srun_command_parts.append("-F") diff --git a/src/cloudai/workloads/ucc_test/ucc.py b/src/cloudai/workloads/ucc_test/ucc.py index 982387937..35c723a4c 100644 --- a/src/cloudai/workloads/ucc_test/ucc.py +++ b/src/cloudai/workloads/ucc_test/ucc.py @@ -69,6 +69,7 @@ class UCCCmdArgs(CmdArgs): b: Union[int, list[int]] = 1 e: Union[str, list[str]] = "8M" gen: Union[str, list[str], None] = None + use_deepep_matrix: bool = False class UCCTestDefinition(TestDefinition):