diff --git a/Cargo.lock b/Cargo.lock index 1671e7da6..a329b4113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2239,6 +2239,7 @@ name = "uart-service" version = "0.1.0" dependencies = [ "defmt 0.3.100", + "embassy-futures", "embassy-sync", "embedded-io-async 0.7.0", "embedded-services", diff --git a/battery-service-relay/src/lib.rs b/battery-service-relay/src/lib.rs index 92988ae2f..406903d90 100644 --- a/battery-service-relay/src/lib.rs +++ b/battery-service-relay/src/lib.rs @@ -23,6 +23,9 @@ impl embedded_services::relay::mct { type RequestType = serialization::AcpiBatteryRequest; type ResultType = serialization::AcpiBatteryResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler diff --git a/debug-service/src/debug_service.rs b/debug-service/src/debug_service.rs index ef9762307..85ade54be 100644 --- a/debug-service/src/debug_service.rs +++ b/debug-service/src/debug_service.rs @@ -37,6 +37,9 @@ impl Service { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for Service { type RequestType = DebugRequest; type ResultType = DebugResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for Service { diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index a5b54bdff..e1ebb4894 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -32,6 +32,11 @@ pub trait Receiver { /// Return none if there are no pending events fn try_next(&mut self) -> Option; /// Receive an event + /// + /// # Cancel Safety + /// + /// The implementation MUST be cancel-safe as the `wait_next` method + /// is typically called inside a `select!` and thus may be cancelled. fn wait_next(&mut self) -> impl Future; } @@ -191,3 +196,157 @@ impl, F: FnMut(I) -> O> Sender for MapSender { self.sender.send((self.map_fn)(event)) } } + +/// Applies a function on events received from the wrapped receiver. +pub struct MapReceiver, F: FnMut(I) -> O> { + receiver: R, + map_fn: F, + _phantom: PhantomData<(I, O)>, +} + +impl, F: FnMut(I) -> O> MapReceiver { + /// Create a new MapReceiver. + pub fn new(receiver: R, map_fn: F) -> Self { + Self { + receiver, + map_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(I) -> O> Receiver for MapReceiver { + fn try_next(&mut self) -> Option { + self.receiver.try_next().map(&mut self.map_fn) + } + + async fn wait_next(&mut self) -> O { + (self.map_fn)(self.receiver.wait_next().await) + } +} + +/// Filters events from the wrapped receiver, only yielding events that pass the predicate. +/// +/// Events that do not pass the filter are consumed and discarded. +pub struct FilterReceiver, F: FnMut(&E) -> bool> { + receiver: R, + filter_fn: F, + _phantom: PhantomData, +} + +impl, F: FnMut(&E) -> bool> FilterReceiver { + /// Create a new FilterReceiver. + pub fn new(receiver: R, filter_fn: F) -> Self { + Self { + receiver, + filter_fn, + _phantom: PhantomData, + } + } +} + +impl, F: FnMut(&E) -> bool> Receiver for FilterReceiver { + fn try_next(&mut self) -> Option { + loop { + match self.receiver.try_next() { + Some(e) if (self.filter_fn)(&e) => return Some(e), + Some(_) => continue, + None => return None, + } + } + } + + async fn wait_next(&mut self) -> E { + loop { + let e = self.receiver.wait_next().await; + if (self.filter_fn)(&e) { + return e; + } + } + } +} + +/// A receiver that never produces events. +/// +/// This is mainly used to make it easier to construct a `MuxReceiver` +/// via macro since we don't need to handle the special start case +/// when chaining `with` calls. +pub struct NeverReceiver(PhantomData); + +impl NeverReceiver { + /// Create a new NeverReceiver. + pub fn new() -> Self { + Self(PhantomData) + } +} + +impl Default for NeverReceiver { + fn default() -> Self { + Self::new() + } +} + +impl Receiver for NeverReceiver { + fn try_next(&mut self) -> Option { + None + } + + async fn wait_next(&mut self) -> E { + core::future::pending().await + } +} + +/// Combines multiple receivers into one by racing them (with left-bias) and returning +/// the first event that becomes available mapped to a common event type. +pub struct MuxReceiver, R: Receiver> { + left: L, + right: R, + _phantom: PhantomData, +} + +impl MuxReceiver, NeverReceiver> { + /// Create an empty MuxReceiver. + /// + /// Use `.with()` to add receivers. + pub fn new() -> Self { + Self { + left: NeverReceiver::new(), + right: NeverReceiver::new(), + _phantom: PhantomData, + } + } +} + +impl Default for MuxReceiver, NeverReceiver> { + fn default() -> Self { + Self::new() + } +} + +impl, R1: Receiver> MuxReceiver { + /// Add another receiver to multiplex with this one. + pub fn with, F: FnMut(I) -> E>( + self, + receiver: R2, + map_fn: F, + ) -> MuxReceiver> { + MuxReceiver { + left: self, + right: MapReceiver::new(receiver, map_fn), + _phantom: PhantomData, + } + } +} + +impl, R: Receiver> Receiver for MuxReceiver { + fn try_next(&mut self) -> Option { + self.left.try_next().or_else(|| self.right.try_next()) + } + + async fn wait_next(&mut self) -> E { + match embassy_futures::select::select(self.left.wait_next(), self.right.wait_next()).await { + embassy_futures::select::Either::First(e) => e, + embassy_futures::select::Either::Second(e) => e, + } + } +} diff --git a/embedded-service/src/relay/mod.rs b/embedded-service/src/relay/mod.rs index d92cfb97f..168bb5943 100644 --- a/embedded-service/src/relay/mod.rs +++ b/embedded-service/src/relay/mod.rs @@ -116,6 +116,9 @@ pub mod mctp { /// The result type that this service handler processes type ResultType: super::SerializableResult; + + /// The event type that this service emits. + type EventType; } /// Trait for a service that can be relayed over an external bus (e.g. battery service, thermal service, time-alarm service) @@ -169,6 +172,11 @@ pub mod mctp { &'a self, message: Self::RequestEnumType, ) -> impl core::future::Future + 'a; + + /// Returns a mutable reference to the multiplexed receiver of all relayable service events. + /// + /// Each service's events are mapped to the service's u8 ID. + fn receiver(&mut self) -> &mut impl crate::event::Receiver; } /// This macro generates a relay type over a collection of message types, which can be used by a relay service to @@ -448,27 +456,42 @@ pub mod mctp { } - pub struct $relay_type_name { + pub struct $relay_type_name> { $( [<$service_name:snake _handler>]: $service_handler_type, )+ + receiver: T, } - impl $relay_type_name { - pub fn new( + // Note: Need a concrete type here to satisfy the type system, + // so we just use `NeverReceiver` as a placeholder + impl $relay_type_name<$crate::event::NeverReceiver> { + pub fn new< + $( + [<$service_name Rx>]: $crate::event::Receiver< + <$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType + >, + )+ + >( $( [<$service_name:snake _handler>]: $service_handler_type, )+ - ) -> Self { - Self { + $( + [<$service_name:snake _event_rx>]: [<$service_name Rx>], + )+ + ) -> $relay_type_name> { + let receiver = $crate::event::MuxReceiver::new() + $(.with([<$service_name:snake _event_rx>], |_| $service_id as u8))+; + $relay_type_name { $( [<$service_name:snake _handler>], )+ + receiver, } } } - impl $crate::relay::mctp::RelayHandler for $relay_type_name { + impl> $crate::relay::mctp::RelayHandler for $relay_type_name { type ServiceIdType = OdpService; type HeaderType = OdpHeader; type RequestEnumType = HostRequest; @@ -489,6 +512,10 @@ pub mod mctp { } } } + + fn receiver(&mut self) -> &mut impl $crate::event::Receiver { + &mut self.receiver + } } } // end mod __odp_impl diff --git a/examples/rt685s-evk/src/bin/time_alarm.rs b/examples/rt685s-evk/src/bin/time_alarm.rs index 8673d9096..f741efa0a 100644 --- a/examples/rt685s-evk/src/bin/time_alarm.rs +++ b/examples/rt685s-evk/src/bin/time_alarm.rs @@ -51,7 +51,10 @@ async fn main(spawner: embassy_executor::Spawner) { TimeAlarm, 0x0B, crate::TimeAlarmServiceRelayHandlerType; ); - let _relay_handler = EspiRelayHandler::new(TimeAlarmServiceRelayHandlerType::new(time_service)); + let _relay_handler = EspiRelayHandler::new( + TimeAlarmServiceRelayHandlerType::new(time_service), + embedded_services::event::NeverReceiver::new(), + ); // Here, you'd normally pass _relay_handler to your relay service (e.g. eSPI service). // In this example, we're not leveraging a relay service, so we'll just demonstrate some direct calls. diff --git a/thermal-service-interface/src/lib.rs b/thermal-service-interface/src/lib.rs index b1e56a100..c901fb201 100644 --- a/thermal-service-interface/src/lib.rs +++ b/thermal-service-interface/src/lib.rs @@ -3,6 +3,17 @@ pub mod fan; pub mod sensor; +/// Thermal service event. +#[derive(Debug, PartialEq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[non_exhaustive] +pub enum Event { + /// A sensor event occurred. + Sensor(u8, sensor::Event), + /// A fan event occurred. + Fan(u8, fan::Event), +} + /// Thermal service interface trait. pub trait ThermalService { /// Associated type for registered sensor services. diff --git a/thermal-service-relay/src/lib.rs b/thermal-service-relay/src/lib.rs index fceae864d..d896606b5 100644 --- a/thermal-service-relay/src/lib.rs +++ b/thermal-service-relay/src/lib.rs @@ -3,6 +3,7 @@ mod serialization; pub use serialization::{ThermalError, ThermalRequest, ThermalResponse, ThermalResult}; +use thermal_service_interface::Event as ThermalEvent; use thermal_service_interface::ThermalService; use thermal_service_interface::fan::{self, FanService}; use thermal_service_interface::sensor::{self, SensorService}; @@ -194,6 +195,7 @@ impl ThermalServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for ThermalServiceRelayHandler { type RequestType = ThermalRequest; type ResultType = ThermalResult; + type EventType = ThermalEvent; } impl embedded_services::relay::mctp::RelayServiceHandler for ThermalServiceRelayHandler { diff --git a/time-alarm-service-relay/src/lib.rs b/time-alarm-service-relay/src/lib.rs index ed444a394..b428b3cb6 100644 --- a/time-alarm-service-relay/src/lib.rs +++ b/time-alarm-service-relay/src/lib.rs @@ -20,6 +20,9 @@ impl TimeAlarmServiceRelayHandler { impl embedded_services::relay::mctp::RelayServiceHandlerTypes for TimeAlarmServiceRelayHandler { type RequestType = AcpiTimeAlarmRequest; type ResultType = AcpiTimeAlarmResult; + + // Temporary until figure out what events want to send + type EventType = (); } impl embedded_services::relay::mctp::RelayServiceHandler for TimeAlarmServiceRelayHandler { diff --git a/uart-service/Cargo.toml b/uart-service/Cargo.toml index beb373559..8aed7b44c 100644 --- a/uart-service/Cargo.toml +++ b/uart-service/Cargo.toml @@ -17,6 +17,7 @@ workspace = true embedded-services.workspace = true defmt = { workspace = true, optional = true } log = { workspace = true, optional = true } +embassy-futures.workspace = true embassy-sync.workspace = true mctp-rs = { workspace = true } embedded-io-async.workspace = true diff --git a/uart-service/src/lib.rs b/uart-service/src/lib.rs index e3ecffe6a..9b6fa806c 100644 --- a/uart-service/src/lib.rs +++ b/uart-service/src/lib.rs @@ -4,9 +4,6 @@ //! Use [`DefaultService`] for the SmbusEspi-medium baseline; use //! [`Service::new`] directly with another medium (e.g. DSP0253 serial) //! for non-SmbusEspi callers. -//! -//! Revisit: Will also need to consider how to handle notifications (likely need to have user -//! provide GPIO pin we can use). #![no_std] pub mod task; @@ -24,6 +21,27 @@ use mctp_rs::smbus_espi::{SmbusEspiMedium, SmbusEspiReplyContext}; const BUF_SIZE: usize = 256; const HOST_TX_QUEUE_SIZE: usize = 5; +// Persistent state for UART request reading +// +// Necessary to make sure the request reading process is cancel-safe +struct ReadState { + buf: [u8; BUF_SIZE], + filled: usize, +} + +impl ReadState { + fn new() -> Self { + Self { + buf: [0u8; BUF_SIZE], + filled: 0, + } + } + + fn reset(&mut self) { + self.filled = 0; + } +} + #[derive(Clone)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub(crate) struct HostResultMessage { @@ -63,8 +81,12 @@ pub enum Error { /// /// [`MctpPacketContext`]: mctp_rs::MctpPacketContext pub struct Service { + pub(crate) inner: ServiceInner, + pub(crate) relay_handler: R, +} + +pub(crate) struct ServiceInner { host_tx_queue: Channel, HOST_TX_QUEUE_SIZE>, - relay_handler: R, medium: M, reply_context: mctp_rs::MctpReplyContext, } @@ -72,13 +94,47 @@ pub struct Service { impl Service { pub fn new(relay_handler: R, medium: M, reply_context: mctp_rs::MctpReplyContext) -> Result> { Ok(Self { - host_tx_queue: Channel::new(), + inner: ServiceInner { + host_tx_queue: Channel::new(), + medium, + reply_context, + }, relay_handler, - medium, - reply_context, }) } + // Deserialize the request and forward it to correct service for processing + async fn process_request(&self, state: &ReadState, packet_len: usize) -> Result<(), Error> { + let mut assembly_buf = [0u8; BUF_SIZE]; + let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.inner.medium, &mut assembly_buf); + + let message = mctp_ctx + .deserialize_packet( + state + .buf + .get(..packet_len) + .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, + ) + .map_err(Error::Mctp)? + .ok_or(Error::Serialize("Partial message not supported"))?; + + let (header, body) = message.parse_as::().map_err(Error::Mctp)?; + trace!("Received host request"); + + let response = self.relay_handler.process_request(body).await; + self.inner + .host_tx_queue + .try_send(HostResultMessage { + handler_service_id: header.get_service_id(), + message: response, + }) + .map_err(|_| Error::Comms)?; + + Ok(()) + } +} + +impl ServiceInner { async fn process_response( &self, uart: &mut T, @@ -107,13 +163,22 @@ impl Service { Ok(()) } - async fn wait_for_request(&self, uart: &mut T) -> Result<(), Error> { - // Incremental read loop: read bytes, ask the medium whether the - // assembled prefix is a complete frame, repeat until it is. - let mut buf = [0u8; BUF_SIZE]; - let mut filled = 0usize; - let packet_len = loop { - let dst = buf.get_mut(filled..).ok_or(Error::Serialize("buffer overrun"))?; + async fn wait_for_response(&self) -> HostResultMessage { + self.host_tx_queue.receive().await + } + + // Read bytes from UART until a complete request is assembled. + // + // # Cancel Safety + // + // This method is cancel-safe (assuming the embedded-io-async `read` implementation is cancel-safe) + // because partial read progress is stored in `state` and will be resumed on the next call. + async fn wait_for_request(&self, uart: &mut T, state: &mut ReadState) -> Result> { + loop { + let dst = state + .buf + .get_mut(state.filled..) + .ok_or(Error::Serialize("buffer overrun"))?; if dst.is_empty() { return Err(Error::Serialize("frame exceeds BUF_SIZE")); } @@ -121,44 +186,21 @@ impl Service { if n == 0 { return Err(Error::Comms); } - filled += n; + state.filled += n; match self .medium - .frame_complete(buf.get(..filled).ok_or(Error::Serialize("buffer overrun"))?) + .frame_complete( + state + .buf + .get(..state.filled) + .ok_or(Error::Serialize("buffer overrun"))?, + ) .map_err(Error::Mctp)? { - Some(len) => break len, + Some(len) => return Ok(len), None => continue, } - }; - - let mut assembly_buf = [0u8; BUF_SIZE]; - let mut mctp_ctx = mctp_rs::MctpPacketContext::::new(self.medium, &mut assembly_buf); - - let message = mctp_ctx - .deserialize_packet( - buf.get(..packet_len) - .ok_or(Error::Serialize("frame exceeds BUF_SIZE"))?, - ) - .map_err(Error::Mctp)? - .ok_or(Error::Serialize("Partial message not supported"))?; - - let (header, body) = message.parse_as::().map_err(Error::Mctp)?; - trace!("Received host request"); - - let response = self.relay_handler.process_request(body).await; - self.host_tx_queue - .try_send(HostResultMessage { - handler_service_id: header.get_service_id(), - message: response, - }) - .map_err(|_| Error::Comms)?; - - Ok(()) - } - - async fn wait_for_response(&self) -> HostResultMessage { - self.host_tx_queue.receive().await + } } } diff --git a/uart-service/src/task.rs b/uart-service/src/task.rs index 06a2b8c05..e90e4e33b 100644 --- a/uart-service/src/task.rs +++ b/uart-service/src/task.rs @@ -1,26 +1,64 @@ -use crate::{Error, Service}; +use crate::{Error, ReadState, Service}; +use embassy_futures::select::{Either, select}; use embedded_io_async::Read as UartRead; use embedded_io_async::Write as UartWrite; -use embedded_services::error; +use embedded_services::event::Receiver; use embedded_services::relay::mctp::RelayHandler; +use embedded_services::{error, warn}; use mctp_rs::MctpMedium; +/// Start the UART service task. +/// +/// # Requirements +/// +/// The `embedded-io-async` `Read` implementation used for the uart **MUST** be cancel-safe. pub async fn uart_service( - uart_service: &Service, + mut service: Service, mut uart: T, ) -> Result> { - // Note: eSPI service uses `select!` to seemingly allow asyncrhonous `responses` from services, - // but there are concerns around async cancellation here at least for UART service. - // - // Thus this assumes services will only send messages in response to requests from the host, - // so we handle this in order. + let mut read_state = ReadState::new(); + loop { - if let Err(e) = uart_service.wait_for_request(&mut uart).await { - log_error("request", &e); - } else { - let host_msg = uart_service.wait_for_response().await; - if let Err(e) = uart_service.process_response(&mut uart, host_msg).await { - log_error("response", &e); + match select( + service.inner.wait_for_request(&mut uart, &mut read_state), + service.relay_handler.receiver().wait_next(), + ) + .await + { + Either::First(Ok(packet_len)) => { + if let Err(e) = service.process_request(&read_state, packet_len).await { + log_error("request", &e); + } else { + let host_msg = service.inner.wait_for_response().await; + if let Err(e) = service.inner.process_response(&mut uart, host_msg).await { + log_error("response", &e); + } + } + read_state.reset(); + } + Either::First(Err(e)) => { + log_error("request", &e); + read_state.reset(); + } + Either::Second(event) => { + warn!( + "uart-service received notifiable event ({}) from relayable service", + event + ); + + // TODO: Here we would do something like: + // + // if let Err(_e) = uart.write_all(&[0x42, event]).await { + // error!("uart-service failed to send notification"); + // } else { + // warn!("uart-service sent notification for event {}", event); + // } + // + // Where we TX some starter byte(s) to tell host it's about to receive a notification, + // then the notification ID itself. + // + // This is TODO until the whole stack is ready to receive notifications + // otherwise TXing here could break things. } } }