Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,26 @@ hpc_submission_settings832:
conda_env_path: /global/cfs/cdirs/als/data_mover/8.3.2/envs/dino_demo
seg_scripts_dir: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/moon_seg/forge_feb_seg_model_demo/
dino_checkpoint_path: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/dino/best_moon.ckpt

# ── LEAF SEGMENTATION SETTINGS ───────────────────────────────────────────
nersc_segmentation_dinov3_leaf:
# ── SLURM resource allocation ─────────────────────────────────────────────
qos: realtime
account: als
constraint: gpu
reservation: ""
num_nodes: 4
ntasks-per-node: 1
nproc_per_node: 4
gpus-per-node: 4
cpus-per-task: 128
walltime: "00:59:00"
# ── Inference parameters ──────────────────────────────────────────────────
script_name: "src.inference_dino_v3"
project: "leaf"
batch_size: 4
# ── Paths ─────────────────────────────────────────────────────────────────
cfs_path: /global/cfs/cdirs/als/data_mover/8.3.2
conda_env_path: /global/cfs/cdirs/als/data_mover/8.3.2/envs/dino_demo
seg_scripts_dir: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/leaf_seg/forge_feb_seg_model_demo/
dino_checkpoint_path: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_segmentation_scripts/dino/best_leaf.ckpt
137 changes: 137 additions & 0 deletions orchestration/_tests/test_bl832/test_nersc.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ def mock_config832(mocker):
"conda_env_path": "/mock/conda/combine",
"seg_scripts_dir": "/mock/seg_scripts/combine",
}
mock_config.nersc_segment_dinov3_leaf_settings = {
"qos": "regular",
"account": "mock_account",
"constraint": "gpu",
"reservation": "",
"num_nodes": 4,
"ntasks-per-node": 1,
"nproc_per_node": 4,
"gpus-per-node": 4,
"cpus-per-task": 128,
"walltime": "00:59:00",
"batch_size": 4,
"cfs_path": "/mock/cfs",
"conda_env_path": "/mock/conda/dino_leaf",
"seg_scripts_dir": "/mock/seg_scripts/dino_leaf",
"dino_checkpoint_path": "/mock/dino_leaf.ckpt",
}

mocker.patch("orchestration.flows.bl832.nersc.Config832", return_value=mock_config)
return mock_config
Expand Down Expand Up @@ -794,3 +811,123 @@ def test_moon_segment_flow_no_sam3_no_combine(mocker, mock_config832, mock_recon

mock_sam3_task.submit.assert_not_called()
mock_combine_task.submit.assert_not_called()


# ──────────────────────────────────────────────────────────────────────────────
# segmentation_dinov3 with project="leaf" (controller level)
# ──────────────────────────────────────────────────────────────────────────────

def test_segmentation_dinov3_leaf_success(mocker, mock_sfapi_client, mock_config832):
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)

result = controller.segmentation_dinov3(recon_folder_path="folder/recfile", project="leaf")

mock_sfapi_client.compute.assert_called_with(Machine.perlmutter)
mock_sfapi_client.compute.return_value.submit_job.assert_called_once()
assert result is True


def test_segmentation_dinov3_leaf_submission_failure(mocker, mock_sfapi_client, mock_config832):
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mocker.patch("orchestration.flows.bl832.nersc.Variable.get", return_value={"defaults": True})
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("No GPU nodes")
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)

result = controller.segmentation_dinov3(recon_folder_path="folder/recfile", project="leaf")

assert result is False


# ──────────────────────────────────────────────────────────────────────────────
# nersc_leaf_segment_flow (recon + DINOv3-leaf only, no SAM3, no combine)
# ──────────────────────────────────────────────────────────────────────────────

def test_leaf_segment_flow_succeeds(mocker, mock_config832, mock_recon_success):
"""Recon + DINOv3-leaf both succeed → flow returns True."""
from orchestration.flows.bl832.nersc import nersc_leaf_segment_flow

mock_controller = mocker.MagicMock()
mock_controller.reconstruct.return_value = mock_recon_success
mocker.patch("orchestration.flows.bl832.nersc.get_controller", return_value=mock_controller)

mock_globus_transfer = mocker.patch("orchestration.flows.bl832.nersc.globus_transfer_task")
mock_globus_transfer.submit.return_value = _make_future(mocker, True)

mocker.patch("orchestration.flows.bl832.nersc.get_prune_controller", return_value=mocker.MagicMock())

mock_dinov3_task = mocker.patch("orchestration.flows.bl832.nersc.nersc_segmentation_dinov3_task")
mock_dinov3_task.submit.return_value = _make_future(mocker, True)

result = nersc_leaf_segment_flow(file_path="folder/file.h5", num_nodes=4, config=None)

assert result is True
mock_controller.reconstruct.assert_called_once()
mock_dinov3_task.submit.assert_called_once_with(
recon_folder_path="folder/recfile", config=mock_config832, project="leaf"
)


def test_leaf_segment_flow_seg_failure(mocker, mock_config832, mock_recon_success):
"""Recon succeeds but DINOv3-leaf fails → flow returns False."""
from orchestration.flows.bl832.nersc import nersc_leaf_segment_flow

mock_controller = mocker.MagicMock()
mock_controller.reconstruct.return_value = mock_recon_success
mocker.patch("orchestration.flows.bl832.nersc.get_controller", return_value=mock_controller)

mock_globus_transfer = mocker.patch("orchestration.flows.bl832.nersc.globus_transfer_task")
mock_globus_transfer.submit.return_value = _make_future(mocker, False)

mocker.patch("orchestration.flows.bl832.nersc.get_prune_controller", return_value=mocker.MagicMock())

mock_dinov3_task = mocker.patch("orchestration.flows.bl832.nersc.nersc_segmentation_dinov3_task")
mock_dinov3_task.submit.return_value = _make_future(mocker, False)

result = nersc_leaf_segment_flow(file_path="folder/file.h5", num_nodes=4, config=None)

assert result is False


def test_leaf_segment_flow_recon_failure(mocker, mock_config832):
"""Recon failure should raise ValueError immediately."""
from orchestration.flows.bl832.nersc import nersc_leaf_segment_flow

mock_controller = mocker.MagicMock()
mock_controller.reconstruct.return_value = {"success": False, "job_id": None, "timing": None}
mocker.patch("orchestration.flows.bl832.nersc.get_controller", return_value=mock_controller)
mocker.patch("orchestration.flows.bl832.nersc.globus_transfer_task")
mocker.patch("orchestration.flows.bl832.nersc.get_prune_controller", return_value=mocker.MagicMock())

with pytest.raises(ValueError, match="Reconstruction at NERSC failed"):
nersc_leaf_segment_flow(file_path="folder/file.h5", num_nodes=4, config=None)


def test_leaf_segment_flow_no_sam3_no_combine(mocker, mock_config832, mock_recon_success):
"""SAM3 and combine tasks should never be called in the leaf flow."""
from orchestration.flows.bl832.nersc import nersc_leaf_segment_flow

mock_controller = mocker.MagicMock()
mock_controller.reconstruct.return_value = mock_recon_success
mocker.patch("orchestration.flows.bl832.nersc.get_controller", return_value=mock_controller)

mock_globus_transfer = mocker.patch("orchestration.flows.bl832.nersc.globus_transfer_task")
mock_globus_transfer.submit.return_value = _make_future(mocker, True)

mocker.patch("orchestration.flows.bl832.nersc.get_prune_controller", return_value=mocker.MagicMock())

mock_sam3_task = mocker.patch("orchestration.flows.bl832.nersc.nersc_segmentation_sam3_task")
mock_combine_task = mocker.patch("orchestration.flows.bl832.nersc.nersc_combine_segmentations_task")
mock_dinov3_task = mocker.patch("orchestration.flows.bl832.nersc.nersc_segmentation_dinov3_task")
mock_dinov3_task.submit.return_value = _make_future(mocker, True)

nersc_leaf_segment_flow(file_path="folder/file.h5", num_nodes=4, config=None)

mock_sam3_task.submit.assert_not_called()
mock_combine_task.submit.assert_not_called()
1 change: 1 addition & 0 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ def _beam_specific_config(self) -> None:
self.nersc_segment_dinov3_settings = self.config["hpc_submission_settings832"]["nersc_segmentation_dinov3"]
self.nersc_combine_segmentation_settings = self.config["hpc_submission_settings832"]["nersc_combine_segmentations"]
self.nersc_segment_dinov3_moon_settings = self.config["hpc_submission_settings832"]["nersc_segmentation_dinov3_moon"]
self.nersc_segment_dinov3_leaf_settings = self.config["hpc_submission_settings832"]["nersc_segmentation_dinov3_leaf"]
13 changes: 13 additions & 0 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class FlowParameterMapper:
"num_nodes",
"config"],
"nersc_moon_segment_flow/nersc_moon_segment_flow": [
"file_path",
"num_nodes",
"config"],
"nersc_leaf_segment_flow/nersc_leaf_segment_flow": [
"file_path",
"num_nodes",
"config"]
Expand Down Expand Up @@ -70,6 +74,7 @@ def setup_decision_settings(
nersc_recon: bool,
nersc_petiole_segment: bool,
nersc_moon_segment: bool,
nersc_leaf_segment: bool,
new_file_832: bool
) -> dict:
"""
Expand All @@ -88,13 +93,15 @@ def setup_decision_settings(
f"nersc_recon={nersc_recon}, "
f"nersc_petiole_segment={nersc_petiole_segment}, "
f"nersc_moon_segment={nersc_moon_segment}, "
f"nersc_leaf_segment={nersc_leaf_segment}, "
f"new_file_832={new_file_832}")
# Define which flows to run based on the input settings
settings = {
"alcf_recon_flow/alcf_recon_flow": alcf_recon,
"nersc_recon_flow/nersc_recon_flow": nersc_recon,
"nersc_petiole_segment_flow/nersc_petiole_segment_flow": nersc_petiole_segment,
"nersc_moon_segment_flow/nersc_moon_segment_flow": nersc_moon_segment,
"nersc_leaf_segment_flow/nersc_leaf_segment_flow": nersc_leaf_segment,
"new_832_file_flow/new_file_832": new_file_832
}
# Save the settings in a JSON block for later retrieval by other flows
Expand Down Expand Up @@ -186,6 +193,12 @@ async def dispatcher(
)
tasks.append(run_recon_flow_async("nersc_moon_segment_flow/nersc_moon_segment_flow", moon_params))

if decision_settings.get("nersc_leaf_segment_flow/nersc_leaf_segment_flow"):
leaf_params = FlowParameterMapper.get_flow_parameters(
"nersc_leaf_segment_flow/nersc_leaf_segment_flow", available_params
)
tasks.append(run_recon_flow_async("nersc_leaf_segment_flow/nersc_leaf_segment_flow", leaf_params))

# Run ALCF and NERSC flows in parallel, if any
if tasks:
try:
Expand Down
Loading
Loading