Skip to content
254 changes: 254 additions & 0 deletions orchestration/_tests/test_prune_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import pytest
from prefect.testing.utilities import prefect_test_harness


def _make_future(mocker, value=None):
f = mocker.MagicMock()
f.result.return_value = value
return f


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield


@pytest.fixture(scope="session")
def prune_module():
from orchestration.prune_controller import (
FileSystemPruneController,
GlobusPruneController,
get_prune_controller,
PruneMethod,
)
from orchestration.transfer_endpoints import FileSystemEndpoint
from orchestration.globus.transfer import GlobusEndpoint
return {
"FileSystemPruneController": FileSystemPruneController,
"GlobusPruneController": GlobusPruneController,
"get_prune_controller": get_prune_controller,
"PruneMethod": PruneMethod,
"FileSystemEndpoint": FileSystemEndpoint,
"GlobusEndpoint": GlobusEndpoint,
}


# ── FileSystemPruneController.prune() ────────────────────────────────────────

class TestFileSystemPruneControllerPrune:

@pytest.fixture
def controller(self, prune_module, mocker):
mock_config = mocker.MagicMock()
mock_config.beamline_id = "8.3.2"
return prune_module["FileSystemPruneController"](mock_config)

@pytest.fixture
def fs_endpoint(self, prune_module, tmp_path):
return prune_module["FileSystemEndpoint"](
name="src", root_path=str(tmp_path), uri="test://src"
)

def test_prune_no_file_path(self, controller, fs_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path=None, source_endpoint=fs_endpoint) is False

def test_prune_no_source_endpoint(self, controller, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path="raw/test.h5", source_endpoint=None) is False

def test_prune_negative_days(self, controller, fs_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path="raw/test.h5", source_endpoint=fs_endpoint, days_from_now=-1) is False

def test_prune_immediate_success(self, controller, fs_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_flow = mocker.patch("orchestration.prune_controller.prune_filesystem_endpoint")
result = controller.prune(file_path="raw/test.h5", source_endpoint=fs_endpoint, days_from_now=0)
assert result is True
mock_flow.assert_called_once_with(
relative_path="raw/test.h5",
source_endpoint=fs_endpoint,
check_endpoint=None,
config=controller.config,
)

def test_prune_immediate_exception(self, controller, fs_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mocker.patch(
"orchestration.prune_controller.prune_filesystem_endpoint",
side_effect=RuntimeError("flow failed"),
)
assert controller.prune(file_path="raw/test.h5", source_endpoint=fs_endpoint, days_from_now=0) is False

def test_prune_deferred_blocks_on_future(self, controller, fs_endpoint, mocker):
"""Verifies the race-condition fix: .submit() is called and .result() blocks on it."""
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_task = mocker.patch("orchestration.prune_controller.schedule_prefect_flow")
future = _make_future(mocker)
mock_task.submit.return_value = future

result = controller.prune(file_path="raw/test.h5", source_endpoint=fs_endpoint, days_from_now=7)
assert result is True
mock_task.submit.assert_called_once()
future.result.assert_called_once()

def test_prune_deferred_exception(self, controller, fs_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_task = mocker.patch("orchestration.prune_controller.schedule_prefect_flow")
mock_task.submit.side_effect = RuntimeError("scheduling failed")
assert controller.prune(file_path="raw/test.h5", source_endpoint=fs_endpoint, days_from_now=7) is False


# ── FileSystemPruneController.prune_no_prefect() ─────────────────────────────

class TestFileSystemPruneControllerNoPrefect:

@pytest.fixture
def controller(self, prune_module, mocker):
mock_config = mocker.MagicMock()
mock_config.beamline_id = "8.3.2"
return prune_module["FileSystemPruneController"](mock_config)

@pytest.fixture
def src_endpoint(self, prune_module, tmp_path):
return prune_module["FileSystemEndpoint"](
name="src", root_path=str(tmp_path / "src"), uri="test://src"
)

@pytest.fixture
def check_endpoint(self, prune_module, tmp_path):
return prune_module["FileSystemEndpoint"](
name="check", root_path=str(tmp_path / "check"), uri="test://check"
)

def test_no_file_path(self, controller, src_endpoint):
assert controller.prune_no_prefect(file_path="", source_endpoint=src_endpoint) is False

def test_no_source_endpoint(self, controller):
assert controller.prune_no_prefect(file_path="raw/test.h5", source_endpoint=None) is False

def test_path_not_exist(self, controller, src_endpoint, tmp_path):
(tmp_path / "src").mkdir(parents=True, exist_ok=True)
assert controller.prune_no_prefect(file_path="raw/missing.h5", source_endpoint=src_endpoint) is False

def test_check_endpoint_path_missing(self, controller, src_endpoint, check_endpoint, tmp_path):
src_dir = tmp_path / "src" / "raw"
src_dir.mkdir(parents=True)
(src_dir / "test.h5").write_text("data")
(tmp_path / "check").mkdir(parents=True, exist_ok=True)
# check path does not contain the file
result = controller.prune_no_prefect(
file_path="raw/test.h5",
source_endpoint=src_endpoint,
check_endpoint=check_endpoint,
)
assert result is False

def test_prune_file_success(self, controller, src_endpoint, tmp_path):
src_dir = tmp_path / "src" / "raw"
src_dir.mkdir(parents=True)
target = src_dir / "test.h5"
target.write_text("data")

result = controller.prune_no_prefect(file_path="raw/test.h5", source_endpoint=src_endpoint)
assert result is True
assert not target.exists()

def test_prune_directory_success(self, controller, src_endpoint, tmp_path):
target = tmp_path / "src" / "raw" / "scan001"
target.mkdir(parents=True)
(target / "file.h5").write_text("data")

result = controller.prune_no_prefect(file_path="raw/scan001", source_endpoint=src_endpoint)
assert result is True
assert not target.exists()


# ── GlobusPruneController.prune() ─────────────────────────────────────────────

class TestGlobusPruneControllerPrune:

@pytest.fixture
def controller(self, prune_module, mocker):
mock_config = mocker.MagicMock()
mock_config.beamline_id = "8.3.2"
return prune_module["GlobusPruneController"](mock_config)

@pytest.fixture
def globus_endpoint(self, prune_module):
return prune_module["GlobusEndpoint"](
uuid="uuid-src-1", uri="globus://src", root_path="/data/raw", name="src832"
)

def test_prune_no_file_path(self, controller, globus_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path=None, source_endpoint=globus_endpoint) is False

def test_prune_no_source_endpoint(self, controller, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path="raw/test.h5", source_endpoint=None) is False

def test_prune_negative_days(self, controller, globus_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
assert controller.prune(file_path="raw/test.h5", source_endpoint=globus_endpoint, days_from_now=-1) is False

def test_prune_immediate_success(self, controller, globus_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_flow = mocker.patch("orchestration.prune_controller.prune_globus_endpoint")
result = controller.prune(file_path="raw/test.h5", source_endpoint=globus_endpoint, days_from_now=0)
assert result is True
mock_flow.assert_called_once_with(
relative_path="raw/test.h5",
source_endpoint=globus_endpoint,
check_endpoint=None,
config=controller.config,
)

def test_prune_immediate_exception(self, controller, globus_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mocker.patch(
"orchestration.prune_controller.prune_globus_endpoint",
side_effect=RuntimeError("flow failed"),
)
assert controller.prune(file_path="raw/test.h5", source_endpoint=globus_endpoint, days_from_now=0) is False

def test_prune_deferred_blocks_on_future(self, controller, globus_endpoint, mocker):
"""Verifies the race-condition fix: .submit() is called and .result() blocks on it."""
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_task = mocker.patch("orchestration.prune_controller.schedule_prefect_flow")
future = _make_future(mocker)
mock_task.submit.return_value = future

result = controller.prune(file_path="raw/test.h5", source_endpoint=globus_endpoint, days_from_now=30)
assert result is True
mock_task.submit.assert_called_once()
future.result.assert_called_once()

def test_prune_deferred_exception(self, controller, globus_endpoint, mocker):
mocker.patch("orchestration.prune_controller.get_run_logger", return_value=mocker.MagicMock())
mock_task = mocker.patch("orchestration.prune_controller.schedule_prefect_flow")
future = _make_future(mocker)
future.result.side_effect = RuntimeError("scheduling failed")
mock_task.submit.return_value = future
assert controller.prune(file_path="raw/test.h5", source_endpoint=globus_endpoint, days_from_now=30) is False


# ── get_prune_controller() ────────────────────────────────────────────────────

class TestGetPruneController:

@pytest.fixture
def mock_config(self, mocker):
cfg = mocker.MagicMock()
cfg.beamline_id = "8.3.2"
return cfg

def test_get_globus_controller(self, prune_module, mock_config):
controller = prune_module["get_prune_controller"](prune_module["PruneMethod"].GLOBUS, mock_config)
assert isinstance(controller, prune_module["GlobusPruneController"])

def test_get_filesystem_controller(self, prune_module, mock_config):
controller = prune_module["get_prune_controller"](prune_module["PruneMethod"].SIMPLE, mock_config)
assert isinstance(controller, prune_module["FileSystemPruneController"])
15 changes: 9 additions & 6 deletions orchestration/prune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import shutil
from typing import Generic, Optional, TypeVar

from prefect import flow
from prefect import flow, get_run_logger
from prefect.variables import Variable

from orchestration.config import BeamlineConfig
Expand Down Expand Up @@ -107,6 +107,7 @@ def prune(
:param days_from_now: Delay in days before pruning; if 0.0, prune immediately. If <0, throws error.
:return: True if pruning was successful or scheduled successfully, False otherwise
"""
logger = get_run_logger()
if not file_path:
logger.error("No file_path provided for pruning operation")
return False
Expand Down Expand Up @@ -145,7 +146,7 @@ def prune(
f"in {days_from_now.total_seconds()/86400:.1f} days")

try:
schedule_prefect_flow(
future = schedule_prefect_flow.submit(
deployment_name="prune_filesystem_endpoint/prune_filesystem_endpoint",
flow_run_name=flow_name,
parameters={
Expand All @@ -156,6 +157,7 @@ def prune(
},
duration_from_now=days_from_now,
)
future.result()
logger.info(f"Successfully scheduled pruning task for {days_from_now.total_seconds()/86400:.1f} days from now")
return True
except Exception as e:
Expand Down Expand Up @@ -300,6 +302,7 @@ def prune(
:param days_from_now: Delay before pruning; if 0, prune immediately. If <0, throws error.
:return: True if pruning was successful or scheduled successfully, False otherwise
"""
logger = get_run_logger()
if not file_path:
logger.error("No file_path provided for pruning operation")
return False
Expand All @@ -314,7 +317,7 @@ def prune(

# globus_settings = JSON.load("globus-settings").value
# max_wait_seconds = globus_settings["max_wait_seconds"]
flow_name = f"prune_from_{source_endpoint.name}"
flow_name = f"prune_{file_path}_from_{source_endpoint.name}"
logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'")

# convert float days → timedelta
Expand All @@ -340,25 +343,25 @@ def prune(
f"in {days_from_now.total_seconds()/86400:.1f} days")

try:
schedule_prefect_flow.submit(
future = schedule_prefect_flow.submit(
deployment_name="prune_globus_endpoint/prune_globus_endpoint",
flow_run_name=flow_name,
parameters={
"relative_path": file_path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint,
# "config": self.config
},
duration_from_now=days_from_now,
)
future.result()
logger.info(f"Successfully scheduled pruning task for {days_from_now.total_seconds()/86400:.1f} days from now")
return True
except Exception as e:
logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True)
return False


@flow(name="prune_globus_endpoint")
@flow(name="prune_globus_endpoint", flow_run_name="prune_{relative_path}_from_{source_endpoint.name}")
def prune_globus_endpoint(
relative_path: str,
source_endpoint: GlobusEndpoint,
Expand Down
Loading