Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions battery-service-relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ impl<S: battery_service_interface::BatteryService> embedded_services::relay::mct
{
type RequestType = serialization::AcpiBatteryRequest;
type ResultType = serialization::AcpiBatteryResult;

// Temporary until figure out what events want to send
type EventType = ();
}

impl<S: battery_service_interface::BatteryService> embedded_services::relay::mctp::RelayServiceHandler
Expand Down
3 changes: 3 additions & 0 deletions debug-service/src/debug_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl Service {
impl embedded_services::relay::mctp::RelayServiceHandlerTypes for Service {
type RequestType = DebugRequest;
type ResultType = DebugResult;

// Temporary until figure out what events want to send
type EventType = ();
}

impl embedded_services::relay::mctp::RelayServiceHandler for Service {
Expand Down
159 changes: 159 additions & 0 deletions embedded-service/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ pub trait Receiver<E> {
/// Return none if there are no pending events
fn try_next(&mut self) -> Option<E>;
/// Receive an event
///
/// # Cancel Safety
///
/// The implementation MUST be cancel-safe as the `wait_next` method
/// is typically called inside a `select!` and thus may be cancelled.
fn wait_next(&mut self) -> impl Future<Output = E>;
}

Expand Down Expand Up @@ -191,3 +196,157 @@ impl<I, O, S: Sender<O>, F: FnMut(I) -> O> Sender<I> for MapSender<I, O, S, F> {
self.sender.send((self.map_fn)(event))
}
}

/// Applies a function on events received from the wrapped receiver.
pub struct MapReceiver<I, O, R: Receiver<I>, F: FnMut(I) -> O> {
receiver: R,
map_fn: F,
_phantom: PhantomData<(I, O)>,
}

impl<I, O, R: Receiver<I>, F: FnMut(I) -> O> MapReceiver<I, O, R, F> {
/// Create a new MapReceiver.
pub fn new(receiver: R, map_fn: F) -> Self {
Self {
receiver,
map_fn,
_phantom: PhantomData,
}
}
}

impl<I, O, R: Receiver<I>, F: FnMut(I) -> O> Receiver<O> for MapReceiver<I, O, R, F> {
fn try_next(&mut self) -> Option<O> {
self.receiver.try_next().map(&mut self.map_fn)
}

async fn wait_next(&mut self) -> O {
(self.map_fn)(self.receiver.wait_next().await)
}
}

/// Filters events from the wrapped receiver, only yielding events that pass the predicate.
///
/// Events that do not pass the filter are consumed and discarded.
pub struct FilterReceiver<E, R: Receiver<E>, F: FnMut(&E) -> bool> {
receiver: R,
filter_fn: F,
_phantom: PhantomData<E>,
}

impl<E, R: Receiver<E>, F: FnMut(&E) -> bool> FilterReceiver<E, R, F> {
/// Create a new FilterReceiver.
pub fn new(receiver: R, filter_fn: F) -> Self {
Self {
receiver,
filter_fn,
_phantom: PhantomData,
}
}
}

impl<E, R: Receiver<E>, F: FnMut(&E) -> bool> Receiver<E> for FilterReceiver<E, R, F> {
fn try_next(&mut self) -> Option<E> {
loop {
match self.receiver.try_next() {
Some(e) if (self.filter_fn)(&e) => return Some(e),
Some(_) => continue,
None => return None,
}
}
}

async fn wait_next(&mut self) -> E {
loop {
let e = self.receiver.wait_next().await;
if (self.filter_fn)(&e) {
return e;
}
}
}
}

/// A receiver that never produces events.
///
/// This is mainly used to make it easier to construct a `MuxReceiver`
/// via macro since we don't need to handle the special start case
/// when chaining `with` calls.
pub struct NeverReceiver<E>(PhantomData<E>);

impl<E> NeverReceiver<E> {
/// Create a new NeverReceiver.
pub fn new() -> Self {
Self(PhantomData)
}
}

impl<E> Default for NeverReceiver<E> {
fn default() -> Self {
Self::new()
}
}

impl<E> Receiver<E> for NeverReceiver<E> {
fn try_next(&mut self) -> Option<E> {
None
}

async fn wait_next(&mut self) -> E {
core::future::pending().await
}
}

/// Combines multiple receivers into one by racing them (with left-bias) and returning
/// the first event that becomes available mapped to a common event type.
pub struct MuxReceiver<E, L: Receiver<E>, R: Receiver<E>> {
left: L,
right: R,
_phantom: PhantomData<E>,
}

impl<E> MuxReceiver<E, NeverReceiver<E>, NeverReceiver<E>> {
/// Create an empty MuxReceiver.
///
/// Use `.with()` to add receivers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does .with() to add receivers mean?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when you call MuxReceiver::new() it basically returns an empty receiver. To "multiplex" additional receivers you need to call MuxReceiver::with(...) which consumes self and spits out a new MuxReceiver with the additional receiver multiplexed into it. You can keep chaining with calls indefinitely to keep adding receivers.

Designed it this way because it was easier than trying to just take a list of N receivers statically upfront or just a slice of indeterminate length. This also feels a bit more ergonomic to select over because an array or slice of Receivers means I need to first map the array of receivers to an array of futures, then perform the select, etc.

pub fn new() -> Self {
Self {
left: NeverReceiver::new(),
right: NeverReceiver::new(),
_phantom: PhantomData,
}
}
}

impl<E> Default for MuxReceiver<E, NeverReceiver<E>, NeverReceiver<E>> {
fn default() -> Self {
Self::new()
}
}

impl<E, L: Receiver<E>, R1: Receiver<E>> MuxReceiver<E, L, R1> {
/// Add another receiver to multiplex with this one.
pub fn with<I, R2: Receiver<I>, F: FnMut(I) -> E>(
self,
receiver: R2,
map_fn: F,
) -> MuxReceiver<E, Self, MapReceiver<I, E, R2, F>> {
MuxReceiver {
left: self,
right: MapReceiver::new(receiver, map_fn),
_phantom: PhantomData,
}
}
}

impl<E, L: Receiver<E>, R: Receiver<E>> Receiver<E> for MuxReceiver<E, L, R> {
fn try_next(&mut self) -> Option<E> {
self.left.try_next().or_else(|| self.right.try_next())
}
Comment thread
felipebalbi marked this conversation as resolved.

async fn wait_next(&mut self) -> E {
match embassy_futures::select::select(self.left.wait_next(), self.right.wait_next()).await {
embassy_futures::select::Either::First(e) => e,
embassy_futures::select::Either::Second(e) => e,
}
}
Comment on lines +346 to +351

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure L and R are always cancel safe? I'm asking because Receiver does not impose a cancel-safety contract:

Image

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll add documentation to the Receiver trait that methods must be cancel-safe and that should not be problematic because all embassy-sync futures are cancel-safe (see: embassy-rs/embassy#5484 (comment)).

}
39 changes: 33 additions & 6 deletions embedded-service/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ pub mod mctp {

/// The result type that this service handler processes
type ResultType: super::SerializableResult;

/// The event type that this service emits.
type EventType;
Comment thread
felipebalbi marked this conversation as resolved.
}

/// Trait for a service that can be relayed over an external bus (e.g. battery service, thermal service, time-alarm service)
Expand Down Expand Up @@ -169,6 +172,11 @@ pub mod mctp {
&'a self,
message: Self::RequestEnumType,
) -> impl core::future::Future<Output = Self::ResultEnumType> + 'a;

/// Returns a mutable reference to the multiplexed receiver of all relayable service events.
///
/// Each service's events are mapped to the service's u8 ID.
fn receiver(&mut self) -> &mut impl crate::event::Receiver<u8>;
}

/// This macro generates a relay type over a collection of message types, which can be used by a relay service to
Expand Down Expand Up @@ -448,27 +456,42 @@ pub mod mctp {
}


pub struct $relay_type_name {
pub struct $relay_type_name<T: $crate::event::Receiver<u8>> {
$(
[<$service_name:snake _handler>]: $service_handler_type,
)+
receiver: T,
}

impl $relay_type_name {
pub fn new(
// Note: Need a concrete type here to satisfy the type system,
// so we just use `NeverReceiver` as a placeholder
impl $relay_type_name<$crate::event::NeverReceiver<u8>> {
pub fn new<
$(
[<$service_name Rx>]: $crate::event::Receiver<
<$service_handler_type as $crate::relay::mctp::RelayServiceHandlerTypes>::EventType
>,
)+
>(
$(
[<$service_name:snake _handler>]: $service_handler_type,
)+
) -> Self {
Self {
$(
[<$service_name:snake _event_rx>]: [<$service_name Rx>],
)+
) -> $relay_type_name<impl $crate::event::Receiver<u8>> {
let receiver = $crate::event::MuxReceiver::new()
$(.with([<$service_name:snake _event_rx>], |_| $service_id as u8))+;
$relay_type_name {
$(
[<$service_name:snake _handler>],
)+
receiver,
}
}
}

impl $crate::relay::mctp::RelayHandler for $relay_type_name {
impl<T: $crate::event::Receiver<u8>> $crate::relay::mctp::RelayHandler for $relay_type_name<T> {
type ServiceIdType = OdpService;
type HeaderType = OdpHeader;
type RequestEnumType = HostRequest;
Expand All @@ -489,6 +512,10 @@ pub mod mctp {
}
}
}

fn receiver(&mut self) -> &mut impl $crate::event::Receiver<u8> {
&mut self.receiver
}
}
} // end mod __odp_impl

Expand Down
5 changes: 4 additions & 1 deletion examples/rt685s-evk/src/bin/time_alarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ async fn main(spawner: embassy_executor::Spawner) {
TimeAlarm, 0x0B, crate::TimeAlarmServiceRelayHandlerType;
);

let _relay_handler = EspiRelayHandler::new(TimeAlarmServiceRelayHandlerType::new(time_service));
let _relay_handler = EspiRelayHandler::new(

@williampMSFT williampMSFT Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: this:

So now we don't need to manually pass in a receiver into the uart_service, it just gets it from the relay_handler.

I think what I'm pushing for is to have the shape of this be something like this instead:

    let _relay_handler = EspiRelayHandler::new(
        TimeAlarmServiceRelayHandlerType::new(time_service, embedded_services::event::NeverReceiver::new()),
    );

and then the espiRelayHandler can interrogate the TimeAlarmServiceRelayHandlerType for a receiver to its notification channel, which will likely be of a different type than the receiver that's passed into the TimeAlarmServiceRelayHandlerType's constructor because the shape of notifications is going to be different across different transport protocols (e.g. MCTP might only support a ()-typed message, whereas HID has to support messages with payloads).

This approach has a few advantages:

  • It's not possible to mix up the notifications for different services (i.e. you can't accidentally wire up the battery service's notification handler to the thermal service) (at least at this layer - the DIY pubsub thing between service <-> relay handler still has this problem, which is part of why I don't like the DIY pubsub thing, but I think fixing that is out of scope for this PR)
  • It's easier to read / reason about what shape a relay handler has because all that information is present in the trait definition, rather than being strewn about in the implementations of macros that consume instances of the trait - sort of the same difference between traditional C++ templates and C++20 concepts

Based on the work I've done on HID stuff I'm pretty convinced it's possible to do this and satisfy the borrow checker, it just requires a few weird-looking (to my eye, anyway) constructs around when you listen to the channel - happy to chat more if you're running into problems there :)

TimeAlarmServiceRelayHandlerType::new(time_service),
embedded_services::event::NeverReceiver::new(),
);

// Here, you'd normally pass _relay_handler to your relay service (e.g. eSPI service).
// In this example, we're not leveraging a relay service, so we'll just demonstrate some direct calls.
Expand Down
11 changes: 11 additions & 0 deletions thermal-service-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
pub mod fan;
pub mod sensor;

/// Thermal service event.
#[derive(Debug, PartialEq, Clone, Copy)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[non_exhaustive]
pub enum Event {
/// A sensor event occurred.
Sensor(u8, sensor::Event),
/// A fan event occurred.
Fan(u8, fan::Event),
}

/// Thermal service interface trait.
pub trait ThermalService {
/// Associated type for registered sensor services.
Expand Down
2 changes: 2 additions & 0 deletions thermal-service-relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod serialization;

pub use serialization::{ThermalError, ThermalRequest, ThermalResponse, ThermalResult};
use thermal_service_interface::Event as ThermalEvent;
use thermal_service_interface::ThermalService;
use thermal_service_interface::fan::{self, FanService};
use thermal_service_interface::sensor::{self, SensorService};
Expand Down Expand Up @@ -194,6 +195,7 @@ impl<T: ThermalService> ThermalServiceRelayHandler<T> {
impl<T: ThermalService> embedded_services::relay::mctp::RelayServiceHandlerTypes for ThermalServiceRelayHandler<T> {
type RequestType = ThermalRequest;
type ResultType = ThermalResult;
type EventType = ThermalEvent;
}

impl<T: ThermalService> embedded_services::relay::mctp::RelayServiceHandler for ThermalServiceRelayHandler<T> {
Expand Down
3 changes: 3 additions & 0 deletions time-alarm-service-relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ impl<T: TimeAlarmService> TimeAlarmServiceRelayHandler<T> {
impl<T: TimeAlarmService> embedded_services::relay::mctp::RelayServiceHandlerTypes for TimeAlarmServiceRelayHandler<T> {
type RequestType = AcpiTimeAlarmRequest;
type ResultType = AcpiTimeAlarmResult;

// Temporary until figure out what events want to send
type EventType = ();
}

impl<T: TimeAlarmService> embedded_services::relay::mctp::RelayServiceHandler for TimeAlarmServiceRelayHandler<T> {
Expand Down
1 change: 1 addition & 0 deletions uart-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ workspace = true
embedded-services.workspace = true
defmt = { workspace = true, optional = true }
log = { workspace = true, optional = true }
embassy-futures.workspace = true
embassy-sync.workspace = true
mctp-rs = { workspace = true }
embedded-io-async.workspace = true
Expand Down
Loading
Loading