From 70a6f12de46b075455e0fcd65e50d51b7da1cec9 Mon Sep 17 00:00:00 2001 From: Micah Roberts Date: Wed, 6 May 2026 14:42:37 -0600 Subject: [PATCH] Add TURN/STUN end-to-end probes and per-test flags to pam tunnel diagnose; fix log noise MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tunnel_and_connections.py: - Add --turn-test flag: establishes a real WebRTC/TURN connection through the gateway without proxying traffic, reproducing the full ICE path - Add --stun-only flag: same probe with TURN credentials stripped on both sides, restricting ICE to host/srflx candidates; confirms the reflexive path works independently of the relay. Mutually exclusive with --turn-test - Add --probe-duration (default 30s) to hold the connection open; use 360+ to trigger and verify survival of the ~300s permission refresh cycle - Add --probe-count for concurrent connection load testing - Add --stress-test: connection cycling, throughput across 64B–256KB frame sizes, and concurrent load; implies --turn-test - Add --test-dns/--test-aws/--test-tcp/--test-udp/--test-ice/--test-webrtc flags as per-test alternatives to the comma-separated --test string; both styles can be combined and are merged into a single validated set - Add _yellow() color helper for diagnostic output - Section header, connect label, hold message, throughput, and stability labels all adapt to STUN vs TURN mode tunnel_helpers.py: - Add probe_stun_only param to start_rust_tunnel: strips turn_url, turn_username, turn_password from webrtc_settings so Commander's ICE agent gathers no relay candidates; sends stun_only=True to gateway - Add turn.client.relay_conn to webrtc crate suppression lists so the periodic "fail to refresh permissions" error no longer leaks to terminal - Add NullHandler to all loggers configured with propagate=False; without a handler Python's lastResort StreamHandler fires when callHandlers() finds found=0, bypassing propagate=False --- .../tunnel/port_forward/tunnel_helpers.py | 116 ++- .../commands/tunnel_and_connections.py | 688 +++++++++++++++++- 2 files changed, 776 insertions(+), 28 deletions(-) diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py index 628ecf47a..17a898a56 100644 --- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py +++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py @@ -449,12 +449,15 @@ def __init__(self, name, level=logging.NOTSET): 'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls', 'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal', 'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns', - 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client' + 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client', + 'turn.client.relay_conn', ] for crate_name in webrtc_crates: crate_logger = logging.getLogger(crate_name) crate_logger.setLevel(logging.WARNING) crate_logger.propagate = False + if not crate_logger.handlers: + crate_logger.addHandler(logging.NullHandler()) # Suppress specific noisy keeper_pam_webrtc_rs sub-modules even in debug mode # These log debug info at ERROR level, so we need to disable them entirely @@ -466,6 +469,8 @@ def __init__(self, name, level=logging.NOTSET): noisy_logger = logging.getLogger(logger_name) noisy_logger.setLevel(logging.CRITICAL + 1) # Disable completely noisy_logger.propagate = False + if not noisy_logger.handlers: + noisy_logger.addHandler(logging.NullHandler()) logging.debug(f"Rust loggers enabled at DEBUG level") enabled_loggers = [name for name in logging.Logger.manager.loggerDict.keys() @@ -492,18 +497,23 @@ def __init__(self, name, level=logging.NOTSET): suppress_logger = logging.getLogger(logger_name) suppress_logger.setLevel(logging.CRITICAL + 1) # Disable completely suppress_logger.propagate = False + if not suppress_logger.handlers: + suppress_logger.addHandler(logging.NullHandler()) # Suppress noisy webrtc dependency logs when not debugging webrtc_crates = [ 'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls', 'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal', 'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns', - 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client' + 'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client', + 'turn.client.relay_conn', ] for crate_name in webrtc_crates: crate_logger = logging.getLogger(crate_name) crate_logger.setLevel(logging.ERROR) crate_logger.propagate = False + if not crate_logger.handlers: + crate_logger.addHandler(logging.NullHandler()) def get_or_create_tube_registry(params): @@ -1192,9 +1202,14 @@ def route_message_to_rust(response_item, tube_registry): answer_sdp = None if isinstance(data_json, dict): logging.debug(f"πŸ”“ Decrypted payload type: {data_json.get('type', 'unknown')}, keys: {list(data_json.keys())}") - answer_sdp = data_json.get('answer') or data_json.get('sdp') + # 'answer' field is already base64 (initial answer); 'sdp' field is plain text (ICE restart answer) + answer_sdp = data_json.get('answer') + if not answer_sdp: + raw_sdp = data_json.get('sdp') + if raw_sdp: + answer_sdp = base64.b64encode(raw_sdp.encode('utf-8')).decode('ascii') elif data_text.strip().startswith('v=') and 'm=' in data_text: - answer_sdp = data_text.strip() + answer_sdp = base64.b64encode(data_text.strip().encode('utf-8')).decode('ascii') logging.debug("Decrypted data appears to be raw SDP (not JSON), using as answer") if answer_sdp: @@ -1210,7 +1225,18 @@ def route_message_to_rust(response_item, tube_registry): if not tube_id: logging.error(f"No tube ID found for conversation: {conversation_id} (also tried URL-safe version)") else: - set_remote_description_and_parse_version(tube_registry, tube_id, answer_sdp, is_answer=True) + try: + set_remote_description_and_parse_version(tube_registry, tube_id, answer_sdp, is_answer=True) + except RuntimeError as _rte: + if "Invalid signaling state transition from Stable" in str(_rte): + # ICE restart answer arrived after the connection already + # re-established via another path β€” safe to ignore. + logging.debug( + f"ICE restart answer arrived for already-stable tube " + f"{tube_id} ({conversation_id}) β€” ignoring late answer" + ) + else: + raise logging.debug("Connection state: SDP answer received, connecting...") session = get_tunnel_session(tube_id) @@ -1224,7 +1250,9 @@ def route_message_to_rust(response_item, tube_registry): logging.warning(f"No signal handler found for tube {tube_id} to send buffered candidates") elif isinstance(data_json, dict) and ("offer" in data_json or data_json.get("type") == "offer"): # Gateway is sending us an ICE restart offer - offer_sdp = data_json.get('sdp') or data_json.get('offer') + # 'sdp' field from gateway is plain SDP (not base64); encode for Rust + raw_offer = data_json.get('sdp') or data_json.get('offer') + offer_sdp = base64.b64encode(raw_offer.encode('utf-8')).decode('ascii') if raw_offer else None if offer_sdp: logging.debug(f"Received ICE restart offer from Gateway for conversation: {conversation_id}") @@ -1365,12 +1393,18 @@ def route_message_to_rust(response_item, tube_registry): errors = payload_data.get('errors', ['']) logging.error(f"Gateway returned errors for {conversation_id}: {errors}") + elif not payload_data.get('is_ok', True): + # Gateway returned an explicit error (is_ok=False) β€” log the message and move on. + # This includes auth failures (401 on get_leafs), overload responses, etc. + logging.error( + f"Gateway error for {conversation_id}: {payload_data.get('data', 'unknown error')}" + ) elif payload_data.get('data', '') == '': logging.debug("Empty data field an acknowledgment, no action needed") elif payload_data.get('data') and "ice candidate added" in payload_data.get('data').lower(): logging.debug("Received ice candidate added") else: - logging.warning(f"Unhandled payload type for {conversation_id}: {payload_data}") + logging.debug(f"Unhandled payload for {conversation_id}: {payload_data}") else: logging.warning(f"No encrypted payload in message for conversation: {conversation_id}") @@ -1475,7 +1509,7 @@ class TunnelSignalHandler: def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce, conversation_id, tube_registry, tube_id=None, trickle_ice=False, websocket_router=None, - conversation_type='tunnel', router_tokens=None, http_session=None): + conversation_type='tunnel', router_tokens=None, http_session=None, silent=False): self.params = params self.record_uid = record_uid self.gateway_uid = gateway_uid @@ -1486,6 +1520,8 @@ def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce, self.tube_registry = tube_registry self.tube_id = tube_id self.trickle_ice = trickle_ice + self.silent = silent # Suppress connection-established display (probe/stress mode) + self.tube_close_initiated = False # Set when Rust initiates close (AdminClosed/Normal); skip redundant cleanup close_tube call self.connection_success_shown = False # Track if we've shown success messages self.connection_connected = False # Track if WebRTC connection is established self.ice_sending_in_progress = False # Serialize ICE candidate sending @@ -1507,7 +1543,22 @@ def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce, raise Exception("Trickle ICE requires WebSocket support - install with: pip install websockets") def signal_from_rust(self, response: dict): - """Signal callback to handle Rust events and gateway communication""" + """Signal callback to handle Rust events and gateway communication. + + Called by Rust via PyO3. Any unhandled exception here crosses the FFI + boundary as a PyErr which can destabilise the tube, so we wrap the whole + body and log rather than propagate. + """ + try: + self._signal_from_rust_inner(response) + except Exception as _sig_err: + logging.error( + f"Unhandled exception in signal_from_rust " + f"(kind={response.get('kind','?')}, tube={response.get('tube_id','?')}): {_sig_err}", + exc_info=True, + ) + + def _signal_from_rust_inner(self, response: dict): signal_kind = response.get('kind', '') tube_id = response.get('tube_id', '') data = response.get('data', '') @@ -1552,7 +1603,7 @@ def signal_from_rust(self, response: dict): self.connection_success_shown = True # Get tunnel session for record details - if session: + if session and not self.silent: logging.info(f"\n{bcolors.OKGREEN}Connection established successfully.{bcolors.ENDC}") # Display record title if available @@ -1623,6 +1674,8 @@ def signal_from_rust(self, response: dict): logging.error(f"{bcolors.FAIL}Tunnel closed due to critical failure - '{tube_id}': {close_reason.name}{bcolors.ENDC}") elif close_reason.is_user_initiated(): + # Rust is already handling the close β€” skip redundant close_tube in cleanup() + self.tube_close_initiated = True logging.debug(f"{bcolors.OKBLUE}User-initiated closure of tunnel '{tube_id}': {close_reason.name}{bcolors.ENDC}") elif close_reason.is_retryable(): @@ -2007,10 +2060,14 @@ def _send_restart_offer(self, restart_sdp, tube_id): Similar to _send_ice_candidate_immediately but sends an offer instead of candidates. """ try: - # Format as offer payload for gateway + # Format as offer payload for gateway. + # restart_sdp arrives base64-encoded from Rust (API contract), but the payload + # SDP field is raw text β€” gateway and vault both decode base64 before using it + # with the Rust boundary (gateway) or WebRTC API (vault). + raw_sdp = bytes_to_string(base64.b64decode(restart_sdp)) offer_payload = { "type": "offer", - "sdp": restart_sdp, + "sdp": raw_sdp, "ice_restart": True # Flag to indicate this is an ICE restart offer } string_data = json.dumps(offer_payload) @@ -2074,7 +2131,7 @@ def _send_restart_offer(self, restart_sdp, tube_id): def cleanup(self): """Cleanup resources""" # Close the tube in Rust registry if it exists - if self.tube_id and self.tube_registry: + if self.tube_id and self.tube_registry and not self.tube_close_initiated: try: logging.debug(f"Closing tube {self.tube_id} in cleanup") self.tube_registry.close_tube(self.tube_id, reason=CloseConnectionReasons.Error) @@ -2096,7 +2153,7 @@ def cleanup(self): logging.debug("TunnelSignalHandler cleaned up") def start_rust_tunnel(params, record_uid, gateway_uid, host, port, - seed, target_host, target_port, socks, trickle_ice=True, record_title=None, allow_supply_host=False, two_factor_value=None): + seed, target_host, target_port, socks, trickle_ice=True, record_title=None, allow_supply_host=False, two_factor_value=None, kind='start', probe_duration=30, probe_turn_only=False, probe_stun_only=False): """ Start a tunnel using Rust WebRTC with trickle ICE via HTTP POST and WebSocket responses. @@ -2190,6 +2247,19 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port, params, host, port, target_host, target_port, socks, nonce ) + # For probe mode with turn_only, force relay-only ICE on Commander's side too. + # Both peers must use RTCIceTransportPolicy::Relay for the connection to be + # pure TURN β€” otherwise the controlling peer (Commander) offers host/srflx + # candidates and ICE selects a direct path, bypassing the relay. + if probe_turn_only: + webrtc_settings["turn_only"] = True + elif probe_stun_only: + # Strip TURN credentials so Commander's ICE agent only gathers host/srflx + # candidates β€” no relay path is available on either side. + webrtc_settings.pop("turn_url", None) + webrtc_settings.pop("turn_username", None) + webrtc_settings.pop("turn_password", None) + # Determine conversation type (tunnel or protocol-specific) conversation_type = webrtc_settings.get('conversationType', 'tunnel') @@ -2267,7 +2337,8 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port, trickle_ice=trickle_ice, conversation_type=conversation_type, router_tokens=router_tokens, - http_session=http_session + http_session=http_session, + silent=(kind == 'probe'), # Suppress connection display for probe/stress mode ) # Store signal handler reference so we can send buffered candidates later @@ -2381,6 +2452,18 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port, # Prepare the offer data data = {"offer": offer.get("offer")} + # For probe mode embed the flag in the encrypted payload so the gateway + # routes to its lightweight probe path. We still send kind='start' below + # so the Keeper router injects routerToken (required for TURN auth). + if kind == 'probe': + data["probe"] = True + if probe_duration != 30: + data["probe_duration"] = probe_duration + if probe_turn_only: + data["turn_only"] = True + if probe_stun_only: + data["stun_only"] = True + # If allowSupplyHost is enabled, include the target host and port in the payload if allow_supply_host: data["host"] = { @@ -2563,7 +2646,8 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port, "websocket_thread": websocket_thread, "conversation_id": conversation_id_original, # Use original, not base64 encoded "tube_registry": tube_registry, - "status": "connecting" # Indicates async connection in progress + "status": "connecting", # Indicates async connection in progress + "local_port": tunnel_session.port, # Actual bound port (may differ from requested) } except Exception as e: diff --git a/keepercommander/commands/tunnel_and_connections.py b/keepercommander/commands/tunnel_and_connections.py index 026be65c9..60400116a 100644 --- a/keepercommander/commands/tunnel_and_connections.py +++ b/keepercommander/commands/tunnel_and_connections.py @@ -1296,6 +1296,39 @@ class PAMTunnelDiagnoseCommand(Command): help='Comma-separated list of specific WebRTC tests to run. Available: ' 'dns_resolution,aws_connectivity,tcp_connectivity,udp_binding,' 'ice_configuration,webrtc_peer_connection') + pam_cmd_parser.add_argument('--test-dns', required=False, dest='test_dns', action='store_true', + help='Run the dns_resolution WebRTC test only') + pam_cmd_parser.add_argument('--test-aws', required=False, dest='test_aws', action='store_true', + help='Run the aws_connectivity WebRTC test only') + pam_cmd_parser.add_argument('--test-tcp', required=False, dest='test_tcp', action='store_true', + help='Run the tcp_connectivity WebRTC test only') + pam_cmd_parser.add_argument('--test-udp', required=False, dest='test_udp', action='store_true', + help='Run the udp_binding WebRTC test only') + pam_cmd_parser.add_argument('--test-ice', required=False, dest='test_ice', action='store_true', + help='Run the ice_configuration WebRTC test only') + pam_cmd_parser.add_argument('--test-webrtc', required=False, dest='test_webrtc', action='store_true', + help='Run the webrtc_peer_connection WebRTC test only') + pam_cmd_parser.add_argument('--turn-test', required=False, dest='turn_test', action='store_true', + help='Run an end-to-end TURN relay probe through the gateway. ' + 'Establishes a real WebRTC/TURN connection without proxying any traffic, ' + 'reproducing the full ICE negotiation path. Requires a record argument.') + pam_cmd_parser.add_argument('--probe-duration', required=False, dest='probe_duration', type=int, default=30, + help='How long (seconds) to hold the TURN probe connection open after it connects. ' + 'Default 30s. Use 360+ to trigger a TURN permission refresh cycle (~300s TTL) ' + 'and verify the connection survives it. Requires --turn-test.') + pam_cmd_parser.add_argument('--probe-count', required=False, dest='probe_count', type=int, default=1, + help='Number of simultaneous TURN probes to run. Use >1 to reproduce the ' + '"35 concurrent connections" CreatePermission failure scenario. ' + 'Requires --turn-test.') + pam_cmd_parser.add_argument('--stress-test', required=False, dest='stress_test', action='store_true', + help='Full WebRTC stress test through the TURN relay: connection cycling ' + '(openβ†’dataβ†’close repeated), throughput across 64B/8KB/64KB/256KB ' + 'frame sizes, and concurrent connection load. Implies --turn-test. ' + 'Requires a record.') + pam_cmd_parser.add_argument('--stun-only', required=False, dest='stun_test', action='store_true', + help='Run an end-to-end probe restricted to STUN/reflexive candidates β€” ' + 'no TURN relay. Confirms that peer-to-peer ICE works when the relay ' + 'is bypassed. Mutually exclusive with --turn-test. Requires a record.') def get_parser(self): return PAMTunnelDiagnoseCommand.pam_cmd_parser @@ -1316,6 +1349,8 @@ def _bright(cls, t: str) -> str: return cls._c('1;92', t) @classmethod def _dim(cls, t: str) -> str: return cls._c('2;32', t) @classmethod + def _yellow(cls, t: str) -> str: return cls._c('1;93', t) + @classmethod def _red(cls, t: str) -> str: return cls._c('1;91', t) @classmethod def _check(cls) -> str: return cls._bright('\u2713') @@ -1345,7 +1380,8 @@ def _print_header(cls): def _print_result(cls, name: str, passed: bool, detail: str, ms: int, indent: int = 4): icon = cls._check() if passed else cls._cross() ms_str = cls._dim(f' {ms}ms') - body = f'{cls._green(name)} \u00b7 {cls._green(detail)}' if detail else cls._green(name) + _color = cls._green if passed else cls._red + body = f'{_color(name)} \u00b7 {_color(detail)}' if detail else _color(name) print(f'{" " * indent}{icon} {body}{ms_str}') # ── STUN ────────────────────────────────────────────────────────────────── @@ -1552,7 +1588,49 @@ def execute(self, params, **kwargs): timeout = kwargs.get('timeout', 30) verbose = kwargs.get('verbose', False) output_format = kwargs.get('format', 'table') - test_filter = kwargs.get('test_filter') + turn_test = kwargs.get('turn_test', False) + + # Build unified WebRTC test filter from --test string and/or individual --test-* flags + _test_flag_map = { + 'test_dns': 'dns_resolution', + 'test_aws': 'aws_connectivity', + 'test_tcp': 'tcp_connectivity', + 'test_udp': 'udp_binding', + 'test_ice': 'ice_configuration', + 'test_webrtc': 'webrtc_peer_connection', + } + _allowed_tests = set(_test_flag_map.values()) + test_filter_set: set = set() + _test_str = kwargs.get('test_filter') + if _test_str: + _requested = {t.strip() for t in _test_str.split(',')} + _invalid = _requested - _allowed_tests + if _invalid: + raise CommandError('pam tunnel diagnose', + f'Invalid test names: {", ".join(_invalid)}. ' + f'Available: {", ".join(sorted(_allowed_tests))}') + test_filter_set = _requested + for _dest, _name in _test_flag_map.items(): + if kwargs.get(_dest, False): + test_filter_set.add(_name) + stress_test = kwargs.get('stress_test', False) + stun_test = kwargs.get('stun_test', False) + if stress_test: + turn_test = True # --stress-test implies --turn-test + if stun_test and turn_test: + raise CommandError('pam tunnel diagnose', + '--stun-only and --turn-test are mutually exclusive') + if stun_test: + turn_test = True # reuse the probe section + probe_duration = kwargs.get('probe_duration', 30) + probe_count = kwargs.get('probe_count', 1) + probe_stun_only = stun_test + probe_turn_only = not stun_test and turn_test # TURN-only when --turn-test but not --stun-only + + if (turn_test or stress_test) and not record_name: + raise CommandError('pam tunnel diagnose', + '--turn-test / --stun-only requires a record argument: ' + 'pam tunnel diagnose --turn-test') server = params.server # e.g. "keepersecurity.com" or "https://qa.keepersecurity.com" server_host = get_keeper_server_hostname(server) @@ -1684,15 +1762,8 @@ def _record(name: str, passed: bool, detail: str, ms: int): logging.debug(f'Could not get TURN credentials: {exc}', exc_info=True) settings = {'use_turn': True, 'turn_only': False} - if test_filter: - allowed = {'dns_resolution', 'aws_connectivity', 'tcp_connectivity', - 'udp_binding', 'ice_configuration', 'webrtc_peer_connection'} - requested = {t.strip() for t in test_filter.split(',')} - invalid = requested - allowed - if invalid: - print(f"{bcolors.FAIL}Invalid test names: {', '.join(invalid)}{bcolors.ENDC}") - return 1 - settings['test_filter'] = list(requested) + if test_filter_set: + settings['test_filter'] = list(test_filter_set) try: rust_results = tube_registry.test_webrtc_connectivity( @@ -1903,7 +1974,600 @@ def _val(v): print() - # ── section 6: technical details ────────────────────────────────────── + # ── section 6: TURN / STUN-only end-to-end probe ───────────────────── + if turn_test: + _probe_label = 'STUN-Only End-to-End Probe' if stun_test else 'TURN End-to-End Probe' + print(f'{self._bullet()} {self._bright(_probe_label)} ' + f'({probe_count} connection{"s" if probe_count > 1 else ""}, ' + f'hold {probe_duration}s)') + print(f' {self._sep()}') + try: + probe_registry = get_or_create_tube_registry(params) + if not probe_registry: + raise RuntimeError('Rust WebRTC library not available') + + api.sync_down(params) + probe_record_obj = RecordMixin.resolve_single_record(params, record_name) + probe_record_uid = probe_record_obj.record_uid if probe_record_obj else record_name + probe_record = vault.KeeperRecord.load(params, probe_record_uid) + if probe_record is None: + raise RuntimeError( + f'Record "{record_name}" not found in vault β€” ' + f'run "sync-down" first, or pass the record UID directly' + ) + if not isinstance(probe_record, vault.TypedRecord): + raise RuntimeError( + f'Record "{record_name}" is a legacy v2 record (type: {type(probe_record).__name__}) β€” ' + f'--turn-test requires a PAM typed record (pamMachine, pamDirectory, etc.)' + ) + + seed_field = probe_record.get_typed_field('trafficEncryptionSeed') + if not seed_field: + raise RuntimeError( + f'Record "{record_name}" (type: {probe_record.record_type}) has no ' + f'trafficEncryptionSeed field β€” ' + f'pass a PAM resource record (pamMachine / pamDirectory / pamUser), ' + f'not a pamConfiguration record' + ) + probe_seed = base64_to_bytes(seed_field.get_default_value(str).encode('utf-8')) + + probe_gateway_uid = get_gateway_uid_from_record(params, vault, probe_record.record_uid) + if not probe_gateway_uid: + raise RuntimeError( + f'No gateway linked to record "{record_name}" β€” ' + f'the record must be in a PAM config that has an active gateway' + ) + + # --- Launch probe_count tunnels concurrently --- + import concurrent.futures as _cf + + def _run_one_probe(probe_idx): + """Launch one probe tunnel and return a result dict.""" + probe_port = find_open_port(tried_ports=[], host='127.0.0.1') + if not probe_port: + return {'idx': probe_idx, 'success': False, 'error': 'no open port'} + t0 = time.monotonic() + result = start_rust_tunnel( + params=params, + record_uid=probe_record.record_uid, + gateway_uid=probe_gateway_uid, + host='127.0.0.1', + port=probe_port, + seed=probe_seed, + target_host='127.0.0.1', + target_port=1, + socks=False, + trickle_ice=True, + record_title=probe_record.title, + kind='probe', + probe_duration=probe_duration, + probe_turn_only=probe_turn_only, + probe_stun_only=probe_stun_only, + ) + offer_ms = int((time.monotonic() - t0) * 1000) + if not result or not result.get('success'): + return {'idx': probe_idx, 'success': False, + 'error': (result or {}).get('error', 'offer failed'), 'offer_ms': offer_ms} + return {'idx': probe_idx, 'success': True, 'offer_ms': offer_ms, + 'tube_id': result['tube_id'], 'registry': result['tube_registry'], + 'signal_handler': result.get('signal_handler'), 't0': t0, + 'port': result.get('local_port', probe_port)} + + t_all = time.monotonic() + with _cf.ThreadPoolExecutor(max_workers=min(probe_count, 20)) as pool: + probe_futures = [pool.submit(_run_one_probe, i) for i in range(probe_count)] + probe_launches = [f.result() for f in _cf.as_completed(probe_futures)] + launch_ms = int((time.monotonic() - t_all) * 1000) + + launched_ok = [p for p in probe_launches if p['success']] + launched_fail = [p for p in probe_launches if not p['success']] + + _record( + f'Probe offer{"s" if probe_count > 1 else ""} sent', + len(launched_fail) == 0, + f'{len(launched_ok)}/{probe_count} sent in {launch_ms}ms' + + (f' β€” failed: {[p["error"] for p in launched_fail]}' if launched_fail else ''), + launch_ms, + ) + + if not launched_ok: + raise RuntimeError('All probes failed to launch') + + # --- Wait for each probe to reach Connected --- + connect_deadline = time.monotonic() + timeout + for p in launched_ok: + p['connected_ms'] = None + p['final_state'] = 'pending' + + while time.monotonic() < connect_deadline: + pending = [p for p in launched_ok if p['connected_ms'] is None + and p['final_state'] not in ('failed', 'closed', 'timeout')] + if not pending: + break + for p in pending: + try: + state = p['registry'].get_connection_state(p['tube_id']) + except Exception: + state = 'closed' + state_l = (state or '').lower() + if state_l == 'connected': + p['connected_ms'] = int((time.monotonic() - p['t0']) * 1000) + p['final_state'] = 'connected' + elif state_l in ('failed', 'closed'): + p['final_state'] = state_l + time.sleep(0.2) + + for p in launched_ok: + if p['connected_ms'] is None and p['final_state'] == 'pending': + p['final_state'] = 'timeout' + + connected_probes = [p for p in launched_ok if p['connected_ms'] is not None] + failed_probes = [p for p in launched_ok if p['connected_ms'] is None] + avg_connect_ms = int(sum(p['connected_ms'] for p in connected_probes) / len(connected_probes)) \ + if connected_probes else 0 + + _record( + 'STUN peer connected' if stun_test else 'TURN relay connected', + len(failed_probes) == 0, + f'{len(connected_probes)}/{len(launched_ok)} connected' + + (f', avg {avg_connect_ms}ms' if connected_probes else '') + + (f' β€” not connected: {[p["final_state"] for p in failed_probes]}' if failed_probes else ''), + avg_connect_ms, + ) + + # --- Hold phase: monitor state transitions for probe_duration seconds --- + if connected_probes and probe_duration > 0: + _path_label = 'STUN' if stun_test else 'TURN' + print(f' Holding {len(connected_probes)} connection{"s" if len(connected_probes) > 1 else ""} ' + f'for {probe_duration}s to monitor {_path_label} stability...') + + # --- Throughput test: RTT + bulk throughput via the echo tunnel --- + for p in connected_probes: + p['throughput_mbps'] = None + p['rtt_ms'] = None + local_port = p.get('port') + logging.debug(f'Throughput test: probe-{p["idx"]} port={local_port} keys={list(p.keys())}') + if not local_port: + logging.warning(f'Throughput test: probe-{p["idx"]} has no local port β€” skipping') + continue + try: + import socket as _socket + # The Rust TCP listener binds after the data channel opens, + # which can lag the ICE 'connected' state by a short window. + # Retry a few times with a brief pause before giving up. + sock = None + for _attempt in range(5): + try: + s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) + s.settimeout(5) + s.connect(('127.0.0.1', local_port)) + sock = s + break + except OSError: + s.close() + if _attempt < 4: + time.sleep(0.5) + if sock is None: + logging.warning( + f'Throughput test: probe-{p["idx"]} could not connect to ' + f'127.0.0.1:{local_port} after 5 attempts β€” ' + f'Rust listener may not be ready' + ) + continue + + sock.settimeout(15) # 15s: 256 KB at ~17 KB/s minimum + + # RTT: single small ping + t_rtt = time.monotonic() + sock.sendall(b'PING') + sock.recv(4) + p['rtt_ms'] = int((time.monotonic() - t_rtt) * 1000) + + # Throughput: 256 KB in 8 KB chunks. + # Each sendall matches MAX_READ_SIZE so it maps to one WebRTC message. + # At the minimum passing threshold (50 KB/s) this completes in ~5s, + # leaving 10s of headroom before the 15s timeout. + chunk = b'X' * 8192 + total_bytes = 256 * 1024 + sent = 0 + t_start = time.monotonic() + while sent < total_bytes: + sock.sendall(chunk) + sent += len(chunk) + recv = 0 + while recv < total_bytes: + data = sock.recv(65536) + if not data: + break + recv += len(data) + elapsed = time.monotonic() - t_start + p['throughput_mbps'] = round((recv / elapsed) / 1_000_000, 2) + sock.close() + except Exception as tput_err: + logging.warning(f'Throughput test error for probe-{p["idx"]} (port={local_port}): {tput_err}', exc_info=True) + + # Report throughput results + tput_results = [p for p in connected_probes if p['throughput_mbps'] is not None] + if tput_results: + avg_tput = round(sum(p['throughput_mbps'] for p in tput_results) / len(tput_results), 2) + avg_rtt = int(sum(p['rtt_ms'] for p in tput_results if p['rtt_ms']) / len(tput_results)) + # Fixed floor: the probe sends 256 KB from a cold SCTP association, + # so measured throughput is dominated by slow-start, not RTT. + # An RTT-aware formula would demand higher throughput at low RTT, + # producing false failures on fast paths. 0.03 MB/s (30 KB/s) is + # achievable even during SCTP ramp-up at 400ms RTT, and any relay + # delivering less than that is genuinely broken. + _tput_threshold = 0.03 + _record( + f'{_path_label} throughput', + avg_tput >= _tput_threshold, + f'{avg_tput} MB/s avg over {_path_label} path Β· RTT {avg_rtt}ms', + int(avg_tput * 1000), + ) + else: + print(f' (throughput test skipped β€” no data returned from echo path)') + + # Base hold_end on connection time, not throughput-test completion. + # The gateway auto-closes probe_duration seconds after the probe STARTED, + # so align the monitoring window to the first connected probe's t0. + earliest_t0 = min(p['t0'] for p in connected_probes) + hold_end = earliest_t0 + probe_duration + (avg_connect_ms / 1000) + # Per-probe tracking: count disconnects and reconnects + for p in connected_probes: + p['disconnects'] = 0 + p['reconnects'] = 0 + p['last_state'] = 'connected' + p['died'] = False + + while time.monotonic() < hold_end: + for p in connected_probes: + if p['died']: + continue + try: + state = p['registry'].get_connection_state(p['tube_id']) + except Exception: + state = 'closed' + state_l = (state or '').lower() + + if state_l != p['last_state']: + elapsed = int((time.monotonic() - p['t0'])) + if state_l == 'disconnected': + p['disconnects'] += 1 + print(f' [{elapsed}s] probe-{p["idx"]}: ' + f'{self._yellow("DISCONNECTED")} β€” ICE restart should fire') + elif state_l == 'connected' and p['last_state'] == 'disconnected': + p['reconnects'] += 1 + print(f' [{elapsed}s] probe-{p["idx"]}: ' + f'{self._green("RECONNECTED")} via ICE restart') + elif state_l == 'failed': + p['died'] = True + print(f' [{elapsed}s] probe-{p["idx"]}: ' + f'{self._red("DIED")} (state=failed) β€” ' + f'ICE failed, tube removed from registry') + elif state_l == 'closed': + # 'closed' is the normal probe auto-close at probe_duration. + # Only count as death if it fired well before the deadline. + remaining = hold_end - time.monotonic() + if remaining > 10: + p['died'] = True + print(f' [{elapsed}s] probe-{p["idx"]}: ' + f'{self._red("DIED")} (state=closed, {int(remaining)}s early) β€” ' + f'tube removed unexpectedly') + else: + print(f' [{elapsed}s] probe-{p["idx"]}: ' + f'{self._green("CLOSED")} (probe auto-close)') + p['last_state'] = state_l + time.sleep(1.0) + + # Summarise hold phase + survived = [p for p in connected_probes if not p['died']] + died = [p for p in connected_probes if p['died']] + total_disc = sum(p['disconnects'] for p in connected_probes) + total_rec = sum(p['reconnects'] for p in connected_probes) + + _record( + f'{_path_label} stability ({probe_duration}s hold)', + len(died) == 0, + f'{len(survived)}/{len(connected_probes)} survived' + + (f', {total_disc} disconnect(s), {total_rec} ICE restart(s)' if total_disc else ', no interruptions') + + (f' β€” {len(died)} died permanently' if died else ''), + probe_duration * 1000, + ) + + if total_disc > 0 and total_rec == total_disc: + print(f' {self._green("ICE restart fix working:")} all disconnects recovered automatically') + elif total_disc > 0 and total_rec < total_disc: + print(f' {self._red("ICE restart fix incomplete:")} {total_disc - total_rec} disconnect(s) did not recover') + + # --- Clean up all probes --- + for p in probe_launches: + if not p['success']: + continue + try: + p['registry'].close_tube(p['tube_id']) + except Exception: + pass + sh = p.get('signal_handler') + if sh: + try: + sh.cleanup() + except Exception: + pass + + except Exception as exc: + _record('TURN probe', False, str(exc)[:70], 0) + logging.debug('TURN probe error', exc_info=True) + + print() + + # ── section 7: stress test ──────────────────────────────────────────── + if stress_test: + print(f'{self._bullet()} {self._bright("WebRTC Stress Test")} (TURN relay)') + print(f' {self._sep()}') + if not record_name: + print(f' {self._cross()} {self._red("--stress-test requires a record argument")}') + else: + import socket as _sock + import concurrent.futures as _cf2 + + def _one_stress_probe(sp=None): + """Single connected probe for stress use β€” returns (port, registry, tube_id, sh) or None. + Pass sp to use a pre-allocated port (for concurrent calls); omit for sequential use. + """ + if sp is None: + sp = find_open_port(tried_ports=[], host='127.0.0.1') + if not sp: + return None + r = start_rust_tunnel( + params=params, + record_uid=probe_record.record_uid, + gateway_uid=probe_gateway_uid, + host='127.0.0.1', port=sp, + seed=probe_seed, target_host='127.0.0.1', target_port=1, + socks=False, trickle_ice=True, + record_title=probe_record.title, + kind='probe', probe_duration=120, probe_turn_only=probe_turn_only, + ) + if not r or not r.get('success'): + return None + deadline = time.monotonic() + 20 + reg = r['tube_registry'] + tid = r['tube_id'] + while time.monotonic() < deadline: + if (reg.get_connection_state(tid) or '').lower() == 'connected': + return sp, reg, tid, r.get('signal_handler') + time.sleep(0.2) + try: + reg.close_tube(tid) + except Exception: pass + + return None + + def _tput_via_port(port): + """Connect to local tunnel port, measure RTT and aggregate throughput. + Sends 256 KB as 32 Γ— 8 KB messages β€” matching the Rust channel's + MAX_READ_SIZE exactly so the measurement reflects real wire behaviour. + Returns (mbps, rtt_ms) or (None, None) on failure. + """ + try: + s = _sock.socket(_sock.AF_INET, _sock.SOCK_STREAM) + s.settimeout(15) + s.connect(('127.0.0.1', port)) + # Warmup ping β€” also gives us RTT for the threshold formula. + _t_rtt = time.monotonic() + s.sendall(b'\x00' * 64) + s.recv(64) + rtt_ms = int((time.monotonic() - _t_rtt) * 1000) + # Bulk: 256 KB in 8 KB chunks so each send maps to one WebRTC + # message (RECEIVE_MTU = 8 KB in webrtc-data). More chunks = + # more frames in-flight = better pipelining signal. + total = 256 * 1024 + chunk = b'\x01' * 8192 + t0 = time.monotonic() + sent = 0 + while sent < total: + s.sendall(chunk) + sent += len(chunk) + got = 0 + while got < total: + d = s.recv(65536) + if not d: + break + got += len(d) + elapsed = time.monotonic() - t0 + s.close() + return round(got / elapsed / 1_000_000, 3), rtt_ms + except Exception as e: + logging.debug(f'Stress tput error: {e}') + return None, None + + CYCLES = 5 + CONCURRENCY = 5 + + # --- 1. Cycle test: open β†’ data β†’ close, repeated --- + print(f' {self._bright("1. Connection cycling")} ({CYCLES} open/data/close cycles)') + cycle_ok = 0 + for cycle in range(CYCLES): + info = _one_stress_probe() + if info: + sp, reg, tid, sh = info + try: + reg.close_tube(tid) + if sh: sh.tube_close_initiated = True + except Exception: pass + if sh: + try: sh.cleanup() + except Exception: pass + cycle_ok += 1 + _record('Cycle open/close', cycle_ok == CYCLES, + f'{cycle_ok}/{CYCLES} cycles completed', cycle_ok * 1000) + + # --- 2. Throughput --- + print(f' {self._bright("2. Throughput")} (32 Γ— 8 KB messages over TURN relay)') + info = _one_stress_probe() + if info: + sp, reg, tid, sh = info + mbps, rtt_ms = _tput_via_port(sp) + if mbps is not None: + # RTT-aware threshold: same formula as TURN probe section. + # 32 Γ— 8 KB in-flight; SCTP window / RTT gives the pipelining ceiling. + _rtt = max(rtt_ms or 1000, 1) + _threshold = max(0.05, round(65536 / _rtt * 0.5 / 1000, 3)) + _record('TURN throughput', + mbps >= _threshold, + f'{mbps} MB/s Β· RTT {rtt_ms}ms', + int(mbps * 1000)) + else: + _record('TURN throughput', False, 'could not connect for throughput test', 0) + try: + reg.close_tube(tid) + if sh: sh.tube_close_initiated = True + except Exception: pass + if sh: + try: sh.cleanup() + except Exception: pass + else: + _record('TURN throughput', False, 'could not connect for throughput test', 0) + + # --- 3. Concurrent connections --- + # Pre-allocate all ports sequentially so no two workers race on find_open_port. + print(f' {self._bright(f"3. Concurrent connections")} ({CONCURRENCY} simultaneous)') + conc_ports, tried = [], [] + for _ in range(CONCURRENCY): + p = find_open_port(tried_ports=tried, host='127.0.0.1') + if p: + conc_ports.append(p) + tried.append(p) + with _cf2.ThreadPoolExecutor(max_workers=CONCURRENCY) as pool: + conc_futures = [pool.submit(_one_stress_probe, p) for p in conc_ports] + conc_results = [f.result() for f in _cf2.as_completed(conc_futures)] + conc_ok = sum(1 for r in conc_results if r is not None) + for r in conc_results: + if r: + sp, reg, tid, sh = r + try: + reg.close_tube(tid) + if sh: sh.tube_close_initiated = True + except Exception: pass + if sh: + try: sh.cleanup() + except Exception: pass + + _record(f'Concurrent {CONCURRENCY}x', conc_ok == CONCURRENCY, + f'{conc_ok}/{CONCURRENCY} connected simultaneously', conc_ok * 1000) + + # --- 4. Interactive latency under bulk load --- + # Two TCP connections to the same probe port = two conn_no streams + # on the same WebRTC data channel. conn_no=1 sends 512 KB bulk; + # conn_no=2 sends 64-byte pings every 200ms and measures RTT. + # This tests whether the EventDrivenSender saw-tooth fix allows + # interactive frames to interleave with bulk frames. + print(f' {self._bright("4. Interactive latency under load")}') + import threading as _threading + info = _one_stress_probe() + if info: + sp, reg, tid, sh = info + rtt_under_load: list = [] + _bulk_done = _threading.Event() + + # Baseline: single ping before bulk starts + _baseline_rtt = None + try: + _bs = _sock.socket(_sock.AF_INET, _sock.SOCK_STREAM) + _bs.settimeout(5) + _bs.connect(('127.0.0.1', sp)) + _t0 = time.monotonic() + _bs.sendall(b'B' * 64) + _bs.recv(64) + _baseline_rtt = int((time.monotonic() - _t0) * 1000) + _bs.close() + except Exception as _e: + logging.debug(f'Latency baseline error: {_e}') + + def _bulk_sender(): + try: + s = _sock.socket(_sock.AF_INET, _sock.SOCK_STREAM) + s.settimeout(30) + s.connect(('127.0.0.1', sp)) + total = 512 * 1024 + chunk = b'\x02' * 8192 + sent = 0 + while sent < total: + s.sendall(chunk) + sent += len(chunk) + got = 0 + while got < total: + d = s.recv(65536) + if not d: + break + got += len(d) + s.close() + except Exception as _e: + logging.debug(f'Bulk sender error: {_e}') + finally: + _bulk_done.set() + + def _latency_sampler(): + try: + s = _sock.socket(_sock.AF_INET, _sock.SOCK_STREAM) + s.settimeout(5) + s.connect(('127.0.0.1', sp)) + while not _bulk_done.is_set(): + try: + _t = time.monotonic() + s.sendall(b'P' * 64) + s.recv(64) + rtt_under_load.append( + int((time.monotonic() - _t) * 1000) + ) + except OSError: + break + time.sleep(0.2) + s.close() + except Exception as _e: + logging.debug(f'Latency sampler error: {_e}') + + _bt = _threading.Thread(target=_bulk_sender, daemon=True) + _lt = _threading.Thread(target=_latency_sampler, daemon=True) + _lt.start() + _bt.start() + _bt.join(timeout=30) + _bulk_done.set() + _lt.join(timeout=2) + + try: + reg.close_tube(tid) + if sh: sh.tube_close_initiated = True + except Exception: pass + if sh: + try: sh.cleanup() + except Exception: pass + + if rtt_under_load and _baseline_rtt: + _avg = int(sum(rtt_under_load) / len(rtt_under_load)) + _max = max(rtt_under_load) + # Pass if average latency under load stays within 5Γ— baseline. + # With the 50-frame drain batch, interactive frames interleave + # between bursts; with the old 2000-frame batch the multiplier + # could reach 100Γ— or more at TURN relay speeds. + _ok = _avg <= _baseline_rtt * 5 + _record( + 'Latency under load', + _ok, + f'avg {_avg}ms, max {_max}ms Β· baseline {_baseline_rtt}ms' + f' Β· {len(rtt_under_load)} samples', + _avg, + ) + elif not rtt_under_load: + _record('Latency under load', False, 'no samples collected', 0) + else: + _record('Latency under load', False, 'baseline RTT unavailable', 0) + else: + _record('Latency under load', False, 'could not connect probe', 0) + + print() + + # ── section 8: technical details ────────────────────────────────────── print(f'{self._bullet()} {self._bright("Technical Details")}') print(f' {self._sep()}')