From 95bdd1a2b0c49f15215e1841db1c4a0812281cc3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:55:14 -0700 Subject: [PATCH 01/23] adding beegfs globus endpoint to config.yaml for bl832 --- orchestration/flows/bl832/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 16b03629..8c3be5ac 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,6 +27,7 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.beegfs_raw = self.endpoints["bl832-beegfs-raw"] # SciCat self.scicat = self.config["scicat"] # NERSC HPC submission settings From 5f54a3ba950f384b2e4de02b0539067895b4e82f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:55:49 -0700 Subject: [PATCH 02/23] updating move.py to copy to beegfs --- orchestration/flows/bl832/move.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index e5e10b02..3d0a5983 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +from splash_flows.orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") @@ -156,6 +157,19 @@ def process_new_832_file_task( except Exception as e: logger.error(f"SciCat ingest failed with {e}") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config, + prometheus_metrics=None + ) + + transfer_controller.copy( + file_path=relative_path, + source=config.data832, + destination=config.beegfs_raw + ) + logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, From 8c0b7276b0a8561f0124e221cf797755f5f4cce8 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:56:15 -0700 Subject: [PATCH 03/23] adding tiled[client] to requirements --- pyproject.toml | 4 +++- requirements.txt | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7fee39f6..ede057ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,9 @@ dependencies = [ "python-dotenv", "pyyaml", "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", - "sfapi_client" + "sfapi_client", + "tiled[client]", + "watchfiles" ] [build-system] diff --git a/requirements.txt b/requirements.txt index 527e9c0d..84581b18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ python-dotenv pyyaml scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 sfapi_client +tiled[client] +watchfiles \ No newline at end of file From 9a1bf3eaf8df590652828968ab56dba249f5db35 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:56:40 -0700 Subject: [PATCH 04/23] adding beegfs globus endpoint to config.yaml for bl832 --- config.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/config.yml b/config.yml index 4e3c6ed2..34d63014 100644 --- a/config.yml +++ b/config.yml @@ -139,6 +139,12 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 + bl832-beegfs-raw: + root_path: /beamline_staging/bl832/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-raw + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} From 03cd05e95ba41268fe37461f524f21b5078b56ec Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 10:57:02 -0700 Subject: [PATCH 05/23] adding orchestration/tiled.py for ingesting data on beegfs --- orchestration/tiled.py | 69 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 orchestration/tiled.py diff --git a/orchestration/tiled.py b/orchestration/tiled.py new file mode 100644 index 00000000..e871d7a8 --- /dev/null +++ b/orchestration/tiled.py @@ -0,0 +1,69 @@ +"""Register data files and Zarr/HDF5 stores to the Tiled catalog. + +Intended to run on a Ride worker. The file path must be accessible +to the Tiled server's filesystem. +""" +from dotenv import load_dotenv +import os +from pathlib import Path + +from prefect import flow, get_run_logger, task +from tiled.client import from_uri +from tiled.client.register import register + + +@task(name="register-file-to-tiled", task_run_name="register-{file_path}") +async def register_file_to_tiled( + file_path: Path, + catalog_path: str | None = None, + tiled_path: Path | None = None, +) -> None: + """Register a file or Zarr store to the Tiled catalog. + + Args: + file_path: Absolute path on the client filesystem (used for logging). + catalog_path: Optional sub-path within the Tiled catalog. + tiled_path: Path as seen by the Tiled server. Defaults to file_path. + """ + logger = get_run_logger() + load_dotenv() + tiled_uri = os.environ["TILED_URI"] + api_key = os.environ["TILED_SINGLE_USER_API_KEY"] + + server_path = tiled_path if tiled_path is not None else file_path + + client = from_uri(tiled_uri, api_key=api_key) + catalog = client[catalog_path] if catalog_path else client + + logger.info(f"Registering {file_path} → {server_path}") + await register(catalog, server_path, overwrite=False) + logger.info(f"Registered {server_path} to Tiled catalog") + + +@flow(name="register-to-tiled", flow_run_name="register-{file_path}") +async def register_to_tiled( + file_path: Path | str, + catalog_path: str | None = None, + tiled_path: Path | str | None = None, +) -> None: + """Register a file or Zarr store to the Tiled server. + + Args: + file_path: Path to the file or Zarr store (client filesystem). + catalog_path: Optional sub-path within the Tiled catalog. + tiled_path: Path as seen by the Tiled server. Defaults to file_path. + """ + logger = get_run_logger() + file_path = Path(file_path) + tiled_path = Path(tiled_path) if tiled_path else None + logger.info(f"Registering {file_path} to Tiled (catalog_path={catalog_path!r})") + await register_file_to_tiled(file_path, catalog_path=catalog_path, tiled_path=tiled_path) + + +if __name__ == "__main__": + import asyncio + + zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") + h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") + asyncio.run(register_to_tiled(zarr)) + asyncio.run(register_to_tiled(h5)) From 04c25c30eb4ef80837aab9452f5e73231a3ad86d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:45:28 -0700 Subject: [PATCH 06/23] cleaning up --- orchestration/tiled.py | 59 +++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index e871d7a8..849d567a 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -12,52 +12,59 @@ from tiled.client.register import register -@task(name="register-file-to-tiled", task_run_name="register-{file_path}") +@task(name="register-file-to-tiled", task_run_name="register-{path}") async def register_file_to_tiled( - file_path: Path, - catalog_path: str | None = None, - tiled_path: Path | None = None, + path: Path, + prefix: str | None = None, + overwrite: bool = False, ) -> None: """Register a file or Zarr store to the Tiled catalog. Args: - file_path: Absolute path on the client filesystem (used for logging). - catalog_path: Optional sub-path within the Tiled catalog. - tiled_path: Path as seen by the Tiled server. Defaults to file_path. + path: Absolute path on the client filesystem (used for logging). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. """ logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] api_key = os.environ["TILED_SINGLE_USER_API_KEY"] - server_path = tiled_path if tiled_path is not None else file_path - client = from_uri(tiled_uri, api_key=api_key) - catalog = client[catalog_path] if catalog_path else client - logger.info(f"Registering {file_path} → {server_path}") - await register(catalog, server_path, overwrite=False) - logger.info(f"Registered {server_path} to Tiled catalog") + logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") + try: + await register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite + ) + except Exception as e: + raise RuntimeError( + f"Failed to register {path} to Tiled catalog at {tiled_uri} " + f"(prefix={prefix!r}): {e}" + ) from e + logger.info(f"Registered {path} to Tiled catalog") -@flow(name="register-to-tiled", flow_run_name="register-{file_path}") +@flow(name="register-to-tiled", flow_run_name="register-{path}") async def register_to_tiled( - file_path: Path | str, - catalog_path: str | None = None, - tiled_path: Path | str | None = None, + path: Path | str, + prefix: str | None = None, + overwrite: bool = False, ) -> None: """Register a file or Zarr store to the Tiled server. Args: - file_path: Path to the file or Zarr store (client filesystem). - catalog_path: Optional sub-path within the Tiled catalog. - tiled_path: Path as seen by the Tiled server. Defaults to file_path. + path: Path to the file or Zarr store (client filesystem). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. """ logger = get_run_logger() - file_path = Path(file_path) - tiled_path = Path(tiled_path) if tiled_path else None - logger.info(f"Registering {file_path} to Tiled (catalog_path={catalog_path!r})") - await register_file_to_tiled(file_path, catalog_path=catalog_path, tiled_path=tiled_path) + path = Path(path) + logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") + await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite) if __name__ == "__main__": @@ -65,5 +72,5 @@ async def register_to_tiled( zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - asyncio.run(register_to_tiled(zarr)) - asyncio.run(register_to_tiled(h5)) + asyncio.run(register_to_tiled(zarr, prefix="scratch")) + asyncio.run(register_to_tiled(h5, prefix="raw")) From 104a134b0fdc5d0df4f8c44b26951af3845eacdf Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:46:32 -0700 Subject: [PATCH 07/23] Adding todo note --- orchestration/flows/bl832/move.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 3d0a5983..b72d2d8d 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -170,6 +170,8 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + # TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, From a940ddd801c5df724554b539fcef87d2bd7a9954 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:53:42 -0700 Subject: [PATCH 08/23] fixing bad import statement --- orchestration/flows/bl832/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index b72d2d8d..de5d30f9 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,7 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics -from splash_flows.orchestration.transfer_controller import CopyMethod, get_transfer_controller +from orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") From 0e70447640b1c18833c845a15fd37fe78d606bb5 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 21 Apr 2026 14:53:57 -0700 Subject: [PATCH 09/23] adding beegfs endpoint to unit tests --- orchestration/_tests/test_globus_flow.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index c75317e1..352c09cb 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -150,6 +150,7 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_scratch")), "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")) } # Mock apps @@ -169,6 +170,7 @@ def __init__(self) -> None: self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] + self.beegfs_raw = self.endpoints["beegfs_raw"] self.scicat = config["scicat"] @@ -250,6 +252,7 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_recon_scripts": mocker.MagicMock(), "alcf832_raw": mocker.MagicMock(), "alcf832_scratch": mocker.MagicMock(), + "bl832-beegfs-raw": mocker.MagicMock(), } ) mocker.patch( From 2bbe6d1aba8af01a46982eaab914423742b6eebb Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 24 Apr 2026 14:14:40 -0700 Subject: [PATCH 10/23] pointing beegfs root_path to the correct directory --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 34d63014..6718a525 100644 --- a/config.yml +++ b/config.yml @@ -140,7 +140,7 @@ globus: name: nersc832 bl832-beegfs-raw: - root_path: /beamline_staging/bl832/raw/ + root_path: /global/beegfs/beamlines/bl832/raw/ uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl832-beegfs-raw From 8792ea62fc4387e36f557a4b9324963d9f32671b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 24 Apr 2026 14:16:51 -0700 Subject: [PATCH 11/23] adding tags to ingested data --- orchestration/tiled.py | 83 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 849d567a..8f9d68f4 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -17,20 +17,14 @@ async def register_file_to_tiled( path: Path, prefix: str | None = None, overwrite: bool = False, + tags: list[str] | None = None, ) -> None: - """Register a file or Zarr store to the Tiled catalog. - - Args: - path: Absolute path on the client filesystem (used for logging). - prefix: Optional sub-path within the Tiled catalog. - overwrite: Whether to overwrite existing entries in the Tiled catalog. - """ logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] - api_key = os.environ["TILED_SINGLE_USER_API_KEY"] + tiled_api_key = os.environ["TILED_API_KEY"] - client = from_uri(tiled_uri, api_key=api_key) + client = from_uri(tiled_uri, api_key=tiled_api_key) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: @@ -38,14 +32,54 @@ async def register_file_to_tiled( node=client, path=path, prefix=prefix or "/", - overwrite=overwrite + overwrite=overwrite, ) except Exception as e: raise RuntimeError( f"Failed to register {path} to Tiled catalog at {tiled_uri} " f"(prefix={prefix!r}): {e}" ) from e - logger.info(f"Registered {path} to Tiled catalog") + + if not tags: + return + + def _apply_tags(entry_node): + existing_blob = entry_node.access_blob + existing_tags = (existing_blob or {}).get("tags", []) + merged_tags = list(set(existing_tags) | set(tags)) + op = "replace" if existing_blob is not None else "add" + try: + entry_node.patch_metadata( + access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], + ) + logger.info(f"Tagged {entry_node.uri} with {merged_tags}") + except Exception as e: + logger.warning(f"Could not tag {entry_node.uri}: {e}") + + # Navigate to prefix node after registration + node = client + for segment in (prefix or "").strip("/").split("/"): + if segment: + node = node[segment] + + if path.is_dir() and not path.suffix: + # TIFF directory: Tiled registers each file flat into the prefix node + for key in node: + _apply_tags(node[key]) + else: + # .h5 or .zarr: registered as single node, keyed by stem + # Even on COLLISION the entry exists — just try it directly + entry_key = path.stem + logger.info(f"Looking up entry key {entry_key!r} under {prefix!r}") + try: + _apply_tags(node[entry_key]) + except KeyError: + # Key not found even after registration — log all available keys to diagnose + available = sorted(node) + logger.warning( + f"Entry {entry_key!r} not found under {prefix!r}. " + f"Available keys: {available}" + ) @flow(name="register-to-tiled", flow_run_name="register-{path}") @@ -53,6 +87,7 @@ async def register_to_tiled( path: Path | str, prefix: str | None = None, overwrite: bool = False, + tags: list[str] | None = None, ) -> None: """Register a file or Zarr store to the Tiled server. @@ -64,13 +99,31 @@ async def register_to_tiled( logger = get_run_logger() path = Path(path) logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") - await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite) + await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) if __name__ == "__main__": import asyncio - zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - asyncio.run(register_to_tiled(zarr, prefix="scratch")) - asyncio.run(register_to_tiled(h5, prefix="raw")) + tiffs = Path("/Users/david/Documents/data/tomo/rec20230224_132553_sea_shell/") + zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia_flat-AQ_fungi2_fast.zarr") + + asyncio.run(register_to_tiled(path=h5, prefix="beamlines/bl832/raw/", tags=["bl832"], overwrite=False)) + asyncio.run(register_to_tiled(path=tiffs, prefix="beamlines/bl832/scratch", tags=["bl832", "dabramov"], overwrite=False)) + asyncio.run(register_to_tiled(path=zarr, prefix="beamlines/bl832/scratch", tags=["bl832"], overwrite=False)) + + load_dotenv() + client = from_uri(os.environ["TILED_URI"]) + checks = [ + (client["beamlines"]["bl832"]["raw"][h5.stem], ["bl832"], h5), + (client["beamlines"]["bl832"]["scratch"], ["bl832", "dabramov"], tiffs), + (client["beamlines"]["bl832"]["scratch"][zarr.stem], ["bl832"], zarr), + ] + for node, expected_tags, check_path in checks: + if check_path.is_dir() and not check_path.suffix: + key = next(iter(node)) + node = node[key] + actual = node.access_blob.get("tags", []) + status = "✓" if set(expected_tags) <= set(actual) else "✗" + print(f"{status} {node.uri}: tags={actual}") # prefix should be beamlines/bl832/raw// From 12b3def3cb9fbc49e7fcde303afb415011a7256b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 13:10:21 -0700 Subject: [PATCH 12/23] removing TILED_API_KEY from tiled.py --- orchestration/tiled.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 8f9d68f4..692d1b91 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -22,9 +22,8 @@ async def register_file_to_tiled( logger = get_run_logger() load_dotenv() tiled_uri = os.environ["TILED_URI"] - tiled_api_key = os.environ["TILED_API_KEY"] - client = from_uri(tiled_uri, api_key=tiled_api_key) + client = from_uri(tiled_uri) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: From 8fb836cc7b38ac00d6a378406f6ace47884d78e3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 15:38:42 -0700 Subject: [PATCH 13/23] removing check tags logic from main, moving it to a new method (check_tags). Also removing file paths from main, and setting as .env variables --- orchestration/tiled.py | 92 +++++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 692d1b91..99d12eb9 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -81,6 +81,62 @@ def _apply_tags(entry_node): ) +@task(name="check-tiled-tags", task_run_name="check-tags-{path}") +def check_tags( + path: Path | str, + prefix: str, + expected_tags: set[str], +) -> tuple[bool, list[str]]: + """Check whether a registered dataset has the expected tags applied. + + Navigates to the entry corresponding to ``path`` under ``prefix`` in the + Tiled catalog and compares its ``access_blob.tags`` against ``expected_tags``. + + For TIFF directories (a directory with no suffix), the first child entry + under the prefix node is checked, since ``register`` registers each TIFF + file flat into the prefix node. + + Args: + path: Path to the file or store that was registered. + prefix: Sub-path within the Tiled catalog where the entry was registered. + expected_tags: Tags that must be present on the entry. + + Returns: + A tuple ``(ok, actual_tags)`` where ``ok`` is True iff every tag in + ``expected_tags`` is present in the entry's ``access_blob.tags``. + + Raises: + KeyError: If the entry cannot be located under ``prefix``. + """ + logger = get_run_logger() + load_dotenv() + path = Path(path) + tiled_uri = os.environ["TILED_URI"] + client = from_uri(tiled_uri) + + # Navigate to the prefix node + node = client + for segment in prefix.strip("/").split("/"): + if segment: + node = node[segment] + + # For TIFF directories, register flattens files into the prefix node; + # for .h5 / .zarr, the entry is keyed by the path stem. + if path.is_dir() and not path.suffix: + key = next(iter(node)) + node = node[key] + else: + node = node[path.stem] + + actual = node.access_blob.get("tags", []) if node.access_blob else [] + ok = expected_tags <= set(actual) + logger.info( + f"{path.name} under {prefix!r}: " + f"expected={sorted(expected_tags)} actual={actual} ok={ok}" + ) + return ok, actual + + @flow(name="register-to-tiled", flow_run_name="register-{path}") async def register_to_tiled( path: Path | str, @@ -94,6 +150,7 @@ async def register_to_tiled( path: Path to the file or Zarr store (client filesystem). prefix: Optional sub-path within the Tiled catalog. overwrite: Whether to overwrite existing entries in the Tiled catalog. + tags: Optional list of tags to apply to the registered entry. """ logger = get_run_logger() path = Path(path) @@ -104,25 +161,20 @@ async def register_to_tiled( if __name__ == "__main__": import asyncio - h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") - tiffs = Path("/Users/david/Documents/data/tomo/rec20230224_132553_sea_shell/") - zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia_flat-AQ_fungi2_fast.zarr") - - asyncio.run(register_to_tiled(path=h5, prefix="beamlines/bl832/raw/", tags=["bl832"], overwrite=False)) - asyncio.run(register_to_tiled(path=tiffs, prefix="beamlines/bl832/scratch", tags=["bl832", "dabramov"], overwrite=False)) - asyncio.run(register_to_tiled(path=zarr, prefix="beamlines/bl832/scratch", tags=["bl832"], overwrite=False)) - load_dotenv() - client = from_uri(os.environ["TILED_URI"]) - checks = [ - (client["beamlines"]["bl832"]["raw"][h5.stem], ["bl832"], h5), - (client["beamlines"]["bl832"]["scratch"], ["bl832", "dabramov"], tiffs), - (client["beamlines"]["bl832"]["scratch"][zarr.stem], ["bl832"], zarr), + h5 = Path(os.environ["EXAMPLE_H5_PATH"]) + tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) + zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) + + cases = [ + (h5, "beamlines/bl832/raw/", {"bl832"}), + (tiffs, "beamlines/bl832/scratch", {"bl832", "dabramov"}), + (zarr, "beamlines/bl832/scratch", {"bl832"}), ] - for node, expected_tags, check_path in checks: - if check_path.is_dir() and not check_path.suffix: - key = next(iter(node)) - node = node[key] - actual = node.access_blob.get("tags", []) - status = "✓" if set(expected_tags) <= set(actual) else "✗" - print(f"{status} {node.uri}: tags={actual}") # prefix should be beamlines/bl832/raw// + + for path, prefix, tags in cases: + asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) + + for path, prefix, expected in cases: + ok, actual = check_tags.fn(path, prefix, expected) + print(f"{'✓' if ok else '✗'} {path.name}: expected={sorted(expected)} actual={actual}") From 669dbbe0e1bc07b910183ef6a7c6362a5467b1b3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 4 May 2026 16:50:39 -0700 Subject: [PATCH 14/23] Removing requirements.txt, moving any missing deps to pyproject.toml, updating github action --- .github/workflows/python-app.yml | 6 ++---- pyproject.toml | 21 +++++++++++---------- requirements-dev.txt | 5 ----- requirements.txt | 21 --------------------- 4 files changed, 13 insertions(+), 40 deletions(-) delete mode 100644 requirements-dev.txt delete mode 100644 requirements.txt diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 1164dede..6345a494 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -17,13 +17,11 @@ jobs: with: python-version: 3.11 cache: 'pip' + cache-dependency-path: pyproject.toml - name: Install dependencies run: | python -m pip install --no-cache-dir --upgrade pip - pip install --no-cache-dir flake8 pytest - pip install --no-cache-dir . - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + pip install -e . --group dev - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/pyproject.toml b/pyproject.toml index ede057ec..abf94468 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.11" dependencies = [ "authlib", + "dynaconf", "globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk", "globus-sdk>=3.0", "griffe>=0.49.0,<2.0.0", @@ -27,15 +28,15 @@ dependencies = [ "watchfiles" ] -[build-system] -requires = [ - "black", - "flake8", - "freezegun", - "pytest", - "pytest-mock", - "setuptools" +[dependency-groups] +dev = [ + "black", + "flake8", + "freezegun", + "pytest", + "pytest-mock", ] -build-backend = "setuptools.build_meta" - +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index d5ac223e..00000000 --- a/requirements-dev.txt +++ /dev/null @@ -1,5 +0,0 @@ -pytest -pytest-mock -freezegun -flake8 -black \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 84581b18..00000000 --- a/requirements.txt +++ /dev/null @@ -1,21 +0,0 @@ -authlib -dynaconf -globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk -globus-sdk>=3.0 -griffe>=0.49.0,<2.0.0 -h5py -httpx>=0.22.0 -mkdocs -mkdocs-material -mkdocs-mermaid2-plugin -numpy>=1.26.4 -pillow -prefect==3.4.2 -prometheus_client==0.21.1 -pydantic==2.11 -python-dotenv -pyyaml -scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 -sfapi_client -tiled[client] -watchfiles \ No newline at end of file From a76742af20588386d54b67f765310f7fde307efb Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 5 May 2026 09:47:07 -0700 Subject: [PATCH 15/23] moving load_dotenv() to top of the module --- orchestration/tiled.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 99d12eb9..5616e16b 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -12,6 +12,9 @@ from tiled.client.register import register +load_dotenv() + + @task(name="register-file-to-tiled", task_run_name="register-{path}") async def register_file_to_tiled( path: Path, @@ -20,7 +23,6 @@ async def register_file_to_tiled( tags: list[str] | None = None, ) -> None: logger = get_run_logger() - load_dotenv() tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) @@ -109,7 +111,6 @@ def check_tags( KeyError: If the entry cannot be located under ``prefix``. """ logger = get_run_logger() - load_dotenv() path = Path(path) tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) From 402a34d2b236af8e10ff94b7e5ec9f4f7ee8e1e0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 5 May 2026 14:50:44 -0700 Subject: [PATCH 16/23] moving _apply_tags() into a prefect task, logger.info -> logger.debug, cleanup. Files and tags register with tiled, and I can see them via the Tiled python api, but not in the Tiled UI for some reason (maybe a sign access tags are working...) --- orchestration/tiled.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/orchestration/tiled.py b/orchestration/tiled.py index 5616e16b..bd2f58af 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -44,19 +44,6 @@ async def register_file_to_tiled( if not tags: return - def _apply_tags(entry_node): - existing_blob = entry_node.access_blob - existing_tags = (existing_blob or {}).get("tags", []) - merged_tags = list(set(existing_tags) | set(tags)) - op = "replace" if existing_blob is not None else "add" - try: - entry_node.patch_metadata( - access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], - ) - logger.info(f"Tagged {entry_node.uri} with {merged_tags}") - except Exception as e: - logger.warning(f"Could not tag {entry_node.uri}: {e}") - # Navigate to prefix node after registration node = client for segment in (prefix or "").strip("/").split("/"): @@ -66,14 +53,14 @@ def _apply_tags(entry_node): if path.is_dir() and not path.suffix: # TIFF directory: Tiled registers each file flat into the prefix node for key in node: - _apply_tags(node[key]) + _apply_tags(entry_node=node[key], tags=tags) else: # .h5 or .zarr: registered as single node, keyed by stem # Even on COLLISION the entry exists — just try it directly entry_key = path.stem logger.info(f"Looking up entry key {entry_key!r} under {prefix!r}") try: - _apply_tags(node[entry_key]) + _apply_tags(entry_node=node[entry_key], tags=tags) except KeyError: # Key not found even after registration — log all available keys to diagnose available = sorted(node) @@ -83,6 +70,24 @@ def _apply_tags(entry_node): ) +@task(name="apply-tags", task_run_name="apply-tags-{tags}") +def _apply_tags(entry_node, tags: list[str]) -> None: + logger = get_run_logger() + existing_blob = entry_node.access_blob + existing_tags = (existing_blob or {}).get("tags", []) + merged_tags = list(set(existing_tags) | set(tags)) + op = "replace" if existing_blob is not None else "add" + try: + # entry_node.update_metadata(access_tags=merged_tags) + entry_node.patch_metadata( + access_blob_patch=[{"op": op, "path": "", "value": {"tags": merged_tags}}], + ) + + logger.debug(f"Tagged {entry_node.uri} with {merged_tags}") + except Exception as e: + logger.debug(f"Could not tag {entry_node.uri}: {e}") + + @task(name="check-tiled-tags", task_run_name="check-tags-{path}") def check_tags( path: Path | str, @@ -162,7 +167,6 @@ async def register_to_tiled( if __name__ == "__main__": import asyncio - load_dotenv() h5 = Path(os.environ["EXAMPLE_H5_PATH"]) tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) @@ -177,5 +181,5 @@ async def register_to_tiled( asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) for path, prefix, expected in cases: - ok, actual = check_tags.fn(path, prefix, expected) + ok, actual = check_tags(path, prefix, expected) print(f"{'✓' if ok else '✗'} {path.name}: expected={sorted(expected)} actual={actual}") From e270faeac48608560799c8526b2d7a0551db638c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:15:49 -0700 Subject: [PATCH 17/23] Adding register_file_to_tiled() call to move.py --- orchestration/flows/bl832/move.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index de5d30f9..14b645d7 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +from orchestration.tiled import register_file_to_tiled from orchestration.transfer_controller import CopyMethod, get_transfer_controller @@ -170,7 +171,14 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") - # TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs + tiled_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+relative_path, + prefix="beamlines/bl832/raw", + overwrite=False, + tags=["raw", "bl832"], + ) + # TODO: find proposal id in h5, make that a tag + tiled_future.result() # wait for registration to complete before scheduling deletes logger.info("Initializing prune controller") prune_controller = get_prune_controller( From d3b85283597d1d19f42f788c8fcd766bc5f79a8a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:24:31 -0700 Subject: [PATCH 18/23] Adding calls to register_file_to_tiled() from nersc_recon_flow() for tiffs and zarr --- orchestration/flows/bl832/nersc.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 42378d70..811b487c 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -16,12 +16,13 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController -from orchestration.prune_controller import get_prune_controller, PruneMethod -from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.flows.bl832.streaming_mixin import ( NerscStreamingMixin, SlurmJobBlock, cancellation_hook, monitor_streaming_job, save_block ) from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import get_prune_controller, PruneMethod +from orchestration.tiled import register_file_to_tiled +from orchestration.transfer_controller import get_transfer_controller, CopyMethod logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -1572,6 +1573,24 @@ def nersc_recon_flow( zarr_file_path=zarr_file_path ) + # Register the reconstructed TIFFs in tiled + tiled_tiffs_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+tiff_file_path, + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["scratch", "bl832"], + ) + tiled_tiffs_future = tiled_tiffs_future.result() + + # Register the reconstructed ZARRs in tiled + tiled_zarr_future = register_file_to_tiled( + path=config.beegfs_raw.root_path+zarr_file_path, + prefix="beamlines/bl832/scratch", + overwrite=False, + tags=["scratch", "bl832"], + ) + tiled_zarr_future = tiled_zarr_future.result() + # TODO: Ingest into SciCat if nersc_reconstruction_success: return True From 308b1670380a06906589bc11e0dbeae050b4e7ca Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:24:57 -0700 Subject: [PATCH 19/23] Adding TILED_URI to .env.example --- .env.example | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.env.example b/.env.example index e3728e89..458e71e0 100644 --- a/.env.example +++ b/.env.example @@ -4,4 +4,5 @@ PREFECT_API_URL= PREFECT_API_KEY= PUSHGATEWAY_URL= JOB_NAME= -INSTANCE_LABEL= \ No newline at end of file +INSTANCE_LABEL= +TILED_URI= \ No newline at end of file From eb52e4b60c9c9da8070031576a1d59bc7bdfaea0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 15:42:19 -0700 Subject: [PATCH 20/23] Adding BEAMLINE=<> to .env.example --- .env.example | 1 + 1 file changed, 1 insertion(+) diff --git a/.env.example b/.env.example index 458e71e0..9db497e7 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,4 @@ +BEAMLINE=<832> GLOBUS_CLIENT_ID= GLOBUS_CLIENT_SECRET= PREFECT_API_URL= From a42b9597d1726bb3e74e4a6130ffc25babb26219 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:13:16 -0700 Subject: [PATCH 21/23] Making register_file_to_tiled() sync to fix async/sync nesting. Example: dispatcher (async) -> move.py (sync) -> register_file_to_tiled (async). Using Prefect's built in helper from prefect.utilities.asyncutils import run_coro_as_sync to run the async Tiled register() function synchronously --- orchestration/flows/bl832/move.py | 5 ++--- orchestration/flows/bl832/nersc.py | 6 ++---- orchestration/tiled.py | 26 ++++++++++++++------------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 14b645d7..73709d04 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -171,14 +171,13 @@ def process_new_832_file_task( ) logger.info(f"File successfully transferred from data832 to beegfs {file_path}") - tiled_future = register_file_to_tiled( - path=config.beegfs_raw.root_path+relative_path, + register_file_to_tiled( + path=Path(config.beegfs_raw.root_path+relative_path), prefix="beamlines/bl832/raw", overwrite=False, tags=["raw", "bl832"], ) # TODO: find proposal id in h5, make that a tag - tiled_future.result() # wait for registration to complete before scheduling deletes logger.info("Initializing prune controller") prune_controller = get_prune_controller( diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 811b487c..d7eba907 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1574,22 +1574,20 @@ def nersc_recon_flow( ) # Register the reconstructed TIFFs in tiled - tiled_tiffs_future = register_file_to_tiled( + register_file_to_tiled( path=config.beegfs_raw.root_path+tiff_file_path, prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) - tiled_tiffs_future = tiled_tiffs_future.result() # Register the reconstructed ZARRs in tiled - tiled_zarr_future = register_file_to_tiled( + register_file_to_tiled( path=config.beegfs_raw.root_path+zarr_file_path, prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) - tiled_zarr_future = tiled_zarr_future.result() # TODO: Ingest into SciCat if nersc_reconstruction_success: diff --git a/orchestration/tiled.py b/orchestration/tiled.py index bd2f58af..90687c7b 100644 --- a/orchestration/tiled.py +++ b/orchestration/tiled.py @@ -8,6 +8,7 @@ from pathlib import Path from prefect import flow, get_run_logger, task +from prefect.utilities.asyncutils import run_coro_as_sync from tiled.client import from_uri from tiled.client.register import register @@ -16,24 +17,27 @@ @task(name="register-file-to-tiled", task_run_name="register-{path}") -async def register_file_to_tiled( - path: Path, +def register_file_to_tiled( + path: Path | str, prefix: str | None = None, overwrite: bool = False, tags: list[str] | None = None, ) -> None: logger = get_run_logger() + path = Path(path) tiled_uri = os.environ["TILED_URI"] client = from_uri(tiled_uri) logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") try: - await register( - node=client, - path=path, - prefix=prefix or "/", - overwrite=overwrite, + run_coro_as_sync( # Bridge synchronous Prefect task to async Tiled client method + register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite, + ) ) except Exception as e: raise RuntimeError( @@ -144,7 +148,7 @@ def check_tags( @flow(name="register-to-tiled", flow_run_name="register-{path}") -async def register_to_tiled( +def register_to_tiled( path: Path | str, prefix: str | None = None, overwrite: bool = False, @@ -161,12 +165,10 @@ async def register_to_tiled( logger = get_run_logger() path = Path(path) logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") - await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) + register_file_to_tiled(path, prefix=prefix, overwrite=overwrite, tags=tags) if __name__ == "__main__": - import asyncio - h5 = Path(os.environ["EXAMPLE_H5_PATH"]) tiffs = Path(os.environ["EXAMPLE_TIFFS_PATH"]) zarr = Path(os.environ["EXAMPLE_ZARR_PATH"]) @@ -178,7 +180,7 @@ async def register_to_tiled( ] for path, prefix, tags in cases: - asyncio.run(register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False)) + register_to_tiled(path=path, prefix=prefix, tags=list(tags), overwrite=False) for path, prefix, expected in cases: ok, actual = check_tags(path, prefix, expected) From 1be3639bc7c4a97f9b6d4e23afe9e4816eae8cae Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:53:26 -0700 Subject: [PATCH 22/23] adding beegfs copy to nersc.py --- orchestration/flows/bl832/nersc.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index d7eba907..d6ec56b4 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -1565,17 +1565,22 @@ def nersc_recon_flow( destination=config.data832_scratch ) - logger.info("Scheduling pruning tasks.") - schedule_pruning( - config=config, - raw_file_path=file_path, - tiff_file_path=tiff_file_path, - zarr_file_path=zarr_file_path + logger.info("Copy from NERSC /global/cfs/cdirs/als/data_mover/8.3.2/scratch to beegfs") + transfer_controller.copy( + file_path=tiff_file_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.beegfs_scratch + ) + + transfer_controller.copy( + file_path=zarr_file_path, + source=config.nersc832_alsdev_pscratch_scratch, + destination=config.beegfs_scratch ) # Register the reconstructed TIFFs in tiled register_file_to_tiled( - path=config.beegfs_raw.root_path+tiff_file_path, + path=Path(config.beegfs_scratch.root_path+tiff_file_path), prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], @@ -1583,12 +1588,20 @@ def nersc_recon_flow( # Register the reconstructed ZARRs in tiled register_file_to_tiled( - path=config.beegfs_raw.root_path+zarr_file_path, + path=Path(config.beegfs_scratch.root_path+zarr_file_path), prefix="beamlines/bl832/scratch", overwrite=False, tags=["scratch", "bl832"], ) + logger.info("Scheduling pruning tasks.") + schedule_pruning( + config=config, + raw_file_path=file_path, + tiff_file_path=tiff_file_path, + zarr_file_path=zarr_file_path + ) + # TODO: Ingest into SciCat if nersc_reconstruction_success: return True From 819308ef7766e1fa1b94b23352b1b2c92482551c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 6 May 2026 16:55:22 -0700 Subject: [PATCH 23/23] create unit tests in test_tiled.py --- orchestration/_tests/test_tiled.py | 471 +++++++++++++++++++++++++++++ 1 file changed, 471 insertions(+) create mode 100644 orchestration/_tests/test_tiled.py diff --git a/orchestration/_tests/test_tiled.py b/orchestration/_tests/test_tiled.py new file mode 100644 index 00000000..deca5bc0 --- /dev/null +++ b/orchestration/_tests/test_tiled.py @@ -0,0 +1,471 @@ +"""Unit tests for ``orchestration.tiled``. + +Tests run inside a Prefect ephemeral server (``prefect_test_harness``) so the +real task engine is exercised, but Tiled itself is mocked — no real Tiled +server, no network calls. + +Coverage: + - ``register_file_to_tiled``: h5/zarr branch, TIFF directory branch, + no-tags skip, ``str`` path coercion, missing-entry warning, register failure. + - ``_apply_tags``: existing access_blob (replace), no blob (add), tag + deduplication, ``patch_metadata`` failure swallowed. + - ``check_tags``: h5 ok, h5 tag-missing, TIFF dir uses first child, + no access_blob, missing entry raises ``KeyError``. + - ``register_to_tiled`` flow: delegates with the right args, coerces ``str``. + +The async/sync bridge inside ``register_file_to_tiled`` (``run_coro_as_sync`` +calling Tiled's async ``register``) is exercised on every happy path: tests +patch ``register`` as an ``AsyncMock`` and let ``run_coro_as_sync`` actually +drive it. The error-path test patches ``run_coro_as_sync`` directly to inject +a failure. +""" +import warnings + +from prefect.testing.utilities import prefect_test_harness +import pytest +from pytest_mock import MockFixture + +from orchestration.tiled import ( + _apply_tags, + check_tags, + register_file_to_tiled, + register_to_tiled, +) + +warnings.filterwarnings("ignore", category=DeprecationWarning) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """Provide an ephemeral Prefect server for the whole test session.""" + with prefect_test_harness(): + yield + + +@pytest.fixture(autouse=True) +def _tiled_uri_env(monkeypatch): + """Set ``TILED_URI`` so ``os.environ[...]`` reads in the module don't error.""" + monkeypatch.setenv("TILED_URI", "http://tiled.test") + + +# --------------------------------------------------------------------------- +# Mock Tiled node +# --------------------------------------------------------------------------- + +class MockNode: + """Stand-in for a Tiled node. + + Strict by design: ``node[missing_key]`` raises ``KeyError`` to match real + Tiled behavior, so tests for the missing-entry warning path actually work. + Build prefix chains explicitly with ``add_child`` or the + ``build_prefix_chain`` helper below. + """ + + def __init__(self, access_blob=None, uri="http://tiled.test/node"): + self.access_blob = access_blob + self.uri = uri + self._children: dict[str, "MockNode"] = {} + self.patch_calls: list[dict] = [] + + def add_child(self, key: str, node: "MockNode") -> "MockNode": + self._children[key] = node + return node + + def __getitem__(self, key): + # Strict: missing keys raise like a real Tiled node would + return self._children[key] + + def __iter__(self): + return iter(self._children) + + def patch_metadata(self, **kwargs): + self.patch_calls.append(kwargs) + + +def build_prefix_chain(segments: list[str], leaf: MockNode) -> MockNode: + """Build ``client[seg1][seg2]...[segN] -> leaf`` and return the root. + + Used to mirror the prefix-navigation loop in ``register_file_to_tiled`` + and ``check_tags`` (``for segment in prefix.strip("/").split("/")``). + """ + current = leaf + for segment in reversed(segments): + parent = MockNode() + parent.add_child(segment, current) + current = parent + return current + + +# --------------------------------------------------------------------------- +# _apply_tags +# --------------------------------------------------------------------------- + +def test_apply_tags_uses_add_op_when_no_existing_blob(): + """No access_blob present → JSON Patch op is ``add``.""" + node = MockNode(access_blob=None) + _apply_tags(entry_node=node, tags=["bl832"]) + + assert len(node.patch_calls) == 1 + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "add" + assert patch["path"] == "" + assert set(patch["value"]["tags"]) == {"bl832"} + + +def test_apply_tags_uses_replace_op_when_blob_exists(): + """Existing access_blob → op is ``replace`` and tags are merged.""" + node = MockNode(access_blob={"tags": ["existing"]}) + _apply_tags(entry_node=node, tags=["bl832"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + assert patch["op"] == "replace" + assert set(patch["value"]["tags"]) == {"existing", "bl832"} + + +def test_apply_tags_deduplicates_overlapping_tags(): + """Tag merging is a set union, so overlap doesn't produce duplicates.""" + node = MockNode(access_blob={"tags": ["bl832", "old"]}) + _apply_tags(entry_node=node, tags=["bl832", "new"]) + + patch = node.patch_calls[0]["access_blob_patch"][0] + merged = patch["value"]["tags"] + assert set(merged) == {"bl832", "old", "new"} + assert len(merged) == 3 # no duplicates + + +def test_apply_tags_swallows_patch_metadata_failure(mocker: MockFixture): + """If ``patch_metadata`` raises, ``_apply_tags`` logs but does not propagate.""" + node = MockNode(access_blob={"tags": []}) + mock_patch_metadata = mocker.patch.object( + node, "patch_metadata", side_effect=RuntimeError("permission denied") + ) + + # Should not raise + _apply_tags(entry_node=node, tags=["bl832"]) + + # Verify the call actually happened — the test is about swallowing, not skipping + mock_patch_metadata.assert_called_once() + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — happy paths (exercise run_coro_as_sync bridge) +# --------------------------------------------------------------------------- + +def test_register_h5_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """For an .h5 file, the entry keyed by ``path.stem`` should be tagged.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw", "bl832"], + ) + + fake_register.assert_awaited_once() + assert len(entry_node.patch_calls) == 1 + tags_written = entry_node.patch_calls[0]["access_blob_patch"][0]["value"]["tags"] + assert set(tags_written) == {"raw", "bl832"} + + +def test_register_zarr_with_tags_applies_to_stem(mocker: MockFixture, tmp_path): + """A .zarr store follows the same stem-keyed lookup as .h5.""" + zarr = tmp_path / "sample.zarr" + # Real .zarr is a directory, but the code path is suffix-driven, so a file is fine + zarr.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("sample", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=zarr, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_no_tags_skips_apply_tags(mocker: MockFixture, tmp_path): + """When ``tags`` is None/empty, the tag-application branch is skipped entirely.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + prefix_node = MockNode() + mocker.patch("orchestration.tiled.from_uri", return_value=prefix_node) + fake_register = mocker.AsyncMock(return_value=None) + mocker.patch("orchestration.tiled.register", fake_register) + + register_file_to_tiled(path=h5) + + fake_register.assert_awaited_once() + assert prefix_node.patch_calls == [] + + +def test_register_string_path_is_coerced(mocker: MockFixture, tmp_path): + """Passing ``path`` as a ``str`` must be coerced to ``Path``. + + Regression test: without ``Path(path)``, the ``path.is_dir()`` and + ``path.suffix`` calls explode with ``AttributeError``. + """ + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + # Pass a string, not a Path — would fail without the Path(path) coercion + register_file_to_tiled( + path=str(h5), + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + assert len(entry_node.patch_calls) == 1 + + +def test_register_tiff_dir_tags_each_child(mocker: MockFixture, tmp_path): + """A directory with no suffix should tag every child under the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + (tiff_dir / "frame_0001.tiff").touch() + + child_a = MockNode(access_blob=None) + child_b = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", child_a) + prefix_node.add_child("frame_0001", child_b) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + + register_file_to_tiled( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + tags=["bl832"], + ) + + assert len(child_a.patch_calls) == 1 + assert len(child_b.patch_calls) == 1 + + +# --------------------------------------------------------------------------- +# register_file_to_tiled — edge cases +# --------------------------------------------------------------------------- + +def test_register_missing_entry_after_register_logs_warning(mocker: MockFixture, tmp_path): + """If ``node[stem]`` raises ``KeyError`` post-register, no exception leaks and a warning is logged.""" + h5 = tmp_path / "missing.h5" + h5.touch() + + # Prefix node has *some* entries but not the one we'll look up + prefix_node = MockNode() + prefix_node.add_child("other-entry", MockNode()) + client = build_prefix_chain(["beamlines", "bl832", "raw"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + mocker.patch("orchestration.tiled.register", mocker.AsyncMock(return_value=None)) + mock_logger = mocker.patch("orchestration.tiled.get_run_logger") + + # Should not raise — the KeyError is caught and logged with available keys + register_file_to_tiled( + path=h5, + prefix="beamlines/bl832/raw", + tags=["raw"], + ) + + mock_logger.return_value.warning.assert_called_once() + warning_msg = mock_logger.return_value.warning.call_args[0][0] + assert "missing" in warning_msg # stem of the file + assert "other-entry" in warning_msg # available keys listed in the message + + +def test_register_raises_runtime_error_on_failure(mocker: MockFixture, tmp_path): + """If the bridge or Tiled's ``register`` raises, wrap it in ``RuntimeError``.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + mocker.patch("orchestration.tiled.from_uri", return_value=MockNode()) + mocker.patch( + "orchestration.tiled.run_coro_as_sync", + side_effect=Exception("connection refused"), + ) + + with pytest.raises(RuntimeError, match="Failed to register .* connection refused"): + register_file_to_tiled(path=h5) + + +# --------------------------------------------------------------------------- +# check_tags +# --------------------------------------------------------------------------- + +def test_check_tags_returns_true_when_all_expected_present(mocker: MockFixture, tmp_path): + """Subset semantics: ok iff expected_tags <= actual_tags.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832", "raw", "extra"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is True + assert set(actual) == {"bl832", "raw", "extra"} + + +def test_check_tags_returns_false_when_tag_missing(mocker: MockFixture, tmp_path): + """A single missing expected tag flips ok to False.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832", "raw"}, + ) + + assert ok is False + assert actual == ["bl832"] + + +def test_check_tags_tiff_dir_uses_first_child(mocker: MockFixture, tmp_path): + """For a TIFF dir, ``check_tags`` reads the first child of the prefix node.""" + tiff_dir = tmp_path / "tiffs" + tiff_dir.mkdir() + (tiff_dir / "frame_0000.tiff").touch() + + first_child = MockNode(access_blob={"tags": ["bl832"]}) + prefix_node = MockNode() + prefix_node.add_child("frame_0000", first_child) + client = build_prefix_chain(["beamlines", "bl832", "scratch"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=tiff_dir, + prefix="beamlines/bl832/scratch", + expected_tags={"bl832"}, + ) + + assert ok is True + assert actual == ["bl832"] + + +def test_check_tags_no_access_blob_returns_empty(mocker: MockFixture, tmp_path): + """When ``access_blob`` is None, actual tags should be ``[]``.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + entry_node = MockNode(access_blob=None) + prefix_node = MockNode() + prefix_node.add_child("scan", entry_node) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + ok, actual = check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + assert ok is False + assert actual == [] + + +def test_check_tags_missing_entry_raises_key_error(mocker: MockFixture, tmp_path): + """If the stem isn't present under the prefix, the lookup raises ``KeyError``.""" + h5 = tmp_path / "missing.h5" + h5.touch() + + prefix_node = MockNode() + prefix_node.add_child("other", MockNode()) + client = build_prefix_chain(["beamlines"], prefix_node) + + mocker.patch("orchestration.tiled.from_uri", return_value=client) + + with pytest.raises(KeyError): + check_tags( + path=h5, + prefix="beamlines", + expected_tags={"bl832"}, + ) + + +# --------------------------------------------------------------------------- +# register_to_tiled flow +# --------------------------------------------------------------------------- + +def test_register_to_tiled_flow_delegates_to_task(mocker: MockFixture, tmp_path): + """The flow is a thin wrapper that just calls the task with the same args.""" + h5 = tmp_path / "scan.h5" + h5.touch() + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled( + path=h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + mock_task.assert_called_once_with( + h5, + prefix="beamlines/bl832", + overwrite=True, + tags=["bl832"], + ) + + +def test_register_to_tiled_flow_coerces_string_path(mocker: MockFixture): + """Passing a string should be coerced to ``Path`` before forwarding.""" + from pathlib import Path + + mock_task = mocker.patch("orchestration.tiled.register_file_to_tiled") + + register_to_tiled(path="/data/sample.h5", prefix="any") + + forwarded_path = mock_task.call_args.args[0] + assert isinstance(forwarded_path, Path) + assert forwarded_path == Path("/data/sample.h5")