Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/cli/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::io::{self, Write};
use std::sync::mpsc::{self, RecvTimeoutError};
use std::time::Duration;

use crate::app::AppFacade;
use crate::discovery::MdnsDiscoveryService;
Expand All @@ -11,6 +13,13 @@ use super::output::{
};
use super::parse::{CliCommand, CliOptions};

const PLAYBACK_POLL_INTERVAL: Duration = Duration::from_millis(250);

enum CaptureExit {
UserRequestedStop,
PlaybackFailed(RairstreamError),
}

pub fn run_cli(cli: CliOptions) -> Result<(), RairstreamError> {
let mut facade = AppFacade::new(MdnsDiscoveryService::default())?;

Expand Down Expand Up @@ -55,10 +64,16 @@ pub fn run_cli(cli: CliOptions) -> Result<(), RairstreamError> {
CliCommand::PlayCapture { selectors } => {
let session = facade.play_capture(&selectors)?;
print_play_capture_started(&selectors);
wait_for_ctrl_c()?;
facade.stop_capture(session)?;
print_play_capture_stopped(&selectors);
Ok(())
match wait_for_ctrl_c_or_capture_end(&session)? {
CaptureExit::UserRequestedStop => {
facade.stop_capture(session)?;
print_play_capture_stopped(&selectors);
Ok(())
}
CaptureExit::PlaybackFailed(error) => {
stop_capture_after_failure(&mut facade, session, error)
}
}
}
}
}
Expand All @@ -77,12 +92,39 @@ fn prompt_pairing_pin(receiver_name: &str) -> Result<String, RairstreamError> {
Ok(pin.to_string())
}

fn wait_for_ctrl_c() -> Result<(), RairstreamError> {
let (sender, receiver) = std::sync::mpsc::channel();
fn wait_for_ctrl_c_or_capture_end(
session: &crate::session::PlaybackSession,
) -> Result<CaptureExit, RairstreamError> {
let (sender, receiver) = mpsc::channel();
ctrlc::set_handler(move || {
let _ = sender.send(());
})
.map_err(std::io::Error::other)?;
receiver.recv().map_err(std::io::Error::other)?;
Ok(())

loop {
match receiver.recv_timeout(PLAYBACK_POLL_INTERVAL) {
Ok(()) => return Ok(CaptureExit::UserRequestedStop),
Err(RecvTimeoutError::Timeout) => {
if let Some(error) = session.transport_error() {
return Ok(CaptureExit::PlaybackFailed(error));
}
}
Err(RecvTimeoutError::Disconnected) => {
return Err(std::io::Error::other("control-c listener disconnected").into());
}
}
}
}

fn stop_capture_after_failure(
facade: &mut AppFacade<MdnsDiscoveryService>,
session: crate::session::PlaybackSession,
error: RairstreamError,
) -> Result<(), RairstreamError> {
match facade.stop_capture(session) {
Ok(()) => Err(error),
Err(stop_error) => Err(RairstreamError::Playback {
message: format!("{error}; cleanup failed: {stop_error}"),
}),
}
}
29 changes: 18 additions & 11 deletions src/rtsp/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;

Expand Down Expand Up @@ -135,7 +132,7 @@ struct RtspKeepaliveWorker {
cseq: u32,
interval: Duration,
command_rx: Receiver<RtspKeepaliveCommand>,
transport_terminated: Arc<AtomicBool>,
transport_error: Arc<Mutex<Option<AirPlayError>>>,
}

impl RtspKeepalive {
Expand All @@ -145,7 +142,7 @@ impl RtspKeepalive {
session_id: String,
initial_cseq: u32,
interval: Duration,
transport_terminated: Arc<AtomicBool>,
transport_error: Arc<Mutex<Option<AirPlayError>>>,
) -> Result<Self, AirPlayError> {
let endpoint = descriptor.device.endpoint();
let (command_tx, command_rx) = mpsc::channel();
Expand All @@ -159,7 +156,7 @@ impl RtspKeepalive {
cseq: initial_cseq,
interval,
command_rx,
transport_terminated,
transport_error,
}
.run();
})
Expand Down Expand Up @@ -214,6 +211,16 @@ impl RtspKeepalive {
}

impl RtspKeepaliveWorker {
fn record_transport_error(&self, error: AirPlayError) {
let mut state = self
.transport_error
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if state.is_none() {
*state = Some(error);
}
}

fn run(mut self) {
let endpoint = self.descriptor.device.endpoint();

Expand Down Expand Up @@ -249,7 +256,7 @@ impl RtspKeepaliveWorker {
error = %error,
"RTSP keepalive received failure response"
);
self.transport_terminated.store(true, Ordering::SeqCst);
self.record_transport_error(error);
break;
}
trace!(
Expand All @@ -266,7 +273,7 @@ impl RtspKeepaliveWorker {
error = %error,
"RTSP keepalive failed"
);
self.transport_terminated.store(true, Ordering::SeqCst);
self.record_transport_error(error);
break;
}
}
Expand Down Expand Up @@ -329,7 +336,7 @@ pub(crate) fn map_connection_error(error: std::io::Error) -> AirPlayError {
mod tests {
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, atomic::AtomicBool, mpsc};
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -419,7 +426,7 @@ mod tests {
String::from("deadbeef"),
0,
Duration::from_secs(1),
Arc::new(AtomicBool::new(false)),
Arc::new(Mutex::new(None)),
)
.unwrap();

Expand Down
23 changes: 14 additions & 9 deletions src/session/raop.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
//! classic `RAOP` 会话、握手与连接生命周期实现。

use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};

use super::{AirPlayError, SessionDescriptor};
Expand Down Expand Up @@ -39,7 +36,7 @@ pub struct RaopConnection {
control_target: SocketAddr,
timing_responder: Option<TimingResponder>,
rtsp_keepalive: Option<SharedRtspKeepalive>,
transport_terminated: Arc<AtomicBool>,
transport_error: Arc<Mutex<Option<AirPlayError>>>,
}

impl RaopConnection {
Expand Down Expand Up @@ -76,7 +73,15 @@ impl RaopConnection {

#[must_use]
pub fn is_terminated(&self) -> bool {
self.transport_terminated.load(Ordering::SeqCst)
self.transport_error().is_some()
}

#[must_use]
pub fn transport_error(&self) -> Option<AirPlayError> {
self.transport_error
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}

fn stop_timing_responder(&mut self) {
Expand Down Expand Up @@ -281,14 +286,14 @@ impl RaopSession {
self.apply_record_response(&record_response)?;

let keepalive_interval = compute_rtsp_keepalive_interval(setup_reply.session_timeout_secs);
let transport_terminated = Arc::new(AtomicBool::new(false));
let transport_error = Arc::new(Mutex::new(None));
let rtsp_keepalive = SharedRtspKeepalive::start(
rtsp_client,
self.descriptor.clone(),
setup_reply.session_id.clone(),
self.cseq,
keepalive_interval,
Arc::clone(&transport_terminated),
Arc::clone(&transport_error),
)?;
let audio_target =
resolve_socket_addr(&self.descriptor.device.host, setup_reply.server_port)?;
Expand All @@ -306,7 +311,7 @@ impl RaopSession {
control_target,
timing_responder: Some(timing_responder),
rtsp_keepalive: Some(rtsp_keepalive),
transport_terminated,
transport_error,
})
}

Expand Down
Loading
Loading