Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 100 additions & 16 deletions keepercommander/commands/tunnel/port_forward/tunnel_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,15 @@ def __init__(self, name, level=logging.NOTSET):
'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls',
'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal',
'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns',
'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client'
'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client',
'turn.client.relay_conn',
]
for crate_name in webrtc_crates:
crate_logger = logging.getLogger(crate_name)
crate_logger.setLevel(logging.WARNING)
crate_logger.propagate = False
if not crate_logger.handlers:
crate_logger.addHandler(logging.NullHandler())

# Suppress specific noisy keeper_pam_webrtc_rs sub-modules even in debug mode
# These log debug info at ERROR level, so we need to disable them entirely
Expand All @@ -466,6 +469,8 @@ def __init__(self, name, level=logging.NOTSET):
noisy_logger = logging.getLogger(logger_name)
noisy_logger.setLevel(logging.CRITICAL + 1) # Disable completely
noisy_logger.propagate = False
if not noisy_logger.handlers:
noisy_logger.addHandler(logging.NullHandler())

logging.debug(f"Rust loggers enabled at DEBUG level")
enabled_loggers = [name for name in logging.Logger.manager.loggerDict.keys()
Expand All @@ -492,18 +497,23 @@ def __init__(self, name, level=logging.NOTSET):
suppress_logger = logging.getLogger(logger_name)
suppress_logger.setLevel(logging.CRITICAL + 1) # Disable completely
suppress_logger.propagate = False
if not suppress_logger.handlers:
suppress_logger.addHandler(logging.NullHandler())

# Suppress noisy webrtc dependency logs when not debugging
webrtc_crates = [
'webrtc', 'webrtc_ice', 'webrtc_mdns', 'webrtc_dtls',
'webrtc_sctp', 'turn', 'stun', 'webrtc_ice.agent.agent_internal',
'webrtc_ice.agent.agent_gather', 'webrtc_ice.mdns',
'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client'
'webrtc_mdns.conn', 'webrtc.peer_connection', 'turn.client',
'turn.client.relay_conn',
]
for crate_name in webrtc_crates:
crate_logger = logging.getLogger(crate_name)
crate_logger.setLevel(logging.ERROR)
crate_logger.propagate = False
if not crate_logger.handlers:
crate_logger.addHandler(logging.NullHandler())


def get_or_create_tube_registry(params):
Expand Down Expand Up @@ -1192,9 +1202,14 @@ def route_message_to_rust(response_item, tube_registry):
answer_sdp = None
if isinstance(data_json, dict):
logging.debug(f"🔓 Decrypted payload type: {data_json.get('type', 'unknown')}, keys: {list(data_json.keys())}")
answer_sdp = data_json.get('answer') or data_json.get('sdp')
# 'answer' field is already base64 (initial answer); 'sdp' field is plain text (ICE restart answer)
answer_sdp = data_json.get('answer')
if not answer_sdp:
raw_sdp = data_json.get('sdp')
if raw_sdp:
answer_sdp = base64.b64encode(raw_sdp.encode('utf-8')).decode('ascii')
elif data_text.strip().startswith('v=') and 'm=' in data_text:
answer_sdp = data_text.strip()
answer_sdp = base64.b64encode(data_text.strip().encode('utf-8')).decode('ascii')
logging.debug("Decrypted data appears to be raw SDP (not JSON), using as answer")

if answer_sdp:
Expand All @@ -1210,7 +1225,18 @@ def route_message_to_rust(response_item, tube_registry):
if not tube_id:
logging.error(f"No tube ID found for conversation: {conversation_id} (also tried URL-safe version)")
else:
set_remote_description_and_parse_version(tube_registry, tube_id, answer_sdp, is_answer=True)
try:
set_remote_description_and_parse_version(tube_registry, tube_id, answer_sdp, is_answer=True)
except RuntimeError as _rte:
if "Invalid signaling state transition from Stable" in str(_rte):
# ICE restart answer arrived after the connection already
# re-established via another path — safe to ignore.
logging.debug(
f"ICE restart answer arrived for already-stable tube "
f"{tube_id} ({conversation_id}) — ignoring late answer"
)
else:
raise
logging.debug("Connection state: SDP answer received, connecting...")

session = get_tunnel_session(tube_id)
Expand All @@ -1224,7 +1250,9 @@ def route_message_to_rust(response_item, tube_registry):
logging.warning(f"No signal handler found for tube {tube_id} to send buffered candidates")
elif isinstance(data_json, dict) and ("offer" in data_json or data_json.get("type") == "offer"):
# Gateway is sending us an ICE restart offer
offer_sdp = data_json.get('sdp') or data_json.get('offer')
# 'sdp' field from gateway is plain SDP (not base64); encode for Rust
raw_offer = data_json.get('sdp') or data_json.get('offer')
offer_sdp = base64.b64encode(raw_offer.encode('utf-8')).decode('ascii') if raw_offer else None

if offer_sdp:
logging.debug(f"Received ICE restart offer from Gateway for conversation: {conversation_id}")
Expand Down Expand Up @@ -1365,12 +1393,18 @@ def route_message_to_rust(response_item, tube_registry):
errors = payload_data.get('errors', [''])
logging.error(f"Gateway returned errors for {conversation_id}: {errors}")

elif not payload_data.get('is_ok', True):
# Gateway returned an explicit error (is_ok=False) — log the message and move on.
# This includes auth failures (401 on get_leafs), overload responses, etc.
logging.error(
f"Gateway error for {conversation_id}: {payload_data.get('data', 'unknown error')}"
)
elif payload_data.get('data', '') == '':
logging.debug("Empty data field an acknowledgment, no action needed")
elif payload_data.get('data') and "ice candidate added" in payload_data.get('data').lower():
logging.debug("Received ice candidate added")
else:
logging.warning(f"Unhandled payload type for {conversation_id}: {payload_data}")
logging.debug(f"Unhandled payload for {conversation_id}: {payload_data}")
else:
logging.warning(f"No encrypted payload in message for conversation: {conversation_id}")

Expand Down Expand Up @@ -1475,7 +1509,7 @@ class TunnelSignalHandler:

def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce, conversation_id,
tube_registry, tube_id=None, trickle_ice=False, websocket_router=None,
conversation_type='tunnel', router_tokens=None, http_session=None):
conversation_type='tunnel', router_tokens=None, http_session=None, silent=False):
self.params = params
self.record_uid = record_uid
self.gateway_uid = gateway_uid
Expand All @@ -1486,6 +1520,8 @@ def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce,
self.tube_registry = tube_registry
self.tube_id = tube_id
self.trickle_ice = trickle_ice
self.silent = silent # Suppress connection-established display (probe/stress mode)
self.tube_close_initiated = False # Set when Rust initiates close (AdminClosed/Normal); skip redundant cleanup close_tube call
self.connection_success_shown = False # Track if we've shown success messages
self.connection_connected = False # Track if WebRTC connection is established
self.ice_sending_in_progress = False # Serialize ICE candidate sending
Expand All @@ -1507,7 +1543,22 @@ def __init__(self, params, record_uid, gateway_uid, symmetric_key, base64_nonce,
raise Exception("Trickle ICE requires WebSocket support - install with: pip install websockets")

def signal_from_rust(self, response: dict):
"""Signal callback to handle Rust events and gateway communication"""
"""Signal callback to handle Rust events and gateway communication.

Called by Rust via PyO3. Any unhandled exception here crosses the FFI
boundary as a PyErr which can destabilise the tube, so we wrap the whole
body and log rather than propagate.
"""
try:
self._signal_from_rust_inner(response)
except Exception as _sig_err:
logging.error(
f"Unhandled exception in signal_from_rust "
f"(kind={response.get('kind','?')}, tube={response.get('tube_id','?')}): {_sig_err}",
exc_info=True,
)

def _signal_from_rust_inner(self, response: dict):
signal_kind = response.get('kind', '')
tube_id = response.get('tube_id', '')
data = response.get('data', '')
Expand Down Expand Up @@ -1552,7 +1603,7 @@ def signal_from_rust(self, response: dict):
self.connection_success_shown = True

# Get tunnel session for record details
if session:
if session and not self.silent:
logging.info(f"\n{bcolors.OKGREEN}Connection established successfully.{bcolors.ENDC}")

# Display record title if available
Expand Down Expand Up @@ -1623,6 +1674,8 @@ def signal_from_rust(self, response: dict):
logging.error(f"{bcolors.FAIL}Tunnel closed due to critical failure - '{tube_id}': {close_reason.name}{bcolors.ENDC}")

elif close_reason.is_user_initiated():
# Rust is already handling the close — skip redundant close_tube in cleanup()
self.tube_close_initiated = True
logging.debug(f"{bcolors.OKBLUE}User-initiated closure of tunnel '{tube_id}': {close_reason.name}{bcolors.ENDC}")

elif close_reason.is_retryable():
Expand Down Expand Up @@ -2007,10 +2060,14 @@ def _send_restart_offer(self, restart_sdp, tube_id):
Similar to _send_ice_candidate_immediately but sends an offer instead of candidates.
"""
try:
# Format as offer payload for gateway
# Format as offer payload for gateway.
# restart_sdp arrives base64-encoded from Rust (API contract), but the payload
# SDP field is raw text — gateway and vault both decode base64 before using it
# with the Rust boundary (gateway) or WebRTC API (vault).
raw_sdp = bytes_to_string(base64.b64decode(restart_sdp))
offer_payload = {
"type": "offer",
"sdp": restart_sdp,
"sdp": raw_sdp,
"ice_restart": True # Flag to indicate this is an ICE restart offer
}
string_data = json.dumps(offer_payload)
Expand Down Expand Up @@ -2074,7 +2131,7 @@ def _send_restart_offer(self, restart_sdp, tube_id):
def cleanup(self):
"""Cleanup resources"""
# Close the tube in Rust registry if it exists
if self.tube_id and self.tube_registry:
if self.tube_id and self.tube_registry and not self.tube_close_initiated:
try:
logging.debug(f"Closing tube {self.tube_id} in cleanup")
self.tube_registry.close_tube(self.tube_id, reason=CloseConnectionReasons.Error)
Expand All @@ -2096,7 +2153,7 @@ def cleanup(self):
logging.debug("TunnelSignalHandler cleaned up")

def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
seed, target_host, target_port, socks, trickle_ice=True, record_title=None, allow_supply_host=False, two_factor_value=None):
seed, target_host, target_port, socks, trickle_ice=True, record_title=None, allow_supply_host=False, two_factor_value=None, kind='start', probe_duration=30, probe_turn_only=False, probe_stun_only=False):
"""
Start a tunnel using Rust WebRTC with trickle ICE via HTTP POST and WebSocket responses.

Expand Down Expand Up @@ -2190,6 +2247,19 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
params, host, port, target_host, target_port, socks, nonce
)

# For probe mode with turn_only, force relay-only ICE on Commander's side too.
# Both peers must use RTCIceTransportPolicy::Relay for the connection to be
# pure TURN — otherwise the controlling peer (Commander) offers host/srflx
# candidates and ICE selects a direct path, bypassing the relay.
if probe_turn_only:
webrtc_settings["turn_only"] = True
elif probe_stun_only:
# Strip TURN credentials so Commander's ICE agent only gathers host/srflx
# candidates — no relay path is available on either side.
webrtc_settings.pop("turn_url", None)
webrtc_settings.pop("turn_username", None)
webrtc_settings.pop("turn_password", None)

# Determine conversation type (tunnel or protocol-specific)
conversation_type = webrtc_settings.get('conversationType', 'tunnel')

Expand Down Expand Up @@ -2267,7 +2337,8 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
trickle_ice=trickle_ice,
conversation_type=conversation_type,
router_tokens=router_tokens,
http_session=http_session
http_session=http_session,
silent=(kind == 'probe'), # Suppress connection display for probe/stress mode
)

# Store signal handler reference so we can send buffered candidates later
Expand Down Expand Up @@ -2381,6 +2452,18 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
# Prepare the offer data
data = {"offer": offer.get("offer")}

# For probe mode embed the flag in the encrypted payload so the gateway
# routes to its lightweight probe path. We still send kind='start' below
# so the Keeper router injects routerToken (required for TURN auth).
if kind == 'probe':
data["probe"] = True
if probe_duration != 30:
data["probe_duration"] = probe_duration
if probe_turn_only:
data["turn_only"] = True
if probe_stun_only:
data["stun_only"] = True

# If allowSupplyHost is enabled, include the target host and port in the payload
if allow_supply_host:
data["host"] = {
Expand Down Expand Up @@ -2563,7 +2646,8 @@ def start_rust_tunnel(params, record_uid, gateway_uid, host, port,
"websocket_thread": websocket_thread,
"conversation_id": conversation_id_original, # Use original, not base64 encoded
"tube_registry": tube_registry,
"status": "connecting" # Indicates async connection in progress
"status": "connecting", # Indicates async connection in progress
"local_port": tunnel_session.port, # Actual bound port (may differ from requested)
}

except Exception as e:
Expand Down
Loading
Loading