From 31d291c7010017072a7688e89e608739abd8df30 Mon Sep 17 00:00:00 2001 From: Ivan Dimov <78815270+idimov-keeper@users.noreply.github.com> Date: Tue, 5 May 2026 20:35:41 -0500 Subject: [PATCH 1/4] Hard-close PAM session tubes on workflow lease expiry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the workflow lease expires for `pam tunnel start` or `pam launch`, soft-close the tube now and escalate to keeper_pam_webrtc_rs's new force_close_tube after 3s. Force-close drops the local TCP listener and severs in-flight forwarded TCP streams (SSH, MySQL, etc.), which the prior soft-close did not — an active session would linger past expiry until the user disconnected manually. Escalation is gated on both endpoints supporting force_close_tube: hasattr(tube_registry, "force_close_tube") on the local crate AND remote SDP-advertised version >= FORCE_CLOSE_MIN_VERSION ("2.1.18"). Older peers fall back to soft close only. - tunnel_helpers: add escalate_close() shared helper plus FORCE_CLOSE_MIN_VERSION / FORCE_CLOSE_DELAY_SECONDS constants; consolidate _version_at_least here as the single source of truth (terminal_connection.py re-exports for back-compat with launch.py) - tunnel_and_connections: replace the previously-commented-out soft close in `pam tunnel start`'s lease-expiry callback with escalate_close - pam_launch/launch: wire escalate_close into `pam launch`'s _on_lease_expired (was only setting flags before, no tube close) --- keepercommander/commands/pam_launch/launch.py | 29 +++++ .../pam_launch/terminal_connection.py | 32 +----- .../tunnel/port_forward/tunnel_helpers.py | 100 ++++++++++++++++++ .../commands/tunnel_and_connections.py | 59 ++++------- 4 files changed, 152 insertions(+), 68 deletions(-) diff --git a/keepercommander/commands/pam_launch/launch.py b/keepercommander/commands/pam_launch/launch.py index 6d64cf993..f0dfe1e61 100644 --- a/keepercommander/commands/pam_launch/launch.py +++ b/keepercommander/commands/pam_launch/launch.py @@ -62,6 +62,8 @@ unregister_tunnel_session, unregister_conversation_key, get_keeper_tokens, + escalate_close, + CloseConnectionReasons, ) from ..tunnel.port_forward.TunnelGraph import TunnelDAG from .rust_log_filter import ( @@ -1452,7 +1454,13 @@ def signal_handler_fn(signum, frame): # the web vault (immediate teardown, no grace period, no reconnect). # The "Access expired" line is printed AFTER terminal reset in finally # so the message survives reset_local_terminal_after_pam_session(). + # On expiry we soft-close the tube and escalate to force_close_tube + # after FORCE_CLOSE_DELAY_SECONDS so any in-flight forwarded streams + # (SSH bytes etc.) are severed instead of lingering until the user + # disconnects manually. Escalation is gated on local hasattr + + # remote SDP version (FORCE_CLOSE_MIN_VERSION). lease_timer = None + force_close_timer_holder = {} # mutable holder so cleanup can cancel if workflow_expires_on_ms and workflow_expires_on_ms > 0: import time as _time seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - _time.time() @@ -1461,10 +1469,31 @@ def signal_handler_fn(signum, frame): shutdown_requested = True else: import threading as _threading + _lease_tube_id = tunnel_result['tunnel'].get('tube_id') + _lease_tube_registry = tunnel_result['tunnel'].get('tube_registry') + def _on_lease_expired(): nonlocal shutdown_requested, lease_expired lease_expired = True shutdown_requested = True + if _lease_tube_id and _lease_tube_registry is not None: + # Fetch remote version lazily: the SDP answer arrives + # asynchronously; capturing eagerly at schedule time + # would race for short leases scheduled before SDP. + remote_ver = tunnel_result['tunnel'].get('remote_webrtc_version') + if not remote_ver: + sess = get_tunnel_session(_lease_tube_id) + remote_ver = ( + getattr(sess, 'remote_webrtc_version', None) + if sess else None + ) + force_close_timer_holder['t'] = escalate_close( + _lease_tube_registry, + _lease_tube_id, + remote_webrtc_version=remote_ver, + reason=CloseConnectionReasons.AdminClosed, + log_prefix=f"[lease-expiry launch tube={_lease_tube_id[:8]}] ", + ) lease_timer = _threading.Timer(seconds_until_expiry, _on_lease_expired) lease_timer.daemon = True lease_timer.start() diff --git a/keepercommander/commands/pam_launch/terminal_connection.py b/keepercommander/commands/pam_launch/terminal_connection.py index bf27de0be..970dfde05 100644 --- a/keepercommander/commands/pam_launch/terminal_connection.py +++ b/keepercommander/commands/pam_launch/terminal_connection.py @@ -57,6 +57,7 @@ MAIN_NONCE_LENGTH, SYMMETRIC_KEY_LENGTH, set_remote_description_and_parse_version, + _version_at_least, ) from ..tunnel.port_forward.TunnelGraph import TunnelDAG from ..pam.pam_dto import GatewayAction, GatewayActionWebRTCSession @@ -136,37 +137,6 @@ CONNECT_AS_MIN_VERSION = "2.1.6" -def _version_at_least(version: Optional[str], min_version: str) -> bool: - """ - Compare semantic versions. Returns True if version >= min_version. - - Args: - version: Parsed version (e.g. "2.1.4") or None (treated as unknown/old). - min_version: Minimum required version (e.g. "2.1.0"). - - Returns: - True if version is known and >= min_version; False if unknown or older. - """ - if not version: - return False - - def parse(v: str) -> tuple: - parts = [] - for p in v.split(".")[:3]: # major.minor.patch - try: - parts.append(int(p)) - except ValueError: - parts.append(0) - while len(parts) < 3: - parts.append(0) - return tuple(parts[:3]) - - try: - return parse(version) >= parse(min_version) - except Exception: - return False - - def _ensure_max_message_size_attribute(sdp_offer: Optional[str]) -> Optional[str]: """ Ensure the SDP offer advertises the same max-message-size attribute as Web Vault. diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py index 628ecf47a..0f4ce1286 100644 --- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py +++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py @@ -103,6 +103,106 @@ def set_remote_description_and_parse_version(tube_registry, tube_id, sdp, is_ans return remote_ver +def _version_at_least(version, min_version): + """ + Compare semantic versions. Returns True if `version` >= `min_version`. + + `version` of None or unparseable is treated as unknown/old (False). + """ + if not version: + return False + + def parse(v): + parts = [] + for p in v.split(".")[:3]: + try: + parts.append(int(p)) + except ValueError: + parts.append(0) + while len(parts) < 3: + parts.append(0) + return tuple(parts[:3]) + + try: + return parse(version) >= parse(min_version) + except Exception: + return False + + +# Minimum keeper-pam-webrtc-rs version that exposes force_close_tube. Both the +# local Rust crate AND the remote peer must satisfy this gate before Commander +# escalates a soft close to a force close. Local check uses hasattr (the binding +# attribute is missing on older crates), remote check uses the SDP-advertised +# version string. +FORCE_CLOSE_MIN_VERSION = "2.1.18" + +# Default delay between the soft close and the force-close escalation. Matches +# the consumer-side budget agreed with the gateway (gateway-side +# KEEPER_GATEWAY_FORCE_CLOSE_TIMEOUT is 6s; we run faster on the consumer because +# at lease expiry there is no reason to wait long). +FORCE_CLOSE_DELAY_SECONDS = 3.0 + + +def escalate_close( + tube_registry, + tube_id, + *, + remote_webrtc_version=None, + reason=None, + hard_after_seconds=FORCE_CLOSE_DELAY_SECONDS, + log_prefix="", +): + """ + Soft-close a tube now, then escalate to force_close_tube after + `hard_after_seconds` if both endpoints support it. + + The soft close stops new channel creation and emits CloseConnection control + frames; the force close (when available) drops the local TCP listener, + severs in-flight forwarded TCP streams (SSH, MySQL, etc.) and tears down + the peer connection on a short bounded budget. + + Returns the scheduled `threading.Timer` (or None if escalation is not + available) so callers can cancel it on a clean exit. + """ + if reason is None: + reason = CloseConnectionReasons.AdminClosed + + try: + tube_registry.close_tube(tube_id, reason=reason) + except Exception as e: + logging.debug(f"{log_prefix}soft close_tube failed: {e}") + + has_local = hasattr(tube_registry, "force_close_tube") + has_remote = _version_at_least(remote_webrtc_version, FORCE_CLOSE_MIN_VERSION) + if not has_local: + logging.debug( + f"{log_prefix}force_close_tube unavailable in local keeper_pam_webrtc_rs - " + f"soft close only" + ) + return None + if not has_remote: + logging.debug( + f"{log_prefix}remote keeper-pam-webrtc {remote_webrtc_version!r} < " + f"{FORCE_CLOSE_MIN_VERSION} - soft close only" + ) + return None + + def _do_force_close(): + try: + logging.info( + f"{log_prefix}escalating to force_close_tube({tube_id}) after " + f"{hard_after_seconds}s" + ) + tube_registry.force_close_tube(tube_id, reason=reason) + except Exception as e: + logging.debug(f"{log_prefix}force_close_tube failed: {e}") + + timer = threading.Timer(hard_after_seconds, _do_force_close) + timer.daemon = True + timer.start() + return timer + + # Constants NONCE_LENGTH = 12 MAIN_NONCE_LENGTH = 16 diff --git a/keepercommander/commands/tunnel_and_connections.py b/keepercommander/commands/tunnel_and_connections.py index 026be65c9..8ac5e2e0b 100644 --- a/keepercommander/commands/tunnel_and_connections.py +++ b/keepercommander/commands/tunnel_and_connections.py @@ -32,7 +32,7 @@ from .tunnel.port_forward.tunnel_helpers import find_open_port, get_config_uid, get_keeper_tokens, \ get_or_create_tube_registry, get_gateway_uid_from_record, resolve_record, resolve_pam_config, resolve_folder, \ remove_field, start_rust_tunnel, get_tunnel_session, unregister_tunnel_session, CloseConnectionReasons, \ - wait_for_tunnel_connection, create_rust_webrtc_settings + wait_for_tunnel_connection, create_rust_webrtc_settings, escalate_close from .tunnel_registry import ( PARENT_GRACE_SECONDS, is_pid_alive, @@ -1013,30 +1013,13 @@ def execute(self, params, **kwargs): self._print_keeperdb_proxy_banner(host, port, db_type_for_banner) # Workflow lease expiry handling. # - # Behavior note: at expiresOn we want to terminate the tunnel and - # any in-flight forwarded connections (web-vault-equivalent hard - # disconnect). However: - # * tube_registry.close_tube(tube_id) is a SOFT close — - # it marks the tube closed and stops new channel creation but - # does NOT terminate already-open forwarded channels. An SSH - # session active through the tube keeps relaying bytes until - # the SSH client itself closes (you'll see TURN - # "fail to refresh permissions" warnings while the existing - # 5-tuple keeps flowing). - # * The local TCP listener is owned by Rust - # (keeper_pam_webrtc_rs); Python has no handle to force-close - # it from here. - # So a true hard-kill is not currently possible from the Python - # layer. The server-side workflow lease still becomes invalid at - # expiresOn (the gateway will refuse new auth requests); only the - # already-running SSH session survives until natural disconnect. - # - # The previous implementation called - # `tube_registry.close_tube(_tube_id, reason=CloseConnectionReasons.Normal)` - # but it was a no-op against active channels and produced - # confusing "fail to refresh permissions" noise from the TURN - # relay. Kept commented out below for reference if a future - # keeper_pam_webrtc_rs release adds a hard-kill primitive. + # At expiresOn we soft-close the tube (stops new channels, sends + # CloseConnection control frames) and, after a short delay, escalate + # to force_close_tube which drops the local TCP listener and severs + # any active forwarded streams (SSH, MySQL, etc.). The escalation + # only fires when both the local Rust crate and the remote peer + # advertise FORCE_CLOSE_MIN_VERSION; older peers get the soft close + # only and the in-flight session lingers until natural disconnect. if workflow_expires_on_ms and workflow_expires_on_ms > 0: import time as _time seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - _time.time() @@ -1058,21 +1041,23 @@ def _close_on_lease_expiry(_tube_id=tube_id, _record_uid=record_uid): try: print( f"\n{bcolors.WARNING}Tunnel access lease expired for " - f"{_record_uid}. Server will refuse new auth requests; " - f"any in-flight SSH session will continue until you " - f"disconnect it.{bcolors.ENDC}", + f"{_record_uid}. Closing the tunnel; any in-flight " + f"forwarded connections will be terminated." + f"{bcolors.ENDC}", flush=True, ) - # Soft-close — kept commented out: doesn't actually - # terminate active forwarded channels, only emits - # TURN permission-refresh errors. Re-enable once - # keeper_pam_webrtc_rs provides a hard-kill that - # also drops the local listener. - # tube_registry.close_tube(_tube_id, reason=CloseConnectionReasons.Normal) + sess = get_tunnel_session(_tube_id) + remote_ver = getattr(sess, 'remote_webrtc_version', None) if sess else None + escalate_close( + tube_registry, + _tube_id, + remote_webrtc_version=remote_ver, + reason=CloseConnectionReasons.AdminClosed, + log_prefix=f"[lease-expiry tunnel record={_record_uid}] ", + ) # Wake any --foreground / --run blocking wait so the - # process self-terminates instead of hanging past lease - # expiry. Default interactive mode does not register - # an event here — it has no blocking wait to break. + # process self-terminates. Default interactive mode + # does not register an event here. shutdown_event = _LEASE_SHUTDOWN_EVENTS_BY_RECORD.get(_record_uid) if shutdown_event is not None: shutdown_event.set() From 8fea79ff0ce0ff8be37a4b54ecff92ee8dc2e2ee Mon Sep 17 00:00:00 2001 From: Ivan Dimov <78815270+idimov-keeper@users.noreply.github.com> Date: Wed, 6 May 2026 12:58:40 -0500 Subject: [PATCH 2/4] Escalate-close when launch lease already expired at start --- keepercommander/commands/pam_launch/launch.py | 62 +++++++++---------- .../commands/tunnel_and_connections.py | 14 ++--- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/keepercommander/commands/pam_launch/launch.py b/keepercommander/commands/pam_launch/launch.py index f0dfe1e61..a6114b7a7 100644 --- a/keepercommander/commands/pam_launch/launch.py +++ b/keepercommander/commands/pam_launch/launch.py @@ -1417,13 +1417,11 @@ def _start_cli_session( (banner printed there); do not create a second spinner or duplicate the launching line. preserve_crlf: When True (default), STDOUT keeps raw CRLF; False when ``pam launch -n`` / ``--normalize-crlf``. """ - import sys as _sys - # Non-interactive stdin guard: key-event mode requires a real TTY. # --stdin (pipe mode) is fine with redirected stdin, but key mode is not — # tty.setraw() will raise and character-at-a-time mapping makes no sense # for piped/scripted input. - if not use_stdin and not _sys.stdin.isatty(): + if not use_stdin and not sys.stdin.isatty(): if pre_connect_spinner is not None: pre_connect_spinner.stop() raise CommandError( @@ -1462,38 +1460,40 @@ def signal_handler_fn(signum, frame): lease_timer = None force_close_timer_holder = {} # mutable holder so cleanup can cancel if workflow_expires_on_ms and workflow_expires_on_ms > 0: - import time as _time - seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - _time.time() - if seconds_until_expiry <= 0: + seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - time.time() + _lease_tube_id = tunnel_result['tunnel'].get('tube_id') + _lease_tube_registry = tunnel_result['tunnel'].get('tube_registry') + + def _on_lease_expired(): + nonlocal shutdown_requested, lease_expired lease_expired = True shutdown_requested = True + if _lease_tube_id and _lease_tube_registry is not None: + # Fetch remote version lazily: the SDP answer arrives + # asynchronously; capturing eagerly at schedule time + # would race for short leases scheduled before SDP. + remote_ver = tunnel_result['tunnel'].get('remote_webrtc_version') + if not remote_ver: + sess = get_tunnel_session(_lease_tube_id) + remote_ver = ( + getattr(sess, 'remote_webrtc_version', None) + if sess else None + ) + force_close_timer_holder['t'] = escalate_close( + _lease_tube_registry, + _lease_tube_id, + remote_webrtc_version=remote_ver, + reason=CloseConnectionReasons.AdminClosed, + log_prefix=f"[lease-expiry launch tube={_lease_tube_id[:8]}] ", + ) + + if seconds_until_expiry <= 0: + # Already expired at session start: run the close-and-escalate + # path immediately so cleanup goes through the same flow as a + # mid-session expiry. + _on_lease_expired() else: import threading as _threading - _lease_tube_id = tunnel_result['tunnel'].get('tube_id') - _lease_tube_registry = tunnel_result['tunnel'].get('tube_registry') - - def _on_lease_expired(): - nonlocal shutdown_requested, lease_expired - lease_expired = True - shutdown_requested = True - if _lease_tube_id and _lease_tube_registry is not None: - # Fetch remote version lazily: the SDP answer arrives - # asynchronously; capturing eagerly at schedule time - # would race for short leases scheduled before SDP. - remote_ver = tunnel_result['tunnel'].get('remote_webrtc_version') - if not remote_ver: - sess = get_tunnel_session(_lease_tube_id) - remote_ver = ( - getattr(sess, 'remote_webrtc_version', None) - if sess else None - ) - force_close_timer_holder['t'] = escalate_close( - _lease_tube_registry, - _lease_tube_id, - remote_webrtc_version=remote_ver, - reason=CloseConnectionReasons.AdminClosed, - log_prefix=f"[lease-expiry launch tube={_lease_tube_id[:8]}] ", - ) lease_timer = _threading.Timer(seconds_until_expiry, _on_lease_expired) lease_timer.daemon = True lease_timer.start() diff --git a/keepercommander/commands/tunnel_and_connections.py b/keepercommander/commands/tunnel_and_connections.py index 8ac5e2e0b..32e00dc56 100644 --- a/keepercommander/commands/tunnel_and_connections.py +++ b/keepercommander/commands/tunnel_and_connections.py @@ -24,7 +24,7 @@ import sys import threading import time -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from keeper_secrets_manager_core.utils import bytes_to_base64, base64_to_bytes, url_safe_str_to_bytes from .base import Command, GroupCommand, dump_report_data, RecordMixin @@ -54,14 +54,12 @@ # so a re-run of `pam tunnel start` in the same shell session doesn't leave # the original timer alive and produce duplicate "Tunnel access expired" # messages from the prior tunnel. -_LEASE_EXPIRY_TIMERS_BY_RECORD = {} # type: dict[str, threading.Timer] +_LEASE_EXPIRY_TIMERS_BY_RECORD: Dict[str, threading.Timer] = {} # Maps record_uid -> threading.Event used by --foreground / --run modes to break # their blocking wait when the workflow lease expires. Set by the mode block, # read by the lease-expiry callback. Default interactive mode does NOT register # (it has no blocking wait to interrupt; user SSH session continues naturally). -_LEASE_SHUTDOWN_EVENTS_BY_RECORD = {} # type: dict[str, threading.Event] -import threading as _lease_threading_module # noqa: E402 (used only by the tunnel-start timer) - +_LEASE_SHUTDOWN_EVENTS_BY_RECORD: Dict[str, threading.Event] = {} # Group Commands @@ -1021,8 +1019,7 @@ def execute(self, params, **kwargs): # advertise FORCE_CLOSE_MIN_VERSION; older peers get the soft close # only and the in-flight session lingers until natural disconnect. if workflow_expires_on_ms and workflow_expires_on_ms > 0: - import time as _time - seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - _time.time() + seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - time.time() tube_id = result.get('tube_id') if tube_id and seconds_until_expiry > 0: # Dedup: cancel any pending lease-expiry timer for this @@ -1066,7 +1063,7 @@ def _close_on_lease_expiry(_tube_id=tube_id, _record_uid=record_uid): finally: _LEASE_EXPIRY_TIMERS_BY_RECORD.pop(_record_uid, None) - timer = _lease_threading_module.Timer( + timer = threading.Timer( seconds_until_expiry, _close_on_lease_expiry, ) timer.daemon = True @@ -1688,7 +1685,6 @@ def _record(name: str, passed: bool, detail: str, ms: int): password=turn_password, ) if output_format == 'json': - import json print(json.dumps(rust_results, indent=2)) return 0 From ddf8332db3754f8138092172b4e82abd6324fbfe Mon Sep 17 00:00:00 2001 From: Ivan Dimov <78815270+idimov-keeper@users.noreply.github.com> Date: Wed, 6 May 2026 17:23:11 -0500 Subject: [PATCH 3/4] Suppress runaway TURN refresh-permission log leak; harden rust-log filter for concurrent sessions --- .../commands/pam_launch/rust_log_filter.py | 138 +++++++++++++++++- .../tunnel/port_forward/tunnel_helpers.py | 2 +- 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/keepercommander/commands/pam_launch/rust_log_filter.py b/keepercommander/commands/pam_launch/rust_log_filter.py index cded0e9bd..b1ee55e96 100644 --- a/keepercommander/commands/pam_launch/rust_log_filter.py +++ b/keepercommander/commands/pam_launch/rust_log_filter.py @@ -9,6 +9,79 @@ import threading +# Patterns for known leak messages from turn-0.11.0's relay-conn task. +# The webrtc-rs ICE agent does not synchronously cancel its TURN refresh task +# on PeerConnection.close(); the task survives indefinitely and re-fires every +# few minutes (TURN permission lifetime ~5 min, refresh at ~3/4 of that). Each +# iteration logs: +# "fail to refresh permissions: CreatePermission error response (error 400: Bad Request)" +# "refresh permissions failed" +# from turn-0.11.0/src/client/relay_conn.rs:528 / :618. +# +# Until the upstream leak is fixed, suppress these messages permanently — they +# are post-close stragglers from a deallocated TURN allocation and have no +# diagnostic value to the user. +_TURN_REFRESH_LEAK_PATTERNS = ( + 'fail to refresh permissions', + 'refresh permissions failed', +) + + +class _PermanentTurnLeakFilter(logging.Filter): + """Always drop the known turn-rs refresh-permission leak messages. + + Installed once at module import time on the root logger, never removed. + Independent of the session-scoped _RustWebrtcToDebugFilter — that one + flips with --debug; this one is an upstream-bug workaround that should + fire regardless of debug state. + """ + + def filter(self, record: logging.LogRecord) -> bool: + try: + msg = record.getMessage() + except Exception: + return True + for needle in _TURN_REFRESH_LEAK_PATTERNS: + if needle in msg: + return False + return True + + +# Loggers known to emit the leak. Both dot- and colon-separated names cover +# the Rust→Python bridge formats. ``turn`` and ``turn.client`` cover any +# parent that records may originate from depending on rust-log target style. +_TURN_LEAK_LOGGER_NAMES = ( + 'turn', + 'turn.client', + 'turn.client.relay_conn', + 'turn::client::relay_conn', +) + +_PERMANENT_TURN_FILTER = _PermanentTurnLeakFilter() + + +def _install_permanent_turn_filter(): + """Attach the content filter to the known leaky loggers AND root. + + Python's logger filters fire only at the originating logger (filters do + NOT re-check during propagation up the hierarchy via callHandlers), so we + must attach to the actual emitting logger names rather than relying on + root.addFilter alone. Idempotent — safe to call again. + """ + for name in _TURN_LEAK_LOGGER_NAMES: + log = logging.getLogger(name) + if _PERMANENT_TURN_FILTER not in log.filters: + log.addFilter(_PERMANENT_TURN_FILTER) + # Also attach to root in case the Rust→Python bridge ever logs directly to + # root (cheap belt-and-braces). + root = logging.getLogger() + if _PERMANENT_TURN_FILTER not in root.filters: + root.addFilter(_PERMANENT_TURN_FILTER) + + +_install_permanent_turn_filter() + + def _rust_webrtc_logger_name(name: str) -> bool: """True if logger name is from Rust/webrtc/turn so we treat its messages as DEBUG-only.""" if not name: @@ -71,6 +144,10 @@ def enter_pam_launch_terminal_rust_logging(): Downgrades Rust/webrtc/turn messages to DEBUG so they only show with --debug. Returns a token to pass to exit_pam_launch_terminal_rust_logging() on exit. """ + global _ACTIVE_SESSION_COUNT + with _ACTIVE_SESSION_LOCK: + _ACTIVE_SESSION_COUNT += 1 + root = logging.getLogger() flt = _RustWebrtcToDebugFilter() root.addFilter(flt) @@ -111,11 +188,35 @@ def enter_pam_launch_terminal_rust_logging(): # the Rust/webrtc log filter. The Rust tube shutdown runs on its own runtime # threads and can emit a final log record AFTER Python's session-exit path has # returned control to the REPL — e.g. ``webrtc-sctp stream N not found`` when -# the channel is torn down. Without a grace period, that late record arrives -# at a root logger whose filter has already been removed and leaks to the -# console. We keep the filter in place for a short window so such stragglers -# are still suppressed. -_DEFAULT_RUST_LOG_FILTER_GRACE_SEC = 2.5 +# the channel is torn down, or TURN ``fail to refresh permissions`` warnings +# from the relay-conn task as it observes the deallocated allocation. +# +# The window must outlive both: +# 1. The soft→hard close escalation in ``escalate_close`` +# (``FORCE_CLOSE_DELAY_SECONDS`` = 3 s) +# 2. A brief TURN refresh-task latency after the PeerConnection drop cascade +# +# Imported lazily below to avoid a top-level cycle (this module is imported +# during pam_launch init, before the tunnel helpers are loaded for some +# callers). +def _force_close_delay_seconds(): + try: + from ..tunnel.port_forward.tunnel_helpers import FORCE_CLOSE_DELAY_SECONDS + return FORCE_CLOSE_DELAY_SECONDS + except Exception: + return 3.0 + + +_DEFAULT_RUST_LOG_FILTER_GRACE_SEC = _force_close_delay_seconds() + 1.5 + +# Refcount of active pam-launch sessions that have rust-log filtering installed. +# Incremented in enter_*, decremented at the END of the grace timer in +# _do_exit_rust_logging. The restore work (removing class-level filters, +# restoring pre-session logger state) is only performed when this drops to 0, +# so a second `pam launch` started during the grace window of a prior one is +# not silently de-filtered when the prior session's timer fires. +_ACTIVE_SESSION_COUNT = 0 +_ACTIVE_SESSION_LOCK = threading.Lock() def _do_exit_rust_logging(token): @@ -124,9 +225,32 @@ def _do_exit_rust_logging(token): return flt, saved = token[0], token[1] original_logger_class = token[2] if len(token) > 2 else logging.Logger - logging.setLoggerClass(original_logger_class) + + # Always remove THIS session's filter instance from root so per-token + # filters don't pile up. The bulk class-based cleanup below only runs when + # we are the last active session. root = logging.getLogger() - root.removeFilter(flt) + try: + root.removeFilter(flt) + except Exception: + pass + + global _ACTIVE_SESSION_COUNT + with _ACTIVE_SESSION_LOCK: + _ACTIVE_SESSION_COUNT = max(0, _ACTIVE_SESSION_COUNT - 1) + last_session = _ACTIVE_SESSION_COUNT == 0 + if not last_session: + # Another pam launch session is still active (or in its own grace + # window); leave the class-level filter and saved state alone so its + # filtering keeps working. We already removed our specific instance + # from root above. + logging.debug( + "rust_log_filter: skipping restore, %d session(s) still active", + _ACTIVE_SESSION_COUNT, + ) + return + + logging.setLoggerClass(original_logger_class) # Remove downgrade filter from all Rust/webrtc loggers (we may have added the shared # filter to existing loggers, and _RustAwareLogger instances have their own filter) for name in list(logging.Logger.manager.loggerDict.keys()): diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py index 0f4ce1286..f70c20f53 100644 --- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py +++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py @@ -189,7 +189,7 @@ def escalate_close( def _do_force_close(): try: - logging.info( + logging.debug( f"{log_prefix}escalating to force_close_tube({tube_id}) after " f"{hard_after_seconds}s" ) From f8912f73543710c003a0a19dff20e47075374dfb Mon Sep 17 00:00:00 2001 From: Ivan Dimov <78815270+idimov-keeper@users.noreply.github.com> Date: Wed, 6 May 2026 19:28:40 -0500 Subject: [PATCH 4/4] Redraw keeper-shell prompt after async lease-expiry message --- .../tunnel/port_forward/tunnel_helpers.py | 38 +++++++++++++++++++ .../commands/tunnel_and_connections.py | 8 ++-- requirements.txt | 2 +- setup.cfg | 2 +- 4 files changed, 44 insertions(+), 6 deletions(-) diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py index f70c20f53..11f18aece 100644 --- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py +++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py @@ -143,6 +143,44 @@ def parse(v): FORCE_CLOSE_DELAY_SECONDS = 3.0 +def print_above_keeper_prompt(msg): + """Print ``msg`` so the keeper-shell prompt redraws itself underneath it. + + Strategy: + 1. If a prompt-toolkit app is running, call ``app.renderer.erase()`` — + this writes the ANSI sequences to fully erase the prompt area + (which may span multiple lines), leaving a clean cursor. + 2. Print the message + newline so the cursor advances below. + 3. Call ``app.invalidate()`` (thread-safe) to schedule a fresh prompt + render at the new cursor position. + + Falls back to plain ``print`` if no app is running. Avoids + ``run_in_terminal`` (returns a coroutine that needs to be awaited on + the app's event loop; scheduling that from a Timer thread is + version-fragile and leaks un-awaited coroutines). + """ + app = None + try: + from prompt_toolkit.application.current import get_app_or_none + app = get_app_or_none() + if app is not None and app.is_running: + try: + app.renderer.erase() + except Exception: + pass + except Exception: + app = None + + sys.stdout.write(msg + '\n') + sys.stdout.flush() + + if app is not None: + try: + app.invalidate() + except Exception: + pass + + def escalate_close( tube_registry, tube_id, diff --git a/keepercommander/commands/tunnel_and_connections.py b/keepercommander/commands/tunnel_and_connections.py index 32e00dc56..3034d3644 100644 --- a/keepercommander/commands/tunnel_and_connections.py +++ b/keepercommander/commands/tunnel_and_connections.py @@ -32,7 +32,8 @@ from .tunnel.port_forward.tunnel_helpers import find_open_port, get_config_uid, get_keeper_tokens, \ get_or_create_tube_registry, get_gateway_uid_from_record, resolve_record, resolve_pam_config, resolve_folder, \ remove_field, start_rust_tunnel, get_tunnel_session, unregister_tunnel_session, CloseConnectionReasons, \ - wait_for_tunnel_connection, create_rust_webrtc_settings, escalate_close + wait_for_tunnel_connection, create_rust_webrtc_settings, escalate_close, \ + print_above_keeper_prompt from .tunnel_registry import ( PARENT_GRACE_SECONDS, is_pid_alive, @@ -1036,12 +1037,11 @@ def execute(self, params, **kwargs): def _close_on_lease_expiry(_tube_id=tube_id, _record_uid=record_uid): try: - print( + print_above_keeper_prompt( f"\n{bcolors.WARNING}Tunnel access lease expired for " f"{_record_uid}. Closing the tunnel; any in-flight " f"forwarded connections will be terminated." - f"{bcolors.ENDC}", - flush=True, + f"{bcolors.ENDC}" ) sess = get_tunnel_session(_tube_id) remote_ver = getattr(sess, 'remote_webrtc_version', None) if sess else None diff --git a/requirements.txt b/requirements.txt index 41983e843..f67f0f6c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ requests>=2.31.0 cryptography>=46.0.6 protobuf>=5.29.6 keeper-secrets-manager-core>=16.6.0 -keeper_pam_webrtc_rs>=2.1.6 +keeper_pam_webrtc_rs>=2.1.17 pydantic>=2.6.4 flask pyngrok>=7.5.0 diff --git a/setup.cfg b/setup.cfg index ac7f6bfb8..3d6801983 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,7 +50,7 @@ install_requires = requests>=2.31.0 tabulate websockets - keeper_pam_webrtc_rs>=2.1.6 + keeper_pam_webrtc_rs>=2.1.17 pydantic>=2.6.4 fpdf2>=2.8.3 cbor2; sys_platform == "darwin" and python_version>='3.10'