diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 3bd788f..a41fe3e 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -3125,8 +3125,16 @@ impl DomainFronter { entry.stream.write_all(&payload).await?; entry.stream.flush().await?; + // Use the configured `request_timeout_secs` for the header read, + // not the hardcoded 10 s default. With Apps Script cold starts + // routinely landing in the 8–12 s range, the 10 s cliff was + // firing as a false-positive batch timeout (issue #1088), killing + // every in-flight tunnel session under it. The outer + // `tokio::time::timeout(batch_timeout, ...)` in `fire_batch` + // remains the authoritative bound on total batch round-trip time. + let batch_timeout = self.batch_timeout(); let (mut status, mut resp_headers, mut resp_body) = - read_http_response(&mut entry.stream).await?; + read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?; // Follow redirect chain for _ in 0..5 { @@ -3139,7 +3147,8 @@ impl DomainFronter { ); entry.stream.write_all(req.as_bytes()).await?; entry.stream.flush().await?; - let (s, h, b) = read_http_response(&mut entry.stream).await?; + let (s, h, b) = + read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?; status = s; resp_headers = h; resp_body = b; } @@ -4242,14 +4251,50 @@ fn parse_redirect(location: &str) -> (String, Option) { /// Read a single HTTP/1.1 response from the stream. Keep-alive safe: respects /// Content-Length or chunked transfer-encoding. +/// +/// Uses a 10 s *total* header-read deadline — the historical 10 s value +/// preserved for most callers (relay path, exit-node, etc.). Note the +/// semantics changed in this patch: the underlying loop now treats this +/// as an absolute deadline across all header reads, not a per-read budget +/// that would silently extend on drip-feed. The tunnel batch path overrides +/// the 10 s value via `read_http_response_with_header_timeout`, since the +/// configurable `request_timeout_secs` (default 30 s) is the authoritative +/// cliff there. async fn read_http_response(stream: &mut S) -> Result<(u16, Vec<(String, String)>, Vec), FronterError> +where + S: tokio::io::AsyncRead + Unpin, +{ + read_http_response_with_header_timeout(stream, Duration::from_secs(10)).await +} + +/// `read_http_response` with a caller-supplied header-read timeout. The +/// timeout applies only to the *initial* header-block read; the body-read +/// timeouts in this function are deliberately left at their fixed values +/// because once the response has started flowing, per-chunk stalls are a +/// separate signal from "Apps Script hasn't started writing yet." +/// +/// The tunnel batch path passes `DomainFronter::batch_timeout()` so that +/// `Config::request_timeout_secs` is the *only* knob controlling how long +/// we wait for an Apps Script edge to start responding — the hardcoded 10 s +/// inner cliff was firing well before the outer `batch_timeout` in +/// `tunnel_client::fire_batch` could, masquerading as a 10 s "batch +/// timeout" in user logs (issue #1088). +async fn read_http_response_with_header_timeout( + stream: &mut S, + header_read_timeout: Duration, +) -> Result<(u16, Vec<(String, String)>, Vec), FronterError> where S: tokio::io::AsyncRead + Unpin, { let mut buf = Vec::with_capacity(8192); let mut tmp = [0u8; 8192]; + // One deadline for the whole header read, not per-iteration. Otherwise + // a slow peer drip-feeding one byte just under `header_read_timeout` + // keeps this loop alive forever and defeats the outer `batch_timeout` + // wiring (the entire point of #1088's fix). + let deadline = tokio::time::Instant::now() + header_read_timeout; let header_end = loop { - let n = timeout(Duration::from_secs(10), stream.read(&mut tmp)).await + let n = tokio::time::timeout_at(deadline, stream.read(&mut tmp)).await .map_err(|_| FronterError::Timeout)??; if n == 0 { return Err(FronterError::BadResponse("connection closed before headers".into())); @@ -5013,6 +5058,80 @@ mod tests { assert_eq!(got_body, body); } + /// Issue #1088. The tunnel batch path passes `batch_timeout` (default + /// 30 s, configurable up to 300 s) to `read_http_response_with_header_timeout` + /// so Apps Script cold starts in the 8-12 s range no longer trip a + /// hardcoded 10 s cliff. A regression that re-introduces the old 10 s + /// inner timeout — or that ignores the parameter entirely — would let + /// cold-start batches fail in the field while passing every existing + /// test. This locks the parameter down: headers arriving at virtual + /// T=15 s must succeed when the caller asked for a 30 s budget. + #[tokio::test(start_paused = true)] + async fn read_http_response_respects_configured_header_timeout() { + use tokio::io::AsyncWriteExt; + + let (mut client_side, mut server_side) = tokio::io::duplex(8192); + let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + + tokio::spawn(async move { + // Slow Apps Script edge: response doesn't start streaming + // for 15 s. Under a 10 s budget this would be Timeout; under + // the 30 s budget the caller passed it must succeed. + tokio::time::sleep(Duration::from_secs(15)).await; + server_side.write_all(response).await.unwrap(); + }); + + let (status, _, body) = read_http_response_with_header_timeout( + &mut client_side, + Duration::from_secs(30), + ) + .await + .expect("15 s response must succeed under 30 s header-read budget"); + assert_eq!(status, 200); + assert!(body.is_empty()); + } + + /// The header-read deadline must be *total*, not reset on every read. + /// Without this, a peer that drip-feeds one byte just under the + /// per-read timeout keeps the loop alive forever and defeats the + /// outer `batch_timeout` wiring — defeating the whole point of + /// #1088's fix. This is the regression that would survive a naive + /// revert to `timeout(d, stream.read(...))` inside the loop, because + /// every individual read completes well under `d`. With the + /// `timeout_at(deadline, ...)` form, total elapsed exceeds the + /// deadline and we get `FronterError::Timeout`. + #[tokio::test(start_paused = true)] + async fn read_http_response_header_deadline_is_total_not_per_read() { + use tokio::io::AsyncWriteExt; + + let (mut client_side, mut server_side) = tokio::io::duplex(8192); + // Header block is 38 bytes; drip-feeding at 3 s/byte takes 114 s + // total. Each individual read returns within 3 s — well under + // the 10 s budget — so per-read semantics would NOT detect the + // stall. + let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(); + + tokio::spawn(async move { + for byte in response { + tokio::time::sleep(Duration::from_secs(3)).await; + server_side.write_all(&[byte]).await.unwrap(); + server_side.flush().await.unwrap(); + } + }); + + let result = read_http_response_with_header_timeout( + &mut client_side, + Duration::from_secs(10), + ) + .await; + assert!( + matches!(result, Err(FronterError::Timeout)), + "drip-feed slower than the total deadline must time out — \ + got {:?}", + result.map(|(s, _, _)| s) + ); + } + #[tokio::test] async fn parse_exit_node_response_unwraps_exit_node_envelope() { // The exit-node path through Apps Script returns exit node's JSON diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index cb6ce12..d09817a 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -45,11 +45,12 @@ const MAX_BATCH_OPS: usize = 50; // Script's typical response cliff — lives in `default_request_timeout_secs` // in `config.rs`. -/// Timeout for a session waiting for its batch reply. If the batch task -/// is slow (e.g. one op in the batch has a dead target on the tunnel-node -/// side), the session gives up and retries on the next tick rather than -/// blocking indefinitely. -const REPLY_TIMEOUT: Duration = Duration::from_secs(35); +/// Slack added to the reply-timeout budget on top of `batch_timeout`. +/// Covers spawn/encode overhead and a small margin for clock skew, so +/// the session-side `reply_rx` doesn't fire just before `fire_batch`'s +/// HTTP round-trip would have completed. No retry budget here — each +/// batch makes exactly one attempt (see `fire_batch` docs). +const REPLY_TIMEOUT_SLACK: Duration = Duration::from_secs(5); /// How long we'll briefly hold the client socket after the local /// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the @@ -280,6 +281,14 @@ pub struct TunnelMux { /// `(host, port)`, value is the expiry instant. Plain Mutex is /// fine: it's touched once per CONNECT (cheap) and once per failure. unreachable_cache: Mutex>, + /// How long a session waits for its batch reply before giving up and + /// retry-polling on the next tick. Computed at construction from + /// `fronter.batch_timeout() + REPLY_TIMEOUT_SLACK` so the session- + /// side `reply_rx` always outlives `fire_batch`'s single HTTP + /// round-trip. Without runtime derivation, an operator who raises + /// `request_timeout_secs` would see sessions abandon replies just + /// before the batch would have completed. + reply_timeout: Duration, } impl TunnelMux { @@ -311,6 +320,14 @@ impl TunnelMux { let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS }; let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS }; tracing::info!("batch coalesce: step={}ms max={}ms", step, max); + // Reply timeout co-varies with `request_timeout_secs` so an + // operator who raises the batch budget doesn't have sessions + // abandoning replies just before the HTTP round-trip would + // have completed. See the `reply_timeout` field comment for + // the invariant. + let reply_timeout = fronter + .batch_timeout() + .saturating_add(REPLY_TIMEOUT_SLACK); let (tx, rx) = mpsc::channel(512); tokio::spawn(mux_loop(rx, fronter, step, max)); Arc::new(Self { @@ -326,9 +343,17 @@ impl TunnelMux { preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), unreachable_cache: Mutex::new(HashMap::new()), + reply_timeout, }) } + /// How long a session waits for its batch reply before retry-polling. + /// Co-varies with `Config::request_timeout_secs` so `fire_batch`'s + /// single HTTP round-trip is always covered. + pub fn reply_timeout(&self) -> Duration { + self.reply_timeout + } + async fn send(&self, msg: MuxMsg) { let _ = self.tx.send(msg).await; } @@ -849,9 +874,16 @@ fn encode_pending(p: PendingOp) -> BatchOp { /// Pick a deployment, acquire its per-account concurrency slot, and spawn /// a batch request task. /// -/// The batch HTTP round-trip is bounded by `BATCH_TIMEOUT` so a slow or -/// dead tunnel-node target cannot hold a pipeline slot (and block waiting -/// sessions) forever. +/// The batch HTTP round-trip is bounded by `DomainFronter::batch_timeout()` +/// so a slow or dead tunnel-node target cannot hold a pipeline slot (and +/// block waiting sessions) forever. Each batch makes a single attempt — +/// no client-side retry against a different deployment, because +/// tunnel-node's `drain_now` mutates the per-session buffer when building +/// a response, so a lost response means lost bytes (silent gap on the +/// client side). Without server-side ack / sequence support a replay +/// would either duplicate writes (payload ops) or silently skip bytes +/// (empty polls). Sessions whose batch times out re-poll on the next +/// tick — same recovery surface as pre-#1088. async fn fire_batch( sems: &Arc>>, fronter: &Arc, @@ -879,17 +911,18 @@ async fn fire_batch( // Bounded-wait: if the batch takes longer than the configured // batch timeout (Config::request_timeout_secs), all sessions in - // this batch get an error and can retry. + // this batch get an error and can retry-poll on the next tick. let batch_timeout = f.batch_timeout(); let result = tokio::time::timeout( batch_timeout, f.tunnel_batch_request_to(&script_id, &data_ops), ) .await; + let sid_short = &script_id[..script_id.len().min(8)]; tracing::info!( "batch: {} ops → {}, rtt={:?}", n_ops, - &script_id[..script_id.len().min(8)], + sid_short, t0.elapsed() ); @@ -925,7 +958,6 @@ async fn fire_batch( }) .sum(); f.record_today(response_bytes); - let sid_short = &script_id[..script_id.len().min(8)]; for (idx, reply) in data_replies { if let Some(resp) = batch_resp.r.get(idx) { let _ = reply.send(Ok((resp.clone(), script_id.clone()))); @@ -948,25 +980,12 @@ async fn fire_batch( f.record_timeout_strike(&script_id); } let err_msg = format!("{}", e); - let sid_short = &script_id[..script_id.len().min(8)]; - // Detect the body string we ship as the v1.8.0 bad-auth - // decoy. v1.8.1 asserted "AUTH_KEY mismatch" outright, but - // #404 (w0l4i) found the same body comes back from Apps - // Script in 3 other unrelated cases too: - // - // 1. AUTH_KEY mismatch — our intentional decoy - // 2. Apps Script execution timeout/ — runtime hit 6-min - // mid-call quota tear cap or per-100s quota - // 3. Apps Script internal hiccup — Google-side flake, - // serves placeholder - // 4. ISP-side response truncation — #313 pattern, the - // response was assembled - // but ate an RST mid-flight - // - // So we surface all four candidates instead of asserting #1. - // Users can flip DIAGNOSTIC_MODE=true in Code.gs to disambiguate: - // only #1 still returns the decoy in diagnostic mode; the - // others return real JSON or different errors. + // Decoy / Apps-Script-flake detection. This body string can + // mean any of 4 unrelated things (AUTH_KEY mismatch, Apps + // Script execution timeout, Google-side flake, ISP-side + // truncation #313), so surface all candidates rather than + // asserting one. Operators can flip DIAGNOSTIC_MODE in + // Code.gs to disambiguate (#404). if err_msg.contains("The script completed but did not return anything") { tracing::error!( "batch failed (script {}): got the v1.8.0 decoy/placeholder body — \ @@ -993,7 +1012,6 @@ async fn fire_batch( // per-read timeout — count it the same way so a truly-stuck // deployment exits round-robin fast. f.record_timeout_strike(&script_id); - let sid_short = &script_id[..script_id.len().min(8)]; tracing::warn!( "batch timed out after {:?} (script {}, {} ops)", batch_timeout, @@ -1368,7 +1386,7 @@ async fn tunnel_loop( // Bounded-wait on reply: if the batch this op landed in is slow // (dead target on the tunnel-node side), don't block this session // forever — timeout and let it retry on the next tick. - let (resp, script_id) = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await { + let (resp, script_id) = match tokio::time::timeout(mux.reply_timeout(), reply_rx).await { Ok(Ok(Ok((r, sid_used)))) => (r, sid_used), Ok(Ok(Err(e))) => { tracing::debug!("tunnel data error: {}", e); @@ -1719,10 +1737,55 @@ mod tests { preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), unreachable_cache: Mutex::new(HashMap::new()), + // Tests that exercise the reply-timeout path expect a + // generous fixed value here; production derives this from + // `fronter.batch_timeout()` (see `TunnelMux::start`). + reply_timeout: Duration::from_secs(35), }); (mux, rx) } + /// `TunnelMux::reply_timeout` must co-vary with the configured + /// `request_timeout_secs` plus `REPLY_TIMEOUT_SLACK`. Without this + /// runtime derivation, operators who raise `request_timeout_secs` + /// see sessions abandon `reply_rx` just before `fire_batch`'s + /// HTTP round-trip would have completed — silently orphaning + /// in-flight responses. The test muxes hardcode a value for + /// convenience, so a regression in `TunnelMux::start`'s formula + /// could ship unnoticed unless we exercise the real construction + /// path. + #[tokio::test] + async fn mux_reply_timeout_tracks_batch_timeout_plus_slack() { + use crate::config::Config; + + // Pick a non-default `request_timeout_secs` so the assertion + // would fail under any hardcoded value (35 s in tests, 75 s in + // the previous patch). + let cfg: Config = serde_json::from_str( + r#"{ + "mode": "apps_script", + "google_ip": "127.0.0.1", + "front_domain": "www.google.com", + "script_id": "TEST", + "auth_key": "test_auth_key", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "info", + "verify_ssl": true, + "request_timeout_secs": 60 + }"#, + ) + .unwrap(); + let fronter = Arc::new(DomainFronter::new(&cfg).expect("test fronter must construct")); + let mux = TunnelMux::start(fronter, 0, 0); + + assert_eq!( + mux.reply_timeout(), + Duration::from_secs(60) + REPLY_TIMEOUT_SLACK, + "reply_timeout must equal batch_timeout + REPLY_TIMEOUT_SLACK" + ); + } + /// The buffered ClientHello from the pre-read window must reach the /// tunnel-node as the first `Data` op on the fallback path. If this /// regresses, every TLS handshake stalls until the 30 s read-timeout