From 57dc546d1da9990ffae706022603126620d42a53 Mon Sep 17 00:00:00 2001 From: yyoyoian-pixel <279225925+yyoyoian-pixel@users.noreply.github.com> Date: Wed, 13 May 2026 02:31:08 +0200 Subject: [PATCH 1/2] feat(tunnel): pipelined polls with adaptive depth and overlapped client reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three improvements to full-tunnel throughput and latency: 1. **Overlapped client reads**: tunnel_loop reads from the client socket concurrently with the batch reply wait via tokio::select!, buffering upload data for the next op instead of blocking on a fresh read timeout. 2. **Pipelined polls with seq echo**: add a per-op sequence number echoed by the tunnel-node so the client can reorder out-of-order replies. Sessions with sustained data flow (consecutive_data >= 2) ramp up to MAX_INFLIGHT_PER_SESSION polls in flight, with 1s stagger between sends so they land in separate batches. Drops to serial on first empty reply. 3. **Adaptive pipeline depth**: idle sessions stay at depth 1 (no extra polls). Data-bearing sessions gradually ramp 1→2→3→...→10. At most MAX_ELEVATED_PER_DEPLOYMENT (6) sessions per deployment can be elevated simultaneously, preventing semaphore exhaustion. Elevation slots are released immediately on first empty reply or session close. Wire protocol: BatchOp and TunnelResponse gain an optional `seq` field. Fully backward compatible — old tunnel-nodes ignore the field, new clients fall back to serial (depth 1) when resp.seq is None. Tunnel-node: LONGPOLL_DEADLINE reduced from 15s to 4s for faster poll turnaround while keeping persistent connections (Telegram) stable. Includes bench-pipeline.sh for comparing serial vs pipelined throughput. Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/bench-pipeline.sh | 140 +++++++++ src/domain_fronter.rs | 4 + src/tunnel_client.rs | 645 +++++++++++++++++++++++++++++--------- tunnel-node/src/main.rs | 53 ++-- 4 files changed, 666 insertions(+), 176 deletions(-) create mode 100755 scripts/bench-pipeline.sh diff --git a/scripts/bench-pipeline.sh b/scripts/bench-pipeline.sh new file mode 100755 index 0000000..65fd2ab --- /dev/null +++ b/scripts/bench-pipeline.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash +# +# bench-pipeline.sh — compare throughput: serial (depth=1) vs pipelined (depth=10) +# +# Builds mhrv-rs twice (patching the INFLIGHT_ACTIVE constant), runs each +# as a local SOCKS5 proxy, downloads through the full tunnel, reports. +# +# Usage: +# ./scripts/bench-pipeline.sh [CONFIG_FILE] +# +# Default: config.json + +set -euo pipefail + +CONFIG="${1:-config.json}" +RUNS=3 +SOCKS_PORT=18088 +HTTP_PORT=18087 +TEST_URL="https://speed.cloudflare.com/__down?bytes=5000000" +SRC="src/tunnel_client.rs" +TMPDIR_BENCH=$(mktemp -d) + +cleanup() { + rm -rf "$TMPDIR_BENCH" + kill $PROXY_PID 2>/dev/null || true + # Restore original constant + sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 10/" "$SRC" 2>/dev/null || true +} +trap cleanup EXIT + +if [ ! -f "$CONFIG" ]; then + echo "ERROR: Config not found: $CONFIG" + exit 1 +fi + +echo "╔══════════════════════════════════════════════╗" +echo "║ Pipeline Throughput Benchmark ║" +echo "╠══════════════════════════════════════════════╣" +echo "║ Config: $CONFIG" +echo "║ Test URL: $TEST_URL" +echo "║ Runs: $RUNS per mode" +echo "╚══════════════════════════════════════════════╝" +echo "" + +# Write a temp config with our ports +TEMP_CONFIG="$TMPDIR_BENCH/config.json" +python3 -c " +import json +with open('$CONFIG') as f: + c = json.load(f) +c['listen_port'] = $HTTP_PORT +c['socks5_port'] = $SOCKS_PORT +c['log_level'] = 'warn' +with open('$TEMP_CONFIG', 'w') as f: + json.dump(c, f) +" + +run_test() { + local label="$1" + local binary="$2" + echo "━━━ $label ━━━" + + $binary -c "$TEMP_CONFIG" & + PROXY_PID=$! + sleep 3 + + if ! kill -0 $PROXY_PID 2>/dev/null; then + echo " ERROR: Proxy failed to start" + return + fi + + # Wait for proxy + for attempt in $(seq 1 15); do + if curl -s --socks5-hostname localhost:$SOCKS_PORT --connect-timeout 5 -o /dev/null https://www.google.com 2>/dev/null; then + break + fi + sleep 1 + done + + local total_bytes=0 + local total_time=0 + + for i in $(seq 1 $RUNS); do + local result + result=$(curl -s --socks5-hostname localhost:$SOCKS_PORT \ + -o /dev/null \ + -w '%{size_download} %{time_total} %{speed_download}' \ + --connect-timeout 30 \ + --max-time 90 \ + "$TEST_URL" 2>/dev/null || echo "0 999 0") + + local bytes time_s speed + bytes=$(echo "$result" | awk '{print $1}') + time_s=$(echo "$result" | awk '{print $2}') + speed=$(echo "$result" | awk '{printf "%.0f", $3/1024}') + + total_bytes=$((total_bytes + ${bytes%.*})) + total_time=$(echo "$total_time + $time_s" | bc) + + printf " Run %d: %.1fs %s KB/s\n" "$i" "$time_s" "$speed" + done + + local avg_speed avg_time + avg_speed=$(echo "scale=1; $total_bytes / $total_time / 1024" | bc 2>/dev/null || echo "0") + avg_time=$(echo "scale=1; $total_time / $RUNS" | bc 2>/dev/null || echo "0") + printf " ➜ Average: %s KB/s (%.1fs per download)\n\n" "$avg_speed" "$avg_time" + + kill $PROXY_PID 2>/dev/null || true + wait $PROXY_PID 2>/dev/null || true + sleep 1 + + echo "$label|$avg_speed|$avg_time" >> "$TMPDIR_BENCH/results.txt" +} + +# Build serial (depth=1) +echo "Building serial mode (INFLIGHT_ACTIVE=1)..." +sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 1/" "$SRC" +cargo build --release 2>&1 | tail -1 +cp target/release/mhrv-rs "$TMPDIR_BENCH/mhrv-serial" + +# Build pipelined (depth=10) +echo "Building pipelined mode (INFLIGHT_ACTIVE=10)..." +sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 10/" "$SRC" +cargo build --release 2>&1 | tail -1 +cp target/release/mhrv-rs "$TMPDIR_BENCH/mhrv-pipelined" + +echo "" + +# Run tests +run_test "Serial (depth=1)" "$TMPDIR_BENCH/mhrv-serial" +run_test "Pipelined (depth=10)" "$TMPDIR_BENCH/mhrv-pipelined" + +# Summary +echo "╔══════════════════════════════════════════════╗" +echo "║ RESULTS ║" +echo "╠══════════════════════════════════════════════╣" +while IFS='|' read -r label speed time; do + printf "║ %-25s %6s KB/s %5ss\n" "$label" "$speed" "$time" +done < "$TMPDIR_BENCH/results.txt" +echo "╚══════════════════════════════════════════════╝" diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 3bd788f..57b2b60 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -514,6 +514,8 @@ pub struct TunnelResponse { /// `e` only when this is `None` and compatibility is needed. #[serde(default)] pub code: Option, + #[serde(default)] + pub seq: Option, } /// A single op in a batch tunnel request. @@ -528,6 +530,8 @@ pub struct BatchOp { pub port: Option, #[serde(skip_serializing_if = "Option::is_none")] pub d: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, } /// Batch tunnel response from Apps Script / tunnel node. diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index cb6ce12..c8a0372 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -5,7 +5,7 @@ //! Each Apps Script deployment (account) gets its own concurrency pool of //! 30 in-flight requests — matching the per-account Apps Script limit. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; // `AtomicU64` from `std::sync::atomic` requires hardware-backed 64-bit // atomics, which 32-bit MIPS (`mipsel-unknown-linux-musl` — our OpenWRT // router target) does not provide — the std type isn't even defined @@ -20,6 +20,7 @@ use std::time::{Duration, Instant}; use base64::engine::general_purpose::STANDARD as B64; use base64::Engine; use bytes::{Bytes, BytesMut}; +use futures_util::stream::{FuturesUnordered, StreamExt}; use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, Semaphore}; @@ -57,6 +58,19 @@ const REPLY_TIMEOUT: Duration = Duration::from_secs(35); /// connect saves one Apps Script round-trip per new flow. const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50); +/// Baseline pipeline depth when idle (no data flowing). +const INFLIGHT_IDLE: usize = 1; + +/// Maximum pipeline depth when data is actively flowing. Ramps up on +/// data-bearing replies, drops back to IDLE after consecutive empties. +const INFLIGHT_ACTIVE: usize = 10; + +/// How many consecutive empty replies before dropping from active to idle depth. +const INFLIGHT_COOLDOWN: u32 = 3; + +/// Max sessions that can run at elevated pipeline depth per deployment. +const MAX_ELEVATED_PER_DEPLOYMENT: u64 = 6; + /// Adaptive coalesce defaults: after each new op arrives, wait another /// step for more ops. Resets on every arrival, up to max from the first /// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`. @@ -173,6 +187,7 @@ enum MuxMsg { Data { sid: String, data: Bytes, + seq: Option, reply: BatchedReply, }, UdpOpen { @@ -208,6 +223,7 @@ struct PendingOp { /// only `connect_data`, which uses presence of `d` as the signal /// that the caller is opting into the bundled-first-bytes flow). encode_empty: bool, + seq: Option, } pub struct TunnelMux { @@ -280,6 +296,9 @@ pub struct TunnelMux { /// `(host, port)`, value is the expiry instant. Plain Mutex is /// fine: it's touched once per CONNECT (cheap) and once per failure. unreachable_cache: Mutex>, + /// How many sessions are currently at elevated pipeline depth (> INFLIGHT_IDLE). + elevated_sessions: AtomicU64, + max_elevated: u64, } impl TunnelMux { @@ -310,7 +329,7 @@ impl TunnelMux { ); let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS }; let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS }; - tracing::info!("batch coalesce: step={}ms max={}ms", step, max); + tracing::info!("batch coalesce: step={}ms max={}ms, pipeline max depth: {}", step, max, INFLIGHT_ACTIVE); let (tx, rx) = mpsc::channel(512); tokio::spawn(mux_loop(rx, fronter, step, max)); Arc::new(Self { @@ -326,6 +345,8 @@ impl TunnelMux { preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), unreachable_cache: Mutex::new(HashMap::new()), + elevated_sessions: AtomicU64::new(0), + max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * unique_n as u64, }) } @@ -679,10 +700,11 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc, c port: Some(port), data: Some(data), encode_empty: true, + seq: None, }; accum.push_or_fire(op, op_bytes, reply, &sems, &fronter).await; } - MuxMsg::Data { sid, data, reply } => { + MuxMsg::Data { sid, data, seq, reply } => { let op_bytes = encoded_len(data.len()); let op = PendingOp { op: "data", @@ -691,6 +713,7 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc, c port: None, data: if data.is_empty() { None } else { Some(data) }, encode_empty: false, + seq, }; accum.push_or_fire(op, op_bytes, reply, &sems, &fronter).await; } @@ -708,6 +731,7 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc, c port: Some(port), data: if data.is_empty() { None } else { Some(data) }, encode_empty: false, + seq: None, }; accum.push_or_fire(op, op_bytes, reply, &sems, &fronter).await; } @@ -720,6 +744,7 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc, c port: None, data: if data.is_empty() { None } else { Some(data) }, encode_empty: false, + seq: None, }; accum.push_or_fire(op, op_bytes, reply, &sems, &fronter).await; } @@ -739,6 +764,7 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc, c port: None, data: None, encode_empty: false, + seq: None, }); } @@ -843,6 +869,7 @@ fn encode_pending(p: PendingOp) -> BatchOp { host: p.host, port: p.port, d, + seq: p.seq, } } @@ -1249,6 +1276,13 @@ fn is_connect_data_unsupported_error_str(e: &str) -> bool { (e.contains("unknown op") || e.contains("unknown tunnel op")) && e.contains("connect_data") } +/// Metadata for one in-flight Data op, returned alongside its reply. +struct InflightMeta { + seq: u64, + was_empty_poll: bool, + send_at: Instant, +} + async fn tunnel_loop( sock: &mut TcpStream, sid: &str, @@ -1256,174 +1290,335 @@ async fn tunnel_loop( mut pending_client_data: Option, ) -> std::io::Result<()> { let (mut reader, mut writer) = sock.split(); - // `BytesMut` + `read_buf` + a per-read decision between - // `split().freeze()` (zero-copy) and `copy_from_slice` + `clear` - // (right-sized copy, buffer reused). - // - // Why the split decision: `bytes` 1.x refcounts the *whole* - // backing allocation, so a frozen `Bytes` from a partial read - // pins all `READ_CHUNK` bytes until it drops. Under semaphore - // saturation or reply timeouts, dozens of small TLS records or - // HTTP/2 frames can each retain ~64 KB instead of their actual - // payload size — order-of-magnitude memory regression on - // constrained targets (router builds with 64 MB RAM). - // - // Threshold: at ≥ half-buffer the saved memcpy outweighs the - // wasted slack, and these reads are typically streaming bulk - // transfer where the `Bytes` flushes through the mux quickly. - // Below that, copy out and `clear()` so the same allocation - // serves the next read — equivalent memory profile to the old - // `vec![0u8; 65536]` + `to_vec()` code on small-read workloads. const READ_CHUNK: usize = 65536; - const ZERO_COPY_THRESHOLD: usize = READ_CHUNK / 2; let mut buf = BytesMut::with_capacity(READ_CHUNK); let mut consecutive_empty = 0u32; + let mut buffered_upload: Option = None; + let mut upload_closed = false; + + let mut next_send_seq: u64 = 0; + let mut next_write_seq: u64 = 0; + let mut pending_writes: BTreeMap = BTreeMap::new(); + let inflight_cap = INFLIGHT_ACTIVE; + let mut max_inflight = INFLIGHT_IDLE.min(inflight_cap); + let mut eof_seen = false; + let mut consecutive_data: u32 = 0; + let mut is_elevated = false; + + enum ReplyOutcome { + Ok(TunnelResponse, String), + BatchErr(String), + Timeout, + Dropped, + } + type ReplyFut = std::pin::Pin + Send>>; + let mut inflight: FuturesUnordered = FuturesUnordered::new(); loop { - // Cadence depends on whether the tunnel-node is doing long-poll - // drains. With long-poll, the server holds empty polls open up - // to its `LONGPOLL_DEADLINE` (~5 s currently), so the client - // can keep this read timeout short — the wait is on the wire, - // not here. Against *legacy* tunnel-nodes (no long-poll, fast - // empty replies), the same short cadence + always-poll behavior - // would generate continuous round-trips on idle sessions and - // burn Apps Script quota. - // - // Both the read timeout and the skip-empty-when-idle decision - // are gated on `all_legacy` — i.e. *every known deployment is - // currently legacy*. Per-deployment "skip when this script is - // legacy" sounds appealing but is unsafe: the next op's - // deployment is chosen by `next_script_id()` only when the - // batch fires, so the loop can't predict where the empty poll - // will land. Suppressing polls based on the *previous* reply's - // script would stall remote→client data on mixed setups — - // round-robin would never reach the long-poll-capable peer for - // this session if every iteration short-circuits before - // sending. Cost of the conservative gate: legacy peers see - // some wasted empty polls when at least one peer is healthy, - // bounded by round-robin fan-out. Worth it to keep pushed - // bytes flowing. let all_legacy = mux.all_servers_legacy(); - let client_data = if let Some(data) = pending_client_data.take() { - Some(data) - } else { - let read_timeout = match (all_legacy, consecutive_empty) { - (_, 0) => Duration::from_millis(20), - (_, 1) => Duration::from_millis(80), - (_, 2) => Duration::from_millis(200), - (false, _) => Duration::from_millis(500), - (true, _) => Duration::from_secs(30), - }; - buf.reserve(READ_CHUNK); - match tokio::time::timeout(read_timeout, reader.read_buf(&mut buf)).await { - Ok(Ok(0)) => break, - Ok(Ok(n)) => { - consecutive_empty = 0; - if n >= ZERO_COPY_THRESHOLD { - // Big read: split off the filled region. The - // frozen `Bytes` is at-least-half-full, so the - // saved 64 KB memcpy outweighs the brief - // retention until the mux drains. - Some(buf.split().freeze()) - } else { - // Small read: copy out a payload-sized `Bytes` - // and `clear()` so the buffer is reused on the - // next iter (no `reserve` allocation needed - // because the alloc stays uniquely owned). - // Bounds retention to actual data even when - // the mux is backpressured. - let owned = Bytes::copy_from_slice(&buf[..n]); - buf.clear(); - Some(owned) + // When no ops are in flight and we can send, we must gather data + // (possibly blocking on client read) before entering the select. + // When ops ARE in flight, send empty polls to keep the pipeline full. + if !eof_seen && inflight.is_empty() { + let client_data = if let Some(data) = pending_client_data.take() { + Some(data) + } else if let Some(data) = buffered_upload.take() { + consecutive_empty = 0; + Some(data) + } else if upload_closed { + None + } else { + let read_timeout = match (all_legacy, consecutive_empty) { + (_, 0) => Duration::from_millis(20), + (_, 1) => Duration::from_millis(80), + (_, 2) => Duration::from_millis(200), + (false, 3..=5) => Duration::from_secs(3), + (false, _) => Duration::from_secs(8), + (true, _) => Duration::from_secs(30), + }; + buf.reserve(READ_CHUNK); + match tokio::time::timeout(read_timeout, reader.read_buf(&mut buf)).await { + Ok(Ok(0)) => break, + Ok(Ok(n)) => { + consecutive_empty = 0; + Some(extract_bytes(&mut buf, n)) } + Ok(Err(_)) => break, + Err(_) => None, } - Ok(Err(_)) => break, - Err(_) => None, - } - }; - - // Skip empty polls only when *every* deployment is legacy. With - // even one long-poll-capable peer, round-robin will land some - // empty polls there where the server holds them open and can - // deliver pushed bytes — that's the whole point of long-poll, - // so we must keep emitting. See the `all_legacy` comment above - // for why a per-deployment gate here would stall mixed setups. - if all_legacy && client_data.is_none() && consecutive_empty > 3 { - continue; - } - - let data = client_data.unwrap_or_else(Bytes::new); - let was_empty_poll = data.is_empty(); - - let (reply_tx, reply_rx) = oneshot::channel(); - let send_at = Instant::now(); - mux.send(MuxMsg::Data { - sid: sid.to_string(), - data, - reply: reply_tx, - }) - .await; + }; - // Bounded-wait on reply: if the batch this op landed in is slow - // (dead target on the tunnel-node side), don't block this session - // forever — timeout and let it retry on the next tick. - let (resp, script_id) = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await { - Ok(Ok(Ok((r, sid_used)))) => (r, sid_used), - Ok(Ok(Err(e))) => { - tracing::debug!("tunnel data error: {}", e); - break; - } - Ok(Err(_)) => break, // channel dropped - Err(_) => { - tracing::warn!("sess {}: reply timeout, retrying", &sid[..sid.len().min(8)]); - consecutive_empty = consecutive_empty.saturating_add(1); + if all_legacy && client_data.is_none() && consecutive_empty > 3 { continue; } - }; - // Per-deployment legacy detection: an empty-in/empty-out round - // trip that finishes well under `LEGACY_DETECT_THRESHOLD` is - // structurally impossible on a long-poll-capable tunnel-node - // (the server holds the response either until data arrives or - // until its long-poll deadline). One observation marks *this - // specific* deployment as legacy for `LEGACY_RECOVER_AFTER`; - // peers stay on the fast path. The aggregate `all_legacy` gate - // only flips once *every* deployment has been so marked. - if was_empty_poll { - let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true); - if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD { - mux.mark_server_no_longpoll(&script_id); + let data = client_data.unwrap_or_else(Bytes::new); + let was_empty_poll = data.is_empty(); + let seq = next_send_seq; + next_send_seq += 1; + let (reply_tx, reply_rx) = oneshot::channel(); + let send_at = Instant::now(); + mux.send(MuxMsg::Data { + sid: sid.to_string(), + data, + seq: Some(seq), + reply: reply_tx, + }) + .await; + tracing::debug!( + "sess {}: send seq={}, inflight=1, {}", + &sid[..sid.len().min(8)], + seq, + if was_empty_poll { "poll" } else { "data" }, + ); + let meta = InflightMeta { seq, was_empty_poll, send_at }; + inflight.push(Box::pin(async move { + match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await { + Ok(Ok(Ok((r, sid)))) => (meta, ReplyOutcome::Ok(r, sid)), + Ok(Ok(Err(e))) => (meta, ReplyOutcome::BatchErr(e)), + Ok(Err(_)) => (meta, ReplyOutcome::Dropped), + Err(_) => (meta, ReplyOutcome::Timeout), + } + })); + + // Pre-fill pipeline when depth > IDLE: send additional polls + // with a brief pause between each so the mux_loop fires them + // in separate batches. At idle depth, refill-on-reply is enough. + while max_inflight > INFLIGHT_IDLE && inflight.len() < max_inflight { + tokio::time::sleep(Duration::from_millis(5)).await; + let seq = next_send_seq; + next_send_seq += 1; + let (reply_tx, reply_rx) = oneshot::channel(); + let send_at = Instant::now(); + tracing::debug!( + "sess {}: send seq={}, inflight={}, poll", + &sid[..sid.len().min(8)], + seq, + inflight.len() + 1, + ); + mux.send(MuxMsg::Data { + sid: sid.to_string(), + data: Bytes::new(), + seq: Some(seq), + reply: reply_tx, + }) + .await; + let meta = InflightMeta { seq, was_empty_poll: true, send_at }; + inflight.push(Box::pin(async move { + match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await { + Ok(Ok(Ok((r, sid)))) => (meta, ReplyOutcome::Ok(r, sid)), + Ok(Ok(Err(e))) => (meta, ReplyOutcome::BatchErr(e)), + Ok(Err(_)) => (meta, ReplyOutcome::Dropped), + Err(_) => (meta, ReplyOutcome::Timeout), + } + })); } } - if let Some(ref e) = resp.e { - tracing::debug!("tunnel error: {}", e); + if inflight.is_empty() && eof_seen { break; } + if inflight.is_empty() { + continue; + } - let got_data = match write_tunnel_response(&mut writer, &resp).await? { - WriteOutcome::Wrote => true, - WriteOutcome::NoData => false, - WriteOutcome::BadBase64 => { - // Tunnel-node gave us garbage; tear the session down but - // do NOT propagate as an io error — the caller's Close - // guard will clean up on the tunnel-node side. - break; - } - }; + let can_read = !upload_closed && buffered_upload.is_none(); + + // Wait for replies, overlapping with client reads. + tokio::select! { + biased; + + Some((meta, outcome)) = inflight.next() => { + match outcome { + ReplyOutcome::Ok(resp, script_id) => { + let has_data = resp.d.as_ref().map(|d| !d.is_empty()).unwrap_or(false); + tracing::debug!( + "sess {}: recv seq={}, rtt={:?}, data={}, inflight={}", + &sid[..sid.len().min(8)], + meta.seq, + meta.send_at.elapsed(), + has_data, + inflight.len(), + ); + if meta.was_empty_poll { + let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true); + if reply_was_empty && meta.send_at.elapsed() < LEGACY_DETECT_THRESHOLD { + mux.mark_server_no_longpoll(&script_id); + } + } - if resp.eof.unwrap_or(false) { - break; - } + if resp.seq.is_none() { + max_inflight = 1; + } + + if let Some(ref e) = resp.e { + tracing::debug!("tunnel error: {}", e); + break; + } + + let is_eof = resp.eof.unwrap_or(false); + let resp_has_seq = resp.seq.is_some(); + + if meta.seq == next_write_seq { + let got_data = match write_tunnel_response(&mut writer, &resp).await? { + WriteOutcome::Wrote => true, + WriteOutcome::NoData => false, + WriteOutcome::BadBase64 => break, + }; + next_write_seq += 1; + if got_data { + consecutive_empty = 0; + consecutive_data = consecutive_data.saturating_add(1); + } else { + consecutive_empty = consecutive_empty.saturating_add(1); + consecutive_data = 0; + } + if is_eof { eof_seen = true; } + + while let Some(entry) = pending_writes.first_entry() { + if *entry.key() != next_write_seq { break; } + let (_, (buffered_resp, _)) = entry.remove_entry(); + let buf_eof = buffered_resp.eof.unwrap_or(false); + match write_tunnel_response(&mut writer, &buffered_resp).await? { + WriteOutcome::Wrote => { consecutive_empty = 0; } + WriteOutcome::NoData => { + consecutive_empty = consecutive_empty.saturating_add(1); + } + WriteOutcome::BadBase64 => break, + } + next_write_seq += 1; + if buf_eof { eof_seen = true; } + } + } else { + pending_writes.insert(meta.seq, (resp, script_id)); + } + + // Adaptive pipeline depth: ramp up when data is + // flowing, drop back when idle. At most + // MAX_ELEVATED_SESSIONS can be above idle depth. + if resp_has_seq { + let prev = max_inflight; + if consecutive_data >= 1 && max_inflight < inflight_cap && !is_elevated { + tracing::debug!( + "sess {}: elevation check: counter={}, cap={}", + &sid[..sid.len().min(8)], + mux.elevated_sessions.load(Ordering::Relaxed), + mux.max_elevated, + ); + } + if consecutive_empty >= 1 && is_elevated { + max_inflight = INFLIGHT_IDLE.min(inflight_cap); + mux.elevated_sessions.fetch_sub(1, Ordering::Relaxed); + is_elevated = false; + } else if consecutive_data >= 2 && max_inflight < inflight_cap { + if !is_elevated { + let cur = mux.elevated_sessions.load(Ordering::Relaxed); + if cur < mux.max_elevated { + mux.elevated_sessions.fetch_add(1, Ordering::Relaxed); + is_elevated = true; + max_inflight = (max_inflight + 1).min(inflight_cap); + } + } else { + max_inflight = (max_inflight + 1).min(inflight_cap); + } + } + if max_inflight != prev { + tracing::debug!( + "sess {}: pipeline depth {} → {}", + &sid[..sid.len().min(8)], + prev, max_inflight, + ); + } + } + + // Fill pipeline slots. When depth is above idle + // (data flowing), fill all slots with 5ms spacing + // so polls land in separate batches. At idle depth, + // send just one refill. + while !eof_seen && inflight.len() < max_inflight && consecutive_empty < 3 { + // Stagger polls 1s apart so they land in + // separate batches. Skip the delay for the + // first refill (immediate) and at idle depth. + if inflight.len() > 0 && max_inflight > INFLIGHT_IDLE { + tokio::time::sleep(Duration::from_secs(1)).await; + } + let data = if let Some(d) = pending_client_data.take() { + d + } else if let Some(d) = buffered_upload.take() { + consecutive_empty = 0; + d + } else { + Bytes::new() + }; + let was_empty_poll = data.is_empty(); + let seq = next_send_seq; + next_send_seq += 1; + let (reply_tx, reply_rx) = oneshot::channel(); + let send_at = Instant::now(); + tracing::debug!( + "sess {}: send seq={}, inflight={}", + &sid[..sid.len().min(8)], + seq, + inflight.len() + 1, + ); + mux.send(MuxMsg::Data { + sid: sid.to_string(), + data, + seq: Some(seq), + reply: reply_tx, + }) + .await; + let meta = InflightMeta { seq, was_empty_poll, send_at }; + inflight.push(Box::pin(async move { + match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await { + Ok(Ok(Ok((r, sid)))) => (meta, ReplyOutcome::Ok(r, sid)), + Ok(Ok(Err(e))) => (meta, ReplyOutcome::BatchErr(e)), + Ok(Err(_)) => (meta, ReplyOutcome::Dropped), + Err(_) => (meta, ReplyOutcome::Timeout), + } + })); + // At idle depth, just one refill. + if max_inflight <= INFLIGHT_IDLE { + break; + } + } + } + ReplyOutcome::BatchErr(e) => { + tracing::debug!("tunnel data error: {}", e); + break; + } + ReplyOutcome::Timeout => { + tracing::warn!( + "sess {}: reply timeout (seq {}), retrying", + &sid[..sid.len().min(8)], + meta.seq + ); + consecutive_empty = consecutive_empty.saturating_add(1); + } + ReplyOutcome::Dropped => { + break; + } + } + } - if got_data { - consecutive_empty = 0; - } else { - consecutive_empty = consecutive_empty.saturating_add(1); + // Read from client socket while waiting for replies. + _ = async { + buf.reserve(READ_CHUNK); + match reader.read_buf(&mut buf).await { + Ok(0) => { upload_closed = true; } + Ok(n) => { + buffered_upload = Some(extract_bytes(&mut buf, n)); + } + Err(_) => { upload_closed = true; } + } + }, if can_read => {} } } + if is_elevated { + mux.elevated_sessions.fetch_sub(1, Ordering::Relaxed); + } Ok(()) } @@ -1461,6 +1656,20 @@ where } } +/// Extract bytes from the read buffer, applying the zero-copy threshold. +/// Reads >= half the buffer use split+freeze (zero-copy); smaller reads +/// copy out and clear so the buffer allocation is reused. +fn extract_bytes(buf: &mut BytesMut, n: usize) -> Bytes { + const ZERO_COPY_THRESHOLD: usize = 65536 / 2; + if n >= ZERO_COPY_THRESHOLD { + buf.split().freeze() + } else { + let owned = Bytes::copy_from_slice(&buf[..n]); + buf.clear(); + owned + } +} + pub fn decode_udp_packets(resp: &TunnelResponse) -> Result>, String> { let Some(pkts) = resp.pkts.as_ref() else { return Ok(Vec::new()); @@ -1489,6 +1698,7 @@ mod tests { eof: None, e: e.map(str::to_string), code: code.map(str::to_string), + seq: None, } } @@ -1719,6 +1929,8 @@ mod tests { preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), unreachable_cache: Mutex::new(HashMap::new()), + elevated_sessions: AtomicU64::new(0), + max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * num_scripts as u64, }); (mux, rx) } @@ -1757,7 +1969,7 @@ mod tests { .expect("mux channel closed unexpectedly"); match msg { - MuxMsg::Data { sid, data, reply } => { + MuxMsg::Data { sid, data, reply, .. } => { assert_eq!(sid, "sid-under-test"); assert_eq!(&data[..], b"CLIENTHELLO"); // Reply with eof so tunnel_loop unwinds cleanly. @@ -1769,6 +1981,7 @@ mod tests { eof: Some(true), e: None, code: None, + seq: Some(0), }, "test-script".to_string(), ))); @@ -1786,6 +1999,23 @@ mod tests { ), } + // With pipelining, a second op may already be in flight. Reply + // to any remaining messages so the loop can exit cleanly. + let mut seq = 1u64; + while let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await { + if let MuxMsg::Data { reply, .. } = msg { + let _ = reply.send(Ok(( + TunnelResponse { + sid: Some("sid-under-test".into()), + d: None, pkts: None, eof: Some(true), + e: None, code: None, seq: Some(seq), + }, + "test-script".to_string(), + ))); + seq += 1; + } + } + let _ = tokio::time::timeout(Duration::from_secs(2), loop_handle) .await .expect("tunnel_loop did not exit after eof"); @@ -1829,19 +2059,20 @@ mod tests { // the aggregate gate stays false and the loop keeps polling. // The 60 s timeout below is paused-time, so it only "elapses" // if rx.recv() truly never resolves (i.e. the loop has stalled). - for i in 0..6u32 { + let mut received = 0u32; + while received < 6 { let msg = tokio::time::timeout(Duration::from_secs(60), rx.recv()) .await .unwrap_or_else(|_| panic!( "loop stopped emitting at iteration {} — regression: per-deployment skip-when-idle stalled session even though long-poll-capable peer was available", - i + received )) .expect("mux channel closed unexpectedly"); match msg { - MuxMsg::Data { sid, data, reply } => { + MuxMsg::Data { sid, data, seq, reply } => { assert_eq!(sid, "sid-mixed"); assert!(data.is_empty(), "expected empty poll, got {} bytes", data.len()); - let last = i == 5; + let last = received == 5; let _ = reply.send(Ok(( TunnelResponse { sid: Some("sid-mixed".into()), @@ -1850,13 +2081,15 @@ mod tests { eof: if last { Some(true) } else { None }, e: None, code: None, + seq, }, "script-A".to_string(), ))); + received += 1; } _ => panic!( "iteration {}: expected Data poll, got a different MuxMsg variant", - i + received ), } } @@ -2057,6 +2290,7 @@ mod tests { port: None, data: Some(Bytes::from_static(b"x")), encode_empty: false, + seq: None, }; let mk_reply = || oneshot::channel::>().0; @@ -2102,6 +2336,7 @@ mod tests { port: None, data: Some(Bytes::from_static(b"hello")), encode_empty: false, + seq: None, }; let b = encode_pending(op); assert_eq!(b.op, "data"); @@ -2119,6 +2354,7 @@ mod tests { port: None, data: None, encode_empty: false, + seq: None, }; assert!(encode_pending(empty_poll).d.is_none()); @@ -2130,6 +2366,7 @@ mod tests { port: None, data: None, encode_empty: false, + seq: None, }; assert!(encode_pending(udp_poll).d.is_none()); @@ -2141,6 +2378,7 @@ mod tests { port: None, data: None, encode_empty: false, + seq: None, }; assert!(encode_pending(close).d.is_none()); } @@ -2158,6 +2396,7 @@ mod tests { port: Some(443), data: Some(Bytes::new()), encode_empty: true, + seq: None, }; let b = encode_pending(op); assert_eq!(b.op, "connect_data"); @@ -2173,6 +2412,7 @@ mod tests { port: Some(443), data: Some(Bytes::from_static(b"\x16\x03\x01")), // ClientHello prefix encode_empty: true, + seq: None, }; let b = encode_pending(op); assert_eq!(b.d.as_deref(), Some(B64.encode(b"\x16\x03\x01").as_str())); @@ -2197,4 +2437,103 @@ mod tests { // Five record_* calls, so trigger counter is at 5. assert_eq!(mux.preread_total_events.load(Ordering::Relaxed), 5); } + + /// Client data written to the socket *during* the reply wait must be + /// buffered and sent in a subsequent op — not blocked until the reply + /// arrives and a fresh read-timeout elapses. + #[tokio::test] + async fn tunnel_loop_reads_client_during_reply_wait() { + use tokio::io::AsyncWriteExt; + use tokio::net::TcpListener; + + let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let accept = tokio::spawn(async move { listener.accept().await.unwrap().0 }); + let mut client = TcpStream::connect(addr).await.unwrap(); + let mut server_side = accept.await.unwrap(); + + let (mux, mut rx) = mux_for_test(); + + let loop_handle = tokio::spawn({ + let mux = mux.clone(); + async move { tunnel_loop(&mut server_side, "sid-overlap", &mux, None).await } + }); + + // With pipelining (N=2), the loop may send two ops before we + // can write client data. Collect all initial ops, reply to each, + // then write data and check a subsequent op carries it. + let mut pending_replies: Vec = Vec::new(); + let mut seq: u64 = 0; + + // Drain initial ops (up to N=2). + while let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(500), rx.recv()).await { + if let MuxMsg::Data { reply, .. } = msg { + pending_replies.push(reply); + } + if pending_replies.len() >= INFLIGHT_ACTIVE { break; } + } + + // Write client data while replies are pending. + client.write_all(b"UPLOAD_DATA").await.unwrap(); + client.flush().await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + + // Reply to all pending ops (no eof, no data). + for reply in pending_replies.drain(..) { + let _ = reply.send(Ok(( + TunnelResponse { + sid: Some("sid-overlap".into()), + d: None, pkts: None, eof: None, + e: None, code: None, seq: Some(seq), + }, + "test-script".to_string(), + ))); + seq += 1; + } + + // Now check that a subsequent op carries the buffered upload data. + let mut found_upload = false; + for _ in 0..4 { + let msg = match tokio::time::timeout(Duration::from_secs(2), rx.recv()).await { + Ok(Some(m)) => m, + _ => break, + }; + if let MuxMsg::Data { data, reply, .. } = msg { + if &data[..] == b"UPLOAD_DATA" { + found_upload = true; + } + let _ = reply.send(Ok(( + TunnelResponse { + sid: Some("sid-overlap".into()), + d: None, pkts: None, + eof: Some(found_upload), + e: None, code: None, seq: Some(seq), + }, + "test-script".to_string(), + ))); + seq += 1; + if found_upload { break; } + } + } + assert!(found_upload, "upload data must appear in a subsequent op"); + + // Drain any remaining in-flight ops. + while let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await { + if let MuxMsg::Data { reply, .. } = msg { + let _ = reply.send(Ok(( + TunnelResponse { + sid: Some("sid-overlap".into()), + d: None, pkts: None, eof: Some(true), + e: None, code: None, seq: Some(seq), + }, + "test-script".to_string(), + ))); + seq += 1; + } + } + + let _ = tokio::time::timeout(Duration::from_secs(2), loop_handle) + .await + .expect("tunnel_loop did not exit after eof"); + } } diff --git a/tunnel-node/src/main.rs b/tunnel-node/src/main.rs index e3bc3c0..bde35f4 100644 --- a/tunnel-node/src/main.rs +++ b/tunnel-node/src/main.rs @@ -71,7 +71,7 @@ const STRAGGLER_SETTLE_MAX: Duration = Duration::from_millis(1000); /// `BATCH_TIMEOUT` (30 s) and Apps Script's UrlFetch ceiling (~60 s). /// Tested on censored networks in Iran where users reported smoother /// Telegram video playback and fewer session resets at this value. -const LONGPOLL_DEADLINE: Duration = Duration::from_secs(15); +const LONGPOLL_DEADLINE: Duration = Duration::from_secs(4); /// Bound on each UDP session's inbound queue. Beyond this we drop oldest /// to keep recent voice/media packets moving — a stale RTP frame is @@ -643,17 +643,19 @@ struct TunnelResponse { #[serde(skip_serializing_if = "Option::is_none")] eof: Option, #[serde(skip_serializing_if = "Option::is_none")] e: Option, #[serde(skip_serializing_if = "Option::is_none")] code: Option, + #[serde(skip_serializing_if = "Option::is_none")] seq: Option, } impl TunnelResponse { fn error(msg: impl Into) -> Self { - Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None } + Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None, seq: None } } fn unsupported_op(op: &str) -> Self { Self { sid: None, d: None, pkts: None, eof: None, e: Some(format!("unknown op: {}", op)), code: Some(CODE_UNSUPPORTED_OP.into()), + seq: None, } } } @@ -675,6 +677,7 @@ struct BatchOp { #[serde(default)] host: Option, #[serde(default)] port: Option, #[serde(default)] d: Option, // base64 data + #[serde(default)] seq: Option, } #[derive(Serialize)] @@ -797,8 +800,8 @@ async fn handle_batch( // map lock isn't held across the per-session read_buf / packets // mutex acquisition — without this, every other batch (and every // connect/close op) head-of-line-blocks behind the drain. - let mut tcp_drains: Vec<(usize, String, Arc)> = Vec::new(); - let mut udp_drains: Vec<(usize, String, Arc)> = Vec::new(); + let mut tcp_drains: Vec<(usize, String, Arc, Option)> = Vec::new(); + let mut udp_drains: Vec<(usize, String, Arc, Option)> = Vec::new(); // True iff the batch contained any op that performed a real action // upstream — a new connection or a non-empty data write. A batch of // only empty "data" / "udp_data" polls (and possibly closes) leaves @@ -899,9 +902,9 @@ async fn handle_batch( } } } - tcp_drains.push((i, sid, inner)); + tcp_drains.push((i, sid, inner, op.seq)); } else { - results.push((i, eof_response(sid))); + results.push((i, eof_response(sid, op.seq))); } } "udp_data" => { @@ -942,9 +945,9 @@ async fn handle_batch( if had_uplink { *inner.last_active.lock().await = Instant::now(); } - udp_drains.push((i, sid, inner)); + udp_drains.push((i, sid, inner, op.seq)); } else { - results.push((i, eof_response(sid))); + results.push((i, eof_response(sid, op.seq))); } } "close" => { @@ -964,11 +967,11 @@ async fn handle_batch( match join { Ok((i, NewConn::Connect(r))) => results.push((i, r)), Ok((i, NewConn::ConnectData(Ok((sid, inner))))) => { - tcp_drains.push((i, sid, inner)); + tcp_drains.push((i, sid, inner, None)); } Ok((i, NewConn::ConnectData(Err(r)))) => results.push((i, r)), Ok((i, NewConn::UdpOpen(Ok((sid, inner))))) => { - udp_drains.push((i, sid, inner)); + udp_drains.push((i, sid, inner, None)); } Ok((i, NewConn::UdpOpen(Err(r)))) => results.push((i, r)), Err(e) => { @@ -999,9 +1002,9 @@ async fn handle_batch( // don't need to re-acquire the sessions map lock here. Cloning // the Arc is just a refcount bump. let tcp_inners: Vec> = - tcp_drains.iter().map(|(_, _, inner)| inner.clone()).collect(); + tcp_drains.iter().map(|(_, _, inner, _)| inner.clone()).collect(); let udp_inners: Vec> = - udp_drains.iter().map(|(_, _, inner)| inner.clone()).collect(); + udp_drains.iter().map(|(_, _, inner, _)| inner.clone()).collect(); // Wake on whichever side has work first. The previous // `tokio::join!` was conjunctive — a TCP burst still paid the @@ -1086,13 +1089,13 @@ async fn handle_batch( // Apps Script's 50 MiB response ceiling. This cap stops one session // short of the cliff; deferred sessions drain on the next poll. let mut remaining_budget: usize = BATCH_RESPONSE_BUDGET; - for (i, sid, inner) in &tcp_drains { + for (i, sid, inner, seq) in &tcp_drains { let (data, eof) = drain_now(inner, remaining_budget).await; let drained = data.len(); if eof { tcp_eof_sids.push(sid.clone()); } - results.push((*i, tcp_drain_response(sid.clone(), data, eof))); + results.push((*i, tcp_drain_response(sid.clone(), data, eof, *seq))); remaining_budget = remaining_budget.saturating_sub(drained); if remaining_budget == 0 { // Budget exhausted; remaining sessions in `tcp_drains` keep @@ -1119,12 +1122,12 @@ async fn handle_batch( // trap that motivated the TCP-side fix reappears, and tracking // eof from the drain return rather than the atomic catches it. let mut udp_eof_sids: Vec = Vec::new(); - for (i, sid, inner) in &udp_drains { + for (i, sid, inner, seq) in &udp_drains { let (packets, eof) = drain_udp_now(inner).await; if eof { udp_eof_sids.push(sid.clone()); } - results.push((*i, udp_drain_response(sid.clone(), packets, eof))); + results.push((*i, udp_drain_response(sid.clone(), packets, eof, *seq))); } if !udp_eof_sids.is_empty() { let mut sessions = state.udp_sessions.lock().await; @@ -1147,7 +1150,7 @@ async fn handle_batch( (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], json) } -fn tcp_drain_response(sid: String, data: Vec, eof: bool) -> TunnelResponse { +fn tcp_drain_response(sid: String, data: Vec, eof: bool, seq: Option) -> TunnelResponse { TunnelResponse { sid: Some(sid), d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, @@ -1155,10 +1158,11 @@ fn tcp_drain_response(sid: String, data: Vec, eof: bool) -> TunnelResponse { eof: Some(eof), e: None, code: None, + seq, } } -fn udp_drain_response(sid: String, packets: Vec>, eof: bool) -> TunnelResponse { +fn udp_drain_response(sid: String, packets: Vec>, eof: bool, seq: Option) -> TunnelResponse { let pkts = if packets.is_empty() { None } else { @@ -1171,10 +1175,11 @@ fn udp_drain_response(sid: String, packets: Vec>, eof: bool) -> TunnelRe eof: Some(eof), e: None, code: None, + seq, } } -fn eof_response(sid: String) -> TunnelResponse { +fn eof_response(sid: String, seq: Option) -> TunnelResponse { TunnelResponse { sid: Some(sid), d: None, @@ -1182,6 +1187,7 @@ fn eof_response(sid: String) -> TunnelResponse { eof: Some(true), e: None, code: None, + seq, } } @@ -1228,7 +1234,7 @@ async fn handle_connect(state: &AppState, host: Option, port: Option {}:{}", sid, host, port); state.sessions.lock().await.insert(sid.clone(), session); - TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None } + TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None, seq: None } } /// Open a session and write the client's first bytes in one round trip. @@ -1350,6 +1356,7 @@ async fn handle_connect_data_single( eof: Some(eof), e: None, code: None, + seq: None, } } @@ -1398,7 +1405,7 @@ async fn handle_data_single(state: &AppState, sid: Option, data: Option< sid: Some(sid), d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, pkts: None, - eof: Some(eof), e: None, code: None, + eof: Some(eof), e: None, code: None, seq: None, } } @@ -1415,7 +1422,7 @@ async fn handle_close(state: &AppState, sid: Option) -> TunnelResponse { s.reader_handle.abort(); tracing::info!("udp session {} closed by client", sid); } - TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None } + TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None, seq: None } } // --------------------------------------------------------------------------- @@ -2343,7 +2350,7 @@ mod tests { ); // The `udp_drain_response` helper threads eof into `eof: Some(true)`. - let resp = udp_drain_response("zombie".into(), pkts, eof); + let resp = udp_drain_response("zombie".into(), pkts, eof, None); assert_eq!(resp.eof, Some(true)); assert!(resp.pkts.is_none()); } From 488c8affc18f248d4280e1b99834ba69c069a7ad Mon Sep 17 00:00:00 2001 From: yyoyoian-pixel <279225925+yyoyoian-pixel@users.noreply.github.com> Date: Wed, 13 May 2026 02:39:02 +0200 Subject: [PATCH 2/2] fix(tunnel-node): revert LONGPOLL_DEADLINE to 2s Co-Authored-By: Claude Opus 4.6 (1M context) --- tunnel-node/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tunnel-node/src/main.rs b/tunnel-node/src/main.rs index bde35f4..f60e08e 100644 --- a/tunnel-node/src/main.rs +++ b/tunnel-node/src/main.rs @@ -71,7 +71,7 @@ const STRAGGLER_SETTLE_MAX: Duration = Duration::from_millis(1000); /// `BATCH_TIMEOUT` (30 s) and Apps Script's UrlFetch ceiling (~60 s). /// Tested on censored networks in Iran where users reported smoother /// Telegram video playback and fewer session resets at this value. -const LONGPOLL_DEADLINE: Duration = Duration::from_secs(4); +const LONGPOLL_DEADLINE: Duration = Duration::from_secs(2); /// Bound on each UDP session's inbound queue. Beyond this we drop oldest /// to keep recent voice/media packets moving — a stale RTP frame is