diff --git a/.github/workflows/build-swordfish-image.yml b/.github/workflows/build-swordfish-image.yml index 686107e..8023848 100644 --- a/.github/workflows/build-swordfish-image.yml +++ b/.github/workflows/build-swordfish-image.yml @@ -18,15 +18,9 @@ on: - "uv.lock" - ".dockerignore" - ".github/workflows/build-swordfish-image.yml" - pull_request: - branches: [main] - paths: - - "infra/rune/image/**" - - "swordfish/**" - - "pyproject.toml" - - "uv.lock" - - ".dockerignore" - - ".github/workflows/build-swordfish-image.yml" + # Keep this off pull_request: the base image lives in private ACR, so PR + # runners cannot pull it anonymously. Use local `make test` + `ci/lint` for + # PR validation, and build the image from main or workflow_dispatch. workflow_dispatch: inputs: liger_version: diff --git a/.gitignore b/.gitignore index f332ca8..1c4580e 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ infra/airun/generated/ # local junk scratch/ tmp/ +.tmp/ diff --git a/Makefile b/Makefile index 6856c51..d03371b 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,8 @@ RUNE_PVC ?= training-nfs RUNE_IMAGE_REF ?= voiceagentcr.azurecr.io/swordfish-bench:latest RUNE_RELEASE_TAG ?= rune-cli-v0.2.0 RUNE_REPO ?= azure-management-and-platforms/aks-ai-runtime +KUBE_CONTEXT ?= +GPU_OPERATOR_NAMESPACE ?= gpu-operator GITHUB_HOST ?= github.com RUNE_PY_SPEC ?= rune-py @ git+https://github.com/azure-management-and-platforms/aks-ai-runtime.git@$(RUNE_RELEASE_TAG)\#subdirectory=applications/rune-py SUBMIT_BENCH = uv run python -m swordfish.runner submit-bench \ @@ -76,7 +78,8 @@ SUBMIT_BENCH = uv run python -m swordfish.runner submit-bench \ rune-submit-gemm-matrix \ rune-submit-liger-rmsnorm-a100 rune-submit-liger-swiglu-a100 \ rune-submit-liger-fsdp-a100-baseline rune-submit-liger-fsdp-a100-liger \ - rune-convert-ncu + rune-convert-ncu \ + airun-a100-ncu-status airun-a100-ncu-pause airun-a100-ncu-restore rune-profiles: uv run python -m swordfish.runner generate-rune-profiles --out $(RUNE_PROFILE_PACK) @@ -93,6 +96,21 @@ rune-install-profiles: rune-profiles-check @echo "verify with: rune profile list" @echo "expected: 4 swordfish-bench-* and 4 swordfish-fsdp-* profiles" +airun-a100-ncu-status: + uv run python -m swordfish.runner a100-ncu-window status \ + --namespace $(GPU_OPERATOR_NAMESPACE) \ + $(if $(KUBE_CONTEXT),--context $(KUBE_CONTEXT),) + +airun-a100-ncu-pause: + uv run python -m swordfish.runner a100-ncu-window pause \ + --namespace $(GPU_OPERATOR_NAMESPACE) \ + $(if $(KUBE_CONTEXT),--context $(KUBE_CONTEXT),) + +airun-a100-ncu-restore: + uv run python -m swordfish.runner a100-ncu-window restore \ + --namespace $(GPU_OPERATOR_NAMESPACE) \ + $(if $(KUBE_CONTEXT),--context $(KUBE_CONTEXT),) + rune-submit-gemm-a100: $(SUBMIT_BENCH) --workload gemm --arch a100 \ --name $(RUNE_NAME_PREFIX)-$(RUNE_RUN_ID)-a100 \ diff --git a/README.md b/README.md index 90c9da5..3d741d9 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,19 @@ uv run python -m swordfish.runner bench-transformer \ --out /tmp/swordfish-transformer-smoke.json ``` +The standalone reduction-kernel track starts with `vectorsum_v2`: + +```bash +uv run python -m swordfish.runner bench-vectorsum \ + --backend torch \ + --size 1638400 \ + --dtype fp32 \ + --repeats 1 --warmup 0 --iters 1 \ + --device cpu --allow-cpu \ + --arch-label a100 \ + --out /tmp/swordfish-vectorsum-smoke.json +``` + To time a full training step instead of inference-only forward, use `--mode train-step`. This runs forward, loss, backward, and an AdamW optimizer step on the same tiny GPT-style reference: @@ -111,6 +124,7 @@ make rune-install-profiles # one-time symlink uv run python -m swordfish.runner list-experiments uv run python -m swordfish.runner explain-experiment liger-fsdp --arch a100 +uv run python -m swordfish.runner explain-experiment vectorsum-v2 --arch a100 # preview the rendered Job manifest (no cluster contact) uv run python -m swordfish.runner submit-experiment gemm --arch h100 \ @@ -181,6 +195,18 @@ still requires the short DCGM exporter pause documented in `docs/airun/a100-ncu-blocker.md`. H100 NVL and H200 NCU run without extra capabilities. +Trace handoff is standardized under `runs/traces/`: + +```bash +uv run python -m swordfish.runner bundle-traces my-bench:ncu \ + --context voice-agent-flex \ + --bundle-name my-bench-handoff +``` + +That fetches the result JSON plus `.ncu-rep` / `.nsys-rep` / torch trace into an +expanded directory and writes `runs/traces/my-bench-handoff.tar.gz` for copying +to a Mac or handing to Hermes. + After all three jobs have produced final JSON files, use the strict completion gate: @@ -228,6 +254,12 @@ driver loader instead of pretending `torch.add` is raw PTX. Future raw-PTX benchmarks should plug into the same backend interface so timing, correctness, NCU, and JSON output do not fork per kernel. +`bench-vectorsum` is the first standalone reduction target. `torch` is the fp32 +reference path; `triton` is a two-stage block reduction that writes fp32 partials +and a scalar output while preserving the common result schema. The root-level +`submission.py` mirrors that Triton reduction as a self-contained evaluator +entrypoint for environments where the `swordfish` package is not installed. + ## License MIT diff --git a/docs/airun/a100-ncu-blocker.md b/docs/airun/a100-ncu-blocker.md index af1d2f5..83995a6 100644 --- a/docs/airun/a100-ncu-blocker.md +++ b/docs/airun/a100-ncu-blocker.md @@ -133,20 +133,30 @@ from A100 nodes. The rendered Job included `securityContext.capabilities.add: When A100 NCU profiling is needed: 1. Confirm the benchmark can run normally without profiling. -2. Temporarily patch `nvidia-dcgm-exporter` so it does not schedule on - `gpu=a100` nodes. -3. Wait until no `nvidia-dcgm-exporter-*` pods are running on the target A100 - nodes. -4. Run `make airun-a100-ncu-preflight`; it should fail if a running DCGM exporter - pod is still on a Ready target A100 node or if the A100 arch config lacks - `SYS_ADMIN`. -5. Run the NCU benchmark job through a dedicated A100 NCU profile +2. Use the repo helper to open the A100 profiling window: + + ```bash + make airun-a100-ncu-pause KUBE_CONTEXT=voice-agent-flex + ``` + + This patches `nvidia-dcgm-exporter` away from A100 nodes and deletes any + already-running A100 exporter pods so the DaemonSet does not keep querying + the profiling metric groups during Nsight Compute. +3. Run `make airun-a100-ncu-status KUBE_CONTEXT=voice-agent-flex`; it should + show no A100 exporter pods. +4. Run the NCU benchmark job through a dedicated A100 NCU profile (`swordfish-bench-a100-ncu` for one-GPU jobs or `swordfish-fsdp-a100-ncu` for 8-GPU jobs). The normal dispatch path selects these automatically for `--arch a100 --profile-mode ncu`. -6. Copy the final JSON/CSV artifacts locally. -7. Restore the DaemonSet immediately and confirm `rollout status - ds/nvidia-dcgm-exporter` succeeds. +5. Copy or bundle the final JSON/trace artifacts locally. +6. Restore the DaemonSet immediately: + + ```bash + make airun-a100-ncu-restore KUBE_CONTEXT=voice-agent-flex + ``` + + Confirm the status reaches the expected ready count before leaving the + window unattended. Do not leave DCGM disabled after profiling. The permanent lesson is that A100 NCU and DCGM exporter profiling groups contend for the same driver profiling diff --git a/docs/airun/triage-log.md b/docs/airun/triage-log.md index 8d8dceb..cf1a8a1 100644 --- a/docs/airun/triage-log.md +++ b/docs/airun/triage-log.md @@ -67,3 +67,58 @@ - Time to root cause: ~20 min - Fix: added Rune renderer support for `spec.runtime.securityContext.capabilities.add`, generated `swordfish-bench-a100-ncu` / `swordfish-fsdp-a100-ncu`, installed the patched local `rune`, temporarily excluded `gpu=a100` nodes from `nvidia-dcgm-exporter`, ran `swordfish-a100-ncu-rune-0502192934` with `--profile-mode ncu`, converted/fetched `profile.ncu-summary.csv`, then restored DCGM to 6/6 Ready. - Lesson: Profile-mode alone is not enough for A100; the easy path must select an elevated A100 NCU profile and still run inside a controlled DCGM pause window. + +## 2026-05-04 — A100/H200 FSDP comparison submit blocked by context and H200 capacity +- Initial suspicion: L5 +- Actual root cause: L5 (cluster context / transient node-pool capacity) — the first Rune submit targeted the current `chokevin-aks` context, which had no `ray` namespace; after switching to `voice-agent-flex`, the first H200 comparison leg had no schedulable H200 node and hit scheduler/autoscaler max-size events. +- Layers ruled out before finding it: L2 for the target context, because `kernel-mode-training`, `kernel-mode-large-memory`, and `team-kernel-mode-reserved-cq` existed with no initial pending workloads; L3/L4 for A100, because pinned A100 jobs admitted, scheduled to `NVIDIA-A100-SXM4-80GB`, and completed with result JSON + NSYS profiles. +- Time to root cause: ~15 min +- Fix: exposed `--context` and `--image` through `submit-experiment`, submitted against `voice-agent-flex`, deleted the initially blocked H200 jobs, pinned reruns to `voiceagentcr.azurecr.io/airun/swordfish-bench:bf92726-dirty` instead of cached `:dev`, and reran H200 once two Ready `NVIDIA-H200` nodes appeared. +- Follow-up: the completed pinned comparison (`sf-fsdp-pin-{a100,h200}-*`) showed `tb-no-limit` as the best overlap lead on both A100 and H200; H200 recovered during the session, so this was a transient capacity/context blocker rather than a persistent H200 experiment blocker. +- Lesson: For Rune sweeps, pass the kube context explicitly and pin the image tag; `:dev` plus `IfNotPresent` can reuse stale runner code even after the ACR tag has moved, and H200 must be preflighted for live schedulable nodes before using it as a comparison leg. + +## 2026-05-04 — vectorsum A100 capture-policy sweep pod Pending after admission +- Initial suspicion: L3 +- Actual root cause: L3 (k8s scheduler) — `vs-v2-capture-policy-05041233` was admitted by Kueue but rendered an impossible selector: `nvidia.com/gpu.product=NVIDIA-A100-SXM4-80GB` together with `rune.ai/gpu-class=h200-nvlink-141gb`. +- Layers ruled out before finding it: L2, because the Workload was `QuotaReserved` and `Admitted` in `team-kernel-mode-reserved-cq`. +- Time to root cause: ~10 min +- Fix: deleted the stuck admitted job and reran the benchmark with `--gpu-class a100-nvlink-80gb`; dry-run confirmed the selector changed to `rune.ai/gpu-class=a100-nvlink-80gb`. +- Verification: fixed-selector reruns scheduled on `aks-gpu-33826946-vmss000001` and wrote A100 result JSON. +- Lesson: When using a nominal A100 Rune profile, still dry-run/check the rendered `rune.ai/gpu-class`; a stale or inherited H200 GPU-class selector can make an A100 pod unschedulable even though Kueue admits it. + +## 2026-05-05 — local session could not submit vectorsum NCU sweep +- Initial suspicion: L5 +- Actual root cause: L5 (local kube/tooling context) — this session only had the `chokevin-aks` context, which has no `ray` namespace and no Kueue `ClusterQueue` CRD; `voice-agent-flex` / `voice-agent-flex-admin` were not configured, and neither `rune` nor `rune-py` was installed on PATH. +- Layers ruled out before finding it: application/runner, because `make test` passed, `bench-vectorsum` wrote valid local JSON, and the Python dispatch layer rendered the intended A100/H100/H200 `rune submit ... --profile-mode ncu` commands with explicit `--gpu-class`. +- Time to root cause: ~10 min +- Fix: local-only packaging completed; `.tmp/` run artifacts are ignored, the standalone `submission.py` evaluator entrypoint is documented, and the vector-sum/FSDP command render path is verified. No safe cluster submission is possible from this environment without a configured `voice-agent-flex` context and Rune bootstrap/auth. +- Follow-up: run the rendered `vectorsum-v2` NCU sweep from a workstation/session with `rune`, `ray` namespace access, and `voice-agent-flex` configured. For A100, use the elevated NCU profile/DCGM pause procedure from the prior A100 NCU entries. +- Lesson: before any Rune sweep, preflight both the local toolchain (`rune`, `rune-py`, GitHub auth if bootstrapping) and kube context (`ray` namespace + Kueue CRDs). A green Python dispatch layer is not sufficient proof that the current shell can submit jobs. + +## 2026-05-05 — vectorsum-v2 NCU sweep completed on H100/H200, A100 counters blocked +- Initial suspicion: L4 for A100 NCU, application/kernel behavior for H100/H200 tuning. +- Actual root cause: H100/H200 NCU completed successfully; A100 still failed at L4 with `ERR_NVGPUCTRPERM` even after selecting `swordfish-bench-a100-ncu` and temporarily excluding `gpu=a100` nodes from `nvidia-dcgm-exporter`. The A100 workload wrote a passing result JSON, so this is a profiler-counter permission/configuration blocker rather than a vector-sum runtime failure. +- Layers ruled out before finding it: L2/L3/L5 for this sweep, because all three jobs admitted and scheduled on Ready GPU nodes in `voice-agent-flex`; H100/H200 L4, because both generated and converted `profile.ncu-rep`; application correctness, because the fetched result JSONs reported `matches_reference=true`. +- Time to root cause: ~25 min after Rune/tooling was restored in-session. +- Fix/workaround: built a session-local Rune binary, installed swordfish profiles, submitted standalone `submission.py` against `voiceagentcr.azurecr.io/airun/autoresearch-pytorch-ray:dev`, fetched/converted H100/H200 NCU reports with `inspect-run --convert-ncu`, and restored the DCGM exporter DaemonSet to 6/6 pods after the A100 profiling window. +- Evidence: run id `230122`; H100 `_partial_sum_kernel` took 302 invocations / 1.86 ms / 62.2% of kernel time, with `_final_sum_kernel` another 1.01 ms / 33.9%; H200 `_partial_sum_kernel` took 302 invocations / 1.38 ms / 61.7%, with `_final_sum_kernel` another 765.24 us / 34.3%. Both top kernels showed low SM utilization and modest memory utilization, so the first tuning target is launch/reduction-structure overhead, not raw bandwidth. +- Timing caveat: latency JSON from NCU-profiled H100/H200 runs is inflated by NCU replay. A separate no-profile latency pass (`sf-vectorsum-v2-lat-230122-{a100,h100,h200}`) measured mean latency of 0.008638 ms on A100, 0.004723 ms on H100, and 0.004449 ms on H200 for the same size/repeats/iters, all with `matches_reference=true`. +- Lesson: For `vectorsum-v2`, use NCU runs for kernel attribution and no-profile runs for latency. A100 NCU remains a cluster/operator profiler-counter blocker even when the elevated profile and DCGM exclusion procedure are followed; do not block H100/H200 tuning on A100 counters. + +## 2026-05-05 — FSDP overlap follow-up ran via standalone script +- Initial suspicion: application/image contract, then FSDP overlap behavior. +- Actual root cause: the profile image `voiceagentcr.azurecr.io/airun/swordfish-bench:dev` did not contain the new FSDP wrap/prefetch/all-gather flags (`run_liger_fsdp_step` still accepted only `profile_steady_state`), so the follow-up could not safely use `python -m swordfish.runner submit-experiment` without rebuilding/pushing the image. A session-local standalone script reproduced the dirty FSDP runner logic and was submitted through Rune instead. +- Layers ruled out before running: L2/L3/L5, because `voice-agent-flex` admitted/scheduled all A100/H200 8-GPU jobs; runtime dependency availability, because the image still had `transformers`, `liger-kernel`, torchrun, and Nsight Systems. +- Time to root cause: ~10 min for image-contract probe, then normal job runtime. +- Fix/workaround: submitted `sf-fsdp-ovl-230122-{a100,h200}-{root,tb}` with steady-state NSYS output under `/data//profile/profile.nsys-rep`, plus no-profile latency checks `sf-fsdp-lat-230122-{a100,h200}-{root,tb}`. All eight jobs completed and fetched JSON; the four NSYS `.nsys-rep` files were also fetched locally under `runs/inspect/fsdp-overlap-230122/`. +- Results: no-profile latency favored transformer-block/no-limit slightly while reducing peak reserved memory: A100 root/default 845.15 ms / 19.39k tok/s / 63.71 GiB versus tb/no-limit 823.07 ms / 19.91k tok/s / 40.18 GiB; H200 root/default 374.70 ms / 43.73k tok/s / 63.71 GiB versus tb/no-limit 361.63 ms / 45.31k tok/s / 40.18 GiB. NSYS-wrapped timing inverted in favor of root/default (A100 1905.23 ms root vs 2254.85 ms tb; H200 1144.41 ms root vs 1448.47 ms tb), so use those traces for overlap attribution, not as the latency scoreboard. +- Lesson: Until the FSDP flag code is in the image, treat standalone-script jobs as the valid follow-up path and explicitly separate no-profile latency from NSYS trace capture. The tb/no-limit variant is still a small throughput win and a large memory win in clean timing, but the NSYS traces need offline inspection before claiming improved communication overlap. + +## 2026-05-06 — A100 NCU profile lacked SYS_ADMIN in rendered pod +- Initial suspicion: L4 +- Actual root cause: L4 (GPU/profiler permission) exposed a Rune renderer/tooling bug — `swordfish-bench-a100-ncu` declared `runtime.securityContext.capabilities.add: [SYS_ADMIN]`, but the submitted A100 NCU pod had an empty container `securityContext`, so Nsight Compute failed with `ERR_NVGPUCTRPERM`. +- Layers ruled out before finding it: L2/L3/L5, because the failed A100 job was admitted, scheduled onto `aks-gpu-33826946-vmss000001`, and wrote a correct result JSON; workload correctness, because `matches_reference=true`; H100/H200 L4, because their NCU reports converted successfully. +- Time to root cause: ~10 min after inspecting the rendered Job and profile. +- Fix: patched Rune's submit renderer to propagate `spec.runtime.securityContext` into the main container, rebuilt the session-local Rune binary, and added a Swordfish dispatch preflight that refuses real A100 NCU submits when dry-run output lacks `SYS_ADMIN`. Added `a100-ncu-window` / Make helpers to pause and restore DCGM exporter on A100 nodes, deleting existing A100 exporter pods after the affinity patch. +- Verification: patched Rune dry-run renders `securityContext.capabilities.add: [SYS_ADMIN]`; A100 smoke `sf-ncu-smoke-001148-a100` completed under a DCGM exclusion window and fetched a 48,773,066-byte `profile.ncu-rep`; `bundle-traces` produced `runs/traces/sf-ncu-smoke-001148-a100-hermes.tar.gz`. DCGM exporter was restored to 6 desired / 6 ready pods afterward. +- Lesson: A100 NCU seamlessness needs three guards together: a Rune binary that propagates profile security context, a DCGM exclusion window that also deletes already-running A100 exporter pods, and a stable trace bundle path for handoff instead of ad hoc `runs/inspect` directories. diff --git a/docs/benchmarking.md b/docs/benchmarking.md index 990f28f..e0e4815 100644 --- a/docs/benchmarking.md +++ b/docs/benchmarking.md @@ -30,17 +30,24 @@ or rune's native `--profile-mode=ncu` (binary `.ncu-rep`), and finally attaches the profiler summary to the unprofiled timing JSON. This keeps NCU replay overhead out of `metrics.latency`. -If Nsight Compute reports `ERR_NVGPUCTRPERM`, the node driver is restricting -performance counters. NCU on A100 needs container `SYS_ADMIN`, which rune -profiles cannot currently request — this is a known limitation tracked in -`docs/airun/a100-ncu-blocker.md`. Do not mark NCU complete unless the attached -CSV/rep contains every required metric. +If Nsight Compute reports `ERR_NVGPUCTRPERM`, first check that the submitted pod +actually rendered `securityContext.capabilities.add: [SYS_ADMIN]`. Current +Swordfish dispatch preflights A100 `--profile-mode ncu` submits and refuses to +run if the local Rune binary drops the `swordfish-*-a100-ncu` profile security +context. Rebuild/install Rune before retrying; otherwise the job will fail after +reserving the A100. If Nsight Compute then changes to `Profiling failed because a driver resource was unavailable`, check for DCGM/profiler contention. The A100 NCU lane requires temporarily excluding `nvidia-dcgm-exporter` from A100 nodes during the NCU profiling window, then restoring the DaemonSet and confirming rollout. This is -an operational profiling window, not a permanent monitoring change. +an operational profiling window, not a permanent monitoring change: + +```bash +make airun-a100-ncu-pause KUBE_CONTEXT=voice-agent-flex +# submit the A100 NCU job +make airun-a100-ncu-restore KUBE_CONTEXT=voice-agent-flex +``` Use the matrix validator as the completion gate for cross-GPU GEMM runs: @@ -135,6 +142,24 @@ the full Speed-of-Light dashboard with occupancy, source-attributed SASS, and baseline comparison sets — the same view kernel engineers use to drive Liger / Triton kernel work. +For multi-job handoff to a Mac or to Hermes, bundle traces into the stable local +handoff root: + +```bash +uv run python -m swordfish.runner bundle-traces \ + sf-vectorsum-v2-ncu-230122-h100:ncu \ + sf-fsdp-ovl-230122-h200-tb:nsys \ + --context voice-agent-flex \ + --bundle-name kernel-handoff-230122 +``` + +This writes `runs/traces/kernel-handoff-230122/manifest.json` plus one +subdirectory per job, and creates `runs/traces/kernel-handoff-230122.tar.gz`. +The expanded directory is convenient for local inspection; the tarball is the +single file to send to Hermes or copy back to a Mac. Each manifest entry records +the local files and the original PVC path, e.g. +`/data//profile/profile.ncu-rep`. + ### Reading kernel-level detail without ncu-ui (`ncu-summary`) The `.ncu-rep` binary format is proprietary and only readable with NVIDIA's diff --git a/docs/research/what-can-beat-liger-fsdp-a100-20260503.md b/docs/research/what-can-beat-liger-fsdp-a100-20260503.md new file mode 100644 index 0000000..96c7eac --- /dev/null +++ b/docs/research/what-can-beat-liger-fsdp-a100-20260503.md @@ -0,0 +1,64 @@ +# Research: What can beat Liger for our FSDP A100 test? + +**Date:** 2026-05-03 +**Asker:** Kevin / Swordfish kernel lab +**Decision:** ADAPT + +## Question + +For our Llama-3-8B-like bf16 FSDP1 train-step test on 8xA100, is there an existing kernel or training stack that is likely to beat Liger Kernel? + +## TL;DR + +No public source shows a clean drop-in replacement that beats Liger on the exact same Hugging Face + PyTorch FSDP1 + bf16 + 8xA100 setup. The credible "beat Liger" paths are stack changes: PyTorch FSDP2/`torch.compile`/TorchTitan for a close PyTorch-native variant, or Megatron-Core/Transformer Engine for a higher-rewrite NVIDIA stack. For Swordfish, keep Liger as the current same-test baseline and run a targeted ADAPT experiment against FSDP2/compile before considering a Megatron rewrite. + +## What I read + +| Source | Type | Date | What it said | +|---|---|---:|---| +| [Liger-Kernel paper](https://arxiv.org/html/2410.10989v3) | paper | 2024 | Liger reports average 20% training-throughput gain and 60% GPU-memory reduction via Triton fusion/chunking, and explicitly supports FSDP/DeepSpeed/DDP. | +| [Liger-Kernel README](https://github.com/linkedin/Liger-Kernel) | code-repo | 2026 | Its headline benchmark is exactly close to ours: Llama-3-8B, batch 8, bf16, AdamW, gradient checkpointing, FSDP1 on 8 A100s. | +| [PyTorch: Maximizing Training Throughput](https://pytorch.org/blog/maximizing-training-throughput/) | vendor-blog | 2024 | `torch.compile` + selective activation checkpointing raised 7B A100 MFU from 57% to 68%, with 10-23% MFU gains across model sizes. | +| [SimpleFSDP paper](https://arxiv.org/abs/2411.00284) | paper | 2024 | Compiler-friendly FSDP can trace communication and reorder/bucket IR nodes for overlap, reporting up to 68.67% throughput improvement vs eager FSDP2 when composed with other techniques. | +| [AWS/Meta TorchTitan Llama 3 blog](https://aws.amazon.com/blogs/machine-learning/efficient-pre-training-of-llama-3-like-model-architectures-using-torchtitan-on-amazon-sagemaker/) | vendor-blog | 2024 | TorchTitan pretrains Llama-3-8B-like models with FSDP2, `torch.compile`, and FP8, showing 38.23% throughput speedup on H100. | +| [NVIDIA Transformer Engine docs](https://docs.nvidia.com/deeplearning/transformer-engine/user-guide/index.html) | official-docs | 2026 | TE provides optimized Transformer blocks and FP8 support on Hopper/Ada/Blackwell, plus BF16/FP16 optimizations on Ampere and later. | +| [Megatron-LM README](https://github.com/NVIDIA/Megatron-LM) | code-repo | 2026 | Megatron-Core is a GPU-optimized training library with TP/PP/DP/CP/EP, BF16/FP8/FP4, and explicit communication-overlap optimizations. | +| [DeepSpeed training docs](https://www.deepspeed.ai/training/) | official-docs | 2026 | DeepSpeed/ZeRO focuses on memory, communication, and scale; it can combine ZeRO data parallelism with model parallelism for speed and scale. | +| [Unsloth multi-GPU docs](https://unsloth.ai/docs/basics/multi-gpu-training-with-unsloth) | official-docs | 2026 | Unsloth supports multi-GPU through Accelerate/DeepSpeed, but says the process is complex/manual and official multi-GPU support is still coming. | +| [FlashAttention paper](https://arxiv.org/abs/2205.14135) | paper | 2022 | FlashAttention trains Transformers faster by IO-aware exact attention, but it is an attention kernel, not a full Llama/FSDP replacement. | + +(Read budget: 10 sources across paper, code-repo, official-docs, and vendor/practitioner blogs. Stopped because the answer was clear enough for a next experiment.) + +## Findings + +1. **Liger is the strongest same-shape baseline, not just a random kernel pack.** Its README names the same benchmark family as ours: Llama-3-8B, bf16, AdamW, gradient checkpointing, FSDP1, 8 A100s. The paper says the gain comes from fusion/chunking, matching our trace where memory/elementwise time fell sharply. + +2. **The closest credible challenger is PyTorch-native FSDP2 + `torch.compile`/TorchTitan, not another one-line kernel.** PyTorch reports 10-23% MFU gains from compile on A100 7B/13B/34B/70B runs; SimpleFSDP attacks exactly our open bottleneck by tracing collectives for compute/communication overlap. This maps directly to our exposed-NCCL problem. + +3. **Megatron-Core + Transformer Engine can probably beat Liger as an end-to-end training stack, but it changes the experiment.** Megatron brings tensor/pipeline/context parallelism and communication overlap; TE brings optimized transformer blocks and FP8 on newer GPUs. On A100 bf16, TE may still help, but the strongest TE story is Hopper+FP8, not our exact A100 bf16 FSDP1 row. + +4. **FlashAttention/xFormers are unlikely to beat Liger alone in this trace.** Our trace is already dominated by GEMM, attention, and NCCL after Liger, and attention is only one slice. FlashAttention is essential tech, but swapping attention alone is not a full replacement for Liger's RMSNorm/SwiGLU/CE fusion and will not solve exposed FSDP collectives. + +5. **Unsloth is not the next benchmark for this exact test.** Its public claims are strong for fine-tuning and memory efficiency, but its own docs say multi-GPU is still manual/complex. That makes it a poor immediate contender for 8xA100 full bf16 FSDP pretraining-step reproduction. + +## Counter-evidence + +The strongest counter-case is that PyTorch `torch.compile` and TorchTitan may already beat our Liger row if we port the test: PyTorch reports 7B A100 MFU rising from 57% to 68%, and SimpleFSDP claims compiler-visible collective overlap can reduce communication exposure. That is directly relevant because our Liger root trace still had fully exposed NCCL. + +The weakness is apples-to-apples. Those sources are not the exact Llama-3-8B/HF/FSDP1/8xA100/Liger comparison. TorchTitan's Llama-3-8B blog result is H100 with FP8 features, and SimpleFSDP compares against eager FSDP2, not Liger+FSDP1. Also, the PyTorch torchtune+Liger blog says Liger composes with `torch.compile`; if compile helps, the best result may be **Liger plus compile**, not compile instead of Liger. + +## Decision: ADAPT + +Do not replace Liger yet. Adapt the benchmark matrix to test the closest credible challengers: Liger+`torch.compile` if feasible, FSDP2/TorchTitan-style compile, and only then Megatron-Core/Transformer Engine if we are willing to change model/runtime structure. + +## What this means in practice + +- **First concrete move:** Add a Swordfish row for PyTorch-native compile/FSDP2 or TorchTitan-style Llama-3-8B 8xA100, with the same steady-state NSYS overlap analysis and tokens/sec schema. +- **Watch-fors:** If compile/FSDP2 reduces exposed NCCL without regressing step time, it is a real challenger. If it only improves eager elementwise work, combine it with Liger instead of replacing Liger. +- **Out of scope for this research:** H100 FP8-only wins; inference-only kernels; LoRA-only or QLoRA-only fine-tuning; convergence/quality beyond parity checks. + +## Open questions / what I'd read next + +1. Does TorchTitan/FSDP2 currently support a close Llama-3-8B bf16 8xA100 config without H100-only FP8 assumptions? +2. Can Liger's monkey patch coexist with `torch.compile` for our exact FSDP runner without graph breaks? +3. Does Megatron-Core have a minimal Llama-3-8B BF16 A100 recipe whose checkpoint/model semantics are close enough to compare fairly? diff --git a/submission.py b/submission.py new file mode 100644 index 0000000..33d2a20 --- /dev/null +++ b/submission.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +"""Standalone vectorsum_v2 submission entrypoint. + +The evaluator imports ``custom_kernel`` from this file and passes the generated +``(input_tensor, output_tensor)`` tuple. Keep this file self-contained: it should +not depend on the local swordfish package being installed in the evaluation +container. +""" + +from __future__ import annotations + +import argparse +import json +import math +import os +import socket +import time +from pathlib import Path + +import torch + +BLOCK_SIZE = 8192 +PARTIAL_NUM_WARPS = 8 +FINAL_NUM_WARPS = 16 +PARTIAL_NUM_STAGES = 1 + +try: + import triton + import triton.language as tl +except ImportError: # pragma: no cover - evaluator has Triton; local CPU import stays safe. + triton = None + tl = None + + +if triton is not None and tl is not None: + + @triton.jit + def _partial_sum_kernel(x_ptr, partials_ptr, n_elements: int, BLOCK_SIZE: tl.constexpr): + pid = tl.program_id(0) + offsets = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE) + mask = offsets < n_elements + values = tl.load(x_ptr + offsets, mask=mask, other=0.0, cache_modifier=".cg") + partial = tl.sum(values, axis=0) + tl.store(partials_ptr + pid, partial) + + @triton.jit + def _final_sum_kernel(partials_ptr, out_ptr, n_partials: int, BLOCK_SIZE: tl.constexpr): + offsets = tl.arange(0, BLOCK_SIZE) + mask = offsets < n_partials + values = tl.load(partials_ptr + offsets, mask=mask, other=0.0) + total = tl.sum(values, axis=0) + tl.store(out_ptr, total) + +else: + _partial_sum_kernel = None + _final_sum_kernel = None + + +_PARTIALS = None +_PARTIALS_DEVICE = None +_PARTIALS_N = 0 +_N_PARTIALS = 0 +_FINAL_BLOCK_SIZE = 0 +_GRAPH = None +_GRAPH_X = None +_GRAPH_OUTPUT = None +_GRAPH_DATA = None +_GRAPH_PARTIALS = None +_GRAPH_N = 0 +_GRAPH_REPLAY = None +_GRAPH_RESULT = None +_GRAPH_CAPTURE_WARMUP = 0 + + +def _launch_sum(x, output, partials, n_elements: int, n_partials: int, final_block_size: int): + _partial_sum_kernel[(n_partials,)]( + x, + partials, + n_elements, + BLOCK_SIZE=BLOCK_SIZE, + num_warps=PARTIAL_NUM_WARPS, + num_stages=PARTIAL_NUM_STAGES, + ) + _final_sum_kernel[(1,)]( + partials, + output, + n_partials, + BLOCK_SIZE=final_block_size, + num_warps=FINAL_NUM_WARPS, + ) + + +def _make_custom_kernel(): + last_x_obj = None + last_output_obj = None + graph_x_obj = None + graph_obj = None + replay_fn = None + result_tensor = None + partials_obj = None + graph_output = None + eager_partials = None + eager_output = None + eager_result = None + eager_x_obj = None + eager_device = None + eager_n = 0 + eager_n_partials = 0 + eager_final_block = 0 + + def custom_kernel(data): + nonlocal eager_device, eager_final_block, eager_n, eager_n_partials + nonlocal eager_output, eager_partials, eager_result, eager_x_obj + nonlocal graph_obj, graph_output, graph_x_obj + nonlocal last_output_obj, last_x_obj, partials_obj, replay_fn, result_tensor + + x, output = data + if replay_fn is not None and graph_x_obj is x: + replay_fn() + return result_tensor + + if triton is None or _partial_sum_kernel is None or _final_sum_kernel is None: + raise RuntimeError("custom_kernel requires Triton") + + if eager_partials is not None and eager_x_obj is x: + if ( + x.device.type == "cuda" + and hasattr(torch.cuda, "CUDAGraph") + and last_x_obj is x + and last_output_obj is output + ): + graph_output = torch.empty((1,), device=x.device, dtype=torch.float32) + partials = torch.empty((eager_n_partials,), device=x.device, dtype=torch.float32) + + _launch_sum(x, graph_output, partials, eager_n, eager_n_partials, eager_final_block) + torch.cuda.synchronize() + + graph = torch.cuda.CUDAGraph() + with torch.cuda.graph(graph): + _launch_sum( + x, graph_output, partials, eager_n, eager_n_partials, eager_final_block + ) + + replay_fn = graph.replay + result = graph_output.reshape(-1)[0] + graph_x_obj = x + graph_obj = graph + result_tensor = result + partials_obj = partials + for _ in range(_GRAPH_CAPTURE_WARMUP): + replay_fn() + torch.cuda.synchronize() + replay_fn() + return result + + _launch_sum( + x, + eager_output, + eager_partials, + eager_n, + eager_n_partials, + eager_final_block, + ) + last_x_obj = x + last_output_obj = output + return eager_result + + n_elements = x.numel() + device = x.device.index + if eager_partials is None or eager_n != n_elements or eager_device != device: + eager_n = n_elements + eager_device = device + n_partials = triton.cdiv(n_elements, BLOCK_SIZE) + eager_n_partials = n_partials + eager_final_block = triton.next_power_of_2(n_partials) + eager_partials = torch.empty((n_partials,), device=x.device, dtype=torch.float32) + eager_output = torch.empty((1,), device=x.device, dtype=torch.float32) + eager_result = eager_output.reshape(-1)[0] + replay_fn = None + graph_x_obj = None + + if ( + x.device.type == "cuda" + and hasattr(torch.cuda, "CUDAGraph") + and last_x_obj is x + and last_output_obj is output + ): + graph_output = torch.empty((1,), device=x.device, dtype=torch.float32) + partials = torch.empty((eager_n_partials,), device=x.device, dtype=torch.float32) + + _launch_sum(x, graph_output, partials, n_elements, eager_n_partials, eager_final_block) + torch.cuda.synchronize() + + graph = torch.cuda.CUDAGraph() + with torch.cuda.graph(graph): + _launch_sum( + x, graph_output, partials, n_elements, eager_n_partials, eager_final_block + ) + + replay_fn = graph.replay + result = graph_output.reshape(-1)[0] + graph_x_obj = x + graph_obj = graph + result_tensor = result + partials_obj = partials + for _ in range(_GRAPH_CAPTURE_WARMUP): + replay_fn() + torch.cuda.synchronize() + replay_fn() + return result + + _launch_sum( + x, + eager_output, + eager_partials, + n_elements, + eager_n_partials, + eager_final_block, + ) + eager_x_obj = x + last_x_obj = x + last_output_obj = output + return eager_result + + return custom_kernel + + +custom_kernel = _make_custom_kernel() + + +def _latency_stats(samples): + ordered = sorted(samples) + n = len(ordered) + + def percentile(p): + if n == 1: + return ordered[0] + k = (n - 1) * p / 100.0 + lo = int(k) + hi = min(lo + 1, n - 1) + frac = k - lo + return ordered[lo] + (ordered[hi] - ordered[lo]) * frac + + mean = sum(samples) / n + return { + "samples_ms": samples, + "mean_ms": mean, + "median_ms": percentile(50), + "p50_ms": percentile(50), + "p90_ms": percentile(90), + "p99_ms": percentile(99), + "min_ms": min(samples), + "max_ms": max(samples), + "std_ms": math.sqrt(sum((x - mean) ** 2 for x in samples) / n) if n else 0.0, + } + + +def _time_cuda(fn, *, warmup, iters): + for _ in range(warmup): + fn() + torch.cuda.synchronize() + start = torch.cuda.Event(enable_timing=True) + end = torch.cuda.Event(enable_timing=True) + start.record() + for _ in range(iters): + fn() + end.record() + torch.cuda.synchronize() + return start.elapsed_time(end) / iters + + +def _run_benchmark(args): + if triton is None: + raise RuntimeError("Triton is required for standalone vectorsum_v2") + if not torch.cuda.is_available(): + raise RuntimeError("CUDA is required for standalone vectorsum_v2") + if args.dtype != "fp32": + raise ValueError("standalone submission.py currently supports --dtype fp32 only") + + device = torch.device("cuda") + torch.manual_seed(args.seed) + gen = torch.Generator(device=device) + gen.manual_seed(args.seed) + x = torch.randn((args.size,), device=device, dtype=torch.float32, generator=gen).contiguous() + output = torch.empty((1,), device=device, dtype=torch.float32) + + def run_once(): + return custom_kernel((x, output)) + + samples = [ + _time_cuda(run_once, warmup=args.warmup, iters=args.iters) for _ in range(args.repeats) + ] + result_tensor = run_once() + torch.cuda.synchronize() + + expected = x.to(torch.float64).sum().to(torch.float32) + max_abs_error = float(torch.abs(result_tensor.detach() - expected.detach()).item()) + atol = max(1e-5, 1e-3 * math.sqrt(args.size)) + matches = bool(torch.allclose(result_tensor.detach(), expected.detach(), atol=atol, rtol=1e-5)) + stats = _latency_stats(samples) + mean_ms = stats["mean_ms"] + estimated_bytes = args.size * 4 + + gpu_name = torch.cuda.get_device_name(device) + out = { + "schema_version": "swordfish.runner.v1", + "benchmark": "vectorsum_v2_standalone", + "config": { + "scope": "vector_sum", + "backend": "triton-submission", + "shape": {"size": args.size}, + "size": args.size, + "dtype": args.dtype, + "repeats": args.repeats, + "warmup": args.warmup, + "iters": args.iters, + "seed": args.seed, + "block_size": BLOCK_SIZE, + "cuda_graph_capture": True, + }, + "env": { + "host": socket.gethostname(), + "gpu_name": gpu_name, + "gpu_class": args.arch_label, + "torch": torch.__version__, + "cuda": torch.version.cuda, + "triton": getattr(triton, "__version__", "unknown"), + "swordfish_sha": os.environ.get("SWORDFISH_SHA", "standalone-submission"), + }, + "correctness": { + "finite_output": bool(torch.isfinite(result_tensor).item()), + "matches_reference": matches, + "max_abs_error": max_abs_error, + "atol": atol, + "rtol": 1e-5, + "output_shape": list(result_tensor.shape), + "output_fp32": float(result_tensor.item()), + "reference_fp32": float(expected.item()), + }, + "metrics": { + "latency": stats, + "elements": args.size, + "elements_per_second": args.size / (mean_ms / 1000.0), + "input_bytes": estimated_bytes, + "estimated_bytes": estimated_bytes, + "estimated_bandwidth_tbps": (estimated_bytes / (mean_ms / 1000.0)) / 1e12, + }, + "command": ["submission.py", *os.sys.argv[1:]], + "timestamp_unix": time.time(), + } + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(json.dumps(out, indent=2, sort_keys=True)) + print(f"wrote {out_path}") + + +def _parse_args(): + parser = argparse.ArgumentParser(description="standalone vectorsum_v2 benchmark") + parser.add_argument("--size", type=int, default=1_638_400) + parser.add_argument("--dtype", choices=["fp32"], default="fp32") + parser.add_argument("--repeats", type=int, default=5) + parser.add_argument("--warmup", type=int, default=10) + parser.add_argument("--iters", type=int, default=50) + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--arch-label", choices=["a100", "h100", "h200"], required=True) + parser.add_argument("--out", required=True) + return parser.parse_args() + + +if __name__ == "__main__": + _run_benchmark(_parse_args()) diff --git a/swordfish/dispatch/__init__.py b/swordfish/dispatch/__init__.py index 5a01175..5599dca 100644 --- a/swordfish/dispatch/__init__.py +++ b/swordfish/dispatch/__init__.py @@ -46,6 +46,7 @@ ) from swordfish.dispatch.rune import ( RuneCommandError, + RuneProfileSecurityError, RuneSubmit, RuneSubmitResult, ) @@ -58,6 +59,7 @@ LigerPerkernelMatrix, LigerPerkernelRun, TorchGemmRun, + VectorSumRun, ) from swordfish.dispatch.topology import ( find_topology_policy, @@ -82,10 +84,12 @@ "ResolvedExperiment", "ResultFetchError", "RuneCommandError", + "RuneProfileSecurityError", "RuneSubmit", "RuneSubmitGetMissingAnnotationsError", "RuneSubmitResult", "TorchGemmRun", + "VectorSumRun", "build_and_push_dev_image", "build_run_for_experiment", "fetch_result", diff --git a/swordfish/dispatch/experiments.py b/swordfish/dispatch/experiments.py index 02ef4bf..fcd44c3 100644 --- a/swordfish/dispatch/experiments.py +++ b/swordfish/dispatch/experiments.py @@ -17,15 +17,22 @@ LigerFsdpRun, LigerPerkernelRun, TorchGemmRun, + VectorSumRun, default_fsdp_profile_for, default_profile_for, ) -ExperimentWorkload = Literal["gemm", "liger-rmsnorm", "liger-swiglu", "liger-fsdp"] +ExperimentWorkload = Literal[ + "gemm", + "vectorsum-v2", + "liger-rmsnorm", + "liger-swiglu", + "liger-fsdp", +] ProfileFamily = Literal["bench", "fsdp"] -ExperimentRun = TorchGemmRun | LigerPerkernelRun | LigerFsdpRun +ExperimentRun = TorchGemmRun | VectorSumRun | LigerPerkernelRun | LigerFsdpRun -COMMON_RUN_OVERRIDES = {"name", "profile_mode", "result_root", "script"} +COMMON_RUN_OVERRIDES = {"name", "profile_mode", "result_root", "script", "context", "image"} @dataclass(frozen=True) @@ -81,6 +88,22 @@ class ResolvedExperiment: }, description="One-GPU torch/cuBLAS GEMM baseline.", ), + "vectorsum-v2": ExperimentSpec( + name="vectorsum-v2", + workload="vectorsum-v2", + profile_family="bench", + allowed_arches=ARCHES, + defaults={ + "backend": "triton", + "size": 1_638_400, + "dtype": "fp32", + "repeats": 5, + "warmup": 10, + "iters": 50, + "block_size": 8192, + }, + description="One-GPU vector sum reduction target.", + ), "liger-rmsnorm": ExperimentSpec( name="liger-rmsnorm", workload="liger-rmsnorm", @@ -124,6 +147,11 @@ class ResolvedExperiment: "iters": 5, "nproc_per_node": 8, "gradient_checkpointing": True, + "profile_steady_state": False, + "fsdp_wrap_policy": "root", + "fsdp_backward_prefetch": "default", + "fsdp_forward_prefetch": False, + "fsdp_limit_all_gathers": True, }, description="8-GPU Llama train-step reproduction row for baseline or Liger FSDP.", ), @@ -212,7 +240,8 @@ def build_run_for_experiment( "profile_mode": values.get("profile_mode"), "result_root": values.get("result_root"), "script": values.get("script"), - "profile": resolved.profile, + "context": values.get("context"), + "image": values.get("image"), } common = {k: v for k, v in common.items() if v is not None} @@ -230,6 +259,19 @@ def build_run_for_experiment( **common, ) + if spec.workload == "vectorsum-v2": + return VectorSumRun( + arch=arch, + backend=str(values["backend"]), + size=int(values["size"]), + dtype=str(values["dtype"]), + repeats=int(values["repeats"]), + warmup=int(values["warmup"]), + iters=int(values["iters"]), + block_size=int(values["block_size"]), + **common, + ) + if spec.workload in {"liger-rmsnorm", "liger-swiglu"}: kernel = spec.workload.removeprefix("liger-") return LigerPerkernelRun( @@ -256,6 +298,11 @@ def build_run_for_experiment( iters=int(values["iters"]), nproc_per_node=int(values["nproc_per_node"]), gradient_checkpointing=bool(values["gradient_checkpointing"]), + profile_steady_state=bool(values["profile_steady_state"]), + fsdp_wrap_policy=str(values["fsdp_wrap_policy"]), + fsdp_backward_prefetch=str(values["fsdp_backward_prefetch"]), + fsdp_forward_prefetch=bool(values["fsdp_forward_prefetch"]), + fsdp_limit_all_gathers=bool(values["fsdp_limit_all_gathers"]), **common, ) diff --git a/swordfish/dispatch/rune.py b/swordfish/dispatch/rune.py index b7b8c06..0df2221 100644 --- a/swordfish/dispatch/rune.py +++ b/swordfish/dispatch/rune.py @@ -27,6 +27,10 @@ def __init__(self, args: list[str], returncode: int, stdout: str, stderr: str): ) +class RuneProfileSecurityError(RuntimeError): + """Raised when the local rune binary drops profile-required pod security.""" + + @dataclass(frozen=True) class RuneSubmitResult: name: str @@ -147,10 +151,13 @@ def submit( """ from swordfish.dispatch.topology import topology_policy_env - args = self.to_args(dry_run=dry_run) env = {**os.environ, **self.env} if auto_topology_policy: env.update(topology_policy_env()) + if dry_run is None and _requires_sys_admin_ncu_guard(self): + self._preflight_sys_admin_ncu(env=env) + + args = self.to_args(dry_run=dry_run) proc = subprocess.run( args, text=True, @@ -169,3 +176,30 @@ def submit( stdout=proc.stdout, stderr=proc.stderr, ) + + def _preflight_sys_admin_ncu(self, *, env: dict[str, str]) -> None: + args = self.to_args(dry_run="client") + proc = subprocess.run( + args, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=False, + env=env, + ) + if proc.returncode != 0: + raise RuneCommandError(args, proc.returncode, proc.stdout, proc.stderr) + rendered = proc.stdout + if "securityContext:" not in rendered or "SYS_ADMIN" not in rendered: + raise RuneProfileSecurityError( + "A100 NCU requires container securityContext.capabilities.add=[SYS_ADMIN], " + f"but {self.rune_bin} dry-run did not render it for profile {self.profile!r}. " + "Rebuild/install a Rune binary that supports profile spec.runtime.securityContext " + "before submitting, otherwise Nsight Compute fails with ERR_NVGPUCTRPERM." + ) + + +def _requires_sys_admin_ncu_guard(run: RuneSubmit) -> bool: + return ( + run.profile_mode == "ncu" and run.profile is not None and run.profile.endswith("-a100-ncu") + ) diff --git a/swordfish/dispatch/runs.py b/swordfish/dispatch/runs.py index d22430d..876d310 100644 --- a/swordfish/dispatch/runs.py +++ b/swordfish/dispatch/runs.py @@ -124,6 +124,10 @@ def _check_raw_preset_guard(*, preset: str | None, allow_raw_preset: bool) -> No # format the legacy SWORDFISH_PROFILE script-side path produces). Downstream # tooling that expects CSV needs to call `ncu --import` to convert. PROFILE_EXTENSIONS = {"ncu": "ncu-rep", "nsys": "nsys-rep", "torch": "json"} +VECTOR_SUM_BACKENDS = ("torch", "triton") +VECTOR_SUM_DTYPES = ("fp16", "bf16", "fp32") +VECTOR_SUM_DEFAULT_SIZE = 1_638_400 +VECTOR_SUM_DEFAULT_BLOCK_SIZE = 8192 _NAME_RE = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$") @@ -601,6 +605,138 @@ def submit(self, *, dry_run: str | None = None, check: bool = True) -> RuneSubmi return self.to_rune_submit().submit(dry_run=dry_run, check=check) +@dataclass +class VectorSumRun: + """One vectorsum_v2 reduction benchmark on a single arch.""" + + arch: str = "a100" + backend: str = "triton" + size: int = VECTOR_SUM_DEFAULT_SIZE + dtype: str = "fp32" + repeats: int = 5 + warmup: int = 10 + iters: int = 50 + block_size: int = VECTOR_SUM_DEFAULT_BLOCK_SIZE + name: str | None = None + namespace: str = DEFAULT_NAMESPACE + context: str | None = None + image: str = DEFAULT_IMAGE + script: str | Path = DEFAULT_BENCH_SCRIPT + pvc: str = DEFAULT_PVC + result_root: str = DEFAULT_RESULT_ROOT + preset: str | None = None + allow_raw_preset: bool = False + profile: str | None = None + extra_args: list[str] = field(default_factory=list) + container_env: dict[str, str] = field(default_factory=dict) + rune_bin: str = "rune" + profile_mode: str | None = None + + def __post_init__(self) -> None: + if self.arch not in ARCH_TO_PRESET: + raise ValueError( + f"unknown arch {self.arch!r}; expected one of {sorted(ARCH_TO_PRESET)}" + ) + if self.backend not in VECTOR_SUM_BACKENDS: + raise ValueError(f"backend {self.backend!r} not in {VECTOR_SUM_BACKENDS}") + if self.dtype not in VECTOR_SUM_DTYPES: + raise ValueError(f"dtype {self.dtype!r} not in {VECTOR_SUM_DTYPES}") + if min(self.size, self.repeats, self.iters, self.block_size) <= 0 or self.warmup < 0: + raise ValueError( + "size, repeats, iters, and block_size must be positive; warmup must be non-negative" + ) + if self.block_size & (self.block_size - 1) != 0: + raise ValueError("block_size must be a power of two") + if self.preset and self.profile: + raise ValueError("preset and profile are mutually exclusive") + _check_raw_preset_guard(preset=self.preset, allow_raw_preset=self.allow_raw_preset) + if self.profile_mode and self.profile_mode not in PROFILE_MODES: + raise ValueError(f"profile_mode {self.profile_mode!r} not in {PROFILE_MODES}") + + @property + def resolved_name(self) -> str: + return _normalize_name( + self.name or f"sf-vectorsum-v2-{self.backend}-{self.size}-{self.arch}" + ) + + @property + def resolved_preset(self) -> str: + if self.profile: + return "" + return self.preset or ARCH_TO_PRESET[self.arch] + + @property + def resolved_profile(self) -> str | None: + if self.profile: + return self.profile + if self.preset: + return None + return default_profile_for_mode(self.arch, self.profile_mode) + + @property + def out_path(self) -> str: + if self.name is not None: + return f"{self.result_root}/vectorsum-v2/{self.resolved_name}.json" + return f"{self.result_root}/vectorsum-v2/{self.backend}-{self.size}-{self.arch}.json" + + @property + def forwarded_args(self) -> list[str]: + return [ + "bench-vectorsum", + "--backend", + self.backend, + "--size", + str(self.size), + "--dtype", + self.dtype, + "--repeats", + str(self.repeats), + "--warmup", + str(self.warmup), + "--iters", + str(self.iters), + "--device", + "auto", + "--arch-label", + self.arch, + "--block-size", + str(self.block_size), + "--out", + self.out_path, + ] + + def to_rune_submit(self) -> RuneSubmit: + rune_native_mode, container_env = _resolve_torch_profile( + self.profile_mode, self.resolved_name, self.container_env + ) + kwargs: dict = dict( + name=self.resolved_name, + image=self.image, + script=self.script, + namespace=self.namespace, + context=self.context, + volumes=[f"data=pvc:{self.pvc}"], + extra_args=_inject_gpu_class(self.arch, self.extra_args), + forwarded_args=self.forwarded_args, + container_env=container_env, + rune_bin=self.rune_bin, + profile_mode=rune_native_mode, + output=self.out_path, + ) + resolved_profile = self.resolved_profile + if resolved_profile: + kwargs["profile"] = resolved_profile + else: + kwargs["preset"] = self.resolved_preset + return RuneSubmit(**kwargs) + + def to_command(self, *, dry_run: str | None = None) -> str: + return self.to_rune_submit().to_command(dry_run=dry_run) + + def submit(self, *, dry_run: str | None = None, check: bool = True) -> RuneSubmitResult: + return self.to_rune_submit().submit(dry_run=dry_run, check=check) + + @dataclass class LigerFsdpRun: """One end-to-end Llama train-step row for the Liger FSDP reproduction.""" @@ -618,6 +754,10 @@ class LigerFsdpRun: nproc_per_node: int = 8 gradient_checkpointing: bool = True profile_steady_state: bool = False + fsdp_wrap_policy: str = "root" + fsdp_backward_prefetch: str = "default" + fsdp_forward_prefetch: bool = False + fsdp_limit_all_gathers: bool = True name: str | None = None namespace: str = DEFAULT_NAMESPACE context: str | None = None @@ -644,6 +784,17 @@ def __post_init__(self) -> None: raise ValueError("model_source must be 'reference' or 'transformers'") if self.model_preset not in {"tiny", "llama3-8b"}: raise ValueError("model_preset must be 'tiny' or 'llama3-8b'") + if self.fsdp_wrap_policy not in {"root", "transformer-block"}: + raise ValueError("fsdp_wrap_policy must be 'root' or 'transformer-block'") + if self.fsdp_backward_prefetch not in { + "default", + "backward-pre", + "backward-post", + "none", + }: + raise ValueError( + "fsdp_backward_prefetch must be one of: default, backward-pre, backward-post, none" + ) if self.preset and self.profile: raise ValueError("preset and profile are mutually exclusive") _check_raw_preset_guard(preset=self.preset, allow_raw_preset=self.allow_raw_preset) @@ -674,6 +825,8 @@ def resolved_profile(self) -> str | None: @property def out_path(self) -> str: + if self.name is not None: + return f"{self.result_root}/liger-fsdp/{self.resolved_name}.json" return f"{self.result_root}/liger-fsdp/{self.model_preset}-{self.mode}-{self.arch}.json" @property @@ -731,6 +884,14 @@ def forwarded_args(self) -> list[str]: args.append("--no-gradient-checkpointing") if self.profile_steady_state: args.append("--profile-steady-state") + if self.fsdp_wrap_policy != "root": + args.extend(["--fsdp-wrap-policy", self.fsdp_wrap_policy]) + if self.fsdp_backward_prefetch != "default": + args.extend(["--fsdp-backward-prefetch", self.fsdp_backward_prefetch]) + if self.fsdp_forward_prefetch: + args.append("--fsdp-forward-prefetch") + if not self.fsdp_limit_all_gathers: + args.append("--no-fsdp-limit-all-gathers") return args def to_rune_submit(self) -> RuneSubmit: diff --git a/swordfish/kernels/vector_sum.py b/swordfish/kernels/vector_sum.py new file mode 100644 index 0000000..4eb256a --- /dev/null +++ b/swordfish/kernels/vector_sum.py @@ -0,0 +1,108 @@ +"""Vector-sum reduction kernels.""" + +from __future__ import annotations + +import torch + +DEFAULT_BLOCK_SIZE = 8192 +DEFAULT_NUM_STAGES = 1 +DEFAULT_FINAL_NUM_WARPS = 16 + +try: + import triton + import triton.language as tl +except ImportError: # pragma: no cover - exercised on CUDA hosts in integration runs. + triton = None + tl = None + + +def _is_power_of_two(value: int) -> bool: + return value > 0 and value & (value - 1) == 0 + + +def _require_triton() -> None: + if triton is None or tl is None or _partial_sum_kernel is None or _final_sum_kernel is None: + raise RuntimeError("vectorsum_v2 Triton backend requires the triton package") + + +def torch_vector_sum_reference(x: torch.Tensor, out: torch.Tensor | None = None) -> torch.Tensor: + """Reference reduction: fp64 accumulation, returned as fp32 like the target task.""" + result = x.to(torch.float64).sum().to(torch.float32) + if out is None: + return result + out.reshape(-1)[0].copy_(result) + return out + + +if triton is not None and tl is not None: + + @triton.jit + def _partial_sum_kernel(x_ptr, partials_ptr, n_elements: int, BLOCK_SIZE: tl.constexpr): + pid = tl.program_id(0) + offsets = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE) + mask = offsets < n_elements + values = tl.load(x_ptr + offsets, mask=mask, other=0.0, cache_modifier=".cg") + partial = tl.sum(values, axis=0) + tl.store(partials_ptr + pid, partial) + + @triton.jit + def _final_sum_kernel(partials_ptr, out_ptr, n_partials: int, BLOCK_SIZE: tl.constexpr): + offsets = tl.arange(0, BLOCK_SIZE) + mask = offsets < n_partials + values = tl.load(partials_ptr + offsets, mask=mask, other=0.0) + total = tl.sum(values, axis=0) + tl.store(out_ptr, total) + +else: + _partial_sum_kernel = None + _final_sum_kernel = None + + +def partial_count(n_elements: int, block_size: int = DEFAULT_BLOCK_SIZE) -> int: + if n_elements <= 0: + raise ValueError("n_elements must be positive") + if not _is_power_of_two(block_size): + raise ValueError("block_size must be a positive power of two") + return (n_elements + block_size - 1) // block_size + + +def triton_vector_sum( + x: torch.Tensor, + out: torch.Tensor, + partials: torch.Tensor, + *, + block_size: int = DEFAULT_BLOCK_SIZE, +) -> torch.Tensor: + """Reduce a contiguous 1D CUDA tensor into an fp32 scalar using two Triton kernels.""" + _require_triton() + if x.device.type != "cuda" or out.device.type != "cuda" or partials.device.type != "cuda": + raise RuntimeError("vectorsum_v2 Triton backend requires CUDA tensors") + if x.ndim != 1: + raise ValueError("vectorsum_v2 input must be a 1D tensor") + if not x.is_contiguous(): + raise ValueError("vectorsum_v2 Triton backend requires a contiguous input tensor") + if out.numel() != 1 or out.dtype != torch.float32: + raise ValueError("vectorsum_v2 output must be a single fp32 scalar tensor") + + n_elements = x.numel() + n_partials = partial_count(n_elements, block_size) + if partials.numel() < n_partials or partials.dtype != torch.float32: + raise ValueError("vectorsum_v2 partials must have at least partial_count fp32 elements") + + final_block_size = 1 << (n_partials - 1).bit_length() + _partial_sum_kernel[(n_partials,)]( + x, + partials, + n_elements, + BLOCK_SIZE=block_size, + num_warps=8, + num_stages=DEFAULT_NUM_STAGES, + ) + _final_sum_kernel[(1,)]( + partials, + out, + n_partials, + BLOCK_SIZE=final_block_size, + num_warps=DEFAULT_FINAL_NUM_WARPS, + ) + return out diff --git a/swordfish/runner/cli.py b/swordfish/runner/cli.py index 97495ad..5801c66 100644 --- a/swordfish/runner/cli.py +++ b/swordfish/runner/cli.py @@ -9,7 +9,9 @@ from swordfish.dispatch import ( LigerPerkernelRun, + ResultFetchError, TorchGemmRun, + VectorSumRun, build_run_for_experiment, format_experiment_explain, format_experiment_table, @@ -19,6 +21,13 @@ from swordfish.quant.marlin_triton import run_w4a16_benchmark, write_w4a16_result from swordfish.runner.backends import available_gemm_backends from swordfish.runner.compare import write_results_comparison +from swordfish.runner.dcgm_window import ( + DcgmWindowError, + dcgm_window_status, + format_dcgm_status, + pause_a100_dcgm, + restore_dcgm, +) from swordfish.runner.index import write_result_index from swordfish.runner.liger_perkernel import ( DEFAULT_DTYPE as LIGER_DEFAULT_DTYPE, @@ -41,7 +50,14 @@ from swordfish.runner.schema import attach_ncu_summary from swordfish.runner.status import write_completion_report from swordfish.runner.torch_gemm import run_gemm_benchmark, write_result +from swordfish.runner.trace_bundle import bundle_traces, parse_trace_job_spec from swordfish.runner.upstream import TARGET_LABELS, write_upstream_packet +from swordfish.runner.vector_sum import ( + DEFAULT_BLOCK_SIZE as VECTOR_SUM_DEFAULT_BLOCK_SIZE, + VECTOR_SUM_BENCHMARK_SIZES, + available_vector_sum_backends, + run_vector_sum_benchmark, +) from swordfish.transformer.bench import ( run_transformer_forward_benchmark, run_transformer_train_step_benchmark, @@ -72,6 +88,28 @@ def _cmd_run_gemm(args: argparse.Namespace) -> int: return 0 +def _cmd_bench_vector_sum(args: argparse.Namespace) -> int: + argv = sys.argv if args.argv is None else args.argv + with torch_profiler_context(resolve_torch_profile_out()): + result = run_vector_sum_benchmark( + backend=args.backend, + size=args.size, + dtype=args.dtype, + repeats=args.repeats, + warmup=args.warmup, + iters=args.iters, + device_name=args.device, + allow_cpu=args.allow_cpu, + arch_label=args.arch_label, + seed=args.seed, + block_size=args.block_size, + ) + result["command"] = argv + write_result(result, args.out) + print(f"wrote {args.out}", file=sys.stderr) + return 0 + + def _cmd_run_liger_perkernel(args: argparse.Namespace) -> int: argv = sys.argv if args.argv is None else args.argv with torch_profiler_context(resolve_torch_profile_out()): @@ -119,6 +157,10 @@ def _cmd_run_liger_fsdp_step(args: argparse.Namespace) -> int: weight_decay=args.weight_decay, gradient_checkpointing=args.gradient_checkpointing, profile_steady_state=args.profile_steady_state, + fsdp_wrap_policy=args.fsdp_wrap_policy, + fsdp_backward_prefetch=args.fsdp_backward_prefetch, + fsdp_forward_prefetch=args.fsdp_forward_prefetch, + fsdp_limit_all_gathers=args.fsdp_limit_all_gathers, ) if result is None: return 0 @@ -311,6 +353,20 @@ def _build_submit_run(args: argparse.Namespace): profile_mode=args.profile_mode, **common, ) + if args.workload == "vectorsum-v2": + return VectorSumRun( + arch=args.arch, + backend=args.backend, + size=args.size, + dtype=args.dtype or "fp32", + repeats=repeats, + warmup=warmup, + iters=iters, + name=args.name, + profile_mode=args.profile_mode, + block_size=args.block_size, + **common, + ) if args.workload == "liger-fsdp": repeats = args.repeats if args.repeats is not None else 3 warmup = args.warmup if args.warmup is not None else 1 @@ -333,6 +389,10 @@ def _build_submit_run(args: argparse.Namespace): profile_mode=args.profile_mode, gradient_checkpointing=args.gradient_checkpointing, profile_steady_state=args.profile_steady_state, + fsdp_wrap_policy=args.fsdp_wrap_policy, + fsdp_backward_prefetch=args.fsdp_backward_prefetch, + fsdp_forward_prefetch=args.fsdp_forward_prefetch, + fsdp_limit_all_gathers=args.fsdp_limit_all_gathers, **common, ) kernel = "rmsnorm" if args.workload == "liger-rmsnorm" else "swiglu" @@ -361,6 +421,8 @@ def _cmd_submit_bench(args: argparse.Namespace) -> int: def _experiment_overrides(args: argparse.Namespace) -> dict[str, object]: keys = ( "backend", + "size", + "block_size", "m", "n", "k", @@ -373,11 +435,18 @@ def _experiment_overrides(args: argparse.Namespace) -> dict[str, object]: "micro_batch_size", "seq_len", "gradient_checkpointing", + "profile_steady_state", + "fsdp_wrap_policy", + "fsdp_backward_prefetch", + "fsdp_forward_prefetch", + "fsdp_limit_all_gathers", "nproc_per_node", "name", "profile_mode", "result_root", "script", + "context", + "image", ) out: dict[str, object] = {} for key in keys: @@ -668,6 +737,63 @@ def _cmd_convert_ncu(args: argparse.Namespace) -> int: return 0 +def _cmd_bundle_traces(args: argparse.Namespace) -> int: + try: + jobs = [ + parse_trace_job_spec(raw, default_profile_mode=args.profile_mode) for raw in args.jobs + ] + result = bundle_traces( + jobs, + bundle_name=args.bundle_name, + local_root=args.local_root, + namespace=args.namespace, + context=args.context, + pvc=args.pvc, + rune_bin=args.rune_bin, + overwrite=args.overwrite, + create_archive=args.create_archive, + ) + except (ValueError, ResultFetchError) as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 if isinstance(exc, ValueError) else 1 + + print(f"bundle dir: {result.bundle_dir}", file=sys.stderr) + print(f"manifest: {result.manifest_path}", file=sys.stderr) + if result.archive_path: + print(f"archive: {result.archive_path}", file=sys.stderr) + return 0 + + +def _cmd_a100_ncu_window(args: argparse.Namespace) -> int: + try: + common = { + "context": args.context, + "namespace": args.namespace, + "daemonset": args.daemonset, + } + if args.action == "status": + status = dcgm_window_status( + **common, + app_label=args.app_label, + a100_selector=args.a100_selector, + ) + elif args.action == "pause": + status = pause_a100_dcgm( + **common, + app_label=args.app_label, + a100_selector=args.a100_selector, + timeout_seconds=args.timeout_seconds, + ) + else: + status = restore_dcgm(**common, timeout_seconds=args.timeout_seconds) + except DcgmWindowError as exc: + print(f"error: {exc}", file=sys.stderr) + return 1 + + print(format_dcgm_status(status)) + return 0 + + def _cmd_generate_rune_profiles(args: argparse.Namespace) -> int: rendered = render_pack_yaml() out: Path = args.out @@ -711,6 +837,36 @@ def build_parser() -> argparse.ArgumentParser: run.add_argument("--out", type=Path, required=True) run.set_defaults(func=_cmd_run_gemm) + vector_sum = sub.add_parser( + "bench-vectorsum", + help="run one vectorsum_v2 reduction benchmark", + ) + vector_sum.add_argument( + "--backend", + choices=available_vector_sum_backends(), + default="torch", + ) + vector_sum.add_argument( + "--size", + type=int, + default=VECTOR_SUM_BENCHMARK_SIZES[0], + help="1D tensor size; benchmark target sizes are " + + ", ".join(str(size) for size in VECTOR_SUM_BENCHMARK_SIZES), + ) + vector_sum.add_argument("--dtype", choices=["fp16", "bf16", "fp32"], default="fp32") + vector_sum.add_argument("--repeats", type=int, default=5) + vector_sum.add_argument("--warmup", type=int, default=10) + vector_sum.add_argument("--iters", type=int, default=50) + vector_sum.add_argument("--device", default="auto") + vector_sum.add_argument( + "--allow-cpu", action="store_true", help="allow CPU timing for local smoke tests" + ) + vector_sum.add_argument("--arch-label", choices=["a100", "h100", "h200"], default=None) + vector_sum.add_argument("--seed", type=int, default=0) + vector_sum.add_argument("--block-size", type=int, default=VECTOR_SUM_DEFAULT_BLOCK_SIZE) + vector_sum.add_argument("--out", type=Path, required=True) + vector_sum.set_defaults(func=_cmd_bench_vector_sum) + liger = sub.add_parser( "liger-perkernel", help="run one paired baseline-vs-Liger per-kernel benchmark", @@ -781,6 +937,30 @@ def build_parser() -> argparse.ArgumentParser: default=True, help="enable model gradient checkpointing (default: enabled)", ) + fsdp.add_argument( + "--fsdp-wrap-policy", + choices=["root", "transformer-block"], + default="root", + help="FSDP wrapping granularity: root preserves the original one-wrapper setup; " + "transformer-block wraps decoder blocks for communication overlap experiments", + ) + fsdp.add_argument( + "--fsdp-backward-prefetch", + choices=["default", "backward-pre", "backward-post", "none"], + default="default", + help="FSDP backward prefetch policy override", + ) + fsdp.add_argument( + "--fsdp-forward-prefetch", + action="store_true", + help="enable FSDP forward prefetch for static-graph overlap experiments", + ) + fsdp.add_argument( + "--fsdp-limit-all-gathers", + action=argparse.BooleanOptionalAction, + default=True, + help="enable FSDP all-gather rate limiting (default: enabled)", + ) fsdp.add_argument( "--nproc-per-node", type=int, @@ -963,7 +1143,7 @@ def build_parser() -> argparse.ArgumentParser: ) submit.add_argument( "--workload", - choices=["gemm", "liger-rmsnorm", "liger-swiglu", "liger-fsdp"], + choices=["gemm", "vectorsum-v2", "liger-rmsnorm", "liger-swiglu", "liger-fsdp"], required=True, ) submit.add_argument("--arch", choices=["a100", "h100", "h200"], required=True) @@ -983,6 +1163,18 @@ def build_parser() -> argparse.ArgumentParser: submit.add_argument("--m", type=int, default=4096, help="GEMM only") submit.add_argument("--n", type=int, default=4096, help="GEMM only") submit.add_argument("--k", type=int, default=4096, help="GEMM only") + submit.add_argument( + "--size", + type=int, + default=VECTOR_SUM_BENCHMARK_SIZES[0], + help="vectorsum-v2 only: 1D tensor size", + ) + submit.add_argument( + "--block-size", + type=int, + default=VECTOR_SUM_DEFAULT_BLOCK_SIZE, + help="vectorsum-v2 only: Triton block size", + ) submit.add_argument( "--dtype", default=None, @@ -1042,6 +1234,29 @@ def build_parser() -> argparse.ArgumentParser: default=True, help="liger-fsdp only: enable model gradient checkpointing", ) + submit.add_argument( + "--fsdp-wrap-policy", + choices=["root", "transformer-block"], + default="root", + help="liger-fsdp only: FSDP wrapping granularity", + ) + submit.add_argument( + "--fsdp-backward-prefetch", + choices=["default", "backward-pre", "backward-post", "none"], + default="default", + help="liger-fsdp only: FSDP backward prefetch policy override", + ) + submit.add_argument( + "--fsdp-forward-prefetch", + action="store_true", + help="liger-fsdp only: enable FSDP forward prefetch", + ) + submit.add_argument( + "--fsdp-limit-all-gathers", + action=argparse.BooleanOptionalAction, + default=True, + help="liger-fsdp only: enable FSDP all-gather rate limiting", + ) submit.add_argument( "--nproc-per-node", type=int, @@ -1058,9 +1273,11 @@ def build_parser() -> argparse.ArgumentParser: ) submit.add_argument( "--backend", - choices=available_gemm_backends(), + choices=tuple( + sorted(set(available_gemm_backends()) | set(available_vector_sum_backends())) + ), default="torch", - help="GEMM only", + help="GEMM and vectorsum-v2 only", ) submit.add_argument( "--result-root", @@ -1100,6 +1317,16 @@ def build_parser() -> argparse.ArgumentParser: submit_exp.add_argument("--arch", choices=["a100", "h100", "h200"], required=True) submit_exp.add_argument("--name", default=None, help="override generated job name") submit_exp.add_argument("--profile-mode", choices=["ncu", "nsys", "torch"], default=None) + submit_exp.add_argument( + "--context", + default=None, + help="kubectl context for the rune submit invocation", + ) + submit_exp.add_argument( + "--image", + default=None, + help="override the profile runtime image for the rune submit invocation", + ) submit_exp.add_argument( "--dry-run", choices=["client", "server"], @@ -1114,6 +1341,8 @@ def build_parser() -> argparse.ArgumentParser: submit_exp.add_argument("--m", type=int, default=None, help="gemm only") submit_exp.add_argument("--n", type=int, default=None, help="gemm only") submit_exp.add_argument("--k", type=int, default=None, help="gemm only") + submit_exp.add_argument("--size", type=int, default=None, help="vectorsum-v2 only") + submit_exp.add_argument("--block-size", type=int, default=None, help="vectorsum-v2 only") submit_exp.add_argument( "--dtype", default=None, @@ -1158,6 +1387,39 @@ def build_parser() -> argparse.ArgumentParser: default=None, help="liger-fsdp only: enable model gradient checkpointing", ) + submit_exp.add_argument( + "--profile-steady-state", + action=argparse.BooleanOptionalAction, + default=None, + help=( + "liger-fsdp only: bracket measured iterations with cudaProfilerStart/Stop; " + "when combined with --profile-mode nsys, capture excludes setup/warmup" + ), + ) + submit_exp.add_argument( + "--fsdp-wrap-policy", + choices=["root", "transformer-block"], + default=None, + help="liger-fsdp only: FSDP wrapping granularity", + ) + submit_exp.add_argument( + "--fsdp-backward-prefetch", + choices=["default", "backward-pre", "backward-post", "none"], + default=None, + help="liger-fsdp only: FSDP backward prefetch policy override", + ) + submit_exp.add_argument( + "--fsdp-forward-prefetch", + action=argparse.BooleanOptionalAction, + default=None, + help="liger-fsdp only: enable FSDP forward prefetch", + ) + submit_exp.add_argument( + "--fsdp-limit-all-gathers", + action=argparse.BooleanOptionalAction, + default=None, + help="liger-fsdp only: enable FSDP all-gather rate limiting", + ) submit_exp.add_argument( "--nproc-per-node", type=int, @@ -1166,9 +1428,11 @@ def build_parser() -> argparse.ArgumentParser: ) submit_exp.add_argument( "--backend", - choices=available_gemm_backends(), + choices=tuple( + sorted(set(available_gemm_backends()) | set(available_vector_sum_backends())) + ), default=None, - help="gemm only", + help="gemm and vectorsum-v2 only", ) submit_exp.add_argument( "--result-root", @@ -1253,6 +1517,105 @@ def build_parser() -> argparse.ArgumentParser: ) inspect.set_defaults(func=_cmd_inspect_run, open=True) + bundle = sub.add_parser( + "bundle-traces", + help="fetch result JSON + profile traces into runs/traces// and " + "write a tar.gz handoff bundle for Mac/Hermes inspection", + ) + bundle.add_argument( + "jobs", + nargs="+", + help="Rune job names to fetch; use NAME:PROFILE_MODE to fetch traces " + "(PROFILE_MODE is ncu, nsys, or torch)", + ) + bundle.add_argument( + "--profile-mode", + choices=["ncu", "nsys", "torch"], + default=None, + help="default profile mode for jobs without a NAME:PROFILE_MODE suffix", + ) + bundle.add_argument( + "--bundle-name", + default=None, + help="stable bundle directory/archive name (default: trace-YYYYmmdd-HHMMSS)", + ) + bundle.add_argument( + "--local-root", + type=Path, + default=Path("runs/traces"), + help="local handoff root (default: runs/traces)", + ) + bundle.add_argument( + "--namespace", + default="ray", + help="kubernetes namespace the jobs ran in (default: ray)", + ) + bundle.add_argument( + "--context", + default=None, + help="kubectl context (default: current)", + ) + bundle.add_argument( + "--pvc", + default="training-nfs", + help="PVC name containing profile artifacts (default: training-nfs)", + ) + bundle.add_argument( + "--rune-bin", + default="rune", + help="rune binary to use for submit get (default: rune)", + ) + bundle.add_argument( + "--overwrite", + action="store_true", + help="re-fetch even when local copies already exist", + ) + bundle.add_argument( + "--no-archive", + dest="create_archive", + action="store_false", + help="only write the expanded bundle directory, not the tar.gz archive", + ) + bundle.set_defaults(func=_cmd_bundle_traces, create_archive=True) + + a100_window = sub.add_parser( + "a100-ncu-window", + help="pause/restore the DCGM exporter A100 window required for A100 Nsight Compute", + ) + a100_window.add_argument("action", choices=["status", "pause", "restore"]) + a100_window.add_argument( + "--context", + default=None, + help="kubectl context (default: current)", + ) + a100_window.add_argument( + "--namespace", + default="gpu-operator", + help="GPU operator namespace (default: gpu-operator)", + ) + a100_window.add_argument( + "--daemonset", + default="nvidia-dcgm-exporter", + help="DCGM exporter DaemonSet name (default: nvidia-dcgm-exporter)", + ) + a100_window.add_argument( + "--app-label", + default="app=nvidia-dcgm-exporter", + help="label selector for DCGM exporter pods (default: app=nvidia-dcgm-exporter)", + ) + a100_window.add_argument( + "--a100-selector", + default="nvidia.com/gpu.product=NVIDIA-A100-SXM4-80GB", + help="node selector identifying target A100 nodes", + ) + a100_window.add_argument( + "--timeout-seconds", + type=int, + default=600, + help="pause/restore wait timeout (default: 600)", + ) + a100_window.set_defaults(func=_cmd_a100_ncu_window) + ncu_summary = sub.add_parser( "ncu-summary", help="pretty-print a per-kernel summary of an Nsight Compute report " diff --git a/swordfish/runner/dcgm_window.py b/swordfish/runner/dcgm_window.py new file mode 100644 index 0000000..aef45a1 --- /dev/null +++ b/swordfish/runner/dcgm_window.py @@ -0,0 +1,215 @@ +"""A100 Nsight Compute profiling window helpers for airun.""" + +from __future__ import annotations + +import json +import subprocess +import time +from dataclasses import dataclass + + +class DcgmWindowError(RuntimeError): + pass + + +@dataclass(frozen=True) +class DcgmPod: + name: str + node: str + phase: str + ready: bool + + +@dataclass(frozen=True) +class DcgmWindowStatus: + a100_nodes: tuple[str, ...] + a100_exporter_pods: tuple[DcgmPod, ...] + desired: int + ready: int + updated: int + available: int + + @property + def a100_clear(self) -> bool: + return not self.a100_exporter_pods + + +def dcgm_window_status( + *, + context: str | None = None, + namespace: str = "gpu-operator", + daemonset: str = "nvidia-dcgm-exporter", + app_label: str = "app=nvidia-dcgm-exporter", + a100_selector: str = "nvidia.com/gpu.product=NVIDIA-A100-SXM4-80GB", +) -> DcgmWindowStatus: + a100_nodes = _a100_nodes(context=context, selector=a100_selector) + pods = _dcgm_pods(context=context, namespace=namespace, app_label=app_label) + ds = _kubectl_json( + ["-n", namespace, "get", "ds", daemonset, "-o", "json"], + context=context, + ) + status = ds.get("status", {}) + a100_set = set(a100_nodes) + return DcgmWindowStatus( + a100_nodes=tuple(a100_nodes), + a100_exporter_pods=tuple(pod for pod in pods if pod.node in a100_set), + desired=int(status.get("desiredNumberScheduled", 0)), + ready=int(status.get("numberReady", 0)), + updated=int(status.get("updatedNumberScheduled", 0)), + available=int(status.get("numberAvailable", 0)), + ) + + +def pause_a100_dcgm( + *, + context: str | None = None, + namespace: str = "gpu-operator", + daemonset: str = "nvidia-dcgm-exporter", + app_label: str = "app=nvidia-dcgm-exporter", + a100_selector: str = "nvidia.com/gpu.product=NVIDIA-A100-SXM4-80GB", + timeout_seconds: int = 300, + poll_interval_seconds: float = 5.0, +) -> DcgmWindowStatus: + """Exclude A100 nodes from DCGM exporter and delete existing A100 exporter pods.""" + patch = { + "spec": { + "template": { + "spec": { + "affinity": { + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "nvidia.com/gpu.product", + "operator": "NotIn", + "values": ["NVIDIA-A100-SXM4-80GB"], + } + ] + } + ] + } + } + } + } + } + } + } + _kubectl( + ["-n", namespace, "patch", "ds", daemonset, "--type", "merge", "-p", json.dumps(patch)], + context=context, + ) + status = dcgm_window_status( + context=context, + namespace=namespace, + daemonset=daemonset, + app_label=app_label, + a100_selector=a100_selector, + ) + if status.a100_exporter_pods: + _kubectl( + [ + "-n", + namespace, + "delete", + "pod", + *(pod.name for pod in status.a100_exporter_pods), + "--wait=true", + "--timeout=120s", + ], + context=context, + ) + deadline = time.monotonic() + timeout_seconds + while True: + status = dcgm_window_status( + context=context, + namespace=namespace, + daemonset=daemonset, + app_label=app_label, + a100_selector=a100_selector, + ) + if status.a100_clear: + return status + if time.monotonic() >= deadline: + names = ", ".join(pod.name for pod in status.a100_exporter_pods) + raise DcgmWindowError(f"DCGM exporter still running on A100 nodes: {names}") + time.sleep(poll_interval_seconds) + + +def restore_dcgm( + *, + context: str | None = None, + namespace: str = "gpu-operator", + daemonset: str = "nvidia-dcgm-exporter", + timeout_seconds: int = 600, +) -> DcgmWindowStatus: + patch = {"spec": {"template": {"spec": {"affinity": None}}}} + _kubectl( + ["-n", namespace, "patch", "ds", daemonset, "--type", "merge", "-p", json.dumps(patch)], + context=context, + ) + _kubectl( + ["-n", namespace, "rollout", "status", f"ds/{daemonset}", f"--timeout={timeout_seconds}s"], + context=context, + ) + return dcgm_window_status(context=context, namespace=namespace, daemonset=daemonset) + + +def format_dcgm_status(status: DcgmWindowStatus) -> str: + pods = ", ".join(f"{p.name}@{p.node}:{p.phase}" for p in status.a100_exporter_pods) or "none" + return "\n".join( + [ + f"A100 nodes: {', '.join(status.a100_nodes) or 'none'}", + f"A100 exporter pods: {pods}", + "DaemonSet: " + f"desired={status.desired} ready={status.ready} " + f"updated={status.updated} available={status.available}", + ] + ) + + +def _a100_nodes(*, context: str | None, selector: str) -> list[str]: + payload = _kubectl_json(["get", "nodes", "-l", selector, "-o", "json"], context=context) + return [item["metadata"]["name"] for item in payload.get("items", [])] + + +def _dcgm_pods(*, context: str | None, namespace: str, app_label: str) -> list[DcgmPod]: + payload = _kubectl_json( + ["-n", namespace, "get", "pods", "-l", app_label, "-o", "json"], + context=context, + ) + out: list[DcgmPod] = [] + for item in payload.get("items", []): + statuses = item.get("status", {}).get("containerStatuses", []) + ready = bool(statuses and all(s.get("ready") for s in statuses)) + out.append( + DcgmPod( + name=item["metadata"]["name"], + node=item.get("spec", {}).get("nodeName", ""), + phase=item.get("status", {}).get("phase", ""), + ready=ready, + ) + ) + return out + + +def _kubectl_json(args: list[str], *, context: str | None) -> dict: + proc = _kubectl(args, context=context) + return json.loads(proc.stdout) + + +def _kubectl(args: list[str], *, context: str | None) -> subprocess.CompletedProcess[str]: + argv = ["kubectl"] + if context: + argv += ["--context", context] + argv += args + proc = subprocess.run( + argv, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False + ) + if proc.returncode != 0: + raise DcgmWindowError( + f"kubectl failed ({proc.returncode}): {' '.join(argv)}\n" + f"{proc.stderr.strip() or proc.stdout.strip()}" + ) + return proc diff --git a/swordfish/runner/liger_fsdp.py b/swordfish/runner/liger_fsdp.py index 626a814..27b5def 100644 --- a/swordfish/runner/liger_fsdp.py +++ b/swordfish/runner/liger_fsdp.py @@ -15,6 +15,7 @@ import time from contextlib import contextmanager from dataclasses import asdict, dataclass +from functools import partial from importlib import metadata from collections.abc import Iterator from typing import Any, Literal @@ -27,11 +28,13 @@ from swordfish.runner.schema import TRAINING_SCHEMA_VERSION, latency_stats from swordfish.runner.torch_gemm import _resolve_device, capture_env from swordfish.transformer.config import GPTConfig -from swordfish.transformer.model import GPTLanguageModel +from swordfish.transformer.model import GPTDecoderBlock, GPTLanguageModel LigerMode = Literal["baseline", "liger"] ModelSource = Literal["reference", "transformers"] ModelPreset = Literal["tiny", "llama3-8b"] +FsdpWrapPolicy = Literal["root", "transformer-block"] +FsdpBackwardPrefetch = Literal["default", "backward-pre", "backward-post", "none"] @dataclass(frozen=True) @@ -204,15 +207,22 @@ def _maybe_wrap_fsdp( *, state: DistributedState, dtype: torch.dtype, + model_source: ModelSource, + fsdp_wrap_policy: FsdpWrapPolicy, + fsdp_backward_prefetch: FsdpBackwardPrefetch, + fsdp_forward_prefetch: bool, + fsdp_limit_all_gathers: bool, ) -> tuple[nn.Module, str]: if state.world_size == 1: return model, "single_process" from torch.distributed.fsdp import ( # noqa: PLC0415 + BackwardPrefetch, FullyShardedDataParallel as FSDP, MixedPrecision, ShardingStrategy, ) + from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy # noqa: PLC0415 mixed_precision = None if dtype in {torch.float16, torch.bfloat16}: @@ -222,13 +232,36 @@ def _maybe_wrap_fsdp( buffer_dtype=dtype, ) + auto_wrap_policy = None + if fsdp_wrap_policy == "transformer-block": + if model_source == "transformers": + from transformers.models.llama.modeling_llama import LlamaDecoderLayer # noqa: PLC0415 + + transformer_layer_cls = {LlamaDecoderLayer} + else: + transformer_layer_cls = {GPTDecoderBlock} + auto_wrap_policy = partial( + transformer_auto_wrap_policy, + transformer_layer_cls=transformer_layer_cls, + ) + + fsdp_kwargs: dict[str, Any] = { + "sharding_strategy": ShardingStrategy.FULL_SHARD, + "mixed_precision": mixed_precision, + "use_orig_params": True, + "auto_wrap_policy": auto_wrap_policy, + "forward_prefetch": fsdp_forward_prefetch, + "limit_all_gathers": fsdp_limit_all_gathers, + } + if fsdp_backward_prefetch != "default": + fsdp_kwargs["backward_prefetch"] = { + "backward-pre": BackwardPrefetch.BACKWARD_PRE, + "backward-post": BackwardPrefetch.BACKWARD_POST, + "none": None, + }[fsdp_backward_prefetch] + return ( - FSDP( - model, - sharding_strategy=ShardingStrategy.FULL_SHARD, - mixed_precision=mixed_precision, - use_orig_params=True, - ), + FSDP(model, **fsdp_kwargs), "FSDP1", ) @@ -312,6 +345,10 @@ def run_liger_fsdp_step( weight_decay: float = 0.1, gradient_checkpointing: bool = True, profile_steady_state: bool = False, + fsdp_wrap_policy: FsdpWrapPolicy = "root", + fsdp_backward_prefetch: FsdpBackwardPrefetch = "default", + fsdp_forward_prefetch: bool = False, + fsdp_limit_all_gathers: bool = True, ) -> dict[str, Any] | None: """Run one baseline or Liger-patched training-step benchmark. @@ -335,6 +372,12 @@ def run_liger_fsdp_step( ) if lr <= 0 or weight_decay < 0: raise ValueError("lr must be positive and weight_decay must be non-negative") + if fsdp_wrap_policy not in {"root", "transformer-block"}: + raise ValueError("fsdp_wrap_policy must be 'root' or 'transformer-block'") + if fsdp_backward_prefetch not in {"default", "backward-pre", "backward-post", "none"}: + raise ValueError( + "fsdp_backward_prefetch must be one of: default, backward-pre, backward-post, none" + ) spec = MODEL_PRESETS[model_preset] if seq_len > spec.block_size: @@ -362,7 +405,16 @@ def run_liger_fsdp_step( else: model = _build_reference_model(spec, device=state.device, dtype=torch_dtype) - model, distributed_strategy = _maybe_wrap_fsdp(model, state=state, dtype=torch_dtype) + model, distributed_strategy = _maybe_wrap_fsdp( + model, + state=state, + dtype=torch_dtype, + model_source=model_source, + fsdp_wrap_policy=fsdp_wrap_policy, + fsdp_backward_prefetch=fsdp_backward_prefetch, + fsdp_forward_prefetch=fsdp_forward_prefetch, + fsdp_limit_all_gathers=fsdp_limit_all_gathers, + ) model.train() optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay) @@ -500,6 +552,12 @@ def step_once(*, phase: Literal["warmup", "measure"]) -> torch.Tensor: "weight_decay": weight_decay, "gradient_checkpointing": gradient_checkpointing, "gradient_checkpointing_use_reentrant": (False if gradient_checkpointing else None), + "fsdp": { + "wrap_policy": fsdp_wrap_policy, + "backward_prefetch": fsdp_backward_prefetch, + "forward_prefetch": fsdp_forward_prefetch, + "limit_all_gathers": fsdp_limit_all_gathers, + }, "profile": { "nvtx_ranges": True, "steady_state_cuda_profiler_api": profile_steady_state, diff --git a/swordfish/runner/trace_bundle.py b/swordfish/runner/trace_bundle.py new file mode 100644 index 0000000..2eb87c0 --- /dev/null +++ b/swordfish/runner/trace_bundle.py @@ -0,0 +1,152 @@ +"""Fetch profiled Rune jobs into a stable trace handoff bundle.""" + +from __future__ import annotations + +import json +import tarfile +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + +from swordfish.dispatch import FetchedRunArtifacts, fetch_run_artifacts + +PROFILE_EXTENSIONS = {"ncu": "ncu-rep", "nsys": "nsys-rep", "torch": "json"} + + +@dataclass(frozen=True) +class TraceJobSpec: + name: str + profile_mode: str | None + + +@dataclass(frozen=True) +class TraceBundleResult: + bundle_name: str + bundle_dir: Path + manifest_path: Path + archive_path: Path | None + jobs: tuple[TraceJobSpec, ...] + + +def parse_trace_job_spec(raw: str, *, default_profile_mode: str | None = None) -> TraceJobSpec: + """Parse ``NAME[:PROFILE_MODE]`` for trace handoff commands.""" + if ":" in raw: + name, mode = raw.rsplit(":", 1) + profile_mode = mode or None + else: + name = raw + profile_mode = default_profile_mode + name = name.strip() + if not name: + raise ValueError("trace job name cannot be empty") + if profile_mode is not None and profile_mode not in PROFILE_EXTENSIONS: + allowed = ", ".join(sorted(PROFILE_EXTENSIONS)) + raise ValueError(f"profile mode {profile_mode!r} not in: {allowed}") + return TraceJobSpec(name=name, profile_mode=profile_mode) + + +def bundle_traces( + jobs: list[TraceJobSpec] | tuple[TraceJobSpec, ...], + *, + bundle_name: str | None = None, + local_root: Path | str = Path("runs/traces"), + namespace: str = "ray", + context: str | None = None, + pvc: str = "training-nfs", + rune_bin: str = "rune", + overwrite: bool = False, + create_archive: bool = True, +) -> TraceBundleResult: + """Fetch result JSON + traces for jobs and write a portable tarball. + + Local layout is intentionally stable so either a human or Hermes can pick it + up without knowing Rune's PVC internals: + + ``runs/traces///.json`` + ``runs/traces///.{ncu-rep|nsys-rep|json}`` + ``runs/traces//manifest.json`` + ``runs/traces/.tar.gz`` + """ + if not jobs: + raise ValueError("at least one job is required") + + root = Path(local_root) + resolved_name = bundle_name or f"trace-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" + bundle_dir = root / resolved_name + bundle_dir.mkdir(parents=True, exist_ok=True) + + fetched_jobs: list[dict[str, object]] = [] + specs = tuple(jobs) + for spec in specs: + fetched = fetch_run_artifacts( + name=spec.name, + profile_mode=spec.profile_mode, + local_dir=bundle_dir / spec.name, + namespace=namespace, + context=context, + pvc=pvc, + rune_bin=rune_bin, + overwrite=overwrite, + ) + fetched_jobs.append(_manifest_entry(fetched, bundle_dir=bundle_dir, pvc=pvc)) + + manifest = { + "schema_version": "swordfish.trace-bundle.v1", + "bundle_name": resolved_name, + "created_at": datetime.now(timezone.utc).isoformat(), + "namespace": namespace, + "context": context, + "pvc": pvc, + "jobs": fetched_jobs, + } + manifest_path = bundle_dir / "manifest.json" + manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n") + + archive_path = None + if create_archive: + archive_path = root / f"{resolved_name}.tar.gz" + archive_path.parent.mkdir(parents=True, exist_ok=True) + with tarfile.open(archive_path, mode="w:gz") as archive: + archive.add(bundle_dir, arcname=resolved_name) + + return TraceBundleResult( + bundle_name=resolved_name, + bundle_dir=bundle_dir, + manifest_path=manifest_path, + archive_path=archive_path, + jobs=specs, + ) + + +def _manifest_entry( + fetched: FetchedRunArtifacts, + *, + bundle_dir: Path, + pvc: str, +) -> dict[str, object]: + files = [_file_entry(fetched.result_json, bundle_dir)] + remote_profile_path = None + if fetched.profile_artifact is not None and fetched.profile_mode is not None: + files.append(_file_entry(fetched.profile_artifact, bundle_dir)) + ext = PROFILE_EXTENSIONS[fetched.profile_mode] + remote_profile_path = f"/data/{fetched.name}/profile/profile.{ext}" + return { + "name": fetched.name, + "profile_mode": fetched.profile_mode, + "result_json": str(fetched.result_json.relative_to(bundle_dir)), + "profile_artifact": ( + str(fetched.profile_artifact.relative_to(bundle_dir)) + if fetched.profile_artifact is not None + else None + ), + "remote_profile_path": remote_profile_path, + "remote_profile_pvc": pvc if remote_profile_path else None, + "files": files, + } + + +def _file_entry(path: Path, bundle_dir: Path) -> dict[str, object]: + return { + "path": str(path.relative_to(bundle_dir)), + "bytes": path.stat().st_size, + } diff --git a/swordfish/runner/vector_sum.py b/swordfish/runner/vector_sum.py new file mode 100644 index 0000000..dac2b6e --- /dev/null +++ b/swordfish/runner/vector_sum.py @@ -0,0 +1,271 @@ +"""Benchmark harness for the vectorsum_v2 reduction target.""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from typing import Any, Callable + +import torch + +from swordfish.kernels.vector_sum import DEFAULT_BLOCK_SIZE, partial_count +from swordfish.runner.backends import TORCH_DTYPES +from swordfish.runner.schema import ( + DTYPE_BYTES, + SCHEMA_VERSION, + latency_stats, + pct_of_peak, + peak_for, + tbps_from_ms, +) +from swordfish.runner.torch_gemm import _resolve_device, _time_cpu, _time_cuda, capture_env + +VECTOR_SUM_BENCHMARK_SIZES = ( + 1_638_400, + 3_276_800, + 6_553_600, + 13_107_200, + 26_214_400, + 52_428_800, +) + +VectorSumRunner = Callable[["VectorSumState"], torch.Tensor] + + +@dataclass(frozen=True) +class VectorSumState: + x: torch.Tensor + out: torch.Tensor + partials: torch.Tensor | None + runner: VectorSumRunner + block_size: int + + +def make_vector_sum_input( + *, + size: int, + dtype: str, + device: torch.device, + seed: int, +) -> torch.Tensor: + """Generate the target input: normal fp32 data with deterministic scale+offset.""" + gen = torch.Generator(device=device.type) + gen.manual_seed(seed) + data = torch.randn( + (size,), device=device, dtype=TORCH_DTYPES[dtype], generator=gen + ).contiguous() + + offset_gen = torch.Generator(device=device.type) + offset_gen.manual_seed(seed + 1) + scale_gen = torch.Generator(device=device.type) + scale_gen.manual_seed(seed + 2) + + offset = (torch.rand(1, device=device, generator=offset_gen) * 200 - 100).item() + scale = (torch.rand(1, device=device, generator=scale_gen) * 9.9 + 0.1).item() + return (data * scale + offset).contiguous() + + +def available_vector_sum_backends() -> tuple[str, ...]: + return ("torch", "triton") + + +def _prepare_torch( + *, + size: int, + dtype: str, + device: torch.device, + seed: int, + block_size: int, +) -> VectorSumState: + from swordfish.kernels.vector_sum import torch_vector_sum_reference + + x = make_vector_sum_input(size=size, dtype=dtype, device=device, seed=seed) + out = torch.empty((1,), device=device, dtype=torch.float32) + + def run(state: VectorSumState) -> torch.Tensor: + return torch_vector_sum_reference(state.x, state.out) + + return VectorSumState(x=x, out=out, partials=None, runner=run, block_size=block_size) + + +def _prepare_triton( + *, + size: int, + dtype: str, + device: torch.device, + seed: int, + block_size: int, +) -> VectorSumState: + if device.type != "cuda": + raise RuntimeError("vectorsum_v2 Triton backend requires a CUDA device") + from swordfish.kernels.vector_sum import triton_vector_sum + + x = make_vector_sum_input(size=size, dtype=dtype, device=device, seed=seed) + out = torch.empty((1,), device=device, dtype=torch.float32) + partials = torch.empty((partial_count(size, block_size),), device=device, dtype=torch.float32) + + def run(state: VectorSumState) -> torch.Tensor: + if state.partials is None: + raise RuntimeError("vectorsum_v2 Triton state is missing partials") + return triton_vector_sum( + state.x, + state.out, + state.partials, + block_size=state.block_size, + ) + + return VectorSumState(x=x, out=out, partials=partials, runner=run, block_size=block_size) + + +def _prepare_state( + *, + backend: str, + size: int, + dtype: str, + device: torch.device, + seed: int, + block_size: int, +) -> VectorSumState: + if backend == "torch": + return _prepare_torch( + size=size, + dtype=dtype, + device=device, + seed=seed, + block_size=block_size, + ) + if backend == "triton": + return _prepare_triton( + size=size, + dtype=dtype, + device=device, + seed=seed, + block_size=block_size, + ) + raise ValueError( + f"unknown vectorsum_v2 backend {backend!r}; expected one of {available_vector_sum_backends()}" + ) + + +def _reference_check(state: VectorSumState, *, size: int, dtype: str) -> dict[str, Any]: + from swordfish.kernels.vector_sum import torch_vector_sum_reference + + expected = torch_vector_sum_reference(state.x).reshape(-1)[0].detach() + actual = state.out.reshape(-1)[0].detach() + max_abs_error = float(torch.abs(actual - expected).item()) + + # Floating-point reductions are order-dependent. The benchmark input is + # N(0, 1), so expected numerical noise grows roughly with sqrt(N). + dtype_scale = 2.0 if dtype in {"fp16", "bf16"} else 1.0 + atol = dtype_scale * max(1e-5, 1e-3 * math.sqrt(size)) + rtol = 1e-5 if dtype == "fp32" else 1e-3 + matches = torch.allclose(actual, expected, atol=atol, rtol=rtol) + + return { + "reference_backend": "torch", + "matches_reference": matches, + "max_abs_error": max_abs_error, + "atol": atol, + "rtol": rtol, + "output_shape": list(state.out.shape), + "output_fp32": float(actual.item()), + "reference_fp32": float(expected.item()), + } + + +def run_vector_sum_benchmark( + *, + size: int, + dtype: str = "fp32", + repeats: int, + warmup: int, + iters: int, + device_name: str = "auto", + allow_cpu: bool = False, + arch_label: str | None = None, + seed: int = 0, + backend: str = "torch", + block_size: int = DEFAULT_BLOCK_SIZE, +) -> dict[str, Any]: + if dtype not in TORCH_DTYPES: + raise ValueError(f"unknown dtype {dtype!r}; expected one of {sorted(TORCH_DTYPES)}") + if backend not in available_vector_sum_backends(): + raise ValueError( + f"unknown vectorsum_v2 backend {backend!r}; expected one of {available_vector_sum_backends()}" + ) + if min(size, repeats, iters, block_size) <= 0 or warmup < 0: + raise ValueError( + "size, repeats, iters, and block_size must be positive; warmup must be non-negative" + ) + partial_count(size, block_size) + + device = _resolve_device(device_name, allow_cpu=allow_cpu) + state = _prepare_state( + backend=backend, + size=size, + dtype=dtype, + device=device, + seed=seed, + block_size=block_size, + ) + + timer = _time_cuda if device.type == "cuda" else _time_cpu + samples_ms = [ + timer(lambda: state.runner(state), warmup=warmup, iters=iters) for _ in range(repeats) + ] + stats = latency_stats(samples_ms) + + state.runner(state) + if device.type == "cuda": + torch.cuda.synchronize(device) + + finite = bool(torch.isfinite(state.out).all().item()) + reference = _reference_check(state, size=size, dtype=dtype) + env = capture_env(device, arch_label=arch_label) + gpu_class = env["gpu_class"] + + input_bytes = size * DTYPE_BYTES[dtype] + output_bytes = 4 + partials_bytes = 0 + if backend == "triton": + partials_bytes = partial_count(size, block_size) * 4 + estimated_bytes = input_bytes + output_bytes + 2 * partials_bytes + mean_ms = stats["mean_ms"] + bandwidth_tbps = tbps_from_ms(estimated_bytes, mean_ms) + hbm_peak = peak_for(gpu_class, dtype, "hbm_tbps") + + return { + "schema_version": SCHEMA_VERSION, + "benchmark": "vectorsum_v2", + "config": { + "scope": "vector_sum", + "backend": backend, + "shape": {"size": size}, + "size": size, + "dtype": dtype, + "repeats": repeats, + "warmup": warmup, + "iters": iters, + "seed": seed, + "block_size": block_size, + }, + "env": env, + "correctness": { + "finite_output": finite, + **reference, + }, + "metrics": { + "latency": stats, + "elements": size, + "elements_per_second": size / (mean_ms / 1000.0) + if mean_ms > 0 and not math.isnan(mean_ms) + else float("nan"), + "input_bytes": input_bytes, + "output_bytes": output_bytes, + "partials_bytes": partials_bytes, + "estimated_bytes": estimated_bytes, + "estimated_bandwidth_tbps": bandwidth_tbps, + "hbm_peak_tbps": hbm_peak, + "estimated_hbm_sol_pct": pct_of_peak(bandwidth_tbps, hbm_peak), + }, + } diff --git a/tests/test_dispatch.py b/tests/test_dispatch.py index 23c5e0b..993aa4b 100644 --- a/tests/test_dispatch.py +++ b/tests/test_dispatch.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json +import tarfile import tempfile from pathlib import Path @@ -13,9 +15,11 @@ LigerFsdpRun, LigerPerkernelMatrix, LigerPerkernelRun, + RuneProfileSecurityError, RuneSubmit, RuneSubmitGetMissingAnnotationsError, TorchGemmRun, + VectorSumRun, build_run_for_experiment, fetch_via_rune_submit_get, list_experiments, @@ -116,6 +120,57 @@ def test_rune_submit_rejects_unknown_profile_mode(): RuneSubmit(name="j", preset="p", script="s.sh", profile_mode="vtune") +def test_a100_ncu_submit_preflights_sys_admin_security_context(monkeypatch): + calls: list[list[str]] = [] + + def fake_run(args, **_kwargs): + calls.append(list(args)) + + class P: + returncode = 0 + stdout = "securityContext:\n capabilities:\n add:\n - SYS_ADMIN\n" + stderr = "" + + return P() + + monkeypatch.setattr("subprocess.run", fake_run) + run = RuneSubmit( + name="a100-ncu", + profile="swordfish-bench-a100-ncu", + script="bench.sh", + profile_mode="ncu", + ) + + result = run.submit() + + assert result.submitted + assert len(calls) == 2 + assert calls[0][-1] == "client" + assert "--dry-run" in calls[0] + assert "--dry-run" not in calls[1] + + +def test_a100_ncu_submit_blocks_when_rune_drops_sys_admin(monkeypatch): + def fake_run(args, **_kwargs): + class P: + returncode = 0 + stdout = "apiVersion: batch/v1\nkind: Job\n" + stderr = "" + + return P() + + monkeypatch.setattr("subprocess.run", fake_run) + run = RuneSubmit( + name="a100-ncu", + profile="swordfish-bench-a100-ncu", + script="bench.sh", + profile_mode="ncu", + ) + + with pytest.raises(RuneProfileSecurityError, match="SYS_ADMIN"): + run.submit() + + def test_liger_perkernel_run_defaults_to_swordfish_profile_pack(): """Default submit path uses the swordfish-bench- profile (not raw preset) so edits to swordfish/dispatch/profiles.py flow into actual jobs.""" @@ -324,6 +379,43 @@ def test_torch_gemm_run_profile_mode_ncu_still_uses_rune_native(): assert not any(e.startswith("SWORDFISH_PROFILE=") for e in env_args) +def test_vector_sum_run_defaults_to_triton_profile_and_unique_output(): + run = VectorSumRun(arch="a100", size=52_428_800) + submit = run.to_rune_submit() + + assert run.resolved_name == "sf-vectorsum-v2-triton-52428800-a100" + assert submit.profile == "swordfish-bench-a100" + assert submit.output == "/data/swordfish/week1/vectorsum-v2/triton-52428800-a100.json" + assert "--gpu-class" in submit.extra_args + assert "a100-nvlink-80gb" in submit.extra_args + + +def test_vector_sum_run_forwarded_args_include_reduction_contract(): + run = VectorSumRun(arch="h200", backend="torch", size=1_638_400, block_size=2048) + forwarded = run.forwarded_args + + assert forwarded[0] == "bench-vectorsum" + assert "--backend" in forwarded + assert forwarded[forwarded.index("--backend") + 1] == "torch" + assert "--size" in forwarded + assert forwarded[forwarded.index("--size") + 1] == "1638400" + assert "--block-size" in forwarded + assert forwarded[forwarded.index("--block-size") + 1] == "2048" + assert "--arch-label" in forwarded + assert "h200" in forwarded + + +def test_vector_sum_run_profile_mode_torch_uses_in_process_profiler(): + run = VectorSumRun(arch="a100", profile_mode="torch") + submit = run.to_rune_submit() + args = submit.to_args() + + assert "--profile-mode" not in args + env_args = [args[i + 1] for i, a in enumerate(args) if a == "--env"] + assert "SWORDFISH_PROFILE=torch" in env_args + assert any(e.endswith("/profile/profile.json") for e in env_args) + + def test_liger_perkernel_run_profile_mode_allows_custom_script(): """The 'profile_mode only with default bench script' restriction is gone: rune wraps any cmd at the renderer level, so custom scripts work.""" @@ -384,6 +476,24 @@ def test_liger_fsdp_run_forwarded_args_include_torchrun_contract(): assert "/data/swordfish/week1/liger-fsdp/llama3-8b-liger-a100.json" in forwarded +def test_liger_fsdp_run_custom_name_uses_unique_output_path(): + run = LigerFsdpRun( + arch="a100", + mode="liger", + name="sf-fsdp-liger-knob-tb-no-limit-05031248-a100", + fsdp_wrap_policy="transformer-block", + fsdp_limit_all_gathers=False, + ) + submit = run.to_rune_submit() + + assert ( + run.out_path + == "/data/swordfish/week1/liger-fsdp/sf-fsdp-liger-knob-tb-no-limit-05031248-a100.json" + ) + assert submit.output == run.out_path + assert run.out_path in run.forwarded_args + + def test_liger_fsdp_run_to_command_renders_dry_run(): run = LigerFsdpRun(arch="h100", mode="baseline", name="fsdp_smoke") cmd = run.to_command(dry_run="client") @@ -430,6 +540,25 @@ def test_liger_fsdp_run_profile_steady_state_sets_runner_and_nsys_capture_env(): assert "NSYS_CAPTURE_RANGE_END=stop" in env_args +def test_liger_fsdp_run_forwarded_args_include_fsdp_overlap_knobs(): + run = LigerFsdpRun( + arch="a100", + mode="liger", + fsdp_wrap_policy="transformer-block", + fsdp_backward_prefetch="backward-post", + fsdp_forward_prefetch=True, + fsdp_limit_all_gathers=False, + ) + forwarded = run.forwarded_args + + assert "--fsdp-wrap-policy" in forwarded + assert forwarded[forwarded.index("--fsdp-wrap-policy") + 1] == "transformer-block" + assert "--fsdp-backward-prefetch" in forwarded + assert forwarded[forwarded.index("--fsdp-backward-prefetch") + 1] == "backward-post" + assert "--fsdp-forward-prefetch" in forwarded + assert "--no-fsdp-limit-all-gathers" in forwarded + + # --------------------------------------------------------------------------- # experiment registry # --------------------------------------------------------------------------- @@ -438,8 +567,15 @@ def test_liger_fsdp_run_profile_steady_state_sets_runner_and_nsys_capture_env(): def test_experiment_registry_lists_current_workloads(): specs = {spec.name: spec for spec in list_experiments()} - assert set(specs) == {"gemm", "liger-fsdp", "liger-rmsnorm", "liger-swiglu"} + assert set(specs) == { + "gemm", + "vectorsum-v2", + "liger-fsdp", + "liger-rmsnorm", + "liger-swiglu", + } assert specs["gemm"].profile_family == "bench" + assert specs["vectorsum-v2"].profile_family == "bench" assert specs["liger-fsdp"].profile_family == "fsdp" @@ -469,11 +605,71 @@ def test_build_run_for_experiment_uses_resolved_profile(): assert run.m == 1024 and run.n == 2048 and run.k == 4096 +def test_build_run_for_vectorsum_experiment_uses_bench_profile_and_overrides(): + run = build_run_for_experiment( + "vectorsum-v2", + "h200", + { + "backend": "triton", + "size": 52_428_800, + "dtype": "fp32", + "block_size": 2048, + }, + ) + submit = run.to_rune_submit() + + assert isinstance(run, VectorSumRun) + assert submit.profile == "swordfish-bench-h200" + assert submit.preset is None + assert run.size == 52_428_800 + assert run.block_size == 2048 + assert "--size" in run.forwarded_args + assert "52428800" in run.forwarded_args + + +def test_build_run_for_vectorsum_experiment_uses_a100_ncu_profile(): + run = build_run_for_experiment( + "vectorsum-v2", + "a100", + {"profile_mode": "ncu"}, + ) + submit = run.to_rune_submit() + + assert isinstance(run, VectorSumRun) + assert submit.profile == "swordfish-bench-a100-ncu" + assert "--profile-mode" in submit.to_args() + + +def test_build_run_for_liger_fsdp_experiment_uses_a100_ncu_profile(): + run = build_run_for_experiment( + "liger-fsdp", + "a100", + {"mode": "liger", "profile_mode": "ncu"}, + ) + submit = run.to_rune_submit() + + assert isinstance(run, LigerFsdpRun) + assert submit.profile == "swordfish-fsdp-a100-ncu" + assert "--profile-mode" in submit.to_args() + + def test_build_run_for_liger_fsdp_experiment_uses_fsdp_profile_and_overrides(): run = build_run_for_experiment( "liger-fsdp", "a100", - {"mode": "liger", "repeats": 1, "warmup": 0, "iters": 1}, + { + "mode": "liger", + "repeats": 1, + "warmup": 0, + "iters": 1, + "profile_steady_state": True, + "fsdp_wrap_policy": "transformer-block", + "fsdp_backward_prefetch": "backward-pre", + "fsdp_forward_prefetch": True, + "fsdp_limit_all_gathers": False, + "context": "voice-agent-flex", + "image": "voiceagentcr.azurecr.io/airun/swordfish-bench:bf92726-dirty", + }, ) submit = run.to_rune_submit() @@ -482,6 +678,16 @@ def test_build_run_for_liger_fsdp_experiment_uses_fsdp_profile_and_overrides(): assert submit.preset is None assert "--liger-mode" in run.forwarded_args assert "liger" in run.forwarded_args + assert "--profile-steady-state" in run.forwarded_args + assert "--fsdp-wrap-policy" in run.forwarded_args + assert "transformer-block" in run.forwarded_args + assert "--fsdp-backward-prefetch" in run.forwarded_args + assert "backward-pre" in run.forwarded_args + assert "--fsdp-forward-prefetch" in run.forwarded_args + assert "--no-fsdp-limit-all-gathers" in run.forwarded_args + assert submit.context == "voice-agent-flex" + assert submit.image == "voiceagentcr.azurecr.io/airun/swordfish-bench:bf92726-dirty" + assert "--context" in submit.to_args() def test_every_registered_experiment_resolves_to_generated_profile_pack(): @@ -1122,6 +1328,8 @@ def fake_submit(self, *, dry_run=None, **kwargs): captured["profile"] = self.to_rune_submit().profile captured["dry_run"] = dry_run captured["mode"] = self.mode + captured["context"] = self.context + captured["image"] = self.image from swordfish.dispatch.rune import RuneSubmitResult return RuneSubmitResult( @@ -1141,6 +1349,10 @@ def fake_submit(self, *, dry_run=None, **kwargs): "a100", "--liger-mode", "liger", + "--context", + "voice-agent-flex", + "--image", + "voiceagentcr.azurecr.io/airun/swordfish-bench:bf92726-dirty", "--dry-run", "client", ] @@ -1151,6 +1363,8 @@ def fake_submit(self, *, dry_run=None, **kwargs): "profile": "swordfish-fsdp-a100", "dry_run": "client", "mode": "liger", + "context": "voice-agent-flex", + "image": "voiceagentcr.azurecr.io/airun/swordfish-bench:bf92726-dirty", } @@ -1460,6 +1674,155 @@ def _raise_unavailable(_path): assert "brew install" in err and "ncu-summary" in err +# --------------------------------------------------------------------------- +# bundle-traces CLI / Hermes handoff bundle +# --------------------------------------------------------------------------- + + +def test_parse_trace_job_spec_accepts_inline_profile_mode(): + from swordfish.runner.trace_bundle import parse_trace_job_spec + + spec = parse_trace_job_spec("sf-job:nsys") + + assert spec.name == "sf-job" + assert spec.profile_mode == "nsys" + + +def test_bundle_traces_fetches_jobs_and_writes_manifest_archive(monkeypatch, tmp_path): + from swordfish.dispatch.results import FetchedRunArtifacts + from swordfish.runner.trace_bundle import TraceJobSpec, bundle_traces + + calls: list[dict] = [] + + def fake_fetch_run_artifacts(**kwargs): + calls.append(kwargs) + local_dir = Path(kwargs["local_dir"]) + local_dir.mkdir(parents=True, exist_ok=True) + name = kwargs["name"] + result_json = local_dir / f"{name}.json" + result_json.write_text('{"ok": true}\n') + profile_artifact = None + if kwargs["profile_mode"] == "ncu": + profile_artifact = local_dir / f"{name}.ncu-rep" + profile_artifact.write_bytes(b"NCU") + return FetchedRunArtifacts( + name=name, + local_dir=local_dir, + result_json=result_json, + profile_artifact=profile_artifact, + profile_mode=kwargs["profile_mode"], + ) + + monkeypatch.setattr( + "swordfish.runner.trace_bundle.fetch_run_artifacts", fake_fetch_run_artifacts + ) + + result = bundle_traces( + [TraceJobSpec("job-a", "ncu"), TraceJobSpec("job-b", None)], + bundle_name="handoff", + local_root=tmp_path, + namespace="ray", + context="voice-agent-flex", + pvc="training-nfs", + ) + + assert [c["name"] for c in calls] == ["job-a", "job-b"] + assert result.archive_path == tmp_path / "handoff.tar.gz" + manifest = json.loads(result.manifest_path.read_text()) + assert manifest["schema_version"] == "swordfish.trace-bundle.v1" + assert manifest["jobs"][0]["remote_profile_path"] == "/data/job-a/profile/profile.ncu-rep" + assert manifest["jobs"][1]["profile_artifact"] is None + with tarfile.open(result.archive_path) as archive: + names = set(archive.getnames()) + assert "handoff/manifest.json" in names + assert "handoff/job-a/job-a.ncu-rep" in names + + +def test_bundle_traces_cli_wires_args(monkeypatch, tmp_path, capsys): + from swordfish.runner import cli + + captured: dict = {} + + def fake_bundle_traces(jobs, **kwargs): + captured["jobs"] = jobs + captured.update(kwargs) + from swordfish.runner.trace_bundle import TraceBundleResult + + bundle_dir = tmp_path / "b" + manifest = bundle_dir / "manifest.json" + archive = tmp_path / "b.tar.gz" + return TraceBundleResult( + bundle_name="b", + bundle_dir=bundle_dir, + manifest_path=manifest, + archive_path=archive, + jobs=tuple(jobs), + ) + + monkeypatch.setattr(cli, "bundle_traces", fake_bundle_traces) + + rc = cli.main( + [ + "bundle-traces", + "job1:ncu", + "job2", + "--profile-mode", + "nsys", + "--bundle-name", + "b", + "--local-root", + str(tmp_path), + "--context", + "voice-agent-flex", + "--overwrite", + ] + ) + + assert rc == 0 + assert [j.name for j in captured["jobs"]] == ["job1", "job2"] + assert [j.profile_mode for j in captured["jobs"]] == ["ncu", "nsys"] + assert captured["bundle_name"] == "b" + assert captured["context"] == "voice-agent-flex" + assert captured["overwrite"] is True + assert "archive:" in capsys.readouterr().err + + +def test_a100_ncu_window_cli_pause_wires_helpers(monkeypatch, capsys): + from swordfish.runner import cli + from swordfish.runner.dcgm_window import DcgmWindowStatus + + captured: dict = {} + + def fake_pause(**kwargs): + captured.update(kwargs) + return DcgmWindowStatus( + a100_nodes=("a100-node",), + a100_exporter_pods=(), + desired=4, + ready=4, + updated=4, + available=4, + ) + + monkeypatch.setattr(cli, "pause_a100_dcgm", fake_pause) + + rc = cli.main( + [ + "a100-ncu-window", + "pause", + "--context", + "voice-agent-flex", + "--timeout-seconds", + "123", + ] + ) + + assert rc == 0 + assert captured["context"] == "voice-agent-flex" + assert captured["timeout_seconds"] == 123 + assert "A100 exporter pods: none" in capsys.readouterr().out + + # ----------------------------------------------------------------------------- # Cluster-side .ncu-rep -> .ncu-summary.csv converter (swordfish.dispatch.ncu_convert) # ----------------------------------------------------------------------------- diff --git a/tests/test_runner.py b/tests/test_runner.py index 276dd40..dbb4de7 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -13,6 +13,7 @@ raw_ptx_blocker, torch_vector_add_reference, ) +from swordfish.kernels.vector_sum import torch_vector_sum_reference, triton_vector_sum from swordfish.quant.marlin_triton import ( dequantize_weight_int4, pack_int4_signed, @@ -44,6 +45,10 @@ write_result, ) from swordfish.runner.upstream import render_upstream_packet +from swordfish.runner.vector_sum import ( + VECTOR_SUM_BENCHMARK_SIZES, + run_vector_sum_benchmark, +) def test_gpu_class_from_name(): @@ -230,6 +235,12 @@ def test_liger_fsdp_reference_train_step_cpu_smoke(): assert result["config"]["shape"]["global_batch_size"] == 1 assert result["config"]["shape"]["world_size"] == 1 assert result["config"]["liger"]["applied"] is False + assert result["config"]["fsdp"] == { + "wrap_policy": "root", + "backward_prefetch": "default", + "forward_prefetch": False, + "limit_all_gathers": True, + } assert result["config"]["profile"]["nvtx_ranges"] is True assert result["config"]["profile"]["steady_state_cuda_profiler_api"] is False assert result["config"]["profile"]["step_phases"] == [ @@ -465,6 +476,238 @@ def test_raw_ptx_vector_add_artifact_and_blocker(): ptx_vector_add(a, b, torch.empty_like(a)) +def test_vectorsum_v2_benchmark_sizes_match_target_shapes(): + assert VECTOR_SUM_BENCHMARK_SIZES == ( + 1_638_400, + 3_276_800, + 6_553_600, + 13_107_200, + 26_214_400, + 52_428_800, + ) + + +def test_vectorsum_v2_torch_reference_sums_to_fp32_scalar(): + import torch + + x = torch.tensor([1.0, 16_777_216.0, -16_777_216.0], dtype=torch.float32) + out = torch.empty((1,), dtype=torch.float32) + + result = torch_vector_sum_reference(x, out) + + assert result is out + assert out.shape == (1,) + assert out.item() == pytest.approx(1.0) + + +def test_vectorsum_v2_triton_backend_rejects_cpu_before_launch(): + import torch + + x = torch.ones((8,), dtype=torch.float32) + out = torch.empty((), dtype=torch.float32) + partials = torch.empty((1,), dtype=torch.float32) + + with pytest.raises(RuntimeError, match="requires.*CUDA|requires the triton package"): + triton_vector_sum(x, out, partials) + + +def test_submission_exports_custom_kernel(): + import importlib + + submission = importlib.import_module("submission") + + assert callable(submission.custom_kernel) + + +def test_submission_custom_kernel_returns_scalar_view(monkeypatch): + import importlib + import math + import torch + + submission = importlib.import_module("submission") + + class FakeTriton: + @staticmethod + def cdiv(a, b): + return math.ceil(a / b) + + @staticmethod + def next_power_of_2(value): + return 1 << (value - 1).bit_length() + + class FakePartialKernel: + def __getitem__(self, grid): + return self + + def __call__(self, *args, **kwargs): + return None + + class FakeFinalKernel: + def __getitem__(self, grid): + return self + + def __call__(self, partials, output, *args, **kwargs): + output.reshape(-1)[0].fill_(3.0) + + monkeypatch.setattr(submission, "triton", FakeTriton) + monkeypatch.setattr(submission, "_partial_sum_kernel", FakePartialKernel()) + monkeypatch.setattr(submission, "_final_sum_kernel", FakeFinalKernel()) + monkeypatch.setattr(submission, "_PARTIALS", None) + monkeypatch.setattr(submission, "_PARTIALS_DEVICE", None) + monkeypatch.setattr(submission, "_PARTIALS_N", 0) + monkeypatch.setattr(submission, "_N_PARTIALS", 0) + monkeypatch.setattr(submission, "_FINAL_BLOCK_SIZE", 0) + monkeypatch.setattr(submission, "_GRAPH", None) + monkeypatch.setattr(submission, "_GRAPH_X", None) + monkeypatch.setattr(submission, "_GRAPH_OUTPUT", None) + monkeypatch.setattr(submission, "_GRAPH_DATA", None) + monkeypatch.setattr(submission, "_GRAPH_PARTIALS", None) + monkeypatch.setattr(submission, "_GRAPH_N", 0) + monkeypatch.setattr(submission, "_GRAPH_REPLAY", None) + monkeypatch.setattr(submission, "_GRAPH_RESULT", None) + + output = torch.empty(1, dtype=torch.float32) + result = submission.custom_kernel((torch.ones(4, dtype=torch.float32), output)) + cached_partials = submission._PARTIALS + result_again = submission.custom_kernel((torch.ones(4, dtype=torch.float32), output)) + + assert result.shape == torch.Size([]) + assert result.item() == pytest.approx(3.0) + assert result_again.shape == torch.Size([]) + assert result_again.item() == pytest.approx(3.0) + assert submission._PARTIALS is cached_partials + + +def test_submission_does_not_capture_graph_for_new_output(monkeypatch): + import importlib + import math + import types + + submission = importlib.import_module("submission") + + class FakeDevice: + type = "cuda" + index = 0 + + class FakeTensor: + def __init__(self, name, numel=1): + self.name = name + self.device = FakeDevice() + self.value = 0.0 + self._numel = numel + + def numel(self): + return self._numel + + def reshape(self, *args): + return self + + def __getitem__(self, index): + return self + + class FakeTriton: + @staticmethod + def cdiv(a, b): + return math.ceil(a / b) + + @staticmethod + def next_power_of_2(value): + return 1 << (value - 1).bit_length() + + class FakePartialKernel: + def __getitem__(self, grid): + return self + + def __call__(self, *args, **kwargs): + return None + + class FakeFinalKernel: + def __getitem__(self, grid): + return self + + def __call__(self, partials, output, *args, **kwargs): + output.value = 3.0 + + class FakeGraph: + captures = 0 + + def __init__(self): + FakeGraph.captures += 1 + + def replay(self): + return None + + class FakeGraphContext: + def __init__(self, graph): + self.graph = graph + + def __enter__(self): + return self.graph + + def __exit__(self, exc_type, exc, tb): + return False + + monkeypatch.setattr(submission, "triton", FakeTriton) + monkeypatch.setattr(submission, "_partial_sum_kernel", FakePartialKernel()) + monkeypatch.setattr(submission, "_final_sum_kernel", FakeFinalKernel()) + monkeypatch.setattr( + submission.torch, + "empty", + lambda shape, device=None, dtype=None: FakeTensor( + "empty", shape[0] if isinstance(shape, tuple) else shape + ), + ) + monkeypatch.setattr( + submission.torch, + "cuda", + types.SimpleNamespace( + CUDAGraph=FakeGraph, + graph=lambda graph: FakeGraphContext(graph), + synchronize=lambda: None, + ), + ) + + kernel = submission._make_custom_kernel() + x = FakeTensor("x", numel=4) + first_output = FakeTensor("first_output") + second_output = FakeTensor("second_output") + + kernel((x, first_output)) + kernel((x, second_output)) + + assert FakeGraph.captures == 0 + + kernel((x, second_output)) + + assert FakeGraph.captures == 1 + + +def test_vectorsum_v2_torch_benchmark_cpu_smoke(): + result = run_vector_sum_benchmark( + backend="torch", + size=64, + dtype="fp32", + repeats=1, + warmup=0, + iters=1, + device_name="cpu", + allow_cpu=True, + arch_label="a100", + ) + + assert result["benchmark"] == "vectorsum_v2" + assert validate_result_protocol(result) == [] + assert result["config"]["scope"] == "vector_sum" + assert result["config"]["backend"] == "torch" + assert result["config"]["shape"] == {"size": 64} + assert result["env"]["gpu_class"] == "a100" + assert result["correctness"]["finite_output"] is True + assert result["correctness"]["matches_reference"] is True + assert result["correctness"]["output_shape"] == [1] + assert result["metrics"]["elements"] == 64 + assert result["metrics"]["latency"]["mean_ms"] > 0 + + def test_marlin_int4_pack_round_trip_odd_columns(): import torch @@ -885,7 +1128,75 @@ def test_validate_training_result_protocol_reports_missing_fields(): ) -_FIXTURES_DIR = Path(__file__).parent.parent / "runs" / "airun" / "week1" +def _write_ncu_gemm_fixture(tmp_path: Path, name: str, top_kernel: str) -> Path: + """Write a small long-form NCU CSV shaped like the real GEMM captures. + + The real week-1 NCU CSVs are large run artifacts and are intentionally not + required for unit tests. This fixture preserves the contracts the parser + needs to support: multiple invocations per kernel, canonical metric names, + cuBLAS-style kernel names, and a dominant matmul kernel. + """ + + path = tmp_path / name + rows = [ + [ + '"ID"', + '"Kernel Name"', + '"Block Size"', + '"Grid Size"', + '"Metric Name"', + '"Metric Unit"', + '"Metric Value"', + ] + ] + + def add_invocation( + inv_id: int, + kernel: str, + *, + duration_ns: float, + sm: float, + mem: float, + dram: float, + block: str = "(384,1,1)", + grid: str = "(2,66,1)", + ) -> None: + for metric, unit, value in [ + ("gpu__time_duration.sum", "ns", duration_ns), + ("sm__throughput.avg.pct_of_peak_sustained_elapsed", "%", sm), + ("gpu__compute_memory_throughput.avg.pct_of_peak_sustained_elapsed", "%", mem), + ("dram__throughput.avg.pct_of_peak_sustained_elapsed", "%", dram), + ]: + rows.append( + [ + f'"{inv_id}"', + f'"{kernel}"', + f'"{block}"', + f'"{grid}"', + f'"{metric}"', + f'"{unit}"', + f'"{value}"', + ] + ) + + add_invocation(0, top_kernel, duration_ns=1_000_000, sm=90.0, mem=70.0, dram=16.0) + add_invocation(1, top_kernel, duration_ns=1_010_000, sm=91.0, mem=71.0, dram=15.0) + add_invocation(2, top_kernel, duration_ns=990_000, sm=89.0, mem=69.0, dram=17.0) + add_invocation( + 3, "at::vectorized_elementwise_kernel", duration_ns=3_000, sm=20, mem=72, dram=72 + ) + add_invocation(4, "at::reduce_kernel", duration_ns=2_000, sm=18, mem=75, dram=75) + add_invocation( + 5, + "at::distribution_elementwise_grid_stride_kernel", + duration_ns=1_000, + sm=74, + mem=12, + dram=3, + ) + + path.write_text("\n".join(",".join(row) for row in rows)) + return path def test_short_name_strips_void_return_type_and_template_args(): @@ -930,46 +1241,64 @@ def test_percentile_linear_interpolation_matches_numpy_default(): assert _percentile([1.0, 2.0, 3.0, 4.0], 50) == 2.5 -def test_parse_ncu_csv_full_against_h100_gemm_fixture(): - """The H100 GEMM fixture: 9 kernels, 309 invocations, 1236 metric rows. - - cuBLAS-via-nvjet should dominate (~99% of time) at ~90% SM throughput. - These numbers are stable across re-runs of the bench in week 1. - """ - summary = parse_ncu_csv_full(_FIXTURES_DIR / "torch-gemm-h100.ncu.csv") - assert summary.rows == 1236 - assert summary.unique_kernels == 9 - assert summary.total_invocations == 309 +def test_parse_ncu_csv_full_against_h100_gemm_fixture(tmp_path): + """cuBLAS-via-nvjet should dominate at roughly 90% SM throughput.""" + csv_path = _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-h100.ncu.csv", + "nvjet_hsh_128x256_64x4_2x1_v_bz_coopA_NNN", + ) + summary = parse_ncu_csv_full(csv_path) + assert summary.rows == 24 + assert summary.unique_kernels == 4 + assert summary.total_invocations == 6 assert summary.total_time_ns > 0 # Top kernel must be the cuBLAS H100 SXM5 SGEMM and ~99% of time. top = summary.kernels[0] assert top.short_name.startswith("nvjet_hsh_") - assert top.invocations == 300 + assert top.invocations == 3 pct_top = top.total_time_ns / summary.total_time_ns assert pct_top > 0.99, f"expected nvjet to dominate; got {pct_top:.2%}" # Per-metric SoL means must be in the right ballpark. sm = top.metrics["sm__throughput.avg.pct_of_peak_sustained_elapsed"] assert 80 < sm.mean < 100 - assert sm.samples == 300 + assert sm.samples == 3 -def test_parse_ncu_csv_full_against_a100_gemm_fixture(): +def test_parse_ncu_csv_full_against_a100_gemm_fixture(tmp_path): """A100 GEMM: dominated by `ampere_fp16_s16816gemm_*` (cuBLAS pre-Hopper).""" - summary = parse_ncu_csv_full(_FIXTURES_DIR / "torch-gemm-a100.ncu.csv") - assert summary.rows == 1236 + csv_path = _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-a100.ncu.csv", + "ampere_fp16_s16816gemm_fp16_256x128_ldg8_f2f_stages_32x3_nn", + ) + summary = parse_ncu_csv_full(csv_path) + assert summary.rows == 24 top = summary.kernels[0] assert "ampere" in top.short_name and "gemm" in top.short_name - assert top.invocations == 300 + assert top.invocations == 3 -def test_parse_ncu_csv_full_against_h200_gemm_fixture_uses_different_nvjet_variant(): +def test_parse_ncu_csv_full_against_h200_gemm_fixture_uses_different_nvjet_variant(tmp_path): """H200 picks a different cuBLAS tile shape than H100 (256x128 vs 128x256). This test exists because catching that difference is exactly the kind of insight the tool is supposed to enable. """ - h100 = parse_ncu_csv_full(_FIXTURES_DIR / "torch-gemm-h100.ncu.csv") - h200 = parse_ncu_csv_full(_FIXTURES_DIR / "torch-gemm-h200.ncu.csv") + h100 = parse_ncu_csv_full( + _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-h100.ncu.csv", + "nvjet_hsh_128x256_64x4_2x1_v_bz_coopA_NNN", + ) + ) + h200 = parse_ncu_csv_full( + _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-h200.ncu.csv", + "nvjet_hsh_256x128_64x4_1x2_h_bz_coopA_NNT", + ) + ) h100_top = h100.kernels[0].short_name h200_top = h200.kernels[0].short_name assert h100_top.startswith("nvjet_hsh_") and h200_top.startswith("nvjet_hsh_") @@ -1034,19 +1363,25 @@ def test_parse_ncu_csv_full_pivots_multiple_invocations_into_one_kernel_row(tmp_ assert sm.max == 70.0 -def test_format_summary_text_renders_top_n_table_and_truncation_notice(): - summary = parse_ncu_csv_full(_FIXTURES_DIR / "torch-gemm-h100.ncu.csv") +def test_format_summary_text_renders_top_n_table_and_truncation_notice(tmp_path): + summary = parse_ncu_csv_full( + _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-h100.ncu.csv", + "nvjet_hsh_128x256_64x4_2x1_v_bz_coopA_NNN", + ) + ) out = format_summary_text(summary, top_n=3) # Header lines. assert "NCU summary:" in out - assert "rows=1236" in out - assert "unique_kernels=9" in out + assert "rows=24" in out + assert "unique_kernels=4" in out # Column header. assert "kernel" in out and "SM%" in out and "DRAM%" in out # Top kernel rendered. assert "nvjet_hsh_" in out - # Truncation notice for the 6 kernels not shown. - assert "6 more kernels not shown" in out + # Truncation notice for the 1 kernel not shown. + assert "1 more kernels not shown" in out def test_format_summary_text_handles_empty_summary_gracefully(tmp_path): @@ -1063,23 +1398,28 @@ def test_format_summary_text_handles_empty_summary_gracefully(tmp_path): # --------------------------------------------------------------------------- -def test_ncu_summary_cli_prints_table_and_returns_zero(capsys): +def test_ncu_summary_cli_prints_table_and_returns_zero(tmp_path, capsys): from swordfish.runner import cli + csv_path = _write_ncu_gemm_fixture( + tmp_path, + "torch-gemm-h100.ncu.csv", + "nvjet_hsh_128x256_64x4_2x1_v_bz_coopA_NNN", + ) rc = cli.main( [ "ncu-summary", - str(_FIXTURES_DIR / "torch-gemm-h100.ncu.csv"), + str(csv_path), "--top", - "5", + "3", ] ) out = capsys.readouterr().out assert rc == 0 assert "NCU summary:" in out assert "nvjet_hsh_" in out - # --top 5 means 4 kernels not shown (9 total). - assert "4 more kernels not shown" in out + # --top 3 means 1 kernel not shown (4 total). + assert "1 more kernels not shown" in out def test_ncu_summary_cli_returns_nonzero_on_unparseable_csv(tmp_path, capsys):