Skip to content
103 changes: 103 additions & 0 deletions bec_ipython_client/tests/end-2-end/test_actors_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

import time
from typing import TYPE_CHECKING

import pytest

from bec_lib.bl_states import DeviceWithinLimitsStateConfig
from bec_lib.builtin_actor_hli import BuiltinActorHli
from bec_lib.logger import bec_logger
from bec_lib.messages import ScanQueueStatus

logger = bec_logger.logger

if TYPE_CHECKING: # pragma: no cover
from ophyd_devices.sim.sim_test_devices import SimDeviceWithSignalDelay

from bec_ipython_client.main import BECIPythonClient


@pytest.fixture
def bec_with_delay_device(bec_ipython_client_fixture):
bec = bec_ipython_client_fixture
bec.builtin_actors.scan_interlock.enabled = True
dev = bec.device_manager.devices
dev.ramp_up.min_val.put(0)
dev.ramp_up.max_val.put(400)
dev.ramp_up.value.put(400)
dev.ramp_up.delay.put(2)
dev.ramp_up.rampup_time.put(2)
yield bec, dev.ramp_up
dev.ramp_up.stop()


@pytest.fixture
def ramp_up_bl_state(bec_with_delay_device):
bec, _ = bec_with_delay_device
ramp_up_state_config = DeviceWithinLimitsStateConfig(
name="beam_intensity_sufficient", device="ramp_up", low_limit=200
)
bec.beamline_states.add(ramp_up_state_config)
yield bec_with_delay_device
bec.beamline_states.delete("beam_intensity_sufficient")
bec.builtin_actors.scan_interlock.clear_all()


def _wait_for(pred, timeout=10, retries=100):
for i in range(retries):
if pred():
return True
time.sleep(timeout / retries)
return pred()


# pylint: disable=protected-accesstest
@pytest.mark.timeout(100)
def test_scan_interlock(
ramp_up_bl_state: tuple[BECIPythonClient, SimDeviceWithSignalDelay], bec_with_delay_device
Comment thread
d-perl marked this conversation as resolved.
):
bec, ramp_up = ramp_up_bl_state
actors: BuiltinActorHli = bec.builtin_actors
assert bec.beamline_states.beam_intensity_sufficient.get()["status"] == "valid"
assert actors.scan_interlock.enabled
current_q_status_msg: ScanQueueStatus = bec.queue.queue_storage.current_scan_queue["primary"]
assert current_q_status_msg.status == "RUNNING"
actors.scan_interlock.add_state_to_interlock("beam_intensity_sufficient", "valid")

assert _wait_for(lambda: "beam_intensity_sufficient" in actors.scan_interlock.states_watched)

def _beam_down():
return (ramp_up.value.get() < 200) and bec.beamline_states.beam_intensity_sufficient.get()[
"status"
] == "invalid"

def _beam_up():
return (ramp_up.value.get() > 200) and bec.beamline_states.beam_intensity_sufficient.get()[
"status"
] == "valid"

ramp_up.start.set(1)
scan = bec.scans.line_scan(
"samx", -5, 5, steps=10, exp_time=0.5, relative=False, hide_report=True
)

assert _wait_for(_beam_down)
assert _wait_for(
lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "LOCKED"
)
assert scan.status == "STOPPED"
assert _wait_for(_beam_up)
assert _wait_for(
lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING"
)

def second_scan_has_run():
if len(bec.history) < 2:
return False
return (
bec.history[-2].metadata["bec"]["status"] == "aborted"
and bec.history[-1].metadata["bec"]["status"] == "closed"
)

assert _wait_for(second_scan_has_run)
12 changes: 11 additions & 1 deletion bec_lib/bec_lib/bl_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BeamlineStateGet(TypedDict):
TypedDict for the return value of the get method of a beamline state client.
"""

status: str
status: messages.BlStateStatus
label: str


Expand Down Expand Up @@ -200,6 +200,16 @@ def delete(self, state_name: str) -> None:
self._delete_state(state_name)
self._publish_states()

def get_status_by_name(self, name: str) -> messages.BlStateStatus | None:
Comment thread
d-perl marked this conversation as resolved.
"""
Get current value of a given state, or None if it does not exist.
Args:
state_name (str): The name of the state for which to get the value.
"""
if not isinstance(state := getattr(self, name, None), BeamlineStateClientBase):
return
return state.get()["status"]

def show_all(self):
"""
Pretty print all beamline states using rich.
Expand Down
104 changes: 104 additions & 0 deletions bec_lib/bec_lib/builtin_actor_hli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import TYPE_CHECKING

from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import (
BlStateStatus,
BuiltinActorStateChangeNotification,
ScanInterlockModifyStateTableMessage,
)

if TYPE_CHECKING:
from bec_lib.client import BECClient

VAR_PREFIX = "_BuiltinActors"


def builtin_actor_enabled_var(actor_name: str):
return f"{VAR_PREFIX}/enabled/{actor_name}"


class ScanInterlockHli:
def __init__(self, client: "BECClient", parent: "BuiltinActorHli") -> None:
self._client = client
self._parent = parent
self._actor_name = "ScanInterlockActor"

@property
def enabled(self):
return self._parent.check_enabled(self._actor_name)

@enabled.setter
def enabled(self, enabled: bool):
if enabled:
self._parent.set_enabled(self._actor_name)
else:
self._parent.set_disabled(self._actor_name)

@property
def states_watched(self) -> dict[str, BlStateStatus]:
"""Return the table of beamline states currently watched by the scan interlock actor"""
if msg := self._client.connector.get(MessageEndpoints.scan_interlock_states()):
return msg.states_watched
return {}

def add_state_to_interlock(self, state_name: str, required_value: BlStateStatus):
"""
Add a beamline state and its status to watch to the ScanInterlockActor. If the state no
longer has this status, an interlock will be placed on the primary scan queue.
Args:
state_name (str): the state to watch
status (Literal["valid","invalid","warning","unknown"]): the status to watch for.
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{
"data": ScanInterlockModifyStateTableMessage(
action="add", state_name=state_name, status=required_value
)
},
)

def remove_state_from_interlock(self, state_name: str):
"""
No longer watch the given state for the scan interlock.
Args:
state_name (str): the state to watch
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{"data": ScanInterlockModifyStateTableMessage(action="remove", state_name=state_name)},
)

def clear_all(self):
"""
Remove all beamline states from the interlock watch table
Args:
state_name (str): the state to watch
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{"data": ScanInterlockModifyStateTableMessage(action="remove_all")},
)


class BuiltinActorHli:
def __init__(self, client: "BECClient") -> None:
self._client = client
self.scan_interlock = ScanInterlockHli(self._client, self)

def _notify(self, actor_name):
self._client.connector.send(
MessageEndpoints.builtin_actor_update_req_notif(),
BuiltinActorStateChangeNotification(actor_name=actor_name),
)

def check_enabled(self, actor_name: str):
return bool(self._client.get_global_var(builtin_actor_enabled_var(actor_name)))

def set_enabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), True)
self._notify(actor_name)

def set_disabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), False)
self._notify(actor_name)
2 changes: 2 additions & 0 deletions bec_lib/bec_lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from bec_lib.alarm_handler import AlarmHandler, Alarms
from bec_lib.bec_service import BECService
from bec_lib.bl_state_manager import BeamlineStateManager
from bec_lib.builtin_actor_hli import BuiltinActorHli
from bec_lib.callback_handler import CallbackHandler, EventType
from bec_lib.config_helper import ConfigHelperUser
from bec_lib.dap_plugins import DAPPlugins
Expand Down Expand Up @@ -163,6 +164,7 @@ def __init__(
self._system_user = ""
self.beamline_states = None
self.messaging: MessagingContainer = None # type: ignore
self.builtin_actors = BuiltinActorHli(self)

def __new__(cls, *args, forced=False, **kwargs):
if forced or BECClient._client is None:
Expand Down
8 changes: 8 additions & 0 deletions bec_lib/bec_lib/configs/demo_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2160,3 +2160,11 @@ waveform:
enabled: true
readOnly: false
softwareTrigger: true
ramp_up:
deviceClass: ophyd_devices.sim.sim_test_devices.SimDeviceWithSignalDelay
readoutPriority: "baseline"
deviceTags:
- signal delay
- ramp up
enabled: true
readOnly: false
38 changes: 38 additions & 0 deletions bec_lib/bec_lib/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,44 @@ def actor_request_response() -> EndpointInfo:
message_op=MessageOp.SEND,
)

@staticmethod
def builtin_actor_update_req_notif() -> EndpointInfo:
"""Endpoint to notify a builtin actor of a pending state change request."""
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/state_change_request_notification"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.BuiltinActorStateChangeNotification,
message_op=MessageOp.SEND,
)

@staticmethod
def builtin_actor_update_notif(actor_name: str) -> EndpointInfo:
"""Endpoint to notify clients of builtin actor state changes."""
endpoint = f"{EndpointType.INFO.value}/actor/builtin/{actor_name}/state_change_done"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.BuiltinActorStateUpdatedNotification,
message_op=MessageOp.SEND,
)

@staticmethod
def modify_interlock_table() -> EndpointInfo:
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/scan_interlock/table_mod"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockModifyStateTableMessage,
message_op=MessageOp.STREAM,
)

@staticmethod
def scan_interlock_states() -> EndpointInfo:
endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/current_states_watched"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockStateTableContent,
message_op=MessageOp.KEY_VALUE,
)

@staticmethod
def gui_registry_state(gui_id: str):
"""
Expand Down
41 changes: 40 additions & 1 deletion bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,9 @@ class GameLeaderboardMessage(BECMessage):
leaderboard: list[GameScoreMessage]


BlStateStatus = Literal["valid", "invalid", "warning", "unknown"]


class BeamlineStateMessage(BECMessage):
"""
Message for beamline state updates
Expand All @@ -2017,7 +2020,7 @@ class BeamlineStateMessage(BECMessage):

msg_type: ClassVar[str] = "beamline_state_message"
name: str
status: Literal["valid", "invalid", "warning", "unknown"]
status: BlStateStatus
label: str
timestamp: float = Field(default_factory=time.time)

Expand Down Expand Up @@ -2051,6 +2054,42 @@ class AvailableBeamlineStatesMessage(BECMessage):
states: list[BeamlineStateConfig]


class BuiltinActorStateChangeNotification(BECMessage):
"""Notify builtin actors of a pending state change request"""

msg_type: ClassVar[str] = "builtin_actor_state_change_request"
actor_name: str


class BuiltinActorStateUpdatedNotification(BECMessage):
"""Notify clients of completed state changes for builtin actors"""

msg_type: ClassVar[str] = "builtin_actor_state_updated_notification"
actor_name: str


class ScanInterlockModifyStateTableMessage(BECMessage):
msg_type: ClassVar[str] = "scan_interlock_modify_state_table"
action: Literal["add", "remove", "remove_all"]
state_name: str | None = None
status: BlStateStatus | None = None

@model_validator(mode="after")
def _validate(self):
if self.action == "add" and (self.status is None or self.state_name is None):
raise ValueError("Must specify a name and status when adding a state")
if self.action in ["remove", "remove_all"] and self.status is not None:
raise ValueError("May not specify a status when removing a state")
if self.action == "remove_all" and self.state_name is not None:
raise ValueError("May not specify a state when removing all states")
return self


class ScanInterlockStateTableContent(BECMessage):
msg_type: ClassVar[str] = "scan_interlock_state_table_content"
states_watched: dict[str, BlStateStatus]


class ActorStartRequestMessage(BECMessage):
"""Specify an actor class by module and name, to be instantiated and started by the actor
manager."""
Expand Down
Loading
Loading