From c5ac45b51e80cb08aff22479a123bca5e15687cf Mon Sep 17 00:00:00 2001 From: ByteColtX Date: Thu, 21 May 2026 23:40:14 +0800 Subject: [PATCH 1/3] feat(session): add configurable latency profiles --- src/app/facade.rs | 19 +++++- src/audio/mod.rs | 6 +- src/audio/raop.rs | 1 + src/cli/commands.rs | 15 +++-- src/cli/parse.rs | 120 +++++++++++++++++++++++++++++++------- src/rtsp/protocol.rs | 26 ++++++++- src/session/connect.rs | 4 +- src/session/mod.rs | 107 ++++++++++++++++++++++++++++++++- src/session/raop.rs | 38 ++++++++---- src/session/stream.rs | 16 ++++- src/ui/tray/mod.rs | 10 +++- tests/cli_play_capture.rs | 38 ++++++++++++ tests/cli_play_file.rs | 25 ++++++++ 13 files changed, 375 insertions(+), 50 deletions(-) diff --git a/src/app/facade.rs b/src/app/facade.rs index d9dc7ff..8958264 100644 --- a/src/app/facade.rs +++ b/src/app/facade.rs @@ -11,7 +11,7 @@ use crate::pairing::ReceiverCredentials; use crate::platform; use crate::receiver::{Receiver, selector}; use crate::session::{ - PlaybackSession, pair_receiver_with_pin, play_capture, play_file, + LatencyProfile, PlaybackSession, pair_receiver_with_pin, play_capture, play_file, request_pairing_pin_display as session_request_pairing_pin_display, }; use crate::storage::{paired_devices, receiver_cache}; @@ -240,7 +240,12 @@ where Ok(entry) } - pub fn play_file(&mut self, path: &Path, selectors: &[String]) -> Result<(), RairstreamError> { + pub fn play_file( + &mut self, + path: &Path, + selectors: &[String], + latency_profile: LatencyProfile, + ) -> Result<(), RairstreamError> { let receivers = self.ensure_receivers()?; let targets = selector::resolve_receivers(&receivers, selectors)?; self.state.session = SessionState::Streaming { @@ -251,6 +256,7 @@ where &targets, &self.config.paired_receivers, self.config.sender_volume_percent, + latency_profile, ); self.state.session = SessionState::Idle; result @@ -259,6 +265,7 @@ where pub fn play_capture( &mut self, selectors: &[String], + latency_profile: LatencyProfile, ) -> Result { let receivers = self.ensure_receivers()?; let targets = selector::resolve_receivers(&receivers, selectors)?; @@ -269,6 +276,7 @@ where &targets, &self.config.paired_receivers, self.config.sender_volume_percent, + latency_profile, ) { Ok(session) => Ok(session), Err(error) => { @@ -425,6 +433,7 @@ mod tests { use crate::receiver::{ AirPlayGeneration, AuthMethod, DeviceSupport, Receiver, ReceiverCapabilities, ReceiverKind, }; + use crate::session::LatencyProfile; use super::{AppFacade, DiscoveryService, SessionState, save_config}; @@ -785,7 +794,11 @@ mod tests { .as_nanos() )); - let result = facade.play_file(&missing_file, &[String::from("Living Room")]); + let result = facade.play_file( + &missing_file, + &[String::from("Living Room")], + LatencyProfile::safe(), + ); assert!(result.is_err()); assert_eq!(facade.state().session, SessionState::Idle); diff --git a/src/audio/mod.rs b/src/audio/mod.rs index 6881d43..12d58cc 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -15,9 +15,11 @@ use std::thread::{self, JoinHandle}; pub use decode::FileChunkDecoder; pub use raop::AudioResampler; -pub(crate) use raop::{CodecDescription, RAOP_FRAMES_PER_PACKET, RAOP_STARTUP_LATENCY_FRAMES}; #[cfg(test)] -pub(crate) use raop::{RAOP_SAMPLE_RATE_HZ, RAOP_STARTUP_LATENCY_MILLIS}; +pub(crate) use raop::RAOP_STARTUP_LATENCY_FRAMES; +pub(crate) use raop::{ + CodecDescription, RAOP_FRAMES_PER_PACKET, RAOP_SAMPLE_RATE_HZ, RAOP_STARTUP_LATENCY_MILLIS, +}; /// PCM 样本的数据语义。 #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/audio/raop.rs b/src/audio/raop.rs index 769a9f2..1f992e4 100644 --- a/src/audio/raop.rs +++ b/src/audio/raop.rs @@ -13,6 +13,7 @@ pub(crate) const RAOP_CHANNELS: u16 = 2; pub(crate) const RAOP_BITS_PER_SAMPLE: u16 = 16; pub(crate) const RAOP_FRAMES_PER_PACKET: usize = 352; pub(crate) const RAOP_STARTUP_LATENCY_MILLIS: u32 = 250; +#[cfg(test)] pub(crate) const RAOP_STARTUP_LATENCY_FRAMES: u32 = RAOP_STARTUP_LATENCY_MILLIS * RAOP_SAMPLE_RATE_HZ / 1_000; diff --git a/src/cli/commands.rs b/src/cli/commands.rs index 986608c..a52d95a 100644 --- a/src/cli/commands.rs +++ b/src/cli/commands.rs @@ -62,13 +62,20 @@ pub fn run_cli(cli: CliOptions) -> Result<(), RairstreamError> { print_paired_removed(&entry); Ok(()) } - CliCommand::PlayFile { path, selectors } => { - facade.play_file(&path, &selectors)?; + CliCommand::PlayFile { + path, + selectors, + latency_profile, + } => { + facade.play_file(&path, &selectors, latency_profile)?; print_play_file_completed(&path, &selectors); Ok(()) } - CliCommand::PlayCapture { selectors } => { - let session = facade.play_capture(&selectors)?; + CliCommand::PlayCapture { + selectors, + latency_profile, + } => { + let session = facade.play_capture(&selectors, latency_profile)?; print_play_capture_started(&selectors); match wait_for_ctrl_c_or_capture_end(&session)? { CaptureExit::UserRequestedStop => { diff --git a/src/cli/parse.rs b/src/cli/parse.rs index cf03ca1..4a71753 100644 --- a/src/cli/parse.rs +++ b/src/cli/parse.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use crate::error::RairstreamError; +use crate::session::LatencyProfile; pub(crate) const CLI_USAGE: &str = "usage: rairstream [-h|--help] [-v|-vv] [--log-level ] "; @@ -14,8 +15,8 @@ pub(crate) const CLI_COMMAND_USAGE: &[&str] = &[ "pair --device [--pin ]", "paired list", "paired forget --device ", - "play file --device ...", - "play capture --device ...", + "play file --device ... [--latency ] [--buffer-ms ]", + "play capture --device ... [--latency ] [--buffer-ms ]", ]; #[derive(Debug, Clone, PartialEq, Eq)] @@ -36,9 +37,11 @@ pub enum CliCommand { PlayFile { path: PathBuf, selectors: Vec, + latency_profile: LatencyProfile, }, PlayCapture { selectors: Vec, + latency_profile: LatencyProfile, }, } @@ -49,6 +52,12 @@ pub struct CliOptions { pub verbosity: u8, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum LatencySelection { + Profile(LatencyProfile), + Custom, +} + impl CliOptions { #[must_use] pub fn log_filter(&self) -> &str { @@ -169,14 +178,20 @@ fn parse_play_command(args: &[String]) -> Result { let Some((path, selectors)) = tail.split_first() else { return Err(usage_error()); }; + let (selectors, latency_profile) = parse_play_selectors_and_latency(selectors)?; Ok(CliCommand::PlayFile { path: PathBuf::from(path), - selectors: parse_device_selectors(selectors)?, + selectors, + latency_profile, + }) + } + "capture" => { + let (selectors, latency_profile) = parse_play_selectors_and_latency(tail)?; + Ok(CliCommand::PlayCapture { + selectors, + latency_profile, }) } - "capture" => Ok(CliCommand::PlayCapture { - selectors: parse_device_selectors(tail)?, - }), _ => Err(usage_error()), } } @@ -224,23 +239,56 @@ fn normalize_cli_text(value: &str, label: &str) -> Result Result, RairstreamError> { +fn parse_play_selectors_and_latency( + args: &[String], +) -> Result<(Vec, LatencyProfile), RairstreamError> { let mut selectors = Vec::new(); + let mut latency_selection = None; + let mut custom_buffer_ms = None; let mut index = 0; while index < args.len() { - if args[index] != "--device" { - return Err(RairstreamError::InvalidCli { - message: format!("unexpected argument `{}`", args[index]), - }); + match args[index].as_str() { + "--device" => { + let value = args + .get(index + 1) + .ok_or_else(|| RairstreamError::InvalidCli { + message: String::from("--device requires a value"), + })?; + selectors.push(normalize_cli_text(value, "selector")?); + index += 2; + } + "--latency" => { + let value = args + .get(index + 1) + .ok_or_else(|| missing_value_error("--latency"))?; + latency_selection = Some(parse_latency_selection(value)?); + index += 2; + } + "--buffer-ms" => { + let value = args + .get(index + 1) + .ok_or_else(|| missing_value_error("--buffer-ms"))?; + custom_buffer_ms = Some(parse_buffer_ms(value)?); + index += 2; + } + unexpected if unexpected.starts_with("--latency=") => { + latency_selection = Some(parse_latency_selection( + unexpected.trim_start_matches("--latency="), + )?); + index += 1; + } + unexpected if unexpected.starts_with("--buffer-ms=") => { + custom_buffer_ms = Some(parse_buffer_ms( + unexpected.trim_start_matches("--buffer-ms="), + )?); + index += 1; + } + unexpected => { + return Err(RairstreamError::InvalidCli { + message: format!("unexpected argument `{unexpected}`"), + }); + } } - - let value = args - .get(index + 1) - .ok_or_else(|| RairstreamError::InvalidCli { - message: String::from("--device requires a value"), - })?; - selectors.push(normalize_cli_text(value, "selector")?); - index += 2; } if selectors.is_empty() { @@ -249,7 +297,39 @@ fn parse_device_selectors(args: &[String]) -> Result, RairstreamErro }); } - Ok(selectors) + let latency_profile = match (latency_selection, custom_buffer_ms) { + (Some(LatencySelection::Custom), None) => { + return Err(RairstreamError::InvalidCli { + message: String::from("--latency custom requires --buffer-ms "), + }); + } + (_, Some(buffer_ms)) => LatencyProfile::custom(buffer_ms), + (Some(LatencySelection::Profile(profile)), None) => profile, + (None, None) => LatencyProfile::safe(), + }; + + Ok((selectors, latency_profile)) +} + +fn parse_latency_selection(value: &str) -> Result { + match value { + "safe" => Ok(LatencySelection::Profile(LatencyProfile::safe())), + "normal" => Ok(LatencySelection::Profile(LatencyProfile::normal())), + "low" => Ok(LatencySelection::Profile(LatencyProfile::low())), + "realtime" => Ok(LatencySelection::Profile(LatencyProfile::realtime())), + "custom" => Ok(LatencySelection::Custom), + _ => Err(RairstreamError::InvalidCli { + message: format!("unsupported latency profile `{value}`"), + }), + } +} + +fn parse_buffer_ms(value: &str) -> Result { + value + .parse::() + .map_err(|_| RairstreamError::InvalidCli { + message: format!("--buffer-ms must be a non-negative integer, got `{value}`"), + }) } fn usage_error() -> RairstreamError { diff --git a/src/rtsp/protocol.rs b/src/rtsp/protocol.rs index b4691fb..3d5db36 100644 --- a/src/rtsp/protocol.rs +++ b/src/rtsp/protocol.rs @@ -3,7 +3,7 @@ use std::fmt::Write; use std::net::UdpSocket; -use crate::audio::{CodecDescription, RAOP_STARTUP_LATENCY_FRAMES}; +use crate::audio::CodecDescription; use crate::session::{AirPlayError, SessionDescriptor}; /// `RTSP` 请求方法。 @@ -442,7 +442,7 @@ fn build_pcm_sdp(descriptor: &SessionDescriptor, codec: &CodecDescription) -> St sender_ip, sender_ip, codec.rtpmap, - RAOP_STARTUP_LATENCY_FRAMES + descriptor.latency_profile.buffer_frames() ) } @@ -521,7 +521,7 @@ mod tests { use crate::receiver::{ AirPlayGeneration, AuthMethod, DeviceSupport, Receiver, ReceiverCapabilities, ReceiverKind, }; - use crate::session::{AirPlayError, SessionDescriptor}; + use crate::session::{AirPlayError, LatencyProfile, SessionDescriptor}; fn build_descriptor() -> SessionDescriptor { SessionDescriptor::new( @@ -712,6 +712,26 @@ mod tests { ); } + #[test] + fn announce_request_min_latency_uses_descriptor_buffer_profile() { + let mut descriptor = build_descriptor(); + descriptor.latency_profile = LatencyProfile::low(); + let request = build_announce_request(&descriptor, 8, &CodecDescription::pcm_stereo()); + let body = request.body_text().unwrap(); + + assert!(body.contains("a=min-latency:4410")); + } + + #[test] + fn announce_request_min_latency_allows_custom_zero_buffer() { + let mut descriptor = build_descriptor(); + descriptor.latency_profile = LatencyProfile::custom(0); + let request = build_announce_request(&descriptor, 8, &CodecDescription::pcm_stereo()); + let body = request.body_text().unwrap(); + + assert!(body.contains("a=min-latency:0")); + } + #[test] fn setup_request_contains_transport_header() { let descriptor = build_descriptor(); diff --git a/src/session/connect.rs b/src/session/connect.rs index 80c55d3..fdc6788 100644 --- a/src/session/connect.rs +++ b/src/session/connect.rs @@ -7,7 +7,7 @@ use crate::config::MAX_SENDER_VOLUME_PERCENT; use crate::error::RairstreamError; use crate::pairing::ReceiverCredentials; use crate::receiver::Receiver; -use crate::session::{PreparedSession, SessionConnection, SessionDescriptor}; +use crate::session::{LatencyProfile, PreparedSession, SessionConnection, SessionDescriptor}; use crate::transport::RaopAudioSink; use super::group::FanoutAudioSink; @@ -36,6 +36,7 @@ pub fn connect_receivers( input_format: AudioFormat, paired_receivers: &HashMap, sender_volume_percent: u16, + latency_profile: LatencyProfile, ) -> Result, RairstreamError> where S: BuildHasher, @@ -45,6 +46,7 @@ where for receiver in receivers { let mut descriptor = SessionDescriptor::new(receiver.clone(), input_format); descriptor.sender_volume_percent = sender_volume_percent; + descriptor.latency_profile = latency_profile; if let Some(credentials) = paired_receivers.get(&receiver.id).cloned() { descriptor = descriptor.with_receiver_credentials(credentials); } diff --git a/src/session/mod.rs b/src/session/mod.rs index 838f473..f58bec1 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -7,7 +7,7 @@ pub(crate) mod transport; use std::fmt::Write; -use crate::audio::AudioFormat; +use crate::audio::{AudioFormat, RAOP_SAMPLE_RATE_HZ}; use crate::pairing::ReceiverCredentials; use crate::receiver::Receiver; use thiserror::Error; @@ -27,6 +27,7 @@ pub struct SessionDescriptor { pub input_format: AudioFormat, pub frames_per_packet: usize, pub sender_volume_percent: u16, + pub latency_profile: LatencyProfile, pub receiver_credentials: Option, } @@ -38,6 +39,7 @@ impl SessionDescriptor { input_format, frames_per_packet: crate::audio::RAOP_FRAMES_PER_PACKET, sender_volume_percent: 100, + latency_profile: LatencyProfile::safe(), receiver_credentials: None, } } @@ -108,6 +110,80 @@ impl SessionDescriptor { } } +/// User-facing RAOP startup buffer profile. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct LatencyProfile { + kind: LatencyProfileKind, + buffer_ms: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LatencyProfileKind { + Safe, + Normal, + Low, + Realtime, + Custom, +} + +impl LatencyProfile { + #[must_use] + pub const fn safe() -> Self { + Self { + kind: LatencyProfileKind::Safe, + buffer_ms: crate::audio::RAOP_STARTUP_LATENCY_MILLIS, + } + } + + #[must_use] + pub const fn normal() -> Self { + Self { + kind: LatencyProfileKind::Normal, + buffer_ms: 150, + } + } + + #[must_use] + pub const fn low() -> Self { + Self { + kind: LatencyProfileKind::Low, + buffer_ms: 100, + } + } + + #[must_use] + pub const fn realtime() -> Self { + Self { + kind: LatencyProfileKind::Realtime, + buffer_ms: 50, + } + } + + #[must_use] + pub const fn custom(buffer_ms: u32) -> Self { + Self { + kind: LatencyProfileKind::Custom, + buffer_ms, + } + } + + #[must_use] + pub const fn kind(self) -> LatencyProfileKind { + self.kind + } + + #[must_use] + pub const fn buffer_ms(self) -> u32 { + self.buffer_ms + } + + #[must_use] + pub fn buffer_frames(self) -> u32 { + let frames = u64::from(self.buffer_ms) * u64::from(RAOP_SAMPLE_RATE_HZ) / 1_000; + u32::try_from(frames).unwrap_or(u32::MAX) + } +} + fn hash_identifier_segment(hash: &mut u64, bytes: &[u8]) { for byte in bytes { *hash ^= u64::from(*byte); @@ -163,7 +239,7 @@ pub enum AirPlayError { #[cfg(test)] mod tests { - use super::{AirPlayError, SessionDescriptor}; + use super::{AirPlayError, LatencyProfile, LatencyProfileKind, SessionDescriptor}; use crate::audio::AudioFormat; use crate::audio::{ CodecDescription, RAOP_FRAMES_PER_PACKET, RAOP_SAMPLE_RATE_HZ, RAOP_STARTUP_LATENCY_FRAMES, @@ -196,6 +272,7 @@ mod tests { assert_eq!(descriptor.frames_per_packet, RAOP_FRAMES_PER_PACKET); assert_eq!(descriptor.sender_volume_percent, 100); + assert_eq!(descriptor.latency_profile, LatencyProfile::safe()); } #[test] @@ -293,6 +370,32 @@ mod tests { ); } + #[test] + fn latency_profiles_convert_millis_to_raop_frames() { + assert_eq!(LatencyProfile::safe().buffer_ms(), 250); + assert_eq!(LatencyProfile::safe().buffer_frames(), 11_025); + assert_eq!(LatencyProfile::normal().buffer_frames(), 6_615); + assert_eq!(LatencyProfile::low().buffer_frames(), 4_410); + assert_eq!(LatencyProfile::realtime().buffer_frames(), 2_205); + assert_eq!(LatencyProfile::custom(0).buffer_frames(), 0); + assert_eq!(LatencyProfile::custom(123).buffer_frames(), 5_424); + } + + #[test] + fn latency_profiles_report_kind() { + assert_eq!(LatencyProfile::safe().kind(), LatencyProfileKind::Safe); + assert_eq!(LatencyProfile::normal().kind(), LatencyProfileKind::Normal); + assert_eq!(LatencyProfile::low().kind(), LatencyProfileKind::Low); + assert_eq!( + LatencyProfile::realtime().kind(), + LatencyProfileKind::Realtime + ); + assert_eq!( + LatencyProfile::custom(10).kind(), + LatencyProfileKind::Custom + ); + } + #[test] fn raop_session_reports_transport_name() { assert_eq!(RaopSession::transport_name(), "raop"); diff --git a/src/session/raop.rs b/src/session/raop.rs index d633b20..86d47a5 100644 --- a/src/session/raop.rs +++ b/src/session/raop.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use super::{AirPlayError, SessionDescriptor}; -use crate::audio::{CodecDescription, RAOP_STARTUP_LATENCY_FRAMES}; +use crate::audio::CodecDescription; use crate::rtsp::client::{ compute_rtsp_keepalive_interval, ensure_success, format_response_status, map_connection_error, }; @@ -126,6 +126,8 @@ impl RaopSession { pub fn connect(descriptor: &SessionDescriptor) -> Result { descriptor.validate()?; + let latency_profile = descriptor.latency_profile; + let buffer_frames = latency_profile.buffer_frames(); let (initial_sequence, initial_timestamp, audio_ssrc) = build_rtp_session_seed(descriptor); debug!( device_id = %descriptor.device.id, @@ -139,6 +141,9 @@ impl RaopSession { initial_sequence, initial_timestamp, audio_ssrc, + requested_buffer_ms = latency_profile.buffer_ms(), + requested_buffer_frames = buffer_frames, + latency_profile = ?latency_profile.kind(), "creating RAOP session" ); @@ -411,6 +416,7 @@ fn build_rtp_session_seed(descriptor: &SessionDescriptor) -> (u16, u32, u32) { let initial_sequence = u16::try_from(hash & 0xffff_u64).unwrap_or(1).max(1); let initial_timestamp = apply_startup_latency_offset( u32::try_from((hash >> 16) & 0xffff_ffff_u64).unwrap_or_default(), + descriptor.latency_profile.buffer_frames(), ); let audio_ssrc = u32::try_from((hash >> 8) & 0xffff_ffff_u64) .unwrap_or(1) @@ -419,8 +425,8 @@ fn build_rtp_session_seed(descriptor: &SessionDescriptor) -> (u16, u32, u32) { (initial_sequence, initial_timestamp, audio_ssrc) } -const fn apply_startup_latency_offset(base_timestamp: u32) -> u32 { - base_timestamp.wrapping_add(RAOP_STARTUP_LATENCY_FRAMES) +const fn apply_startup_latency_offset(base_timestamp: u32, startup_latency_frames: u32) -> u32 { + base_timestamp.wrapping_add(startup_latency_frames) } fn hash_identifier_segment(hash: &mut u64, bytes: &[u8]) { @@ -573,19 +579,19 @@ mod tests { #[test] fn startup_latency_offset_adds_raop_baseline_to_timestamp_seed() { assert_eq!( - super::apply_startup_latency_offset(0), + super::apply_startup_latency_offset(0, RAOP_STARTUP_LATENCY_FRAMES), RAOP_STARTUP_LATENCY_FRAMES ); assert_eq!( - super::apply_startup_latency_offset(1_000), + super::apply_startup_latency_offset(1_000, RAOP_STARTUP_LATENCY_FRAMES), 1_000 + RAOP_STARTUP_LATENCY_FRAMES ); } #[test] fn startup_latency_offset_preserves_timestamp_distance_from_baseline() { - let baseline = super::apply_startup_latency_offset(0); - let advanced = super::apply_startup_latency_offset(12_345); + let baseline = super::apply_startup_latency_offset(0, RAOP_STARTUP_LATENCY_FRAMES); + let advanced = super::apply_startup_latency_offset(12_345, RAOP_STARTUP_LATENCY_FRAMES); assert_eq!(advanced - baseline, 12_345); } @@ -593,7 +599,10 @@ mod tests { #[test] fn startup_latency_offset_reaches_u32_max_at_exact_rollover_threshold() { assert_eq!( - super::apply_startup_latency_offset(u32::MAX - RAOP_STARTUP_LATENCY_FRAMES), + super::apply_startup_latency_offset( + u32::MAX - RAOP_STARTUP_LATENCY_FRAMES, + RAOP_STARTUP_LATENCY_FRAMES + ), u32::MAX ); } @@ -601,7 +610,10 @@ mod tests { #[test] fn startup_latency_offset_wraps_to_zero_after_rollover_threshold() { assert_eq!( - super::apply_startup_latency_offset(u32::MAX - RAOP_STARTUP_LATENCY_FRAMES + 1), + super::apply_startup_latency_offset( + u32::MAX - RAOP_STARTUP_LATENCY_FRAMES + 1, + RAOP_STARTUP_LATENCY_FRAMES + ), 0 ); } @@ -609,11 +621,17 @@ mod tests { #[test] fn startup_latency_offset_wraps_at_u32_boundary() { assert_eq!( - super::apply_startup_latency_offset(u32::MAX), + super::apply_startup_latency_offset(u32::MAX, RAOP_STARTUP_LATENCY_FRAMES), RAOP_STARTUP_LATENCY_FRAMES - 1 ); } + #[test] + fn startup_latency_offset_uses_requested_buffer_frames() { + assert_eq!(super::apply_startup_latency_offset(1_000, 4_410), 5_410); + assert_eq!(super::apply_startup_latency_offset(1_000, 0), 1_000); + } + #[test] fn connect_accepts_common_windows_mix_profile() { let session = RaopSession::connect(&build_descriptor(AudioFormat { diff --git a/src/session/stream.rs b/src/session/stream.rs index e43341b..b65ae50 100644 --- a/src/session/stream.rs +++ b/src/session/stream.rs @@ -10,6 +10,7 @@ use crate::error::RairstreamError; use crate::pairing::ReceiverCredentials; use crate::receiver::Receiver; +use super::LatencyProfile; use super::connect::{ConnectedReceiver, build_group_sink, connect_receivers}; pub struct PlaybackSession { @@ -40,13 +41,19 @@ pub fn play_capture( receivers: &[Receiver], paired_receivers: &HashMap, sender_volume_percent: u16, + latency_profile: LatencyProfile, ) -> Result where S: BuildHasher, { let format = WindowsLoopbackCapture::preferred_format()?; - let connections = - connect_receivers(receivers, format, paired_receivers, sender_volume_percent)?; + let connections = connect_receivers( + receivers, + format, + paired_receivers, + sender_volume_percent, + latency_profile, + )?; let sink = match build_group_sink(&connections, format, sender_volume_percent) { Ok(sink) => sink, Err(error) => { @@ -71,6 +78,7 @@ pub fn play_file( receivers: &[Receiver], paired_receivers: &HashMap, sender_volume_percent: u16, + latency_profile: LatencyProfile, ) -> Result<(), RairstreamError> where S: BuildHasher, @@ -89,6 +97,7 @@ where first_chunk.format, paired_receivers, sender_volume_percent, + latency_profile, )?; let mut sink = match build_group_sink(&connections, first_chunk.format, sender_volume_percent) { Ok(sink) => sink, @@ -221,7 +230,7 @@ mod tests { use crate::receiver::{ AirPlayGeneration, AuthMethod, DeviceSupport, Receiver, ReceiverCapabilities, ReceiverKind, }; - use crate::session::AirPlayError; + use crate::session::{AirPlayError, LatencyProfile}; use super::{ PlaybackSession, chunk_duration, combine_playback_results, connect_receivers, stream_chunks, @@ -353,6 +362,7 @@ mod tests { AudioFormat::default(), &paired_receivers, 100, + LatencyProfile::safe(), ) .unwrap(); let session = PlaybackSession { diff --git a/src/ui/tray/mod.rs b/src/ui/tray/mod.rs index 6ccce89..3edb6f3 100644 --- a/src/ui/tray/mod.rs +++ b/src/ui/tray/mod.rs @@ -289,7 +289,10 @@ where ]; } - match self.facade.play_capture(&selected_ids) { + match self + .facade + .play_capture(&selected_ids, crate::session::LatencyProfile::safe()) + { Ok(session) => { self.active_session = Some(session); self.reconnect_due = None; @@ -379,7 +382,10 @@ where let attempt = *attempt; let _ = self.facade.discover(); - if let Ok(session) = self.facade.play_capture(&receiver_ids) { + if let Ok(session) = self + .facade + .play_capture(&receiver_ids, crate::session::LatencyProfile::safe()) + { self.active_session = Some(session); self.reconnect_due = None; self.runtime.start_streaming(receiver_ids); diff --git a/tests/cli_play_capture.rs b/tests/cli_play_capture.rs index 2c53706..714d62b 100644 --- a/tests/cli_play_capture.rs +++ b/tests/cli_play_capture.rs @@ -1,4 +1,5 @@ use rairstream::cli::{CliCommand, parse_cli}; +use rairstream::session::LatencyProfile; #[test] fn parse_play_capture_command_with_multiple_devices() { @@ -16,6 +17,43 @@ fn parse_play_capture_command_with_multiple_devices() { cli.command, CliCommand::PlayCapture { selectors: vec![String::from("Office"), String::from("Bedroom")], + latency_profile: LatencyProfile::safe(), } ); } + +#[test] +fn parse_play_capture_command_with_custom_zero_buffer() { + let cli = parse_cli([ + String::from("play"), + String::from("capture"), + String::from("--device"), + String::from("Office"), + String::from("--buffer-ms"), + String::from("0"), + ]) + .unwrap(); + + assert_eq!( + cli.command, + CliCommand::PlayCapture { + selectors: vec![String::from("Office")], + latency_profile: LatencyProfile::custom(0), + } + ); +} + +#[test] +fn parse_play_capture_custom_latency_requires_buffer_ms() { + let error = parse_cli([ + String::from("play"), + String::from("capture"), + String::from("--device"), + String::from("Office"), + String::from("--latency"), + String::from("custom"), + ]) + .unwrap_err(); + + assert!(error.to_string().contains("--latency custom requires")); +} diff --git a/tests/cli_play_file.rs b/tests/cli_play_file.rs index ce7e53f..3374ed2 100644 --- a/tests/cli_play_file.rs +++ b/tests/cli_play_file.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use rairstream::cli::{CliCommand, parse_cli}; +use rairstream::session::LatencyProfile; #[test] fn parse_play_file_command_with_multiple_devices() { @@ -20,6 +21,30 @@ fn parse_play_file_command_with_multiple_devices() { CliCommand::PlayFile { path: PathBuf::from("song.m4a"), selectors: vec![String::from("Living Room"), String::from("Kitchen")], + latency_profile: LatencyProfile::safe(), + } + ); +} + +#[test] +fn parse_play_file_command_with_low_latency_profile() { + let cli = parse_cli([ + String::from("play"), + String::from("file"), + String::from("song.m4a"), + String::from("--device"), + String::from("Living Room"), + String::from("--latency"), + String::from("low"), + ]) + .unwrap(); + + assert_eq!( + cli.command, + CliCommand::PlayFile { + path: PathBuf::from("song.m4a"), + selectors: vec![String::from("Living Room")], + latency_profile: LatencyProfile::low(), } ); } From 40bac731409006aaf5f87f086500dd70161c1a80 Mon Sep 17 00:00:00 2001 From: ByteColtX Date: Fri, 22 May 2026 00:08:50 +0800 Subject: [PATCH 2/3] feat(session): use smaller realtime packets --- src/audio/raop.rs | 30 ++++++++++++++++++++++++- src/session/connect.rs | 1 + src/session/mod.rs | 34 +++++++++++++++++++++++++++- src/session/raop.rs | 11 +++++++++ src/transport/sink.rs | 51 +++++++++++++++++++++++++++++++++++++++++- 5 files changed, 124 insertions(+), 3 deletions(-) diff --git a/src/audio/raop.rs b/src/audio/raop.rs index 1f992e4..ad8938c 100644 --- a/src/audio/raop.rs +++ b/src/audio/raop.rs @@ -45,6 +45,7 @@ impl CodecDescription { pub struct AudioResampler { source_format: AudioFormat, sender_volume_gain: f64, + frames_per_packet: usize, phase_numerator: u32, pending_input_frames: Vec<[f64; 2]>, pending_output_frames: Vec<[f64; 2]>, @@ -60,10 +61,20 @@ impl AudioResampler { pub fn with_sender_volume_percent( source_format: AudioFormat, sender_volume_percent: u16, + ) -> Self { + Self::with_config(source_format, sender_volume_percent, RAOP_FRAMES_PER_PACKET) + } + + #[must_use] + pub fn with_config( + source_format: AudioFormat, + sender_volume_percent: u16, + frames_per_packet: usize, ) -> Self { let mut resampler = Self { source_format, sender_volume_gain: 1.0, + frames_per_packet: frames_per_packet.max(1), phase_numerator: 0, pending_input_frames: Vec::new(), pending_output_frames: Vec::new(), @@ -106,7 +117,7 @@ impl AudioResampler { Ok(drain_pcm_packets( resampled_frames, - RAOP_FRAMES_PER_PACKET, + self.frames_per_packet, &mut self.pending_output_frames, )) } @@ -683,6 +694,23 @@ mod tests { assert!(packets.iter().all(|packet| packet.len() == 352 * 4)); } + #[test] + fn resampler_uses_configured_frames_per_packet() { + let format = AudioFormat::default(); + let mut resampler = AudioResampler::with_config(format, 100, 128); + let mut bytes = Vec::new(); + for _ in 0..256 { + bytes.extend_from_slice(&1000_i16.to_le_bytes()); + bytes.extend_from_slice(&(-1000_i16).to_le_bytes()); + } + let chunk = AudioChunk::new(format, bytes).unwrap(); + + let packets = resampler.push_chunk(&chunk).unwrap(); + + assert_eq!(packets.len(), 2); + assert!(packets.iter().all(|packet| packet.len() == 128 * 4)); + } + #[test] fn resampler_accepts_matching_raop_pcm_input() { let format = AudioFormat::default(); diff --git a/src/session/connect.rs b/src/session/connect.rs index fdc6788..346c1c5 100644 --- a/src/session/connect.rs +++ b/src/session/connect.rs @@ -47,6 +47,7 @@ where let mut descriptor = SessionDescriptor::new(receiver.clone(), input_format); descriptor.sender_volume_percent = sender_volume_percent; descriptor.latency_profile = latency_profile; + descriptor.frames_per_packet = latency_profile.frames_per_packet(); if let Some(credentials) = paired_receivers.get(&receiver.id).cloned() { descriptor = descriptor.with_receiver_credentials(credentials); } diff --git a/src/session/mod.rs b/src/session/mod.rs index f58bec1..609917f 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -7,7 +7,7 @@ pub(crate) mod transport; use std::fmt::Write; -use crate::audio::{AudioFormat, RAOP_SAMPLE_RATE_HZ}; +use crate::audio::{AudioFormat, RAOP_FRAMES_PER_PACKET, RAOP_SAMPLE_RATE_HZ}; use crate::pairing::ReceiverCredentials; use crate::receiver::Receiver; use thiserror::Error; @@ -182,6 +182,17 @@ impl LatencyProfile { let frames = u64::from(self.buffer_ms) * u64::from(RAOP_SAMPLE_RATE_HZ) / 1_000; u32::try_from(frames).unwrap_or(u32::MAX) } + + #[must_use] + pub const fn frames_per_packet(self) -> usize { + match self.kind { + LatencyProfileKind::Realtime => 128, + LatencyProfileKind::Safe + | LatencyProfileKind::Normal + | LatencyProfileKind::Low + | LatencyProfileKind::Custom => RAOP_FRAMES_PER_PACKET, + } + } } fn hash_identifier_segment(hash: &mut u64, bytes: &[u8]) { @@ -396,6 +407,27 @@ mod tests { ); } + #[test] + fn latency_profiles_select_packet_size() { + assert_eq!( + LatencyProfile::safe().frames_per_packet(), + RAOP_FRAMES_PER_PACKET + ); + assert_eq!( + LatencyProfile::normal().frames_per_packet(), + RAOP_FRAMES_PER_PACKET + ); + assert_eq!( + LatencyProfile::low().frames_per_packet(), + RAOP_FRAMES_PER_PACKET + ); + assert_eq!(LatencyProfile::realtime().frames_per_packet(), 128); + assert_eq!( + LatencyProfile::custom(0).frames_per_packet(), + RAOP_FRAMES_PER_PACKET + ); + } + #[test] fn raop_session_reports_transport_name() { assert_eq!(RaopSession::transport_name(), "raop"); diff --git a/src/session/raop.rs b/src/session/raop.rs index 86d47a5..4b226ab 100644 --- a/src/session/raop.rs +++ b/src/session/raop.rs @@ -528,6 +528,17 @@ mod tests { assert_eq!(session.sink_config().sender_volume_percent, 400); } + #[test] + fn connect_preserves_realtime_packet_size_in_sink_config() { + let mut descriptor = build_descriptor(AudioFormat::default()); + descriptor.latency_profile = crate::session::LatencyProfile::realtime(); + descriptor.frames_per_packet = descriptor.latency_profile.frames_per_packet(); + + let session = RaopSession::connect(&descriptor).unwrap(); + + assert_eq!(session.sink_config().frames_per_packet, 128); + } + #[test] fn connect_rejects_out_of_range_sender_volume_percent() { let mut descriptor = build_descriptor(AudioFormat::default()); diff --git a/src/transport/sink.rs b/src/transport/sink.rs index 1732f70..f4f1afa 100644 --- a/src/transport/sink.rs +++ b/src/transport/sink.rs @@ -69,7 +69,11 @@ impl RaopAudioSink { sync_interval_packets = transport.sink_config.sync_interval_packets, "initializing RAOP audio sink" ); - let mut resampler = AudioResampler::new(source_format); + let mut resampler = AudioResampler::with_config( + source_format, + 100, + transport.sink_config.frames_per_packet, + ); let initial_sender_volume_percent = sender_volume_percent .lock() .map_or(100, |sender_volume_percent| *sender_volume_percent); @@ -293,4 +297,49 @@ mod tests { assert_eq!(control_len, 20); assert_eq!(control_buffer[1] & 0x7f, 84); } + + #[test] + fn raop_audio_sink_uses_configured_packet_size() { + let audio_receiver = UdpSocket::bind("127.0.0.1:0").unwrap(); + let control_receiver = UdpSocket::bind("127.0.0.1:0").unwrap(); + audio_receiver + .set_read_timeout(Some(Duration::from_secs(1))) + .unwrap(); + control_receiver + .set_read_timeout(Some(Duration::from_secs(1))) + .unwrap(); + let transport = RaopStreamTransport { + audio_socket: UdpSocket::bind("127.0.0.1:0").unwrap(), + control_socket: UdpSocket::bind("127.0.0.1:0").unwrap(), + audio_target: audio_receiver.local_addr().unwrap(), + control_target: control_receiver.local_addr().unwrap(), + audio_ssrc: 0x1122_3344, + packet_counters: RaopPacketCounters::new(7, 11), + sink_config: RaopSinkConfig { + frames_per_packet: 128, + sync_interval_packets: 1, + sender_volume_percent: 100, + }, + }; + let format = AudioFormat::default(); + let mut sink = RaopAudioSink::new(format, transport, Arc::new(Mutex::new(100))); + let mut bytes = Vec::new(); + for _ in 0..128 { + bytes.extend_from_slice(&1000_i16.to_le_bytes()); + bytes.extend_from_slice(&(-1000_i16).to_le_bytes()); + } + let chunk = AudioChunk::new(format, bytes).unwrap(); + + sink.write(chunk).unwrap(); + + let mut audio_buffer = [0_u8; 1600]; + let mut control_buffer = [0_u8; 64]; + let (audio_len, _) = audio_receiver.recv_from(&mut audio_buffer).unwrap(); + let (control_len, _) = control_receiver.recv_from(&mut control_buffer).unwrap(); + + assert_eq!(audio_len, 12 + 128 * 4); + assert_eq!(control_len, 20); + assert_eq!(&audio_buffer[4..8], &11_u32.to_be_bytes()); + assert_eq!(&control_buffer[16..20], &(11_u32 + 128).to_be_bytes()); + } } From bc11a0ce007567a3b09367e2527960315c03f8ba Mon Sep 17 00:00:00 2001 From: ByteColtX Date: Fri, 22 May 2026 00:15:12 +0800 Subject: [PATCH 3/3] feat(cli): add custom packet frame tuning --- src/cli/parse.rs | 58 ++++++++++++++++++++++++++++----- src/session/mod.rs | 23 ++++++++++---- tests/cli_play_capture.rs | 67 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 132 insertions(+), 16 deletions(-) diff --git a/src/cli/parse.rs b/src/cli/parse.rs index 4a71753..ad40659 100644 --- a/src/cli/parse.rs +++ b/src/cli/parse.rs @@ -15,8 +15,8 @@ pub(crate) const CLI_COMMAND_USAGE: &[&str] = &[ "pair --device [--pin ]", "paired list", "paired forget --device ", - "play file --device ... [--latency ] [--buffer-ms ]", - "play capture --device ... [--latency ] [--buffer-ms ]", + "play file --device ... [--latency ] [--buffer-ms ] [--packet-frames ]", + "play capture --device ... [--latency ] [--buffer-ms ] [--packet-frames ]", ]; #[derive(Debug, Clone, PartialEq, Eq)] @@ -245,6 +245,7 @@ fn parse_play_selectors_and_latency( let mut selectors = Vec::new(); let mut latency_selection = None; let mut custom_buffer_ms = None; + let mut custom_packet_frames = None; let mut index = 0; while index < args.len() { match args[index].as_str() { @@ -271,6 +272,13 @@ fn parse_play_selectors_and_latency( custom_buffer_ms = Some(parse_buffer_ms(value)?); index += 2; } + "--packet-frames" => { + let value = args + .get(index + 1) + .ok_or_else(|| missing_value_error("--packet-frames"))?; + custom_packet_frames = Some(parse_packet_frames(value)?); + index += 2; + } unexpected if unexpected.starts_with("--latency=") => { latency_selection = Some(parse_latency_selection( unexpected.trim_start_matches("--latency="), @@ -283,6 +291,12 @@ fn parse_play_selectors_and_latency( )?); index += 1; } + unexpected if unexpected.starts_with("--packet-frames=") => { + custom_packet_frames = Some(parse_packet_frames( + unexpected.trim_start_matches("--packet-frames="), + )?); + index += 1; + } unexpected => { return Err(RairstreamError::InvalidCli { message: format!("unexpected argument `{unexpected}`"), @@ -297,15 +311,27 @@ fn parse_play_selectors_and_latency( }); } - let latency_profile = match (latency_selection, custom_buffer_ms) { - (Some(LatencySelection::Custom), None) => { + let latency_profile = match (latency_selection, custom_buffer_ms, custom_packet_frames) { + (Some(LatencySelection::Custom), None, None) => { return Err(RairstreamError::InvalidCli { - message: String::from("--latency custom requires --buffer-ms "), + message: String::from( + "--latency custom requires --buffer-ms or --packet-frames ", + ), }); } - (_, Some(buffer_ms)) => LatencyProfile::custom(buffer_ms), - (Some(LatencySelection::Profile(profile)), None) => profile, - (None, None) => LatencyProfile::safe(), + (_, Some(buffer_ms), packet_frames) => LatencyProfile::custom_with_packet_frames( + buffer_ms, + packet_frames.unwrap_or(crate::audio::RAOP_FRAMES_PER_PACKET), + ), + (_, None, Some(packet_frames)) => { + let buffer_ms = match latency_selection { + Some(LatencySelection::Profile(profile)) => profile.buffer_ms(), + Some(LatencySelection::Custom) | None => LatencyProfile::safe().buffer_ms(), + }; + LatencyProfile::custom_with_packet_frames(buffer_ms, packet_frames) + } + (Some(LatencySelection::Profile(profile)), None, None) => profile, + (None, None, None) => LatencyProfile::safe(), }; Ok((selectors, latency_profile)) @@ -332,6 +358,22 @@ fn parse_buffer_ms(value: &str) -> Result { }) } +fn parse_packet_frames(value: &str) -> Result { + let frames = value + .parse::() + .map_err(|_| RairstreamError::InvalidCli { + message: format!("--packet-frames must be a positive integer, got `{value}`"), + })?; + + if frames == 0 { + return Err(RairstreamError::InvalidCli { + message: String::from("--packet-frames must be greater than 0"), + }); + } + + Ok(frames) +} + fn usage_error() -> RairstreamError { RairstreamError::InvalidCli { message: String::from(CLI_USAGE), diff --git a/src/session/mod.rs b/src/session/mod.rs index 609917f..066edb3 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -115,6 +115,7 @@ impl SessionDescriptor { pub struct LatencyProfile { kind: LatencyProfileKind, buffer_ms: u32, + frames_per_packet: usize, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -132,6 +133,7 @@ impl LatencyProfile { Self { kind: LatencyProfileKind::Safe, buffer_ms: crate::audio::RAOP_STARTUP_LATENCY_MILLIS, + frames_per_packet: RAOP_FRAMES_PER_PACKET, } } @@ -140,6 +142,7 @@ impl LatencyProfile { Self { kind: LatencyProfileKind::Normal, buffer_ms: 150, + frames_per_packet: RAOP_FRAMES_PER_PACKET, } } @@ -148,6 +151,7 @@ impl LatencyProfile { Self { kind: LatencyProfileKind::Low, buffer_ms: 100, + frames_per_packet: RAOP_FRAMES_PER_PACKET, } } @@ -156,14 +160,21 @@ impl LatencyProfile { Self { kind: LatencyProfileKind::Realtime, buffer_ms: 50, + frames_per_packet: 128, } } #[must_use] pub const fn custom(buffer_ms: u32) -> Self { + Self::custom_with_packet_frames(buffer_ms, RAOP_FRAMES_PER_PACKET) + } + + #[must_use] + pub const fn custom_with_packet_frames(buffer_ms: u32, frames_per_packet: usize) -> Self { Self { kind: LatencyProfileKind::Custom, buffer_ms, + frames_per_packet, } } @@ -185,13 +196,7 @@ impl LatencyProfile { #[must_use] pub const fn frames_per_packet(self) -> usize { - match self.kind { - LatencyProfileKind::Realtime => 128, - LatencyProfileKind::Safe - | LatencyProfileKind::Normal - | LatencyProfileKind::Low - | LatencyProfileKind::Custom => RAOP_FRAMES_PER_PACKET, - } + self.frames_per_packet } } @@ -426,6 +431,10 @@ mod tests { LatencyProfile::custom(0).frames_per_packet(), RAOP_FRAMES_PER_PACKET ); + assert_eq!( + LatencyProfile::custom_with_packet_frames(25, 64).frames_per_packet(), + 64 + ); } #[test] diff --git a/tests/cli_play_capture.rs b/tests/cli_play_capture.rs index 714d62b..088543e 100644 --- a/tests/cli_play_capture.rs +++ b/tests/cli_play_capture.rs @@ -44,7 +44,7 @@ fn parse_play_capture_command_with_custom_zero_buffer() { } #[test] -fn parse_play_capture_custom_latency_requires_buffer_ms() { +fn parse_play_capture_custom_latency_requires_tuning_flag() { let error = parse_cli([ String::from("play"), String::from("capture"), @@ -57,3 +57,68 @@ fn parse_play_capture_custom_latency_requires_buffer_ms() { assert!(error.to_string().contains("--latency custom requires")); } + +#[test] +fn parse_play_capture_command_with_custom_packet_frames() { + let cli = parse_cli([ + String::from("play"), + String::from("capture"), + String::from("--device"), + String::from("Office"), + String::from("--buffer-ms"), + String::from("25"), + String::from("--packet-frames"), + String::from("128"), + ]) + .unwrap(); + + assert_eq!( + cli.command, + CliCommand::PlayCapture { + selectors: vec![String::from("Office")], + latency_profile: LatencyProfile::custom_with_packet_frames(25, 128), + } + ); +} + +#[test] +fn parse_play_capture_packet_frames_overrides_profile_packet_size() { + let cli = parse_cli([ + String::from("play"), + String::from("capture"), + String::from("--device"), + String::from("Office"), + String::from("--latency"), + String::from("realtime"), + String::from("--packet-frames"), + String::from("64"), + ]) + .unwrap(); + + assert_eq!( + cli.command, + CliCommand::PlayCapture { + selectors: vec![String::from("Office")], + latency_profile: LatencyProfile::custom_with_packet_frames(50, 64), + } + ); +} + +#[test] +fn parse_play_capture_rejects_zero_packet_frames() { + let error = parse_cli([ + String::from("play"), + String::from("capture"), + String::from("--device"), + String::from("Office"), + String::from("--packet-frames"), + String::from("0"), + ]) + .unwrap_err(); + + assert!( + error + .to_string() + .contains("--packet-frames must be greater") + ); +}