From 68451d3bf46b3ffd4c7ce1351ff81e9a186ed8f2 Mon Sep 17 00:00:00 2001 From: Martin Sawczynski Date: Fri, 1 May 2026 17:44:23 +0100 Subject: [PATCH] feat(pam-extended): add pam extended schedule + rule commands New keepercommander/commands/pam_extended/ package: - PamExtendedCommand group (schedule, rule sub-groups) - PamExtendedScheduleSetCommand: set PAMRotationSchedule.scheduleData via REST - PamExtendedScheduleDeleteCommand: noSchedule=True via REST - PamExtendedScheduleListCommand: read current schedules - PamExtendedRuleAddCommand: ADD PAMElementData via pam/modify - PamExtendedRuleDeleteCommand: DELETE PAMElementData via pam/modify - PamExtendedRuleListCommand: read rules from PAM DAG Co-authored-by: Cursor --- .../commands/pam_extended/__init__.py | 0 .../pam_extended/discovery_rule_commands.py | 185 ++++++++++++++++++ .../commands/pam_extended/group_command.py | 58 ++++++ .../pam_extended/schedule_commands.py | 169 ++++++++++++++++ 4 files changed, 412 insertions(+) create mode 100644 keepercommander/commands/pam_extended/__init__.py create mode 100644 keepercommander/commands/pam_extended/discovery_rule_commands.py create mode 100644 keepercommander/commands/pam_extended/group_command.py create mode 100644 keepercommander/commands/pam_extended/schedule_commands.py diff --git a/keepercommander/commands/pam_extended/__init__.py b/keepercommander/commands/pam_extended/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/keepercommander/commands/pam_extended/discovery_rule_commands.py b/keepercommander/commands/pam_extended/discovery_rule_commands.py new file mode 100644 index 000000000..4bf99c3bc --- /dev/null +++ b/keepercommander/commands/pam_extended/discovery_rule_commands.py @@ -0,0 +1,185 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' ] + pam extended rule add --type --cidr --config-uid + pam extended rule delete --config-uid +""" +from __future__ import annotations + +import argparse +import json +import logging +import os +from typing import TYPE_CHECKING + +from ..base import ArgparseCommand +from ...error import CommandError +from ... import utils + +if TYPE_CHECKING: + from ...params import KeeperParams + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_dag_rules(params: "KeeperParams", config_uid: str) -> list[dict]: + """Return discovery rules from the PAM DAG for a configuration.""" + try: + from ...keeper_dag.types import PamGraphId + from ...keeper_dag.vertex import DAGVertex + except ImportError: + return [] + + config_uid_bytes = utils.base64_url_decode(config_uid) + dag = getattr(params, "pam_dag", None) + if dag is None: + return [] + + config_vertex = dag.get_vertex(config_uid_bytes) + if config_vertex is None: + return [] + + rules_vertex = config_vertex.get_child(PamGraphId.DISCOVERY_RULES) + if rules_vertex is None: + return [] + + rows = [] + for child in rules_vertex.children: + data = child.data + if isinstance(data, (bytes, bytearray)): + try: + data = json.loads(data) + except Exception: + data = {} + rows.append({"uid": child.uid.hex() if isinstance(child.uid, bytes) else child.uid, + **data}) + return rows + + +def _modify_dag_rule(params: "KeeperParams", config_uid: str, + operation: str, rule_data: dict, + element_uid: bytes | None = None) -> None: + """Apply an ADD / UPDATE / DELETE operation on a discovery rule DAG element.""" + from ...proto import pam_pb2 + from ...api import communicate_rest + + op_map = {"ADD": pam_pb2.PAMOperationType.ADD, + "UPDATE": pam_pb2.PAMOperationType.UPDATE, + "DELETE": pam_pb2.PAMOperationType.DELETE} + if operation not in op_map: + raise CommandError(f"Unknown operation: {operation}") + + config_uid_bytes = utils.base64_url_decode(config_uid) + element_uid_bytes = element_uid or os.urandom(16) + + data_op = pam_pb2.PAMDataOperation() + data_op.operationType = op_map[operation] + + element = pam_pb2.PAMElementData() + element.elementUid = element_uid_bytes + element.parentUid = config_uid_bytes + element.data = json.dumps(rule_data).encode() + data_op.element.CopyFrom(element) + + rq = pam_pb2.PAMModifyRequest() + rq.operations.append(data_op) + communicate_rest(params, rq, "pam/modify", rs_type=pam_pb2.PAMModifyResult) + + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +class PamExtendedRuleListCommand(ArgparseCommand): + """``pam extended rule list``.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser(prog="list", description="List PAM discovery rules") + parser.add_argument("--config-uid", dest="config_uid", required=True) + parser.add_argument("--format", dest="fmt", choices=["table", "json"], default="table") + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + rows = _get_dag_rules(params, kwargs["config_uid"]) + if kwargs.get("fmt") == "json": + print(json.dumps(rows, indent=2)) + else: + if not rows: + print("No discovery rules found.") + return + for r in rows: + print(f" {r.get('uid', '?')} name={r.get('name', '?')} " + f"type={r.get('target_type', '?')} cidr={r.get('target_cidr', '?')}") + + +class PamExtendedRuleAddCommand(ArgparseCommand): + """``pam extended rule add``.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser(prog="add", description="Add a PAM discovery rule") + parser.add_argument("name", help="Rule name") + parser.add_argument( + "--type", dest="target_type", + choices=["machine", "user", "database"], default="machine", + ) + parser.add_argument("--cidr", dest="target_cidr", required=True, help="Target CIDR range") + parser.add_argument( + "--protocol", dest="protocol", + choices=["ssh", "rdp", "database"], default="ssh", + ) + parser.add_argument("--config-uid", dest="config_uid", required=True) + parser.add_argument( + "--credential-uid", dest="credential_uid", default=None, + help="Credential record UID", + ) + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + rule_data = { + "name": kwargs["name"], + "target_type": kwargs.get("target_type", "machine"), + "target_cidr": kwargs["target_cidr"], + "protocol": kwargs.get("protocol", "ssh"), + } + if kwargs.get("credential_uid"): + rule_data["credential_uid_ref"] = kwargs["credential_uid"] + + _modify_dag_rule(params, kwargs["config_uid"], "ADD", rule_data) + print(f"Discovery rule '{kwargs['name']}' added to config {kwargs['config_uid']}") + + +class PamExtendedRuleDeleteCommand(ArgparseCommand): + """``pam extended rule delete``.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser(prog="delete", description="Delete a PAM discovery rule") + parser.add_argument("uid", help="Rule element UID (hex)") + parser.add_argument("--config-uid", dest="config_uid", required=True) + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + element_uid = bytes.fromhex(kwargs["uid"]) + _modify_dag_rule( + params, kwargs["config_uid"], "DELETE", {}, + element_uid=element_uid, + ) + print(f"Discovery rule {kwargs['uid']} deleted from config {kwargs['config_uid']}") diff --git a/keepercommander/commands/pam_extended/group_command.py b/keepercommander/commands/pam_extended/group_command.py new file mode 100644 index 000000000..b0f6c07b4 --- /dev/null +++ b/keepercommander/commands/pam_extended/group_command.py @@ -0,0 +1,58 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' None: + super().__init__("Manage PAM rotation schedules") + self.register_command_new(PamExtendedScheduleListCommand(), "list") + self.register_command_new(PamExtendedScheduleSetCommand(), "set") + self.register_command_new(PamExtendedScheduleDeleteCommand(), "delete") + + +class PamExtendedRuleGroup(GroupCommandNew): + """``pam extended rule`` sub-group.""" + + def __init__(self) -> None: + super().__init__("Manage PAM discovery rules") + self.register_command_new(PamExtendedRuleListCommand(), "list") + self.register_command_new(PamExtendedRuleAddCommand(), "add") + self.register_command_new(PamExtendedRuleDeleteCommand(), "delete") + + +class PamExtendedCommand(GroupCommandNew): + """``pam extended`` — advanced PAM schedule and discovery-rule management.""" + + def __init__(self) -> None: + super().__init__("Advanced PAM schedule and discovery-rule management") + self.register_command_new(PamExtendedScheduleGroup(), "schedule") + self.register_command_new(PamExtendedRuleGroup(), "rule") diff --git a/keepercommander/commands/pam_extended/schedule_commands.py b/keepercommander/commands/pam_extended/schedule_commands.py new file mode 100644 index 000000000..701c812ab --- /dev/null +++ b/keepercommander/commands/pam_extended/schedule_commands.py @@ -0,0 +1,169 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' ] + pam extended schedule set --cron [--config-uid ] + pam extended schedule delete [--config-uid ] +""" +from __future__ import annotations + +import argparse +import json +import logging +from typing import TYPE_CHECKING + +from ..base import ArgparseCommand +from ...error import CommandError + +if TYPE_CHECKING: + from ...params import KeeperParams + +logger = logging.getLogger(__name__) + + +def _set_schedule(params: "KeeperParams", record_uid: str, + config_uid: str, cron_expr: str, notify_emails: list[str] | None = None) -> None: + """Write a named rotation schedule via the PAM rotation REST endpoint.""" + from ...proto import pam_pb2 + from ...api import communicate_rest + + rq = pam_pb2.PAMRotationSchedule() + rq.recordUid = bytes.fromhex(record_uid) if len(record_uid) == 32 else record_uid.encode() + if config_uid: + rq.configurationUid = ( + bytes.fromhex(config_uid) if len(config_uid) == 32 else config_uid.encode() + ) + schedule_data: dict = {"type": "cron", "cron": cron_expr} + if notify_emails: + schedule_data["notifyEmails"] = notify_emails + rq.scheduleData = json.dumps(schedule_data) + communicate_rest(params, rq, "pam/set_pam_rotation_schedule") + + +def _delete_schedule(params: "KeeperParams", record_uid: str) -> None: + """Remove a rotation schedule (set noSchedule=True).""" + from ...proto import pam_pb2 + from ...api import communicate_rest + + rq = pam_pb2.PAMRotationSchedule() + rq.recordUid = bytes.fromhex(record_uid) if len(record_uid) == 32 else record_uid.encode() + rq.noSchedule = True + communicate_rest(params, rq, "pam/set_pam_rotation_schedule") + + +def _list_schedules(params: "KeeperParams", config_uid: str | None = None) -> list[dict]: + """Return rotation schedules visible to the authenticated user.""" + from ...proto import pam_pb2 + from ...api import communicate_rest + + rq = pam_pb2.PAMGenericUidsRequest() if config_uid else pam_pb2.PAMGenericUidRequest.__new__( + pam_pb2.PAMGenericUidRequest + ) + rs = communicate_rest( + params, rq, + "pam/get_rotation_schedules", + rs_type=pam_pb2.PAMRotationSchedulesResponse, + ) + rows = [] + for s in rs.schedules: + entry: dict = { + "record_uid": s.recordUid.hex() if isinstance(s.recordUid, bytes) else s.recordUid, + "no_schedule": s.noSchedule, + } + if s.scheduleData: + try: + entry["schedule"] = json.loads(s.scheduleData) + except Exception: + entry["schedule_raw"] = s.scheduleData + rows.append(entry) + return rows + + +class PamExtendedScheduleListCommand(ArgparseCommand): + """``pam extended schedule list`` — list rotation schedules.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser( + prog="list", description="List PAM rotation schedules" + ) + parser.add_argument( + "--config-uid", dest="config_uid", default=None, + help="Filter by PAM configuration UID", + ) + parser.add_argument( + "--format", dest="fmt", choices=["table", "json"], default="table", + ) + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + rows = _list_schedules(params, config_uid=kwargs.get("config_uid")) + if kwargs.get("fmt") == "json": + print(json.dumps(rows, indent=2)) + else: + if not rows: + print("No rotation schedules found.") + return + for r in rows: + sched = r.get("schedule", {}) + cron = sched.get("cron", "(none)") + print(f" {r['record_uid']} cron={cron}") + + +class PamExtendedScheduleSetCommand(ArgparseCommand): + """``pam extended schedule set`` — create or update a named rotation schedule.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser( + prog="set", description="Create or update a PAM rotation schedule" + ) + parser.add_argument("uid_ref", help="PAM record UID") + parser.add_argument("--cron", dest="cron", required=True, help="Cron expression (5-field)") + parser.add_argument( + "--config-uid", dest="config_uid", default=None, + help="PAM configuration UID (optional)", + ) + parser.add_argument( + "--notify", dest="notify", action="append", default=None, + metavar="EMAIL", help="Email(s) to notify on schedule fire", + ) + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + uid_ref: str = kwargs["uid_ref"] + cron: str = kwargs["cron"] + config_uid: str | None = kwargs.get("config_uid") + notify: list[str] | None = kwargs.get("notify") + + _set_schedule(params, uid_ref, config_uid or "", cron, notify_emails=notify) + logger.info("Rotation schedule set: uid=%s cron=%s", uid_ref, cron) + print(f"Rotation schedule set for {uid_ref} (cron: {cron})") + + +class PamExtendedScheduleDeleteCommand(ArgparseCommand): + """``pam extended schedule delete`` — remove a rotation schedule.""" + + def __init__(self) -> None: + parser = argparse.ArgumentParser( + prog="delete", description="Remove a PAM rotation schedule" + ) + parser.add_argument("uid_ref", help="PAM record UID") + super().__init__(parser) + + def execute(self, params: "KeeperParams", **kwargs) -> None: + uid_ref: str = kwargs["uid_ref"] + _delete_schedule(params, uid_ref) + logger.info("Rotation schedule deleted: uid=%s", uid_ref) + print(f"Rotation schedule removed for {uid_ref}")