From 3d6b3237e30c4bf6b6771e0f79b1e3f6b8a81c2a Mon Sep 17 00:00:00 2001 From: Kurtis Dinelle Date: Wed, 20 May 2026 10:49:17 -0700 Subject: [PATCH 1/5] Add relayable service event plumbing Assisted-by: GitHub Copilot:claude-opus-4.6 Signed-off-by: Kurtis Dinelle --- Cargo.lock | 1 + battery-service-relay/src/lib.rs | 3 + debug-service/src/debug_service.rs | 3 + embedded-service/src/event.rs | 154 +++++++++++++++++++++++++++ embedded-service/src/relay/mod.rs | 39 +++++++ thermal-service-interface/src/lib.rs | 11 ++ thermal-service-relay/src/lib.rs | 2 + time-alarm-service-relay/src/lib.rs | 3 + uart-service/Cargo.toml | 1 + uart-service/src/lib.rs | 63 ++++++++--- uart-service/src/task.rs | 59 +++++++--- 11 files changed, 311 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1671e7da6..a329b4113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2239,6 +2239,7 @@ name = "uart-service" version = "0.1.0" dependencies = [ "defmt 0.3.100", + "embassy-futures", "embassy-sync", "embedded-io-async 0.7.0", "embedded-services", diff --git a/battery-service-relay/src/lib.rs b/battery-service-relay/src/lib.rs index 92988ae2f..406903d90 100644 --- a/battery-service-relay/src/lib.rs +++ b/battery-service-relay/src/lib.rs @@ -23,6 +23,9 @@ impl embedded_services::relay::mct { type RequestType = serialization::AcpiBatteryRequest; type ResultType = serialization::AcpiBatteryResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler diff --git a/debug-service/src/debug_service.rs b/debug-service/src/debug_service.rs index ef9762307..85ade54be 100644 --- a/debug-service/src/debug_service.rs +++ b/debug-service/src/debug_service.rs @@ -37,6 +37,9 @@ impl Service { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for Service { type RequestType = DebugRequest; type ResultType = DebugResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for Service { diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index a5b54bdff..93a068b70 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -191,3 +191,157 @@ impl, F: FnMut(I) -> O> Sender for MapSender { self.sender.send((self.map_fn)(event)) } } + +/// Applies a function on events received from the wrapped receiver. +pub struct MapReceiver, F: FnMut(I) -> O> { + receiver: R, + map_fn: F, + _phantom: PhantomData<(I, O)>, +} + +impl, F: FnMut(I) -> O> MapReceiver { + /// Create a new MapReceiver. + pub fn new(receiver: R, map_fn: F) -> Self { + Self { + receiver, + map_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(I) -> O> Receiver for MapReceiver { + fn try_next(&mut self) -> Option { + self.receiver.try_next().map(&mut self.map_fn) + } + + async fn wait_next(&mut self) -> O { + (self.map_fn)(self.receiver.wait_next().await) + } +} + +/// Filters events from the wrapped receiver, only yielding events that pass the predicate. +/// +/// Events that do not pass the filter are consumed and discarded. +pub struct FilterReceiver, F: FnMut(&E) -> bool> { + receiver: R, + filter_fn: F, + _phantom: PhantomData, +} + +impl, F: FnMut(&E) -> bool> FilterReceiver { + /// Create a new FilterReceiver. + pub fn new(receiver: R, filter_fn: F) -> Self { + Self { + receiver, + filter_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(&E) -> bool> Receiver for FilterReceiver { + fn try_next(&mut self) -> Option { + loop { + match self.receiver.try_next() { + Some(e) if (self.filter_fn)(&e) => return Some(e), + Some(_) => continue, + None => return None, + } + } + } + + async fn wait_next(&mut self) -> E { + loop { + let e = self.receiver.wait_next().await; + if (self.filter_fn)(&e) { + return e; + } + } + } +} + +/// A receiver that never produces events. +/// +/// This is mainly used to make it easier to construct a `MuxReceiver` +/// via macro since we don't need to handle the special start case +/// when chaining `with` calls. +pub struct NeverReceiver(PhantomData); + +impl NeverReceiver { + /// Create a new NeverReceiver. + pub fn new() -> Self { + Self(PhantomData) + } +} + +impl Default for NeverReceiver { + fn default() -> Self { + Self::new() + } +} + +impl Receiver for NeverReceiver { + fn try_next(&mut self) -> Option { + None + } + + async fn wait_next(&mut self) -> E { + core::future::pending().await + } +} + +/// Combines multiple receivers into one by racing them and returning +/// the first event that becomes available mapped to a common event type. +pub struct MuxReceiver, R: Receiver> { + left: L, + right: R, + _phantom: PhantomData, +} + +impl MuxReceiver, NeverReceiver> { + /// Create an empty MuxReceiver. + /// + /// Use `.with()` to add receivers. + pub fn new() -> Self { + Self { + left: NeverReceiver::new(), + right: NeverReceiver::new(), + _phantom: PhantomData, + } + } +} + +impl Default for MuxReceiver, NeverReceiver> { + fn default() -> Self { + Self::new() + } +} + +impl, R1: Receiver> MuxReceiver { + /// Add another receiver to multiplex with this one. + pub fn with, F: FnMut(I) -> E>( + self, + receiver: R2, + map_fn: F, + ) -> MuxReceiver> { + MuxReceiver { + left: self, + right: MapReceiver::new(receiver, map_fn), + _phantom: PhantomData, + } + } +} + +impl, R: Receiver> Receiver for MuxReceiver { + fn try_next(&mut self) -> Option { + self.left.try_next().or_else(|| self.right.try_next()) + } + + async fn wait_next(&mut self) -> E { + match embassy_futures::select::select(self.left.wait_next(), self.right.wait_next()).await { + embassy_futures::select::Either::First(e) => e, + embassy_futures::select::Either::Second(e) => e, + } + } +} diff --git a/embedded-service/src/relay/mod.rs b/embedded-service/src/relay/mod.rs index d92cfb97f..274a351df 100644 --- a/embedded-service/src/relay/mod.rs +++ b/embedded-service/src/relay/mod.rs @@ -116,6 +116,9 @@ pub mod mctp { /// The result type that this service handler processes type ResultType: super::SerializableResult; + + /// The event type that this service emits. + type EventType; } /// Trait for a service that can be relayed over an external bus (e.g. battery service, thermal service, time-alarm service) @@ -448,6 +451,14 @@ pub mod mctp { } + /// A common event type wrapper for all relayable service events. + #[derive(Debug)] + pub enum ServiceEvent { + $( + $service_name(<$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType), + )+ + } + pub struct $relay_type_name { $( [<$service_name:snake _handler>]: $service_handler_type, @@ -466,6 +477,33 @@ pub mod mctp { )+ } } + + /// Build an event multiplexer from the provided relayable services. + /// + /// This is generic over the receiver type used for each service, so callers + /// can decide at the call site which concrete `Receiver` impl to + /// supply for each relayed service (e.g. `NeverReceiver`, an embassy channel + /// `Receiver`, a `DynamicReceiver`, or a custom impl). + /// + /// The caller will then need to further filter this event mux to whatever + /// events they consider worth notifying the host about. + /// + /// Lastly, the caller will then need to map the events into a format the + /// relay service understands (e.g. a single u8 for uart-service). + pub fn event_mux< + $( + [<$service_name Rx>]: $crate::event::Receiver< + <$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType + >, + )+ + >( + $( + [<$service_name:snake _event_rx>]: [<$service_name Rx>], + )+ + ) -> impl $crate::event::Receiver { + $crate::event::MuxReceiver::new() + $(.with([<$service_name:snake _event_rx>], ServiceEvent::$service_name))+ + } } impl $crate::relay::mctp::RelayHandler for $relay_type_name { @@ -494,6 +532,7 @@ pub mod mctp { // Allows this generated relay type to be publicly re-exported pub use [< _odp_impl_ $relay_type_name:snake >]::$relay_type_name; + pub use [< _odp_impl_ $relay_type_name:snake >]::ServiceEvent; } // end paste! }; // end macro arm diff --git a/thermal-service-interface/src/lib.rs b/thermal-service-interface/src/lib.rs index b1e56a100..c901fb201 100644 --- a/thermal-service-interface/src/lib.rs +++ b/thermal-service-interface/src/lib.rs @@ -3,6 +3,17 @@ pub mod fan; pub mod sensor; +/// Thermal service event. +#[derive(Debug, PartialEq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[non_exhaustive] +pub enum Event { + /// A sensor event occurred. + Sensor(u8, sensor::Event), + /// A fan event occurred. + Fan(u8, fan::Event), +} + /// Thermal service interface trait. pub trait ThermalService { /// Associated type for registered sensor services. diff --git a/thermal-service-relay/src/lib.rs b/thermal-service-relay/src/lib.rs index fceae864d..d896606b5 100644 --- a/thermal-service-relay/src/lib.rs +++ b/thermal-service-relay/src/lib.rs @@ -3,6 +3,7 @@ mod serialization; pub use serialization::{ThermalError, ThermalRequest, ThermalResponse, ThermalResult}; +use thermal_service_interface::Event as ThermalEvent; use thermal_service_interface::ThermalService; use thermal_service_interface::fan::{self, FanService}; use thermal_service_interface::sensor::{self, SensorService}; @@ -194,6 +195,7 @@ impl ThermalServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for ThermalServiceRelayHandler { type RequestType = ThermalRequest; type ResultType = ThermalResult; + type EventType = ThermalEvent; } impl embedded_services::relay::mctp::RelayServiceHandler for ThermalServiceRelayHandler { diff --git a/time-alarm-service-relay/src/lib.rs b/time-alarm-service-relay/src/lib.rs index ed444a394..b428b3cb6 100644 --- a/time-alarm-service-relay/src/lib.rs +++ b/time-alarm-service-relay/src/lib.rs @@ -20,6 +20,9 @@ impl TimeAlarmServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for TimeAlarmServiceRelayHandler { type RequestType = AcpiTimeAlarmRequest; type ResultType = AcpiTimeAlarmResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for TimeAlarmServiceRelayHandler { diff --git a/uart-service/Cargo.toml b/uart-service/Cargo.toml index beb373559..8aed7b44c 100644 --- a/uart-service/Cargo.toml +++ b/uart-service/Cargo.toml @@ -17,6 +17,7 @@ workspace = true embedded-services.workspace = true defmt = { workspace = true, optional = true } log = { workspace = true, optional = true } +embassy-futures.workspace = true embassy-sync.workspace = true mctp-rs = { workspace = true } embedded-io-async.workspace = true diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index e3ecffe6a..a0e8a621b 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -4,9 +4,6 @@ //! Use [`DefaultService`] for the SmbusEspi-medium baseline; use //! [`Service::new`] directly with another medium (e.g. DSP0253 serial) //! for non-SmbusEspi callers. -//! -//! Revisit: Will also need to consider how to handle notifications (likely need to have user -//! provide GPIO pin we can use). #![no_std] pub mod task; @@ -24,6 +21,27 @@ use mctp_rs::smbus_espi::{SmbusEspiMedium, SmbusEspiReplyContext}; const BUF_SIZE: usize = 256; const HOST_TX_QUEUE_SIZE: usize = 5; +// Persistent state for UART request reading +// +// Necessary to make sure the request reading process is cancel-safe +struct ReadState { + buf: [u8; BUF_SIZE], + filled: usize, +} + +impl ReadState { + fn new() -> Self { + Self { + buf: [0u8; BUF_SIZE], + filled: 0, + } + } + + fn reset(&mut self) { + self.filled = 0; + } +} + #[derive(Clone)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub(crate) struct HostResultMessage { @@ -107,13 +125,18 @@ impl Service { Ok(()) } - async fn wait_for_request(&self, uart: &mut T) -> Result<(), Error> { - // Incremental read loop: read bytes, ask the medium whether the - // assembled prefix is a complete frame, repeat until it is. - let mut buf = [0u8; BUF_SIZE]; - let mut filled = 0usize; - let packet_len = loop { - let dst = buf.get_mut(filled..).ok_or(Error::Serialize("buffer overrun"))?; + // Read bytes from UART until a complete request is assembled. + // + // # Cancel Safety + // + // This method is cancel-safe because partial read progress is stored in `state` + // and will be resumed on the next call. + async fn wait_for_request(&self, uart: &mut T, state: &mut ReadState) -> Result> { + loop { + let dst = state + .buf + .get_mut(state.filled..) + .ok_or(Error::Serialize("buffer overrun"))?; if dst.is_empty() { return Err(Error::Serialize("frame exceeds BUF_SIZE")); } @@ -121,23 +144,33 @@ impl Service { if n == 0 { return Err(Error::Comms); } - filled += n; + state.filled += n; match self .medium - .frame_complete(buf.get(..filled).ok_or(Error::Serialize("buffer overrun"))?) + .frame_complete( + state + .buf + .get(..state.filled) + .ok_or(Error::Serialize("buffer overrun"))?, + ) .map_err(Error::Mctp)? { - Some(len) => break len, + Some(len) => return Ok(len), None => continue, } - }; + } + } + // Deserialize the request and forward it to correct service for processing + async fn process_request(&self, state: &ReadState, packet_len: usize) -> Result<(), Error> { let mut assembly_buf = [0u8; BUF_SIZE]; let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.medium, &mut assembly_buf); let message = mctp_ctx .deserialize_packet( - buf.get(..packet_len) + state + .buf + .get(..packet_len) .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, ) .map_err(Error::Mctp)? diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index 06a2b8c05..92b7054c8 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -1,26 +1,59 @@ -use crate::{Error, Service}; +use crate::{Error, ReadState, Service}; +use embassy_futures::select::{Either, select}; use embedded_io_async::Read as UartRead; use embedded_io_async::Write as UartWrite; -use embedded_services::error; use embedded_services::relay::mctp::RelayHandler; +use embedded_services::{error, warn}; use mctp_rs::MctpMedium; pub async fn uart_service( uart_service: &Service, mut uart: T, + mut notifiable_events: impl embedded_services::event::Receiver, ) -> Result> { - // Note: eSPI service uses `select!` to seemingly allow asyncrhonous `responses` from services, - // but there are concerns around async cancellation here at least for UART service. - // - // Thus this assumes services will only send messages in response to requests from the host, - // so we handle this in order. + let mut read_state = ReadState::new(); + loop { - if let Err(e) = uart_service.wait_for_request(&mut uart).await { - log_error("request", &e); - } else { - let host_msg = uart_service.wait_for_response().await; - if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { - log_error("response", &e); + match select( + uart_service.wait_for_request(&mut uart, &mut read_state), + notifiable_events.wait_next(), + ) + .await + { + Either::First(Ok(packet_len)) => { + if let Err(e) = uart_service.process_request(&read_state, packet_len).await { + log_error("request", &e); + } else { + let host_msg = uart_service.wait_for_response().await; + if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { + log_error("response", &e); + } + } + read_state.reset(); + } + Either::First(Err(e)) => { + log_error("request", &e); + read_state.reset(); + } + Either::Second(event) => { + warn!( + "uart-service received notifiable event ({}) from relayable service", + event + ); + + // TODO: Here we would do something like: + // + // if let Err(_e) = uart.write_all(&[0x42, event]).await { + // error!("uart-service failed to send notification"); + // } else { + // warn!("uart-service sent notification for event {}", event); + // } + // + // Where we TX some starter byte(s) to tell host its about to receive a notification, + // then the notification ID itself. + // + // This is TODO until the whole stack is ready to receive notifications + // otherwise TXing here could break things. } } } From b9aa5c6099d6d9de8a84c6b87b9783c9eb11f5fc Mon Sep 17 00:00:00 2001 From: Kurtis Dinelle Date: Thu, 28 May 2026 08:05:13 -0700 Subject: [PATCH 2/5] Address Copilot suggestions --- embedded-service/src/event.rs | 2 +- uart-service/src/task.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index 93a068b70..0bcd8efdb 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -291,7 +291,7 @@ impl Receiver for NeverReceiver { } } -/// Combines multiple receivers into one by racing them and returning +/// Combines multiple receivers into one by racing them (with left-bias) and returning /// the first event that becomes available mapped to a common event type. pub struct MuxReceiver, R: Receiver> { left: L, diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index 92b7054c8..80256615d 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -49,7 +49,7 @@ pub async fn uart_service Date: Mon, 1 Jun 2026 10:56:57 -0700 Subject: [PATCH 3/5] Improve ergonomics --- embedded-service/src/relay/mod.rs | 68 ++++++++----------- examples/rt685s-evk/src/bin/time_alarm.rs | 5 +- uart-service/src/lib.rs | 82 +++++++++++++---------- uart-service/src/task.rs | 14 ++-- 4 files changed, 84 insertions(+), 85 deletions(-) diff --git a/embedded-service/src/relay/mod.rs b/embedded-service/src/relay/mod.rs index 274a351df..168bb5943 100644 --- a/embedded-service/src/relay/mod.rs +++ b/embedded-service/src/relay/mod.rs @@ -172,6 +172,11 @@ pub mod mctp { &'a self, message: Self::RequestEnumType, ) -> impl core::future::Future + 'a; + + /// Returns a mutable reference to the multiplexed receiver of all relayable service events. + /// + /// Each service's events are mapped to the service's u8 ID. + fn receiver(&mut self) -> &mut impl crate::event::Receiver; } /// This macro generates a relay type over a collection of message types, which can be used by a relay service to @@ -451,62 +456,42 @@ pub mod mctp { } - /// A common event type wrapper for all relayable service events. - #[derive(Debug)] - pub enum ServiceEvent { - $( - $service_name(<$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType), - )+ - } - - pub struct $relay_type_name { + pub struct $relay_type_name> { $( [<$service_name:snake _handler>]: $service_handler_type, )+ + receiver: T, } - impl $relay_type_name { - pub fn new( - $( - [<$service_name:snake _handler>]: $service_handler_type, - )+ - ) -> Self { - Self { - $( - [<$service_name:snake _handler>], - )+ - } - } - - /// Build an event multiplexer from the provided relayable services. - /// - /// This is generic over the receiver type used for each service, so callers - /// can decide at the call site which concrete `Receiver` impl to - /// supply for each relayed service (e.g. `NeverReceiver`, an embassy channel - /// `Receiver`, a `DynamicReceiver`, or a custom impl). - /// - /// The caller will then need to further filter this event mux to whatever - /// events they consider worth notifying the host about. - /// - /// Lastly, the caller will then need to map the events into a format the - /// relay service understands (e.g. a single u8 for uart-service). - pub fn event_mux< + // Note: Need a concrete type here to satisfy the type system, + // so we just use `NeverReceiver` as a placeholder + impl $relay_type_name<$crate::event::NeverReceiver> { + pub fn new< $( [<$service_name Rx>]: $crate::event::Receiver< <$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType >, )+ >( + $( + [<$service_name:snake _handler>]: $service_handler_type, + )+ $( [<$service_name:snake _event_rx>]: [<$service_name Rx>], )+ - ) -> impl $crate::event::Receiver { - $crate::event::MuxReceiver::new() - $(.with([<$service_name:snake _event_rx>], ServiceEvent::$service_name))+ + ) -> $relay_type_name> { + let receiver = $crate::event::MuxReceiver::new() + $(.with([<$service_name:snake _event_rx>], |_| $service_id as u8))+; + $relay_type_name { + $( + [<$service_name:snake _handler>], + )+ + receiver, + } } } - impl $crate::relay::mctp::RelayHandler for $relay_type_name { + impl> $crate::relay::mctp::RelayHandler for $relay_type_name { type ServiceIdType = OdpService; type HeaderType = OdpHeader; type RequestEnumType = HostRequest; @@ -527,12 +512,15 @@ pub mod mctp { } } } + + fn receiver(&mut self) -> &mut impl $crate::event::Receiver { + &mut self.receiver + } } } // end mod __odp_impl // Allows this generated relay type to be publicly re-exported pub use [< _odp_impl_ $relay_type_name:snake >]::$relay_type_name; - pub use [< _odp_impl_ $relay_type_name:snake >]::ServiceEvent; } // end paste! }; // end macro arm diff --git a/examples/rt685s-evk/src/bin/time_alarm.rs b/examples/rt685s-evk/src/bin/time_alarm.rs index 8673d9096..f741efa0a 100644 --- a/examples/rt685s-evk/src/bin/time_alarm.rs +++ b/examples/rt685s-evk/src/bin/time_alarm.rs @@ -51,7 +51,10 @@ async fn main(spawner: embassy_executor::Spawner) { TimeAlarm, 0x0B, crate::TimeAlarmServiceRelayHandlerType; ); - let _relay_handler = EspiRelayHandler::new(TimeAlarmServiceRelayHandlerType::new(time_service)); + let _relay_handler = EspiRelayHandler::new( + TimeAlarmServiceRelayHandlerType::new(time_service), + embedded_services::event::NeverReceiver::new(), + ); // Here, you'd normally pass _relay_handler to your relay service (e.g. eSPI service). // In this example, we're not leveraging a relay service, so we'll just demonstrate some direct calls. diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index a0e8a621b..6c4060a61 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -81,8 +81,12 @@ pub enum Error { /// /// [`MctpPacketContext`]: mctp_rs::MctpPacketContext pub struct Service { + pub(crate) inner: ServiceInner, + pub(crate) relay_handler: R, +} + +pub(crate) struct ServiceInner { host_tx_queue: Channel, HOST_TX_QUEUE_SIZE>, - relay_handler: R, medium: M, reply_context: mctp_rs::MctpReplyContext, } @@ -90,13 +94,46 @@ pub struct Service { impl Service { pub fn new(relay_handler: R, medium: M, reply_context: mctp_rs::MctpReplyContext) -> Result> { Ok(Self { - host_tx_queue: Channel::new(), + inner: ServiceInner { + host_tx_queue: Channel::new(), + medium, + reply_context, + }, relay_handler, - medium, - reply_context, }) } + // Deserialize the request and forward it to correct service for processing + async fn process_request(&self, state: &ReadState, packet_len: usize) -> Result<(), Error> { + let mut assembly_buf = [0u8; BUF_SIZE]; + let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.inner.medium, &mut assembly_buf); + + let message = mctp_ctx + .deserialize_packet( + state + .buf + .get(..packet_len) + .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, + ) + .map_err(Error::Mctp)? + .ok_or(Error::Serialize("Partial message not supported"))?; + + let (header, body) = message.parse_as::().map_err(Error::Mctp)?; + trace!("Received host request"); + + let response = self.relay_handler.process_request(body).await; + self.inner.host_tx_queue + .try_send(HostResultMessage { + handler_service_id: header.get_service_id(), + message: response, + }) + .map_err(|_| Error::Comms)?; + + Ok(()) + } +} + +impl ServiceInner { async fn process_response( &self, uart: &mut T, @@ -125,6 +162,10 @@ impl Service { Ok(()) } + async fn wait_for_response(&self) -> HostResultMessage { + self.host_tx_queue.receive().await + } + // Read bytes from UART until a complete request is assembled. // // # Cancel Safety @@ -160,39 +201,6 @@ impl Service { } } } - - // Deserialize the request and forward it to correct service for processing - async fn process_request(&self, state: &ReadState, packet_len: usize) -> Result<(), Error> { - let mut assembly_buf = [0u8; BUF_SIZE]; - let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.medium, &mut assembly_buf); - - let message = mctp_ctx - .deserialize_packet( - state - .buf - .get(..packet_len) - .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, - ) - .map_err(Error::Mctp)? - .ok_or(Error::Serialize("Partial message not supported"))?; - - let (header, body) = message.parse_as::().map_err(Error::Mctp)?; - trace!("Received host request"); - - let response = self.relay_handler.process_request(body).await; - self.host_tx_queue - .try_send(HostResultMessage { - handler_service_id: header.get_service_id(), - message: response, - }) - .map_err(|_| Error::Comms)?; - - Ok(()) - } - - async fn wait_for_response(&self) -> HostResultMessage { - self.host_tx_queue.receive().await - } } /// Backwards-compatible alias for SmbusEspi-medium services. diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index 80256615d..cc4294a78 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -2,30 +2,30 @@ use crate::{Error, ReadState, Service}; use embassy_futures::select::{Either, select}; use embedded_io_async::Read as UartRead; use embedded_io_async::Write as UartWrite; +use embedded_services::event::Receiver; use embedded_services::relay::mctp::RelayHandler; use embedded_services::{error, warn}; use mctp_rs::MctpMedium; pub async fn uart_service( - uart_service: &Service, + mut service: Service, mut uart: T, - mut notifiable_events: impl embedded_services::event::Receiver, ) -> Result> { let mut read_state = ReadState::new(); loop { match select( - uart_service.wait_for_request(&mut uart, &mut read_state), - notifiable_events.wait_next(), + service.inner.wait_for_request(&mut uart, &mut read_state), + service.relay_handler.receiver().wait_next(), ) .await { Either::First(Ok(packet_len)) => { - if let Err(e) = uart_service.process_request(&read_state, packet_len).await { + if let Err(e) = service.process_request(&read_state, packet_len).await { log_error("request", &e); } else { - let host_msg = uart_service.wait_for_response().await; - if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { + let host_msg = service.inner.wait_for_response().await; + if let Err(e) = service.inner.process_response(&mut uart, host_msg).await { log_error("response", &e); } } From e1b5d5c7851e951c3f44d3a4f8a231b2f1bbb106 Mon Sep 17 00:00:00 2001 From: Kurtis Dinelle Date: Mon, 1 Jun 2026 11:13:10 -0700 Subject: [PATCH 4/5] Fix format issue --- uart-service/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index 6c4060a61..c0d62b151 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -122,7 +122,8 @@ impl Service { trace!("Received host request"); let response = self.relay_handler.process_request(body).await; - self.inner.host_tx_queue + self.inner + .host_tx_queue .try_send(HostResultMessage { handler_service_id: header.get_service_id(), message: response, From 5cab8331fdc95ab45d48ff02b97fe4a00547d290 Mon Sep 17 00:00:00 2001 From: Kurtis Dinelle Date: Mon, 1 Jun 2026 12:56:22 -0700 Subject: [PATCH 5/5] Add cancel-safety docs --- embedded-service/src/event.rs | 5 +++++ uart-service/src/lib.rs | 4 ++-- uart-service/src/task.rs | 5 +++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index 0bcd8efdb..e1ebb4894 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -32,6 +32,11 @@ pub trait Receiver { /// Return none if there are no pending events fn try_next(&mut self) -> Option; /// Receive an event + /// + /// # Cancel Safety + /// + /// The implementation MUST be cancel-safe as the `wait_next` method + /// is typically called inside a `select!` and thus may be cancelled. fn wait_next(&mut self) -> impl Future; } diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index c0d62b151..9b6fa806c 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -171,8 +171,8 @@ impl ServiceInner { // // # Cancel Safety // - // This method is cancel-safe because partial read progress is stored in `state` - // and will be resumed on the next call. + // This method is cancel-safe (assuming the embedded-io-async `read` implementation is cancel-safe) + // because partial read progress is stored in `state` and will be resumed on the next call. async fn wait_for_request(&self, uart: &mut T, state: &mut ReadState) -> Result> { loop { let dst = state diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index cc4294a78..e90e4e33b 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -7,6 +7,11 @@ use embedded_services::relay::mctp::RelayHandler; use embedded_services::{error, warn}; use mctp_rs::MctpMedium; +/// Start the UART service task. +/// +/// # Requirements +/// +/// The `embedded-io-async` `Read` implementation used for the uart **MUST** be cancel-safe. pub async fn uart_service( mut service: Service, mut uart: T,