Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 122 additions & 3 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3125,8 +3125,16 @@ impl DomainFronter {
entry.stream.write_all(&payload).await?;
entry.stream.flush().await?;

// Use the configured `request_timeout_secs` for the header read,
// not the hardcoded 10 s default. With Apps Script cold starts
// routinely landing in the 8–12 s range, the 10 s cliff was
// firing as a false-positive batch timeout (issue #1088), killing
// every in-flight tunnel session under it. The outer
// `tokio::time::timeout(batch_timeout, ...)` in `fire_batch`
// remains the authoritative bound on total batch round-trip time.
let batch_timeout = self.batch_timeout();
let (mut status, mut resp_headers, mut resp_body) =
read_http_response(&mut entry.stream).await?;
read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?;

// Follow redirect chain
for _ in 0..5 {
Expand All @@ -3139,7 +3147,8 @@ impl DomainFronter {
);
entry.stream.write_all(req.as_bytes()).await?;
entry.stream.flush().await?;
let (s, h, b) = read_http_response(&mut entry.stream).await?;
let (s, h, b) =
read_http_response_with_header_timeout(&mut entry.stream, batch_timeout).await?;
status = s; resp_headers = h; resp_body = b;
}

Expand Down Expand Up @@ -4242,14 +4251,50 @@ fn parse_redirect(location: &str) -> (String, Option<String>) {

/// Read a single HTTP/1.1 response from the stream. Keep-alive safe: respects
/// Content-Length or chunked transfer-encoding.
///
/// Uses a 10 s *total* header-read deadline — the historical 10 s value
/// preserved for most callers (relay path, exit-node, etc.). Note the
/// semantics changed in this patch: the underlying loop now treats this
/// as an absolute deadline across all header reads, not a per-read budget
/// that would silently extend on drip-feed. The tunnel batch path overrides
/// the 10 s value via `read_http_response_with_header_timeout`, since the
/// configurable `request_timeout_secs` (default 30 s) is the authoritative
/// cliff there.
async fn read_http_response<S>(stream: &mut S) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
read_http_response_with_header_timeout(stream, Duration::from_secs(10)).await
}

/// `read_http_response` with a caller-supplied header-read timeout. The
/// timeout applies only to the *initial* header-block read; the body-read
/// timeouts in this function are deliberately left at their fixed values
/// because once the response has started flowing, per-chunk stalls are a
/// separate signal from "Apps Script hasn't started writing yet."
///
/// The tunnel batch path passes `DomainFronter::batch_timeout()` so that
/// `Config::request_timeout_secs` is the *only* knob controlling how long
/// we wait for an Apps Script edge to start responding — the hardcoded 10 s
/// inner cliff was firing well before the outer `batch_timeout` in
/// `tunnel_client::fire_batch` could, masquerading as a 10 s "batch
/// timeout" in user logs (issue #1088).
async fn read_http_response_with_header_timeout<S>(
stream: &mut S,
header_read_timeout: Duration,
) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
let mut buf = Vec::with_capacity(8192);
let mut tmp = [0u8; 8192];
// One deadline for the whole header read, not per-iteration. Otherwise
// a slow peer drip-feeding one byte just under `header_read_timeout`
// keeps this loop alive forever and defeats the outer `batch_timeout`
// wiring (the entire point of #1088's fix).
let deadline = tokio::time::Instant::now() + header_read_timeout;
let header_end = loop {
let n = timeout(Duration::from_secs(10), stream.read(&mut tmp)).await
let n = tokio::time::timeout_at(deadline, stream.read(&mut tmp)).await
.map_err(|_| FronterError::Timeout)??;
if n == 0 {
return Err(FronterError::BadResponse("connection closed before headers".into()));
Expand Down Expand Up @@ -5013,6 +5058,80 @@ mod tests {
assert_eq!(got_body, body);
}

/// Issue #1088. The tunnel batch path passes `batch_timeout` (default
/// 30 s, configurable up to 300 s) to `read_http_response_with_header_timeout`
/// so Apps Script cold starts in the 8-12 s range no longer trip a
/// hardcoded 10 s cliff. A regression that re-introduces the old 10 s
/// inner timeout — or that ignores the parameter entirely — would let
/// cold-start batches fail in the field while passing every existing
/// test. This locks the parameter down: headers arriving at virtual
/// T=15 s must succeed when the caller asked for a 30 s budget.
#[tokio::test(start_paused = true)]
async fn read_http_response_respects_configured_header_timeout() {
use tokio::io::AsyncWriteExt;

let (mut client_side, mut server_side) = tokio::io::duplex(8192);
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";

tokio::spawn(async move {
// Slow Apps Script edge: response doesn't start streaming
// for 15 s. Under a 10 s budget this would be Timeout; under
// the 30 s budget the caller passed it must succeed.
tokio::time::sleep(Duration::from_secs(15)).await;
server_side.write_all(response).await.unwrap();
});

let (status, _, body) = read_http_response_with_header_timeout(
&mut client_side,
Duration::from_secs(30),
)
.await
.expect("15 s response must succeed under 30 s header-read budget");
assert_eq!(status, 200);
assert!(body.is_empty());
}

/// The header-read deadline must be *total*, not reset on every read.
/// Without this, a peer that drip-feeds one byte just under the
/// per-read timeout keeps the loop alive forever and defeats the
/// outer `batch_timeout` wiring — defeating the whole point of
/// #1088's fix. This is the regression that would survive a naive
/// revert to `timeout(d, stream.read(...))` inside the loop, because
/// every individual read completes well under `d`. With the
/// `timeout_at(deadline, ...)` form, total elapsed exceeds the
/// deadline and we get `FronterError::Timeout`.
#[tokio::test(start_paused = true)]
async fn read_http_response_header_deadline_is_total_not_per_read() {
use tokio::io::AsyncWriteExt;

let (mut client_side, mut server_side) = tokio::io::duplex(8192);
// Header block is 38 bytes; drip-feeding at 3 s/byte takes 114 s
// total. Each individual read returns within 3 s — well under
// the 10 s budget — so per-read semantics would NOT detect the
// stall.
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec();

tokio::spawn(async move {
for byte in response {
tokio::time::sleep(Duration::from_secs(3)).await;
server_side.write_all(&[byte]).await.unwrap();
server_side.flush().await.unwrap();
}
});

let result = read_http_response_with_header_timeout(
&mut client_side,
Duration::from_secs(10),
)
.await;
assert!(
matches!(result, Err(FronterError::Timeout)),
"drip-feed slower than the total deadline must time out — \
got {:?}",
result.map(|(s, _, _)| s)
);
}

#[tokio::test]
async fn parse_exit_node_response_unwraps_exit_node_envelope() {
// The exit-node path through Apps Script returns exit node's JSON
Expand Down
127 changes: 95 additions & 32 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ const MAX_BATCH_OPS: usize = 50;
// Script's typical response cliff — lives in `default_request_timeout_secs`
// in `config.rs`.

/// Timeout for a session waiting for its batch reply. If the batch task
/// is slow (e.g. one op in the batch has a dead target on the tunnel-node
/// side), the session gives up and retries on the next tick rather than
/// blocking indefinitely.
const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
/// Slack added to the reply-timeout budget on top of `batch_timeout`.
/// Covers spawn/encode overhead and a small margin for clock skew, so
/// the session-side `reply_rx` doesn't fire just before `fire_batch`'s
/// HTTP round-trip would have completed. No retry budget here — each
/// batch makes exactly one attempt (see `fire_batch` docs).
const REPLY_TIMEOUT_SLACK: Duration = Duration::from_secs(5);

/// How long we'll briefly hold the client socket after the local
/// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the
Expand Down Expand Up @@ -280,6 +281,14 @@ pub struct TunnelMux {
/// `(host, port)`, value is the expiry instant. Plain Mutex<HashMap> is
/// fine: it's touched once per CONNECT (cheap) and once per failure.
unreachable_cache: Mutex<HashMap<(String, u16), Instant>>,
/// How long a session waits for its batch reply before giving up and
/// retry-polling on the next tick. Computed at construction from
/// `fronter.batch_timeout() + REPLY_TIMEOUT_SLACK` so the session-
/// side `reply_rx` always outlives `fire_batch`'s single HTTP
/// round-trip. Without runtime derivation, an operator who raises
/// `request_timeout_secs` would see sessions abandon replies just
/// before the batch would have completed.
reply_timeout: Duration,
}

impl TunnelMux {
Expand Down Expand Up @@ -311,6 +320,14 @@ impl TunnelMux {
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
// Reply timeout co-varies with `request_timeout_secs` so an
// operator who raises the batch budget doesn't have sessions
// abandoning replies just before the HTTP round-trip would
// have completed. See the `reply_timeout` field comment for
// the invariant.
let reply_timeout = fronter
.batch_timeout()
.saturating_add(REPLY_TIMEOUT_SLACK);
let (tx, rx) = mpsc::channel(512);
tokio::spawn(mux_loop(rx, fronter, step, max));
Arc::new(Self {
Expand All @@ -326,9 +343,17 @@ impl TunnelMux {
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
reply_timeout,
})
}

/// How long a session waits for its batch reply before retry-polling.
/// Co-varies with `Config::request_timeout_secs` so `fire_batch`'s
/// single HTTP round-trip is always covered.
pub fn reply_timeout(&self) -> Duration {
self.reply_timeout
}

async fn send(&self, msg: MuxMsg) {
let _ = self.tx.send(msg).await;
}
Expand Down Expand Up @@ -849,9 +874,16 @@ fn encode_pending(p: PendingOp) -> BatchOp {
/// Pick a deployment, acquire its per-account concurrency slot, and spawn
/// a batch request task.
///
/// The batch HTTP round-trip is bounded by `BATCH_TIMEOUT` so a slow or
/// dead tunnel-node target cannot hold a pipeline slot (and block waiting
/// sessions) forever.
/// The batch HTTP round-trip is bounded by `DomainFronter::batch_timeout()`
/// so a slow or dead tunnel-node target cannot hold a pipeline slot (and
/// block waiting sessions) forever. Each batch makes a single attempt —
/// no client-side retry against a different deployment, because
/// tunnel-node's `drain_now` mutates the per-session buffer when building
/// a response, so a lost response means lost bytes (silent gap on the
/// client side). Without server-side ack / sequence support a replay
/// would either duplicate writes (payload ops) or silently skip bytes
/// (empty polls). Sessions whose batch times out re-poll on the next
/// tick — same recovery surface as pre-#1088.
async fn fire_batch(
sems: &Arc<HashMap<String, Arc<Semaphore>>>,
fronter: &Arc<DomainFronter>,
Expand Down Expand Up @@ -879,17 +911,18 @@ async fn fire_batch(

// Bounded-wait: if the batch takes longer than the configured
// batch timeout (Config::request_timeout_secs), all sessions in
// this batch get an error and can retry.
// this batch get an error and can retry-poll on the next tick.
let batch_timeout = f.batch_timeout();
let result = tokio::time::timeout(
batch_timeout,
f.tunnel_batch_request_to(&script_id, &data_ops),
)
.await;
let sid_short = &script_id[..script_id.len().min(8)];
tracing::info!(
"batch: {} ops → {}, rtt={:?}",
n_ops,
&script_id[..script_id.len().min(8)],
sid_short,
t0.elapsed()
);

Expand Down Expand Up @@ -925,7 +958,6 @@ async fn fire_batch(
})
.sum();
f.record_today(response_bytes);
let sid_short = &script_id[..script_id.len().min(8)];
for (idx, reply) in data_replies {
if let Some(resp) = batch_resp.r.get(idx) {
let _ = reply.send(Ok((resp.clone(), script_id.clone())));
Expand All @@ -948,25 +980,12 @@ async fn fire_batch(
f.record_timeout_strike(&script_id);
}
let err_msg = format!("{}", e);
let sid_short = &script_id[..script_id.len().min(8)];
// Detect the body string we ship as the v1.8.0 bad-auth
// decoy. v1.8.1 asserted "AUTH_KEY mismatch" outright, but
// #404 (w0l4i) found the same body comes back from Apps
// Script in 3 other unrelated cases too:
//
// 1. AUTH_KEY mismatch — our intentional decoy
// 2. Apps Script execution timeout/ — runtime hit 6-min
// mid-call quota tear cap or per-100s quota
// 3. Apps Script internal hiccup — Google-side flake,
// serves placeholder
// 4. ISP-side response truncation — #313 pattern, the
// response was assembled
// but ate an RST mid-flight
//
// So we surface all four candidates instead of asserting #1.
// Users can flip DIAGNOSTIC_MODE=true in Code.gs to disambiguate:
// only #1 still returns the decoy in diagnostic mode; the
// others return real JSON or different errors.
// Decoy / Apps-Script-flake detection. This body string can
// mean any of 4 unrelated things (AUTH_KEY mismatch, Apps
// Script execution timeout, Google-side flake, ISP-side
// truncation #313), so surface all candidates rather than
// asserting one. Operators can flip DIAGNOSTIC_MODE in
// Code.gs to disambiguate (#404).
if err_msg.contains("The script completed but did not return anything") {
tracing::error!(
"batch failed (script {}): got the v1.8.0 decoy/placeholder body — \
Expand All @@ -993,7 +1012,6 @@ async fn fire_batch(
// per-read timeout — count it the same way so a truly-stuck
// deployment exits round-robin fast.
f.record_timeout_strike(&script_id);
let sid_short = &script_id[..script_id.len().min(8)];
tracing::warn!(
"batch timed out after {:?} (script {}, {} ops)",
batch_timeout,
Expand Down Expand Up @@ -1368,7 +1386,7 @@ async fn tunnel_loop(
// Bounded-wait on reply: if the batch this op landed in is slow
// (dead target on the tunnel-node side), don't block this session
// forever — timeout and let it retry on the next tick.
let (resp, script_id) = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await {
let (resp, script_id) = match tokio::time::timeout(mux.reply_timeout(), reply_rx).await {
Ok(Ok(Ok((r, sid_used)))) => (r, sid_used),
Ok(Ok(Err(e))) => {
tracing::debug!("tunnel data error: {}", e);
Expand Down Expand Up @@ -1719,10 +1737,55 @@ mod tests {
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
// Tests that exercise the reply-timeout path expect a
// generous fixed value here; production derives this from
// `fronter.batch_timeout()` (see `TunnelMux::start`).
reply_timeout: Duration::from_secs(35),
});
(mux, rx)
}

/// `TunnelMux::reply_timeout` must co-vary with the configured
/// `request_timeout_secs` plus `REPLY_TIMEOUT_SLACK`. Without this
/// runtime derivation, operators who raise `request_timeout_secs`
/// see sessions abandon `reply_rx` just before `fire_batch`'s
/// HTTP round-trip would have completed — silently orphaning
/// in-flight responses. The test muxes hardcode a value for
/// convenience, so a regression in `TunnelMux::start`'s formula
/// could ship unnoticed unless we exercise the real construction
/// path.
#[tokio::test]
async fn mux_reply_timeout_tracks_batch_timeout_plus_slack() {
use crate::config::Config;

// Pick a non-default `request_timeout_secs` so the assertion
// would fail under any hardcoded value (35 s in tests, 75 s in
// the previous patch).
let cfg: Config = serde_json::from_str(
r#"{
"mode": "apps_script",
"google_ip": "127.0.0.1",
"front_domain": "www.google.com",
"script_id": "TEST",
"auth_key": "test_auth_key",
"listen_host": "127.0.0.1",
"listen_port": 8085,
"log_level": "info",
"verify_ssl": true,
"request_timeout_secs": 60
}"#,
)
.unwrap();
let fronter = Arc::new(DomainFronter::new(&cfg).expect("test fronter must construct"));
let mux = TunnelMux::start(fronter, 0, 0);

assert_eq!(
mux.reply_timeout(),
Duration::from_secs(60) + REPLY_TIMEOUT_SLACK,
"reply_timeout must equal batch_timeout + REPLY_TIMEOUT_SLACK"
);
}

/// The buffered ClientHello from the pre-read window must reach the
/// tunnel-node as the first `Data` op on the fallback path. If this
/// regresses, every TLS handshake stalls until the 30 s read-timeout
Expand Down
Loading