diff --git a/.env.example b/.env.example index e3728e89..9db497e7 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,9 @@ +BEAMLINE=<832> GLOBUS_CLIENT_ID= GLOBUS_CLIENT_SECRET= PREFECT_API_URL= PREFECT_API_KEY= PUSHGATEWAY_URL= JOB_NAME= -INSTANCE_LABEL= \ No newline at end of file +INSTANCE_LABEL= +TILED_URI= \ No newline at end of file diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 1164dede..6345a494 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -17,13 +17,11 @@ jobs: with: python-version: 3.11 cache: 'pip' + cache-dependency-path: pyproject.toml - name: Install dependencies run: | python -m pip install --no-cache-dir --upgrade pip - pip install --no-cache-dir flake8 pytest - pip install --no-cache-dir . - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + pip install -e . --group dev - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/config.yml b/config.yml index 4e3c6ed2..9102ce0e 100644 --- a/config.yml +++ b/config.yml @@ -139,6 +139,18 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 + bl832-beegfs-raw: + root_path: /global/beegfs/beamlines/bl832/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-raw + + bl832-beegfs-scratch: + root_path: /global/beegfs/beamlines/bl832/scratch/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-scratch + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index c75317e1..beb21965 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -150,6 +150,8 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_scratch")), "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")), + "beegfs_scratch": MockEndpoint("mock_beegfs_scratch_path", MockSecret.for_endpoint("beegfs_scratch")) } # Mock apps @@ -169,6 +171,8 @@ def __init__(self) -> None: self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] + self.beegfs_raw = self.endpoints["beegfs_raw"] + self.beegfs_scratch = self.endpoints["beegfs_scratch"] self.scicat = config["scicat"] @@ -202,10 +206,16 @@ async def mock_run_deployment(*args, **kwargs): # Mock asyncio.gather to avoid actual async task execution async def mock_gather(*args, **kwargs): + # Await any coroutines so we don't leak warnings, but don't care about results + for arg in args: + if asyncio.iscoroutine(arg): + try: + await arg + except Exception: + pass return [None] mocker.patch('asyncio.gather', new=mock_gather) - # Import decision flow after mocking the necessary components from orchestration.flows.bl832.dispatcher import dispatcher @@ -250,6 +260,8 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_recon_scripts": mocker.MagicMock(), "alcf832_raw": mocker.MagicMock(), "alcf832_scratch": mocker.MagicMock(), + "bl832-beegfs-raw": mocker.MagicMock(), + "bl832-beegfs-scratch": mocker.MagicMock() } ) mocker.patch( @@ -306,6 +318,17 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=True ) + mocker.patch( + "orchestration.flows.bl832.alcf.schedule_prefect_flow", + return_value=None + ) + + # Patch ingestion to Tiled + mocker.patch( + "orchestration.flows.bl832.alcf.register_file_to_tiled", + return_value=None + ) + file_path = "/global/raw/transfer_tests/test.h5" # ---------- CASE 1: SUCCESS PATH ---------- @@ -315,7 +338,7 @@ def test_alcf_recon_flow(mocker: MockFixture): result = alcf_recon_flow(file_path=file_path, config=mock_config) assert result is True, "Flow should return True if HPC + Tiff->Zarr + transfers all succeed" - assert mock_transfer_controller.copy.call_count == 3, "Should do 3 transfers in success path" + assert mock_transfer_controller.copy.call_count == 4, "Should do 4 transfers in success path" mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() mock_schedule_pruning.assert_called_once() diff --git a/orchestration/_tests/test_tiled.py b/orchestration/_tests/test_tiled.py new file mode 100644 index 00000000..58e1e2e8 --- /dev/null +++ b/orchestration/_tests/test_tiled.py @@ -0,0 +1,473 @@ +"""Unit tests for ``orchestration.tiled``. + +Tests run inside a Prefect ephemeral server (``prefect_test_harness``) so the +real task engine is exercised, but Tiled itself is mocked — no real Tiled +server, no network calls. + +Coverage: + - ``register_file_to_tiled``: h5/zarr branch, TIFF directory branch, + no-tags skip, ``str`` path coercion, missing-entry warning, register failure. + - ``_apply_tags``: existing access_blob (replace), no blob (add), tag + deduplication, ``patch_metadata`` failure swallowed. + - ``check_tags``: h5 ok, h5 tag-missing, TIFF dir uses first child, + no access_blob, missing entry raises ``KeyError``. + - ``register_to_tiled`` flow: delegates with the right args, coerces ``str``. + +The async/sync bridge inside ``register_file_to_tiled`` (``run_coro_as_sync`` +calling Tiled's async ``register``) is exercised on every happy path: tests +patch ``register`` as an ``AsyncMock`` and let ``run_coro_as_sync`` actually +drive it. The error-path test patches ``run_coro_as_sync`` directly to inject +a failure. +""" +import warnings + +from prefect.testing.utilities import prefect_test_harness +import pytest +from pytest_mock import MockFixture + +from orchestration.tiled import ( + _apply_tags, + check_tags, + register_file_to_tiled, + register_to_tiled, +) + +warnings.filterwarnings("ignore", category=DeprecationWarning) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """Provide an ephemeral Prefect server for the whole test session.""" + with prefect_test_harness(): + yield + + +@pytest.fixture(autouse=True) +def _tiled_uri_env(monkeypatch): + """Set ``TILED_URI`` so ``os.environ[...]`` reads in the module don't error.""" + monkeypatch.setenv("TILED_URI", "http://tiled.test") + + +# --------------------------------------------------------------------------- +# Mock Tiled node +# --------------------------------------------------------------------------- + +class MockNode: + """Stand-in for a Tiled node. + + Strict by design: ``node[missing_key]`` raises ``KeyError`` to match real + Tiled behavior, so tests for the missing-entry warning path actually work. + Build prefix chains explicitly with ``add_child`` or the + ``build_prefix_chain`` helper below. + """ + + def __init__(self, access_blob=None, uri="http://tiled.test/node"): + self.access_blob = access_blob + self.uri = uri + self._children: dict[str, "MockNode"] = {} + self.patch_calls: list[dict] = [] + + def add_child(self, key: str, node: "MockNode") -> "MockNode": + self._children[key] = node + return node + + def __getitem__(self, key): + # Strict: missing keys raise like a real Tiled node would + return self._children[key] + + def __iter__(self): + return iter(self._children) + + def patch_metadata(self, **kwargs): + self.patch_calls.append(kwargs) + + +def build_prefix_chain(segments: list[str], leaf: MockNode) -> MockNode: + """Build ``client[seg1][seg2]...[segN] -> leaf`` and return the root. + + Used to mirror the prefix-navigation loop in ``register_file_to_tiled`` + and ``check_tags`` (``for segment in prefix.strip("/").split("/")``). + """ + current = leaf + for segment in reversed(segments): + parent = MockNode() + parent.add_child(segment, current) + current = parent + return current + + +# --------------------------------------------------------------------------- +# _apply_tags +# --------------------------------------------------------------------------- + +def test_apply_tags_uses_add_op_when_no_existing_blob(): + """No access_blob present → JSON Patch op is ``add``.""" + node = MockNode(access_blob=None) + _apply_tags(entry_node=node, tags=["bl832"]) + + assert len(node.patch_calls) == 1 + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "add" + assert patch["path"] == "" + assert set(patch["value"]["tags"]) == {"bl832"} + + +def test_apply_tags_uses_replace_op_when_blob_exists(): + """Existing access_blob → op is ``replace`` and tags are merged.""" + node = MockNode(access_blob={"tags": ["existing"]}) + _apply_tags(entry_node=node, tags=["bl832"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "replace" + assert set(patch["value"]["tags"]) == {"existing", "bl832"} + + +def test_apply_tags_deduplicates_overlapping_tags(): + """Tag merging is a set union, so overlap doesn't produce duplicates.""" + node = MockNode(access_blob={"tags": ["bl832", "old"]}) + _apply_tags(entry_node=node, tags=["bl832", "new"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + merged = patch["value"]["tags"] + assert set(merged) == {"bl832", "old", "new"} + assert len(merged) == 3 # no duplicates + + +def test_apply_tags_swallows_patch_metadata_failure(mocker: MockFixture): + """If ``patch_metadata`` raises, ``_apply_tags`` logs but does not propagate.""" + node = MockNode(access_blob={"tags": []}) + mock_patch_metadata = mocker.patch.object( + node, "patch_metadata", side_effect=RuntimeError("permission denied") + ) + + # Should not raise + _apply_tags(entry_node=node, tags=["bl832"]) + + # Verify the call actually happened — the test is about swallowing, not skipping + mock_patch_metadata.assert_called_once() + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — happy paths (exercise run_coro_as_sync bridge) +# --------------------------------------------------------------------------- + +def test_register_h5_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """For an .h5 file, the entry keyed by ``path.stem`` should be tagged.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw", "bl832"], + ) + + fake_register.assert_awaited_once() + assert len(entry_node.patch_calls) == 1 + tags_written = entry_node.patch_calls[0]["access_blob_patch"][0]["value"]["tags"] + assert set(tags_written) == {"raw", "bl832"} + + +def test_register_zarr_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """A .zarr store follows the same stem-keyed lookup as .h5.""" + zarr = tmp_path / "sample.zarr" + # Real .zarr is a directory, but the code path is suffix-driven, so a file is fine + zarr.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("sample", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=zarr, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_no_tags_skips_apply_tags(mocker: MockFixture, tmp_path): + """When ``tags`` is None/empty, the tag-application branch is skipped entirely.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + prefix_node = MockNode() + mocker.patch("orchestration.tiled.from_uri", return_value=prefix_node) + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled(path=h5) + + fake_register.assert_awaited_once() + assert prefix_node.patch_calls == [] + + +def test_register_string_path_is_coerced(mocker: MockFixture, tmp_path): + """Passing ``path`` as a ``str`` must be coerced to ``Path``. + + Regression test: without ``Path(path)``, the ``path.is_dir()`` and + ``path.suffix`` calls explode with ``AttributeError``. + """ + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + # Pass a string, not a Path — would fail without the Path(path) coercion + register_file_to_tiled( + path=str(h5), + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_tiff_dir_tags_each_child(mocker: MockFixture, tmp_path): + """A directory with no suffix should tag every child under the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + (tiff_dir / "frame_0001.tiff").touch() + + child_a = MockNode(access_blob=None) + child_b = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", child_a) + prefix_node.add_child("frame_0001", child_b) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(child_a.patch_calls) == 1 + assert len(child_b.patch_calls) == 1 + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — edge cases +# --------------------------------------------------------------------------- + +def test_register_missing_entry_after_register_logs_warning(mocker: MockFixture, tmp_path): + h5 = tmp_path / "missing.h5" + h5.touch() + + prefix_node = MockNode() + prefix_node.add_child("other-entry", MockNode()) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + # Patch the specific logger instance returned by get_run_logger + # rather than the function itself, to avoid Prefect engine confusion + import logging + mock_logger = mocker.MagicMock(spec=logging.Logger) + mocker.patch("orchestration.tiled.get_run_logger", return_value=mock_logger) + + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + mock_logger.warning.assert_called_once() + warning_msg = mock_logger.warning.call_args[0][0] + assert "missing" in warning_msg + assert "other-entry" in warning_msg + + +def test_register_raises_runtime_error_on_failure(mocker: MockFixture, tmp_path): + h5 = tmp_path / "scan.h5" + h5.touch() + + mocker.patch("orchestration.tiled.from_uri", return_value=MockNode()) + mocker.patch("orchestration.tiled.register", return_value=None) # plain MagicMock, not AsyncMock + mocker.patch( + "orchestration.tiled.run_coro_as_sync", + side_effect=Exception("connection refused"), + ) + + with pytest.raises(RuntimeError, match="Failed to register .* connection refused"): + register_file_to_tiled(path=h5) + + +# --------------------------------------------------------------------------- +# check_tags +# --------------------------------------------------------------------------- + +def test_check_tags_returns_true_when_all_expected_present(mocker: MockFixture, tmp_path): + """Subset semantics: ok iff expected_tags <= actual_tags.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832", "raw", "extra"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is True + assert set(actual) == {"bl832", "raw", "extra"} + + +def test_check_tags_returns_false_when_tag_missing(mocker: MockFixture, tmp_path): + """A single missing expected tag flips ok to False.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is False + assert actual == ["bl832"] + + +def test_check_tags_tiff_dir_uses_first_child(mocker: MockFixture, tmp_path): + """For a TIFF dir, ``check_tags`` reads the first child of the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + + first_child = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", first_child) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + expected_tags={"bl832"}, + ) + + assert ok is True + assert actual == ["bl832"] + + +def test_check_tags_no_access_blob_returns_empty(mocker: MockFixture, tmp_path): + """When ``access_blob`` is None, actual tags should be ``[]``.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + assert ok is False + assert actual == [] + + +def test_check_tags_missing_entry_raises_key_error(mocker: MockFixture, tmp_path): + """If the stem isn't present under the prefix, the lookup raises ``KeyError``.""" + h5 = tmp_path / "missing.h5" + h5.touch() + + prefix_node = MockNode() + prefix_node.add_child("other", MockNode()) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + with pytest.raises(KeyError): + check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + +# --------------------------------------------------------------------------- +# register_to_tiled flow +# --------------------------------------------------------------------------- + +def test_register_to_tiled_flow_delegates_to_task(mocker: MockFixture, tmp_path): + """The flow is a thin wrapper that just calls the task with the same args.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled( + path=h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + mock_task.assert_called_once_with( + h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + +def test_register_to_tiled_flow_coerces_string_path(mocker: MockFixture): + """Passing a string should be coerced to ``Path`` before forwarding.""" + from pathlib import Path + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled(path="/data/sample.h5", prefix="any") + + forwarded_path = mock_task.call_args.args[0] + assert isinstance(forwarded_path, Path) + assert forwarded_path == Path("/data/sample.h5") diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac2..28ac7813 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -14,6 +14,7 @@ from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.prefect import schedule_prefect_flow +from orchestration.tiled import register_file_to_tiled class ALCFTomographyHPCController(TomographyHPCController): @@ -456,6 +457,23 @@ def alcf_recon_flow( destination=config.data832_scratch ) + beegfs_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_scratch, + destination=config.beegfs_scratch + ) + + if beegfs_zarr_transfer_success: + logger.info("Successfully transferred Zarr to beegfs. Now ingesting to Tiled.") + register_file_to_tiled( + path=Path(config.beegfs_scratch.root_path+scratch_path_zarr), + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["8.3.2", folder_name], + ) + else: + logger.error("Failed to transfer Zarr to beegfs, skipping registration to Tiled.") + # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 16b03629..b07df64c 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,6 +27,8 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.beegfs_raw = self.endpoints["bl832-beegfs-raw"] + self.beegfs_scratch = self.endpoints["bl832-beegfs-scratch"] # SciCat self.scicat = self.config["scicat"] # NERSC HPC submission settings diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index e5e10b02..1d9ea158 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,8 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +# from orchestration.tiled import register_file_to_tiled +# from orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") @@ -156,6 +158,29 @@ def process_new_832_file_task( except Exception as e: logger.error(f"SciCat ingest failed with {e}") + # Holding off from moving and registering Raw Data to Beegfs Tiled for storage concerns. + + # transfer_controller = get_transfer_controller( + # transfer_type=CopyMethod.GLOBUS, + # config=config, + # prometheus_metrics=None + # ) + + # transfer_controller.copy( + # file_path=relative_path, + # source=config.data832, + # destination=config.beegfs_raw + # ) + # logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + + # register_file_to_tiled( + # path=Path(config.beegfs_raw.root_path+relative_path), + # prefix="beamlines/bl832/raw", + # overwrite=False, + # tags=["raw", "8.3.2"], + # ) + # TODO: find proposal id in h5, make that a tag + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 42378d70..c89fff54 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -16,12 +16,13 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController -from orchestration.prune_controller import get_prune_controller, PruneMethod -from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.flows.bl832.streaming_mixin import ( NerscStreamingMixin, SlurmJobBlock, cancellation_hook, monitor_streaming_job, save_block ) from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import get_prune_controller, PruneMethod +from orchestration.tiled import register_file_to_tiled +from orchestration.transfer_controller import get_transfer_controller, CopyMethod logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -1564,6 +1565,37 @@ def nersc_recon_flow( destination=config.data832_scratch ) + logger.info("Copy from NERSC /global/cfs/cdirs/als/data_mover/8.3.2/scratch to beegfs") + # Holding off on copying tiffs to beegfs for now since they are large and we may not need them all. + # We can always copy them later if needed. + # transfer_controller.copy( + # file_path=tiff_file_path, + # source=config.nersc832_alsdev_pscratch_scratch, + # destination=config.beegfs_scratch + # ) + + # Register the reconstructed TIFFs in tiled + # register_file_to_tiled( + # path=Path(config.beegfs_scratch.root_path+tiff_file_path), + # prefix="beamlines/bl832/scratch", + # overwrite=False, + # tags=["scratch", "8.3.2", folder_name], + # ) + + # Register the reconstructed Zarr in tiled + transfer_controller.copy( + file_path=zarr_file_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.beegfs_scratch + ) + + register_file_to_tiled( + path=Path(config.beegfs_scratch.root_path+zarr_file_path), + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["8.3.2", folder_name], + ) + logger.info("Scheduling pruning tasks.") schedule_pruning( config=config, diff --git a/orchestration/tiled.py b/orchestration/tiled.py new file mode 100644 index 00000000..90687c7b --- /dev/null +++ b/orchestration/tiled.py @@ -0,0 +1,187 @@ +"""Register data files and Zarr/HDF5 stores to the Tiled catalog. + +Intended to run on a Ride worker. The file path must be accessible +to the Tiled server's filesystem. +""" +from dotenv import load_dotenv +import os +from pathlib import Path + +from prefect import flow, get_run_logger, task +from prefect.utilities.asyncutils import run_coro_as_sync +from tiled.client import from_uri +from tiled.client.register import register + + +load_dotenv() + + +@task(name="register-file-to-tiled", task_run_name="register-{path}") +def register_file_to_tiled( + path: Path | str, + prefix: str | None = None, + overwrite: bool = False, + tags: list[str] | None = None, +) -> None: + logger = get_run_logger() + path = Path(path) + tiled_uri = os.environ["TILED_URI"] + + client = from_uri(tiled_uri) + + logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") + try: + run_coro_as_sync( # Bridge synchronous Prefect task to async Tiled client method + register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite, + ) + ) + except Exception as e: + raise RuntimeError( + f"Failed to register {path} to Tiled catalog at {tiled_uri} " + f"(prefix={prefix!r}): {e}" + ) from e + + if not tags: + return + + # Navigate to prefix node after registration + node = client + for segment in (prefix or "").strip("/").split("/"): + if segment: + node = node[segment] + + if path.is_dir() and not path.suffix: + # TIFF directory: Tiled registers each file flat into the prefix node + for key in node: + _apply_tags(entry_node=node[key], tags=tags) + else: + # .h5 or .zarr: registered as single node, keyed by stem + # Even on COLLISION the entry exists — just try it directly + entry_key = path.stem + logger.info(f"Looking up entry key {entry_key!r} under {prefix!r}") + try: + _apply_tags(entry_node=node[entry_key], tags=tags) + except KeyError: + # Key not found even after registration — log all available keys to diagnose + available = sorted(node) + logger.warning( + f"Entry {entry_key!r} not found under {prefix!r}. " + f"Available keys: {available}" + ) + + +@task(name="apply-tags", task_run_name="apply-tags-{tags}") +def _apply_tags(entry_node, tags: list[str]) -> None: + logger = get_run_logger() + existing_blob = entry_node.access_blob + existing_tags = (existing_blob or {}).get("tags", []) + merged_tags = list(set(existing_tags) | set(tags)) + op = "replace" if existing_blob is not None else "add" + try: + # entry_node.update_metadata(access_tags=merged_tags) + entry_node.patch_metadata( + access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], + ) + + logger.debug(f"Tagged {entry_node.uri} with {merged_tags}") + except Exception as e: + logger.debug(f"Could not tag {entry_node.uri}: {e}") + + +@task(name="check-tiled-tags", task_run_name="check-tags-{path}") +def check_tags( + path: Path | str, + prefix: str, + expected_tags: set[str], +) -> tuple[bool, list[str]]: + """Check whether a registered dataset has the expected tags applied. + + Navigates to the entry corresponding to ``path`` under ``prefix`` in the + Tiled catalog and compares its ``access_blob.tags`` against ``expected_tags``. + + For TIFF directories (a directory with no suffix), the first child entry + under the prefix node is checked, since ``register`` registers each TIFF + file flat into the prefix node. + + Args: + path: Path to the file or store that was registered. + prefix: Sub-path within the Tiled catalog where the entry was registered. + expected_tags: Tags that must be present on the entry. + + Returns: + A tuple ``(ok, actual_tags)`` where ``ok`` is True iff every tag in + ``expected_tags`` is present in the entry's ``access_blob.tags``. + + Raises: + KeyError: If the entry cannot be located under ``prefix``. + """ + logger = get_run_logger() + path = Path(path) + tiled_uri = os.environ["TILED_URI"] + client = from_uri(tiled_uri) + + # Navigate to the prefix node + node = client + for segment in prefix.strip("/").split("/"): + if segment: + node = node[segment] + + # For TIFF directories, register flattens files into the prefix node; + # for .h5 / .zarr, the entry is keyed by the path stem. + if path.is_dir() and not path.suffix: + key = next(iter(node)) + node = node[key] + else: + node = node[path.stem] + + actual = node.access_blob.get("tags", []) if node.access_blob else [] + ok = expected_tags <= set(actual) + logger.info( + f"{path.name} under {prefix!r}: " + f"expected={sorted(expected_tags)} actual={actual} ok={ok}" + ) + return ok, actual + + +@flow(name="register-to-tiled", flow_run_name="register-{path}") +def register_to_tiled( + path: Path | str, + prefix: str | None = None, + overwrite: bool = False, + tags: list[str] | None = None, +) -> None: + """Register a file or Zarr store to the Tiled server. + + Args: + path: Path to the file or Zarr store (client filesystem). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. + tags: Optional list of tags to apply to the registered entry. + """ + logger = get_run_logger() + path = Path(path) + logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") + register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) + + +if __name__ == "__main__": + h5 = Path(os.environ["EXAMPLE_H5_PATH"]) + tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) + zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) + + cases = [ + (h5, "beamlines/bl832/raw/", {"bl832"}), + (tiffs, "beamlines/bl832/scratch", {"bl832", "dabramov"}), + (zarr, "beamlines/bl832/scratch", {"bl832"}), + ] + + for path, prefix, tags in cases: + register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False) + + for path, prefix, expected in cases: + ok, actual = check_tags(path, prefix, expected) + print(f"{'✓' if ok else '✗'} {path.name}: expected={sorted(expected)} actual={actual}") diff --git a/pyproject.toml b/pyproject.toml index 7fee39f6..abf94468 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.11" dependencies = [ "authlib", + "dynaconf", "globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk", "globus-sdk>=3.0", "griffe>=0.49.0,<2.0.0", @@ -22,18 +23,20 @@ dependencies = [ "python-dotenv", "pyyaml", "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", - "sfapi_client" + "sfapi_client", + "tiled[client]", + "watchfiles" ] -[build-system] -requires = [ - "black", - "flake8", - "freezegun", - "pytest", - "pytest-mock", - "setuptools" +[dependency-groups] +dev = [ + "black", + "flake8", + "freezegun", + "pytest", + "pytest-mock", ] -build-backend = "setuptools.build_meta" - +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index d5ac223e..00000000 --- a/requirements-dev.txt +++ /dev/null @@ -1,5 +0,0 @@ -pytest -pytest-mock -freezegun -flake8 -black \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 527e9c0d..00000000 --- a/requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -authlib -dynaconf -globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk -globus-sdk>=3.0 -griffe>=0.49.0,<2.0.0 -h5py -httpx>=0.22.0 -mkdocs -mkdocs-material -mkdocs-mermaid2-plugin -numpy>=1.26.4 -pillow -prefect==3.4.2 -prometheus_client==0.21.1 -pydantic==2.11 -python-dotenv -pyyaml -scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 -sfapi_client