-
Notifications
You must be signed in to change notification settings - Fork 49
Add relayable service event plumbing #866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,11 @@ pub trait Receiver<E> { | |
| /// Return none if there are no pending events | ||
| fn try_next(&mut self) -> Option<E>; | ||
| /// Receive an event | ||
| /// | ||
| /// # Cancel Safety | ||
| /// | ||
| /// The implementation MUST be cancel-safe as the `wait_next` method | ||
| /// is typically called inside a `select!` and thus may be cancelled. | ||
| fn wait_next(&mut self) -> impl Future<Output = E>; | ||
| } | ||
|
|
||
|
|
@@ -191,3 +196,157 @@ impl<I, O, S: Sender<O>, F: FnMut(I) -> O> Sender<I> for MapSender<I, O, S, F> { | |
| self.sender.send((self.map_fn)(event)) | ||
| } | ||
| } | ||
|
|
||
| /// Applies a function on events received from the wrapped receiver. | ||
| pub struct MapReceiver<I, O, R: Receiver<I>, F: FnMut(I) -> O> { | ||
| receiver: R, | ||
| map_fn: F, | ||
| _phantom: PhantomData<(I, O)>, | ||
| } | ||
|
|
||
| impl<I, O, R: Receiver<I>, F: FnMut(I) -> O> MapReceiver<I, O, R, F> { | ||
| /// Create a new MapReceiver. | ||
| pub fn new(receiver: R, map_fn: F) -> Self { | ||
| Self { | ||
| receiver, | ||
| map_fn, | ||
| _phantom: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<I, O, R: Receiver<I>, F: FnMut(I) -> O> Receiver<O> for MapReceiver<I, O, R, F> { | ||
| fn try_next(&mut self) -> Option<O> { | ||
| self.receiver.try_next().map(&mut self.map_fn) | ||
| } | ||
|
|
||
| async fn wait_next(&mut self) -> O { | ||
| (self.map_fn)(self.receiver.wait_next().await) | ||
| } | ||
| } | ||
|
|
||
| /// Filters events from the wrapped receiver, only yielding events that pass the predicate. | ||
| /// | ||
| /// Events that do not pass the filter are consumed and discarded. | ||
| pub struct FilterReceiver<E, R: Receiver<E>, F: FnMut(&E) -> bool> { | ||
| receiver: R, | ||
| filter_fn: F, | ||
| _phantom: PhantomData<E>, | ||
| } | ||
|
|
||
| impl<E, R: Receiver<E>, F: FnMut(&E) -> bool> FilterReceiver<E, R, F> { | ||
| /// Create a new FilterReceiver. | ||
| pub fn new(receiver: R, filter_fn: F) -> Self { | ||
| Self { | ||
| receiver, | ||
| filter_fn, | ||
| _phantom: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<E, R: Receiver<E>, F: FnMut(&E) -> bool> Receiver<E> for FilterReceiver<E, R, F> { | ||
| fn try_next(&mut self) -> Option<E> { | ||
| loop { | ||
| match self.receiver.try_next() { | ||
| Some(e) if (self.filter_fn)(&e) => return Some(e), | ||
| Some(_) => continue, | ||
| None => return None, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn wait_next(&mut self) -> E { | ||
| loop { | ||
| let e = self.receiver.wait_next().await; | ||
| if (self.filter_fn)(&e) { | ||
| return e; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// A receiver that never produces events. | ||
| /// | ||
| /// This is mainly used to make it easier to construct a `MuxReceiver` | ||
| /// via macro since we don't need to handle the special start case | ||
| /// when chaining `with` calls. | ||
| pub struct NeverReceiver<E>(PhantomData<E>); | ||
|
|
||
| impl<E> NeverReceiver<E> { | ||
| /// Create a new NeverReceiver. | ||
| pub fn new() -> Self { | ||
| Self(PhantomData) | ||
| } | ||
| } | ||
|
|
||
| impl<E> Default for NeverReceiver<E> { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl<E> Receiver<E> for NeverReceiver<E> { | ||
| fn try_next(&mut self) -> Option<E> { | ||
| None | ||
| } | ||
|
|
||
| async fn wait_next(&mut self) -> E { | ||
| core::future::pending().await | ||
| } | ||
| } | ||
|
|
||
| /// Combines multiple receivers into one by racing them (with left-bias) and returning | ||
| /// the first event that becomes available mapped to a common event type. | ||
| pub struct MuxReceiver<E, L: Receiver<E>, R: Receiver<E>> { | ||
| left: L, | ||
| right: R, | ||
| _phantom: PhantomData<E>, | ||
| } | ||
|
|
||
| impl<E> MuxReceiver<E, NeverReceiver<E>, NeverReceiver<E>> { | ||
| /// Create an empty MuxReceiver. | ||
| /// | ||
| /// Use `.with()` to add receivers. | ||
| pub fn new() -> Self { | ||
| Self { | ||
| left: NeverReceiver::new(), | ||
| right: NeverReceiver::new(), | ||
| _phantom: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<E> Default for MuxReceiver<E, NeverReceiver<E>, NeverReceiver<E>> { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl<E, L: Receiver<E>, R1: Receiver<E>> MuxReceiver<E, L, R1> { | ||
| /// Add another receiver to multiplex with this one. | ||
| pub fn with<I, R2: Receiver<I>, F: FnMut(I) -> E>( | ||
| self, | ||
| receiver: R2, | ||
| map_fn: F, | ||
| ) -> MuxReceiver<E, Self, MapReceiver<I, E, R2, F>> { | ||
| MuxReceiver { | ||
| left: self, | ||
| right: MapReceiver::new(receiver, map_fn), | ||
| _phantom: PhantomData, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<E, L: Receiver<E>, R: Receiver<E>> Receiver<E> for MuxReceiver<E, L, R> { | ||
| fn try_next(&mut self) -> Option<E> { | ||
| self.left.try_next().or_else(|| self.right.try_next()) | ||
| } | ||
|
felipebalbi marked this conversation as resolved.
|
||
|
|
||
| async fn wait_next(&mut self) -> E { | ||
| match embassy_futures::select::select(self.left.wait_next(), self.right.wait_next()).await { | ||
| embassy_futures::select::Either::First(e) => e, | ||
| embassy_futures::select::Either::Second(e) => e, | ||
| } | ||
| } | ||
|
Comment on lines
+346
to
+351
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I'll add documentation to the Receiver trait that methods must be cancel-safe and that should not be problematic because all |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,10 @@ async fn main(spawner: embassy_executor::Spawner) { | |
| TimeAlarm, 0x0B, crate::TimeAlarmServiceRelayHandlerType; | ||
| ); | ||
|
|
||
| let _relay_handler = EspiRelayHandler::new(TimeAlarmServiceRelayHandlerType::new(time_service)); | ||
| let _relay_handler = EspiRelayHandler::new( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. re: this:
I think what I'm pushing for is to have the shape of this be something like this instead: let _relay_handler = EspiRelayHandler::new(
TimeAlarmServiceRelayHandlerType::new(time_service, embedded_services::event::NeverReceiver::new()),
);and then the espiRelayHandler can interrogate the TimeAlarmServiceRelayHandlerType for a receiver to its notification channel, which will likely be of a different type than the receiver that's passed into the TimeAlarmServiceRelayHandlerType's constructor because the shape of notifications is going to be different across different transport protocols (e.g. MCTP might only support a This approach has a few advantages:
Based on the work I've done on HID stuff I'm pretty convinced it's possible to do this and satisfy the borrow checker, it just requires a few weird-looking (to my eye, anyway) constructs around when you listen to the channel - happy to chat more if you're running into problems there :) |
||
| TimeAlarmServiceRelayHandlerType::new(time_service), | ||
| embedded_services::event::NeverReceiver::new(), | ||
| ); | ||
|
|
||
| // Here, you'd normally pass _relay_handler to your relay service (e.g. eSPI service). | ||
| // In this example, we're not leveraging a relay service, so we'll just demonstrate some direct calls. | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does
.with() to add receiversmean?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when you call
MuxReceiver::new()it basically returns an empty receiver. To "multiplex" additional receivers you need to callMuxReceiver::with(...)which consumesselfand spits out a newMuxReceiverwith the additional receiver multiplexed into it. You can keep chainingwithcalls indefinitely to keep adding receivers.Designed it this way because it was easier than trying to just take a list of N receivers statically upfront or just a slice of indeterminate length. This also feels a bit more ergonomic to select over because an array or slice of Receivers means I need to first map the array of receivers to an array of futures, then perform the select, etc.