Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions keepercommander/commands/pam_launch/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
unregister_tunnel_session,
unregister_conversation_key,
get_keeper_tokens,
escalate_close,
CloseConnectionReasons,
)
from ..tunnel.port_forward.TunnelGraph import TunnelDAG
from .rust_log_filter import (
Expand Down Expand Up @@ -1415,13 +1417,11 @@ def _start_cli_session(
(banner printed there); do not create a second spinner or duplicate the launching line.
preserve_crlf: When True (default), STDOUT keeps raw CRLF; False when ``pam launch -n`` / ``--normalize-crlf``.
"""
import sys as _sys

# Non-interactive stdin guard: key-event mode requires a real TTY.
# --stdin (pipe mode) is fine with redirected stdin, but key mode is not —
# tty.setraw() will raise and character-at-a-time mapping makes no sense
# for piped/scripted input.
if not use_stdin and not _sys.stdin.isatty():
if not use_stdin and not sys.stdin.isatty():
if pre_connect_spinner is not None:
pre_connect_spinner.stop()
raise CommandError(
Expand Down Expand Up @@ -1452,19 +1452,48 @@ def signal_handler_fn(signum, frame):
# the web vault (immediate teardown, no grace period, no reconnect).
# The "Access expired" line is printed AFTER terminal reset in finally
# so the message survives reset_local_terminal_after_pam_session().
# On expiry we soft-close the tube and escalate to force_close_tube
# after FORCE_CLOSE_DELAY_SECONDS so any in-flight forwarded streams
# (SSH bytes etc.) are severed instead of lingering until the user
# disconnects manually. Escalation is gated on local hasattr +
# remote SDP version (FORCE_CLOSE_MIN_VERSION).
lease_timer = None
force_close_timer_holder = {} # mutable holder so cleanup can cancel
if workflow_expires_on_ms and workflow_expires_on_ms > 0:
import time as _time
seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - _time.time()
if seconds_until_expiry <= 0:
seconds_until_expiry = (workflow_expires_on_ms / 1000.0) - time.time()
_lease_tube_id = tunnel_result['tunnel'].get('tube_id')
_lease_tube_registry = tunnel_result['tunnel'].get('tube_registry')

def _on_lease_expired():
nonlocal shutdown_requested, lease_expired
lease_expired = True
shutdown_requested = True
if _lease_tube_id and _lease_tube_registry is not None:
# Fetch remote version lazily: the SDP answer arrives
# asynchronously; capturing eagerly at schedule time
# would race for short leases scheduled before SDP.
remote_ver = tunnel_result['tunnel'].get('remote_webrtc_version')
if not remote_ver:
sess = get_tunnel_session(_lease_tube_id)
remote_ver = (
getattr(sess, 'remote_webrtc_version', None)
if sess else None
)
force_close_timer_holder['t'] = escalate_close(
_lease_tube_registry,
_lease_tube_id,
remote_webrtc_version=remote_ver,
reason=CloseConnectionReasons.AdminClosed,
log_prefix=f"[lease-expiry launch tube={_lease_tube_id[:8]}] ",
)

if seconds_until_expiry <= 0:
# Already expired at session start: run the close-and-escalate
# path immediately so cleanup goes through the same flow as a
# mid-session expiry.
_on_lease_expired()
else:
import threading as _threading
def _on_lease_expired():
nonlocal shutdown_requested, lease_expired
lease_expired = True
shutdown_requested = True
lease_timer = _threading.Timer(seconds_until_expiry, _on_lease_expired)
lease_timer.daemon = True
lease_timer.start()
Expand Down
138 changes: 131 additions & 7 deletions keepercommander/commands/pam_launch/rust_log_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,79 @@
import threading


# Patterns for known leak messages from turn-0.11.0's relay-conn task.
# The webrtc-rs ICE agent does not synchronously cancel its TURN refresh task
# on PeerConnection.close(); the task survives indefinitely and re-fires every
# few minutes (TURN permission lifetime ~5 min, refresh at ~3/4 of that). Each
# iteration logs:
# "fail to refresh permissions: CreatePermission error response (error 400: Bad Request)"
# "refresh permissions failed"
# from turn-0.11.0/src/client/relay_conn.rs:528 / :618.
#
# Until the upstream leak is fixed, suppress these messages permanently — they
# are post-close stragglers from a deallocated TURN allocation and have no
# diagnostic value to the user.
_TURN_REFRESH_LEAK_PATTERNS = (
'fail to refresh permissions',
'refresh permissions failed',
)


class _PermanentTurnLeakFilter(logging.Filter):
"""Always drop the known turn-rs refresh-permission leak messages.

Installed once at module import time on the root logger, never removed.
Independent of the session-scoped _RustWebrtcToDebugFilter — that one
flips with --debug; this one is an upstream-bug workaround that should
fire regardless of debug state.
"""

def filter(self, record: logging.LogRecord) -> bool:
try:
msg = record.getMessage()
except Exception:
return True
for needle in _TURN_REFRESH_LEAK_PATTERNS:
if needle in msg:
return False
return True


# Loggers known to emit the leak. Both dot- and colon-separated names cover
# the Rust→Python bridge formats. ``turn`` and ``turn.client`` cover any
# parent that records may originate from depending on rust-log target style.
_TURN_LEAK_LOGGER_NAMES = (
'turn',
'turn.client',
'turn.client.relay_conn',
'turn::client::relay_conn',
)

_PERMANENT_TURN_FILTER = _PermanentTurnLeakFilter()


def _install_permanent_turn_filter():
"""Attach the content filter to the known leaky loggers AND root.

Python's logger filters fire only at the originating logger (filters do
NOT re-check during propagation up the hierarchy via callHandlers), so we
must attach to the actual emitting logger names rather than relying on
root.addFilter alone. Idempotent — safe to call again.
"""
for name in _TURN_LEAK_LOGGER_NAMES:
log = logging.getLogger(name)
if _PERMANENT_TURN_FILTER not in log.filters:
log.addFilter(_PERMANENT_TURN_FILTER)
# Also attach to root in case the Rust→Python bridge ever logs directly to
# root (cheap belt-and-braces).
root = logging.getLogger()
if _PERMANENT_TURN_FILTER not in root.filters:
root.addFilter(_PERMANENT_TURN_FILTER)


_install_permanent_turn_filter()


def _rust_webrtc_logger_name(name: str) -> bool:
"""True if logger name is from Rust/webrtc/turn so we treat its messages as DEBUG-only."""
if not name:
Expand Down Expand Up @@ -71,6 +144,10 @@ def enter_pam_launch_terminal_rust_logging():
Downgrades Rust/webrtc/turn messages to DEBUG so they only show with --debug.
Returns a token to pass to exit_pam_launch_terminal_rust_logging() on exit.
"""
global _ACTIVE_SESSION_COUNT
with _ACTIVE_SESSION_LOCK:
_ACTIVE_SESSION_COUNT += 1

root = logging.getLogger()
flt = _RustWebrtcToDebugFilter()
root.addFilter(flt)
Expand Down Expand Up @@ -111,11 +188,35 @@ def enter_pam_launch_terminal_rust_logging():
# the Rust/webrtc log filter. The Rust tube shutdown runs on its own runtime
# threads and can emit a final log record AFTER Python's session-exit path has
# returned control to the REPL — e.g. ``webrtc-sctp stream N not found`` when
# the channel is torn down. Without a grace period, that late record arrives
# at a root logger whose filter has already been removed and leaks to the
# console. We keep the filter in place for a short window so such stragglers
# are still suppressed.
_DEFAULT_RUST_LOG_FILTER_GRACE_SEC = 2.5
# the channel is torn down, or TURN ``fail to refresh permissions`` warnings
# from the relay-conn task as it observes the deallocated allocation.
#
# The window must outlive both:
# 1. The soft→hard close escalation in ``escalate_close``
# (``FORCE_CLOSE_DELAY_SECONDS`` = 3 s)
# 2. A brief TURN refresh-task latency after the PeerConnection drop cascade
#
# Imported lazily below to avoid a top-level cycle (this module is imported
# during pam_launch init, before the tunnel helpers are loaded for some
# callers).
def _force_close_delay_seconds():
try:
from ..tunnel.port_forward.tunnel_helpers import FORCE_CLOSE_DELAY_SECONDS
return FORCE_CLOSE_DELAY_SECONDS
except Exception:
return 3.0


_DEFAULT_RUST_LOG_FILTER_GRACE_SEC = _force_close_delay_seconds() + 1.5

# Refcount of active pam-launch sessions that have rust-log filtering installed.
# Incremented in enter_*, decremented at the END of the grace timer in
# _do_exit_rust_logging. The restore work (removing class-level filters,
# restoring pre-session logger state) is only performed when this drops to 0,
# so a second `pam launch` started during the grace window of a prior one is
# not silently de-filtered when the prior session's timer fires.
_ACTIVE_SESSION_COUNT = 0
_ACTIVE_SESSION_LOCK = threading.Lock()


def _do_exit_rust_logging(token):
Expand All @@ -124,9 +225,32 @@ def _do_exit_rust_logging(token):
return
flt, saved = token[0], token[1]
original_logger_class = token[2] if len(token) > 2 else logging.Logger
logging.setLoggerClass(original_logger_class)

# Always remove THIS session's filter instance from root so per-token
# filters don't pile up. The bulk class-based cleanup below only runs when
# we are the last active session.
root = logging.getLogger()
root.removeFilter(flt)
try:
root.removeFilter(flt)
except Exception:
pass

global _ACTIVE_SESSION_COUNT
with _ACTIVE_SESSION_LOCK:
_ACTIVE_SESSION_COUNT = max(0, _ACTIVE_SESSION_COUNT - 1)
last_session = _ACTIVE_SESSION_COUNT == 0
if not last_session:
# Another pam launch session is still active (or in its own grace
# window); leave the class-level filter and saved state alone so its
# filtering keeps working. We already removed our specific instance
# from root above.
logging.debug(
"rust_log_filter: skipping restore, %d session(s) still active",
_ACTIVE_SESSION_COUNT,
)
return

logging.setLoggerClass(original_logger_class)
# Remove downgrade filter from all Rust/webrtc loggers (we may have added the shared
# filter to existing loggers, and _RustAwareLogger instances have their own filter)
for name in list(logging.Logger.manager.loggerDict.keys()):
Expand Down
32 changes: 1 addition & 31 deletions keepercommander/commands/pam_launch/terminal_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
MAIN_NONCE_LENGTH,
SYMMETRIC_KEY_LENGTH,
set_remote_description_and_parse_version,
_version_at_least,
)
from ..tunnel.port_forward.TunnelGraph import TunnelDAG
from ..pam.pam_dto import GatewayAction, GatewayActionWebRTCSession
Expand Down Expand Up @@ -136,37 +137,6 @@
CONNECT_AS_MIN_VERSION = "2.1.6"


def _version_at_least(version: Optional[str], min_version: str) -> bool:
"""
Compare semantic versions. Returns True if version >= min_version.

Args:
version: Parsed version (e.g. "2.1.4") or None (treated as unknown/old).
min_version: Minimum required version (e.g. "2.1.0").

Returns:
True if version is known and >= min_version; False if unknown or older.
"""
if not version:
return False

def parse(v: str) -> tuple:
parts = []
for p in v.split(".")[:3]: # major.minor.patch
try:
parts.append(int(p))
except ValueError:
parts.append(0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])

try:
return parse(version) >= parse(min_version)
except Exception:
return False


def _ensure_max_message_size_attribute(sdp_offer: Optional[str]) -> Optional[str]:
"""
Ensure the SDP offer advertises the same max-message-size attribute as Web Vault.
Expand Down
Loading
Loading