Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions keepercommander/commands/pam_import/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ Each Machine (pamMachine, pamDatabase, pamDirectory) can specify **Administrativ
> **Note 3:** Post rotation scripts (a.k.a. `scripts`) are executed in following order: `pamUser` scripts after any **successful** rotation for that user, `pamMachine` scripts after any **successful** rotation on the machine and `pamConfiguration` scripts after any rotation using that configuration.
> **Note 4:** When `allow_supply_user` is false and JIT ephemeral is not used, vault may require a launch credential; import can provide it via `launch_credentials` in the resource's `connection` block.

JIT and KeeperAI settings below are shared across all resource types (pamMachine, pamDatabase, pamDirectory) except User and RBI (pamRemoteBrowser) records.
JIT and KeeperAI settings below are shared across all resource types (pamMachine, pamDatabase, pamDirectory) except User and RBI (pamRemoteBrowser) records. **Workflow** (approvals / checkout / temporal restrictions) is supported on all four resource types: pamMachine, pamDatabase, pamDirectory, **and** pamRemoteBrowser.

<details>
<summary>Just-In-Time Access (JIT)</summary>
Expand Down Expand Up @@ -406,6 +406,79 @@ JIT and KeeperAI settings below are shared across all resource types (pamMachine
```
</details>
<details>
<summary>Workflow (Approvals, Checkout, Temporal Access)</summary>

Workflow controls how privileged access to a resource is gated: how many approvals are needed, whether sessions require check-out, MFA, reason/ticket, what time windows access is allowed in, and who can approve (with optional escalation). Workflow is applied via the Keeper Router **after** the resource record and DAG/JIT/AI steps are complete and is not stored on the record itself.

**How to Configure:** Add `pam_settings.options.workflow` to any pamMachine, pamDatabase, pamDirectory, or pamRemoteBrowser. The workflow object maps directly to the Web Vault's "Workflow" tab on a resource record.

```json
{
"pam_settings": {
"options": {
"workflow": {
"approvals_needed": 2,
"checkout_needed": true,
"start_access_on_approval": false,
"require_reason": true,
"require_ticket": false,
"require_mfa": true,
"access_duration": "8h",
"allowed_times": {
"allowed_days": ["mon", "tue", "wed", "thu", "fri"],
"time_ranges": [
{ "start": "09:00", "end": "17:30" }
],
"timezone": "America/New_York"
},
"approvers": [
{
"principal": { "type": "user", "email": "primary.approver@example.com" },
"escalation": false
},
{
"principal": { "type": "user", "email": "second.approver@example.com" },
"escalation": false
},
{
"principal": {
"type": "team",
"team_uid_base64url": "REPLACE_TEAM_UID_BASE64URL"
},
"escalation": true,
"escalation_after": "45m"
}
]
}
}
}
}
```

**Field reference:**
- `approvals_needed` *(int, default `0`)* — number of approvals required to grant access.
- `checkout_needed` *(bool, default `false`)* — require explicit check-out before launching a session.
- `start_access_on_approval` *(bool, default `false`)* — start the access window the moment approval is granted (rather than at session launch).
- `require_reason` / `require_ticket` *(bool, default `false`)* — prompt the user for a reason / ticket reference at request time.
- `require_mfa` *(bool, default `false`)* — require MFA at session launch.
- `access_duration` *(string, default `"1d"`)* — how long approved access remains valid. Accepts `Xm` / `Xh` / `Xd` (e.g. `"30m"`, `"8h"`, `"2d"`); a bare integer is interpreted as minutes. Must be positive.
- `allowed_times.allowed_days` *(list of strings)* — restrict access to these weekdays. Accepts 3-letter (`mon`..`sun`) or full names (`monday`..`sunday`), case-insensitive.
- `allowed_times.time_ranges` *(list of `{start, end}` objects)* — one or more allowed daily time windows in `HH:MM` (24-hour) format. **Multiple ranges per day are supported.** A single range whose `end` is earlier than its `start` (e.g. an overnight `22:00–06:00`) **should be split into two ranges** that both fall inside one day (e.g. `22:00–23:59` and `00:00–06:00`)
- `allowed_times.timezone` *(string)* — IANA timezone name (e.g. `"UTC"`, `"America/New_York"`). **Required when `time_ranges` is non-empty.**
- `approvers[]` — list of approver entries.
- `principal.type` — `"user"` or `"team"`.
- For users: `principal.email` (must exist in the enterprise).
- For teams: `principal.team_uid_base64url` (the team's vault UID, base64url-encoded; validated against the local team cache during import — unknown UIDs fail in dry-run).
- `escalation` *(bool)* — whether this approver is in the escalation chain.
- `escalation_after` *(duration string, optional)* — wait this long before escalating to this approver. **Requires `escalation: true`.**

**Behavior notes:**
- **Trivial workflow is a no-op.** If none of `approvals_needed > 0`, `checkout_needed`, `require_mfa`, `start_access_on_approval`, `allowed_times.allowed_days`, or `allowed_times.time_ranges` is set, the workflow block is treated as absent and no Router call is made.
- **Pre-flight validation runs in `--dry-run`.** Bad durations, malformed `HH:MM`, missing timezone, escalation rule violations, and unknown team UIDs are reported during dry-run before any vault writes.
- **Dry-run skips the Router calls.** Workflow is applied (Router create/update + approver reconcile) only on a real run.
- **`extend` only applies workflow to newly created resources** (existing resources are not touched).
</details>
<details>
<summary>pam_data.resources.pamMachine (RDP)</summary>

```json
Expand Down Expand Up @@ -435,7 +508,8 @@ JIT and KeeperAI settings below are shared across all resource types (pamMachine
"ai_threat_detection": "off",
"ai_terminate_session_on_detection": "off",
"jit_settings": {},
"ai_settings": {}
"ai_settings": {},
"workflow": {}
},
"allow_supply_host": false,
"port_forward": {
Expand Down
163 changes: 158 additions & 5 deletions keepercommander/commands/pam_import/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from typing import Any, Dict, Optional, List, Union

from ..record_edit import RecordAddCommand as RecordEditAddCommand
from ..workflow.helpers import RecordResolver, WorkflowFormatter
from ... import api, attachment, utils, vault, vault_extensions, \
record_facades, record_management
from ...display import bcolors
from ...error import CommandError
from ...recordv3 import RecordV3


Expand Down Expand Up @@ -69,7 +71,8 @@
"pam_settings": {
"options" : {
"jit_settings": {},
"ai_settings": {}
"ai_settings": {},
"workflow": {}
},
"connection" : {}
},
Expand Down Expand Up @@ -611,6 +614,144 @@ def load(cls, data: Union[str, dict]):
return obj


class PamWorkflowOptions:
"""Parsed workflow settings from pam_settings.options.workflow.
Not stored on record fields nor in DAG; applied via Krouter after record/DAG creation.
"""

_DEFAULT_DURATION_MS = 86_400_000 # "1d"

def __init__(self):
self.approvals_needed: int = 0
self.checkout_needed: bool = False
self.start_access_on_approval: bool = False
self.require_reason: bool = False
self.require_ticket: bool = False
self.require_mfa: bool = False
self.access_duration_ms: int = self._DEFAULT_DURATION_MS
self.allowed_days: List[str] = [] # canonical 3-letter tokens: "mon".."sun"
self.time_ranges: List[dict] = [] # each: {"start": "HH:MM", "end": "HH:MM"}
self.timezone: str = ""
self.approvers: List[dict] = [] # each: {principal_type, email, team_uid_b64, escalation, escalation_after_ms}

@staticmethod
def _parse_duration(value) -> int:
"""Return milliseconds. Raises CommandError on invalid/non-positive value.
Delegates to WorkflowFormatter.parse_duration; adds a None -> default-1d shim
(the CLI command always supplies a string, but the JSON import may omit the key).
"""
if value is None:
return PamWorkflowOptions._DEFAULT_DURATION_MS
return WorkflowFormatter.parse_duration(str(value))

@classmethod
def load(cls, data) -> Optional['PamWorkflowOptions']:
"""Parse workflow JSON dict. Returns None when absent / null / trivial (V2 guard)."""
if not data or not isinstance(data, dict):
return None

obj = cls()
obj.approvals_needed = max(0, int(data.get('approvals_needed', 0) or 0))
obj.checkout_needed = bool(data.get('checkout_needed', False))
obj.start_access_on_approval = bool(data.get('start_access_on_approval', False))
obj.require_reason = bool(data.get('require_reason', False))
obj.require_ticket = bool(data.get('require_ticket', False))
obj.require_mfa = bool(data.get('require_mfa', False))

# V9: access_duration — default "1d"
obj.access_duration_ms = cls._parse_duration(data.get('access_duration'))

# allowed_times
at = data.get('allowed_times') or {}
if isinstance(at, dict):
days_raw = at.get('allowed_days') or []
if isinstance(days_raw, list):
for day in days_raw:
d = str(day).lower().strip()
if d not in WorkflowFormatter.DAY_PARSE_MAP:
raise CommandError('', f'workflow: invalid allowed_times.allowed_days token "{day}"')
obj.allowed_days.append(d[:3]) # store as "mon".."sun"

ranges_raw = at.get('time_ranges') or []
if isinstance(ranges_raw, list):
for r in ranges_raw:
if isinstance(r, dict):
start = str(r.get('start', '') or '').strip()
end = str(r.get('end', '') or '').strip()
if start and end:
obj.time_ranges.append({'start': start, 'end': end})

obj.timezone = str(at.get('timezone', '') or '').strip()

# V8: time_ranges non-empty => timezone required
if obj.time_ranges and not obj.timezone:
raise CommandError('', 'workflow: allowed_times.time_ranges requires timezone')

# approvers
for idx, a in enumerate(data.get('approvers') or []):
if not isinstance(a, dict):
continue
principal = a.get('principal') or {}
if not isinstance(principal, dict):
continue
ptype = str(principal.get('type', '') or '').lower()
escalation = bool(a.get('escalation', False))
esc_after_raw = a.get('escalation_after')
esc_after_ms = cls._parse_duration(esc_after_raw) if esc_after_raw else 0
# V7: escalation_after requires escalation: true
if esc_after_ms and not escalation:
raise CommandError('', f'workflow: approvers[{idx}] escalation_after requires escalation: true')
if ptype == 'user':
email = str(principal.get('email', '') or '').strip()
if not email:
raise CommandError('', f'workflow: approvers[{idx}] user principal requires non-empty email')
obj.approvers.append({
'principal_type': 'user', 'email': email, 'team_uid_b64': None,
'escalation': escalation, 'escalation_after_ms': esc_after_ms,
})
elif ptype == 'team':
uid_b64 = str(principal.get('team_uid_base64url', '') or '').strip()
if not uid_b64:
raise CommandError('', f'workflow: approvers[{idx}] team principal requires non-empty team_uid_base64url')
obj.approvers.append({
'principal_type': 'team', 'email': None, 'team_uid_b64': uid_b64,
'escalation': escalation, 'escalation_after_ms': esc_after_ms,
})
else:
raise CommandError('', f'workflow: approvers[{idx}] principal.type must be "user" or "team", got "{ptype}"')

# V2: non-trivial guard — at least one meaningful flag must be set
is_trivial = (
obj.approvals_needed == 0
and not obj.start_access_on_approval
and not obj.checkout_needed
and not obj.require_mfa
and not obj.allowed_days
and not obj.time_ranges
)
if is_trivial:
return None # nothing to persist; caller treats as delete/no-op

# V4 warning: approvals_needed > 0 with no approvers
if obj.approvals_needed > 0 and not obj.approvers:
logging.warning('workflow: approvals_needed > 0 but no approvers specified')

return obj

def validate_principals(self, params, resource_title: str = '') -> None:
"""Validate team UIDs via RecordResolver.validate_team (which checks both
team_cache and enterprise.teams). Raises CommandError on first unknown UID.
"""
for idx, a in enumerate(self.approvers):
if a['principal_type'] != 'team':
continue
try:
RecordResolver.validate_team(params, a['team_uid_b64'])
except CommandError as e:
prefix = f'Resource "{resource_title}": ' if resource_title else ''
raise CommandError('', f'{prefix}workflow approvers[{idx}]: {e.message or str(e)}')


class DagJitSettingsObject():
def __init__(self):
self.create_ephemeral: bool = False
Expand Down Expand Up @@ -2900,10 +3041,12 @@ class PamRemoteBrowserSettings:
def __init__(
self,
options: Optional[DagSettingsObject] = None,
connection: Optional[ConnectionSettingsHTTP] = None
connection: Optional[ConnectionSettingsHTTP] = None,
workflow: Optional[PamWorkflowOptions] = None,
):
self.options = options
self.connection = connection
self.workflow = workflow # not on record nor in DAG; applied via Krouter

@classmethod
def load(cls, data: Optional[Union[str, dict]]):
Expand All @@ -2912,9 +3055,14 @@ def load(cls, data: Optional[Union[str, dict]]):
except: logging.error(f"PAM RBI Settings field failed to load from: {str(data)[:80]}...")
if not isinstance(data, dict): return obj

options = DagSettingsObject.load(data.get("options", {}))
options_dict = data.get("options", {}) or {}
options = DagSettingsObject.load(options_dict)
if not is_empty_instance(options):
obj.options = options
if isinstance(options_dict, dict):
workflow_value = options_dict.get("workflow")
if workflow_value is not None:
obj.workflow = PamWorkflowOptions.load(workflow_value)

cdata = data.get("connection", {})
# TO DO: if isinstance(cdata, str): lookup_by_name(pam_data.connections)
Expand Down Expand Up @@ -2944,13 +3092,15 @@ def __init__(
options: Optional[DagSettingsObject] = None,
jit_settings: Optional[DagJitSettingsObject] = None,
ai_settings: Optional[DagAiSettingsObject] = None,
workflow: Optional[PamWorkflowOptions] = None,
):
self.allowSupplyHost = allowSupplyHost
self.connection = connection
self.portForward = portForward
self.options = options
self.jit_settings = jit_settings
self.ai_settings = ai_settings
self.workflow = workflow # not on record nor in DAG; applied via Krouter

# PamConnectionSettings excludes ConnectionSettingsHTTP
pam_connection_classes = [
Expand Down Expand Up @@ -2981,8 +3131,8 @@ def is_empty(self):
empty = is_empty_instance(self.options)
empty = empty and is_empty_instance(self.portForward)
empty = empty and is_empty_instance(self.connection, ["protocol"])
# NB! JIT and AI settings are in import json but not in record json (just DAG json)
empty = empty and self.jit_settings is None and self.ai_settings is None
# NB! JIT, AI, workflow are in import json but not in record json (not DAG either for workflow)
empty = empty and self.jit_settings is None and self.ai_settings is None and self.workflow is None
return empty

@classmethod
Expand All @@ -3008,6 +3158,9 @@ def load(cls, data: Union[str, dict]):
ai_settings = DagAiSettingsObject.load(ai_value)
if ai_settings:
obj.ai_settings = ai_settings
workflow_value = options_dict.get("workflow")
if workflow_value is not None:
obj.workflow = PamWorkflowOptions.load(workflow_value)

portForward = PamPortForwardSettings.load(data.get("port_forward", {}))
if not is_empty_instance(portForward):
Expand Down
11 changes: 11 additions & 0 deletions keepercommander/commands/pam_import/edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Any, Dict, Optional, List, Union

from .keeper_ai_settings import set_resource_jit_settings, set_resource_keeper_ai_settings, refresh_meta_to_latest, refresh_link_to_config_to_latest
from .workflow_apply import apply_workflow, validate_workflow_principals
from .base import (
PAM_RESOURCES_RECORD_TYPES,
PROJECT_IMPORT_JSON_TEMPLATE,
Expand Down Expand Up @@ -1642,6 +1643,9 @@ def process_data(self, params, project):
resolve_domain_admin(pce, users)
# only resolve here - create after machine and user creation

# pre-flight: validate workflow team UIDs before any vault writes (runs in dry-run too)
validate_workflow_principals(params, resources)

# dry run
if project["options"].get("dry_run", False) is True:
print("Will import file data here...")
Expand Down Expand Up @@ -1696,6 +1700,9 @@ def process_data(self, params, project):
args["connections"] = True
args["v_type"] = RefType.PAM_BROWSER
tdag.set_resource_allowed(**args)
rbi_wf = getattr(getattr(mach, 'rbi_settings', None), 'workflow', None)
if rbi_wf:
apply_workflow(params, mach.uid, mach.title or '', rbi_wf)
else: # machine/db/directory
args = parse_command_options(mach, True)
if admin_uid: args["admin"] = admin_uid
Expand Down Expand Up @@ -1739,6 +1746,10 @@ def process_data(self, params, project):
if ai:
refresh_link_to_config_to_latest(params, mach.uid, pam_cfg_uid)

ps_wf = getattr(getattr(mach, 'pam_settings', None), 'workflow', None)
if ps_wf:
apply_workflow(params, mach.uid, mach.title or '', ps_wf)

# Machine - create its users (if any)
users = getattr(mach, "users", [])
users = users if isinstance(users, list) else []
Expand Down
Loading
Loading