diff --git a/keepercommander/commands/base.py b/keepercommander/commands/base.py index d075197a6..44ba651c4 100644 --- a/keepercommander/commands/base.py +++ b/keepercommander/commands/base.py @@ -212,6 +212,10 @@ def register_enterprise_commands(commands, aliases, command_info): device_management.register_enterprise_commands(commands) device_management.register_enterprise_command_info(aliases, command_info) + from . import sso_cloud + sso_cloud.register_commands(commands) + sso_cloud.register_command_info(aliases, command_info) + if sys.version_info.major > 3 or (sys.version_info.major == 3 and sys.version_info.minor >= 9): from.pedm import pedm_admin pedm_command = pedm_admin.PedmCommand() diff --git a/keepercommander/commands/sso_cloud/__init__.py b/keepercommander/commands/sso_cloud/__init__.py new file mode 100644 index 000000000..6f60037a8 --- /dev/null +++ b/keepercommander/commands/sso_cloud/__init__.py @@ -0,0 +1,74 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + self.ensure_cloud_sso(svc, target) + + settings_to_set = kwargs.get('setting') or [] + settings_to_reset = kwargs.get('reset') or [] + + if not settings_to_set and not settings_to_reset: + raise CommandError('sso-cloud', 'Provide at least one --set KEY=VALUE or --reset KEY argument.') + + config_rs = self.get_selected_configuration(params, sp_id, config_target=kwargs.get('config')) + config_id = config_rs.ssoSpConfigurationId + + available_settings = {} + for sv in config_rs.ssoCloudSettingValue: + available_settings[sv.settingName.lower()] = sv + + rq = ssocloud.SsoCloudConfigurationRequest() + rq.ssoServiceProviderId = sp_id + rq.ssoSpConfigurationId = config_id + + for setting_str in settings_to_set: + pos = setting_str.find('=') + if pos < 1: + raise CommandError('sso-cloud', f'Invalid setting format "{setting_str}". Expected KEY=VALUE.') + + key = setting_str[:pos].strip() + value = setting_str[pos + 1:].strip() + + existing = available_settings.get(key.lower()) + if not existing: + raise CommandError('sso-cloud', f'Unknown setting: "{key}". ' + f'Use "sso-cloud get" to see available settings.') + if not existing.isEditable: + raise CommandError('sso-cloud', f'Setting "{key}" is read-only.') + + action = ssocloud.SsoCloudSettingAction() + action.settingName = existing.settingName + action.operation = ssocloud.SET + action.value = value + rq.ssoCloudSettingAction.append(action) + + for key in settings_to_reset: + existing = available_settings.get(key.strip().lower()) + if not existing: + raise CommandError('sso-cloud', f'Unknown setting: "{key}".') + if not existing.isEditable: + raise CommandError('sso-cloud', f'Setting "{key}" is read-only.') + + action = ssocloud.SsoCloudSettingAction() + action.settingName = existing.settingName + action.operation = ssocloud.RESET_TO_DEFAULT + rq.ssoCloudSettingAction.append(action) + + updated_rs = api.communicate_rest( + params, rq, 'sso/config/sso_cloud_configuration_setting_set', + rs_type=ssocloud.SsoCloudConfigurationResponse) + + logging.info('Configuration updated successfully.') + self.dump_configuration(updated_rs) + + +class SsoCloudValidateCommand(EnterpriseCommand, SsoCloudMixin): + def get_parser(self): + return sso_cloud_validate_parser + + def execute(self, params, **kwargs): + # type: (Any, **Any) -> Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + self.ensure_cloud_sso(svc, target) + + config_rs = self.get_selected_configuration(params, sp_id, config_target=kwargs.get('config')) + config_id = config_rs.ssoSpConfigurationId + + rq = ssocloud.SsoCloudConfigurationValidationRequest() + rq.ssoSpConfigurationId.append(config_id) + + rs = api.communicate_rest( + params, rq, 'sso/config/sso_cloud_configuration_validate', + rs_type=ssocloud.SsoCloudConfigurationValidationResponse) + + all_valid = True + for vc in rs.validationContent: + if vc.isSuccessful: + logging.info('Configuration "%s" (ID: %s) is valid.', + config_rs.name, vc.ssoSpConfigurationId) + else: + all_valid = False + logging.warning('Configuration "%s" (ID: %s) has validation errors:', + config_rs.name, vc.ssoSpConfigurationId) + for msg in vc.errorMessage: + logging.warning(' - %s', msg) + + if all_valid: + logging.info('SSO Cloud configuration is ready for use.') diff --git a/keepercommander/commands/sso_cloud/constants.py b/keepercommander/commands/sso_cloud/constants.py new file mode 100644 index 000000000..c863e80c6 --- /dev/null +++ b/keepercommander/commands/sso_cloud/constants.py @@ -0,0 +1,228 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' Create Application > Regular Web App'), + ('idp', 'Enable Addons > SAML2 WEB APP'), + ('idp', 'In the Usage tab > Download IdP Metadata XML'), + ('idp', 'In the Settings tab > paste the ACS Endpoint into "Application Callback URL":'), + ('value', '{acs_endpoint}'), + ('idp', 'Replace the Settings editor JSON with the below (Entity ID pre-filled in audience):'), + ('json', '{auth0_json}'), + ('idp', 'Click "Debug" to verify, then Save'), + ('cmd', 'sso-cloud upload "{name}" --file '), + ], + }, + 'azure': { + 'portal_name': 'Azure Entra ID', + 'portal_url': 'https://portal.azure.com', + 'steps': [ + ('cmd', 'sso-cloud download "{name}" --output sp-metadata.xml'), + ('idp', 'In Azure portal, navigate to Microsoft Entra ID'), + ('idp', 'Go to Enterprise Applications > New Application'), + ('idp', 'Search "Keeper Password Manager" > Create'), + ('idp', 'Go to Set up Single sign-on > SAML'), + ('idp', 'Click "Upload metadata file" and upload sp-metadata.xml'), + ('note', 'Azure auto-fills Entity ID and Reply URL from the metadata'), + ('idp', 'Paste the IdP Initiated Login Endpoint into "Sign on URL":'), + ('value', '{idp_login_endpoint}'), + ('idp', 'Save the Basic SAML Configuration'), + ('idp', 'Click on "No, I\'ll test later" when asked for the test SSO login'), + ('idp', 'In Attributes and Claims card > Edit: delete the 4 extra Additional Claims'), + ('note', 'Verify: NameID/Email = user.userprincipalname (or user.mail)'), + ('idp', 'Reload page, under SAML Signing Certificate > Download "Federation Metadata XML"'), + ('cmd', 'sso-cloud upload "{name}" --file --force-authn'), + ], + }, + 'okta': { + 'portal_name': 'Okta', + 'portal_url': 'https://login.okta.com', + 'steps': [ + ('idp', 'Go to Applications > Create App Integration > SAML 2.0'), + ('idp', 'Paste the ACS Endpoint into "Single sign-on URL":'), + ('value', '{acs_endpoint}'), + ('idp', 'Paste the Entity ID into "Audience URI (SP Entity ID)":'), + ('value', '{entity_id}'), + ('idp', 'Set Name ID format to EmailAddress'), + ('idp', 'Add attribute statements: Email, First, Last'), + ('idp', 'Finish, then go to Sign On tab > Download IdP Metadata'), + ('cmd', 'sso-cloud upload "{name}" --file '), + ], + }, + 'google': { + 'portal_name': 'Google Workspace', + 'portal_url': 'https://admin.google.com', + 'steps': [ + ('idp', 'Go to Apps > Web and mobile apps > Add App > Add custom SAML app'), + ('idp', 'Download IdP Metadata from the Google IdP Information step'), + ('idp', 'Paste the ACS Endpoint into "ACS URL":'), + ('value', '{acs_endpoint}'), + ('idp', 'Paste the Entity ID into "Entity ID":'), + ('value', '{entity_id}'), + ('idp', 'Set Name ID format to EMAIL'), + ('idp', 'Add attribute mappings for email, first name, last name'), + ('cmd', 'sso-cloud upload "{name}" --file '), + ], + }, + 'jumpcloud': { + 'portal_name': 'JumpCloud', + 'portal_url': 'https://console.jumpcloud.com', + 'steps': [ + ('idp', 'Go to SSO Applications > Add New Application > Custom SAML App'), + ('idp', 'Paste the ACS Endpoint into "ACS URL":'), + ('value', '{acs_endpoint}'), + ('idp', 'Paste the Entity ID into "SP Entity ID":'), + ('value', '{entity_id}'), + ('idp', 'Set SAMLSubject NameID to email'), + ('idp', 'Add attribute mappings for email, first name, last name'), + ('idp', 'Activate the application, then download IdP Metadata'), + ('cmd', 'sso-cloud upload "{name}" --file '), + ], + }, +} diff --git a/keepercommander/commands/sso_cloud/log_commands.py b/keepercommander/commands/sso_cloud/log_commands.py new file mode 100644 index 000000000..5c4dfa53e --- /dev/null +++ b/keepercommander/commands/sso_cloud/log_commands.py @@ -0,0 +1,131 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + + rq = ssocloud.SsoCloudSAMLLogRequest() + rq.ssoServiceProviderId = sp_id + + rs = api.communicate_rest( + params, rq, 'sso/config/sso_cloud_log_saml_get', + rs_type=ssocloud.SsoCloudSAMLLogResponse) + + if not rs.entry: + logging.info('No SAML log entries found for SP "%s".', svc.get('name', target)) + return + + fmt = kwargs.get('format') + verbose = kwargs.get('verbose', False) + + if fmt == 'json': + entries = [] + for entry in rs.entry: + e = { + 'server_time': entry.serverTime, + 'direction': entry.direction, + 'message_type': entry.messageType, + 'message_issued': entry.messageIssued, + 'from_entity_id': entry.fromEntityId, + 'saml_status': entry.samlStatus, + 'is_signed': entry.isSigned, + 'is_ok': entry.isOK, + } + if verbose: + e['relay_state'] = entry.relayState + e['saml_content'] = entry.samlContent + entries.append(e) + output = json.dumps(entries, indent=2) + output_path = kwargs.get('output') + if output_path: + try: + with open(os.path.expanduser(output_path), 'w') as f: + f.write(output) + logging.info('Log output written to %s', output_path) + except IOError as e: + raise CommandError('sso-cloud', f'Failed to write log output file "{output_path}": {e}') + else: + print(output) + return + + table = [] + headers = ['time', 'direction', 'type', 'status', 'signed', 'ok'] + if verbose: + headers.append('from_entity') + for entry in rs.entry: + row = [ + entry.serverTime, + entry.direction, + entry.messageType, + entry.samlStatus, + 'Yes' if entry.isSigned else 'No', + 'Yes' if entry.isOK else 'No', + ] + if verbose: + row.append(entry.fromEntityId) + table.append(row) + + dump_report_data(table, headers=headers, fmt=fmt, filename=kwargs.get('output')) + + if verbose: + logging.info('') + for i, entry in enumerate(rs.entry): + logging.info('--- Entry %d: %s %s ---', i + 1, entry.direction, entry.messageType) + if entry.relayState: + logging.info('Relay State: %s', entry.relayState) + if entry.samlContent: + logging.info('SAML Content:\n%s', entry.samlContent) + logging.info('') + + +class SsoCloudLogClearCommand(EnterpriseCommand, SsoCloudMixin): + def get_parser(self): + return sso_cloud_log_clear_parser + + def execute(self, params, **kwargs): + # type: (Any, **Any) -> Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + + rq = ssocloud.SsoCloudSAMLLogRequest() + rq.ssoServiceProviderId = sp_id + + api.communicate_rest( + params, rq, 'sso/config/sso_cloud_log_saml_clear', + rs_type=ssocloud.SsoCloudSAMLLogResponse) + + logging.info('SAML log entries cleared for SP "%s".', svc.get('name', target)) diff --git a/keepercommander/commands/sso_cloud/metadata_commands.py b/keepercommander/commands/sso_cloud/metadata_commands.py new file mode 100644 index 000000000..33f42d9a9 --- /dev/null +++ b/keepercommander/commands/sso_cloud/metadata_commands.py @@ -0,0 +1,126 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + self.ensure_cloud_sso(svc, target) + + filepath = kwargs.get('file', '') + filepath = os.path.expanduser(filepath) + if not os.path.isfile(filepath): + raise CommandError('sso-cloud', f'File not found: "{filepath}"') + + with open(filepath, 'rb') as f: + file_content = f.read() + + filename = os.path.basename(filepath) + + config_rs = self.get_selected_configuration(params, sp_id, config_target=kwargs.get('config')) + config_id = config_rs.ssoSpConfigurationId + + rq = ssocloud.SsoCloudIdpMetadataRequest() + rq.ssoSpConfigurationId = config_id + rq.filename = filename + rq.content = file_content + + rs = api.communicate_rest( + params, rq, 'sso/config/sso_cloud_upload_idp_metadata', + rs_type=ssocloud.SsoCloudConfigurationValidationResponse) + + has_errors = False + for vc in rs.validationContent: + if vc.isSuccessful: + logging.info('IdP metadata uploaded and validated successfully for configuration %s.', + vc.ssoSpConfigurationId) + else: + has_errors = True + logging.warning('Validation errors for configuration %s:', vc.ssoSpConfigurationId) + for msg in vc.errorMessage: + logging.warning(' - %s', msg) + + if not has_errors: + logging.info('File "%s" uploaded to configuration "%s" (ID: %s).', + filename, config_rs.name, config_id) + + if kwargs.get('force_authn'): + setting_rq = ssocloud.SsoCloudConfigurationRequest() + setting_rq.ssoServiceProviderId = sp_id + setting_rq.ssoSpConfigurationId = config_id + action = ssocloud.SsoCloudSettingAction() + action.settingName = 'sso_idp_force_login_mode' + action.operation = ssocloud.SET + action.value = 'true' + setting_rq.ssoCloudSettingAction.append(action) + api.communicate_rest( + params, setting_rq, 'sso/config/sso_cloud_configuration_setting_set', + rs_type=ssocloud.SsoCloudConfigurationResponse) + logging.info('ForceAuthn enabled.') + + +class SsoCloudDownloadMetadataCommand(EnterpriseCommand, SsoCloudMixin): + def get_parser(self): + return sso_cloud_download_parser + + def execute(self, params, **kwargs): + # type: (KeeperParams, **Any) -> Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + self.ensure_cloud_sso(svc, target) + + server_base = params.rest_context.server_base + if not server_base.endswith('/'): + server_base += '/' + metadata_url = f'{server_base}sso/saml/metadata/{sp_id}' + + rs = http_requests.get(metadata_url, timeout=30) + if rs.status_code != 200: + raise CommandError('sso-cloud', + f'Failed to download SP metadata (HTTP {rs.status_code}): {rs.text[:200]}') + + xml_content = rs.text + output_path = kwargs.get('output') + if output_path: + output_path = os.path.expanduser(output_path) + try: + with open(output_path, 'w', encoding='utf-8') as f: + f.write(xml_content) + logging.info('SP metadata saved to: %s', output_path) + except IOError as e: + raise CommandError('sso-cloud', f'Failed to write metadata file "{output_path}": {e}') + else: + print(xml_content) diff --git a/keepercommander/commands/sso_cloud/mixin.py b/keepercommander/commands/sso_cloud/mixin.py new file mode 100644 index 000000000..a5d37d09f --- /dev/null +++ b/keepercommander/commands/sso_cloud/mixin.py @@ -0,0 +1,323 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' dict + """Resolve an SSO service provider by ID or name from enterprise data.""" + if not target: + raise CommandError('sso-cloud', 'SSO Service Provider name or ID is required.') + + sso_services = params.enterprise.get('sso_services', []) + if not sso_services: + raise CommandError('sso-cloud', 'No SSO Cloud service providers found in this enterprise.') + + try: + target_id = int(target) + for svc in sso_services: + if svc.get('sso_service_provider_id') == target_id: + return svc + except (ValueError, TypeError): + logging.debug('Target "%s" is not numeric, searching by name.', target) + + target_lower = target.lower() + matches = [s for s in sso_services if s.get('name', '').lower() == target_lower] + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + raise CommandError('sso-cloud', + f'Multiple SSO service providers match "{target}". Use the SP ID instead.') + + raise CommandError('sso-cloud', + f'SSO Service Provider "{target}" not found. ' + f'Run "ed -f" to refresh enterprise data, then "sso list" to verify.') + + @staticmethod + def ensure_cloud_sso(svc, target=''): + # type: (dict, str) -> None + """Warn if the SP doesn't appear as Cloud SSO in cached enterprise data.""" + if not svc.get('is_cloud'): + logging.debug('SSO Service Provider "%s" is_cloud flag is not set in enterprise cache. ' + 'Proceeding anyway — the server will enforce if invalid.', + svc.get('name', target)) + + @staticmethod + def get_node_name(params, node_id): + # type: (KeeperParams, int) -> str + """Resolve a node ID to its display name.""" + for node in params.enterprise.get('nodes', []): + if node['node_id'] == node_id: + if node.get('parent_id', 0) > 0: + return node['data'].get('displayname') or str(node_id) + else: + return params.enterprise.get('enterprise_name', str(node_id)) + return str(node_id) + + @staticmethod + def get_selected_configuration(params, sp_id, config_target=None): + # type: (KeeperParams, int, Optional[str]) -> ssocloud.SsoCloudConfigurationResponse + """Fetch the active or specified configuration for a service provider.""" + list_rq = ssocloud.SsoCloudServiceProviderConfigurationListRequest() + list_rq.ssoServiceProviderId = sp_id + list_rs = api.communicate_rest( + params, list_rq, 'sso/config/sso_cloud_sp_configuration_get', + rs_type=ssocloud.SsoCloudServiceProviderConfigurationListResponse) + + owned = [c for c in list_rs.configurationItem + if not c.ssoServiceProviderId or sp_id in c.ssoServiceProviderId] + + if not owned: + raise CommandError('sso-cloud', f'No configurations found for SP ID {sp_id}.') + + config_item = None + if config_target: + try: + config_id = int(config_target) + config_item = next( + (c for c in owned if c.ssoSpConfigurationId == config_id), None) + except ValueError: + pass + + if not config_item: + config_lower = config_target.lower() + matches = [c for c in owned if c.name.lower() == config_lower] + if len(matches) == 1: + config_item = matches[0] + elif len(matches) > 1: + raise CommandError('sso-cloud', + f'Multiple configurations match "{config_target}". Use Configuration ID.') + + if not config_item: + raise CommandError('sso-cloud', f'Configuration "{config_target}" not found.') + else: + config_item = next((c for c in owned if c.isSelected), None) + if not config_item: + config_item = owned[0] + + get_rq = ssocloud.SsoCloudConfigurationRequest() + get_rq.ssoServiceProviderId = sp_id + get_rq.ssoSpConfigurationId = config_item.ssoSpConfigurationId + return api.communicate_rest( + params, get_rq, 'sso/config/sso_cloud_configuration_get', + rs_type=ssocloud.SsoCloudConfigurationResponse) + + @staticmethod + def format_setting_value(setting): + # type: (ssocloud.SsoCloudSettingValue) -> str + """Format a setting value for display, handling special cases.""" + value = setting.value or '' + if setting.isFromFile: + if value and len(value) > 80: + return f'[{len(value)} bytes]' + if setting.settingName == 'sso_idp_type_id': + try: + idp_type = int(value) + return IDP_TYPE_NAMES.get(idp_type, f'Unknown ({value})') + except (ValueError, TypeError): + pass + return value + + @staticmethod + def _extract_sp_values(config_rs): + # type: (ssocloud.SsoCloudConfigurationResponse) -> dict + keys = ('sso_sp_entity_id', 'sso_sp_acs_endpoint', 'sso_sp_login_endpoint', + 'sso_sp_logout_endpoint', 'sso_sp_slo_endpoint', + 'sso_idp_initiated_login_endpoint', 'sso_sp_domain') + result = {} + for sv in config_rs.ssoCloudSettingValue: + if sv.settingName in keys: + result[sv.settingName] = sv.value or '' + return result + + @staticmethod + def _get_idp_type_name(config_rs): + # type: (ssocloud.SsoCloudConfigurationResponse) -> Optional[str] + for sv in config_rs.ssoCloudSettingValue: + if sv.settingName == 'sso_idp_type_id' and sv.value: + try: + return IDP_ENUM_TO_KEY.get(int(sv.value)) + except (ValueError, TypeError): + pass + return None + + @staticmethod + def show_idp_guidance(config_rs, sp_name=''): + # type: (ssocloud.SsoCloudConfigurationResponse, str) -> None + """Show IdP-specific setup guidance with formatted output.""" + idp_type_name = SsoCloudMixin._get_idp_type_name(config_rs) + if not idp_type_name: + return + guidance = IDP_SETUP_GUIDANCE.get(idp_type_name) + if not guidance: + return + + sp = SsoCloudMixin._extract_sp_values(config_rs) + portal = guidance['portal_name'] + display_name = sp_name or str(config_rs.ssoServiceProviderId) + + vals = { + 'name': display_name, + 'entity_id': sp.get('sso_sp_entity_id', ''), + 'acs_endpoint': sp.get('sso_sp_acs_endpoint', ''), + 'login_endpoint': sp.get('sso_sp_login_endpoint', ''), + 'idp_login_endpoint': sp.get('sso_idp_initiated_login_endpoint', ''), + 'slo_endpoint': sp.get('sso_sp_slo_endpoint', ''), + 'auth0_json': AUTH0_SAML_JSON_TEMPLATE.format( + entity_id=sp.get('sso_sp_entity_id', '')), + } + + BAR = '\u2500' * 60 + CMD_TAG = '[Commander]' + IDP_TAG = f'[{portal}]' + + print('') + print(f'{portal} SSO Setup Guide') + print(BAR) + print(guidance.get('portal_url', '')) + print('') + + step_num = 0 + for kind, text in guidance['steps']: + filled = text.format(**vals) + + if kind == 'value': + print(f' {filled}') + print('') + elif kind == 'json': + for json_line in filled.splitlines(): + print(f' {json_line}') + print('') + elif kind == 'note': + print(f' * {filled}') + elif kind == 'cmd': + step_num += 1 + print(f'{step_num:>2}. {CMD_TAG} My Vault> {filled}') + else: + step_num += 1 + print(f'{step_num:>2}. {IDP_TAG} {filled}') + + print('') + + @staticmethod + def dump_configuration(config_rs, fmt=None, filename=None): + # type: (ssocloud.SsoCloudConfigurationResponse, Optional[str], Optional[str]) -> None + """Display configuration details.""" + logging.info('') + logging.info('{0:>40s}: {1}'.format('Service Provider ID', config_rs.ssoServiceProviderId)) + logging.info('{0:>40s}: {1}'.format('Configuration ID', config_rs.ssoSpConfigurationId)) + logging.info('{0:>40s}: {1}'.format('Configuration Name', config_rs.name)) + logging.info('{0:>40s}: {1}'.format('Protocol', config_rs.protocol)) + logging.info('{0:>40s}: {1}'.format('Last Modified', config_rs.lastModified)) + + if fmt == 'json': + settings_list = [] + for sv in config_rs.ssoCloudSettingValue: + settings_list.append({ + 'setting_id': sv.settingId, + 'setting_name': sv.settingName, + 'label': sv.label, + 'value': sv.value, + 'editable': sv.isEditable, + 'required': sv.isRequired, + 'from_file': sv.isFromFile, + 'last_modified': sv.lastModified, + }) + output = json.dumps({ + 'sso_service_provider_id': config_rs.ssoServiceProviderId, + 'sso_sp_configuration_id': config_rs.ssoSpConfigurationId, + 'name': config_rs.name, + 'protocol': config_rs.protocol, + 'last_modified': config_rs.lastModified, + 'settings': settings_list + }, indent=2) + if filename: + try: + with open(filename, 'w') as f: + f.write(output) + logging.info('Output written to %s', filename) + except IOError as e: + raise CommandError('sso-cloud', f'Failed to write output file "{filename}": {e}') + else: + print(output) + return + + settings_by_name = {} # type: Dict[str, ssocloud.SsoCloudSettingValue] + for sv in config_rs.ssoCloudSettingValue: + settings_by_name[sv.settingName] = sv + + for group_label, setting_names in SETTING_GROUPS.items(): + group_settings = [settings_by_name.get(name) for name in setting_names] + group_settings = [s for s in group_settings if s is not None] + if not group_settings: + continue + + logging.info('') + logging.info(' --- %s ---', group_label) + for sv in group_settings: + if sv.isFromFile and sv.value and len(sv.value) > 80: + display_value = f'[{len(sv.value)} bytes]' + else: + display_value = SsoCloudMixin.format_setting_value(sv) + + editable_marker = '' if sv.isEditable else ' (read-only)' + required_marker = ' *' if sv.isRequired else '' + logging.info('{0:>40s}: {1}{2}{3}'.format( + sv.label or sv.settingName, display_value, required_marker, editable_marker)) + + ungrouped_names = set() + for group_names in SETTING_GROUPS.values(): + ungrouped_names.update(group_names) + ungrouped = [sv for name, sv in settings_by_name.items() if name not in ungrouped_names] + if ungrouped: + logging.info('') + logging.info(' --- Other Settings ---') + for sv in ungrouped: + display_value = SsoCloudMixin.format_setting_value(sv) + logging.info('{0:>40s}: {1}'.format(sv.label or sv.settingName, display_value)) + + logging.info('') + + @staticmethod + def dump_sso_services(params, fmt=None, filename=None): + # type: (KeeperParams, Optional[str], Optional[str]) -> None + """Display all SSO service providers as a table.""" + sso_services = params.enterprise.get('sso_services', []) + table = [] + headers = ['sp_id', 'name', 'node_id', 'node_name', 'active', 'is_cloud'] + if fmt and fmt != 'json': + headers = [field_to_title(x) for x in headers] + for svc in sso_services: + sp_id = svc.get('sso_service_provider_id') + name = svc.get('name', '') + node_id = svc.get('node_id', 0) + node_name = SsoCloudMixin.get_node_name(params, node_id) if node_id else 'N/A' + active = svc.get('active', False) + is_cloud = svc.get('is_cloud', False) + table.append([sp_id, name, node_id, node_name, active, is_cloud]) + return dump_report_data(table, headers=headers, fmt=fmt, filename=filename) diff --git a/keepercommander/commands/sso_cloud/parsers.py b/keepercommander/commands/sso_cloud/parsers.py new file mode 100644 index 000000000..91a7d12a9 --- /dev/null +++ b/keepercommander/commands/sso_cloud/parsers.py @@ -0,0 +1,116 @@ +# _ __ +# | |/ /___ ___ _ __ ___ _ _ ® +# | ' Any + name = kwargs.get('name') + if not name: + logging.warning('"--name" option is required for "create" command') + return + + node_name = kwargs.get('node') + nodes = list(self.resolve_nodes(params, node_name)) + if len(nodes) == 0: + raise CommandError('sso-cloud', f'Node "{node_name}" not found.') + if len(nodes) > 1: + raise CommandError('sso-cloud', f'Node name "{node_name}" is not unique. Use Node ID.') + target_node = nodes[0] + node_id = target_node['node_id'] + + existing = params.enterprise.get('sso_services', []) + for svc in existing: + if svc.get('node_id') == node_id: + raise CommandError('sso-cloud', + f'Node already has an SSO service provider: ' + f'"{svc.get("name")}" (ID: {svc.get("sso_service_provider_id")})') + + tree_key = params.enterprise.get('unencrypted_tree_key') + if not tree_key: + raise CommandError('sso-cloud', 'Enterprise tree key not available. Ensure enterprise data is loaded.') + + sp_data_key = crypto.get_random_bytes(32) + encrypted_sp_data_key = crypto.encrypt_aes_v1(sp_data_key, tree_key) + + rq = { + 'command': 'sso_service_provider_add', + 'sso_service_provider_id': self.get_enterprise_id(params), + 'node_id': node_id, + 'name': name, + 'sp_data_key': utils.base64_url_encode(encrypted_sp_data_key), + 'invite_new_users': True, + 'is_cloud': True, + } + rs = api.communicate(params, rq) + sp_id = rs.get('sso_service_provider_id') or rq['sso_service_provider_id'] + logging.info('SSO Service Provider created: %s (ID: %s)', name, sp_id) + + config_name = kwargs.get('config_name') or 'Default' + config_rq = ssocloud.SsoCloudConfigurationRequest() + config_rq.ssoServiceProviderId = sp_id + config_rq.name = config_name + config_rq.ssoAuthProtocolType = ssocloud.SAML2 + + config_rs = api.communicate_rest( + params, config_rq, 'sso/config/sso_cloud_configuration_add', + rs_type=ssocloud.SsoCloudConfigurationResponse) + + config_id = config_rs.ssoSpConfigurationId + logging.info('SAML2 Configuration created: "%s" (ID: %s)', config_name, config_id) + + setting_rq = ssocloud.SsoCloudConfigurationRequest() + setting_rq.ssoServiceProviderId = sp_id + setting_rq.ssoSpConfigurationId = config_id + + idp_type_name = kwargs['idp_type'] + idp_type_enum = IDP_TYPE_NAME_TO_ENUM.get(idp_type_name.lower()) + if idp_type_enum is not None: + action = ssocloud.SsoCloudSettingAction() + action.settingName = 'sso_idp_type_id' + action.operation = ssocloud.SET + action.value = str(idp_type_enum) + setting_rq.ssoCloudSettingAction.append(action) + + domain = kwargs.get('domain') + if domain: + action = ssocloud.SsoCloudSettingAction() + action.settingName = 'sso_sp_domain' + action.operation = ssocloud.SET + action.value = domain + setting_rq.ssoCloudSettingAction.append(action) + + if setting_rq.ssoCloudSettingAction: + api.communicate_rest( + params, setting_rq, 'sso/config/sso_cloud_configuration_setting_set', + rs_type=ssocloud.SsoCloudConfigurationResponse) + if idp_type_enum is not None: + logging.info('IdP type set to: %s', IDP_TYPE_NAMES.get(idp_type_enum, idp_type_name)) + if domain: + logging.info('Enterprise domain set to: %s', domain) + + api.query_enterprise(params, force=True) + + fmt = kwargs.get('format') + if fmt == 'json': + import json as json_mod + result = { + 'sso_service_provider_id': sp_id, + 'name': name, + 'node_id': node_id, + 'config_id': config_id, + 'config_name': config_name, + 'idp_type': idp_type_name, + } + if domain: + result['domain'] = domain + try: + config_rs = self.get_selected_configuration(params, sp_id) + settings = {} + for sv in config_rs.ssoCloudSettingValue: + settings[sv.settingName] = sv.value or '' + result['settings'] = settings + except Exception as e: + logging.debug('Failed to fetch settings for JSON output: %s', e) + print(json_mod.dumps(result, indent=2)) + else: + logging.info('') + logging.info('Next steps:') + logging.info(' sso-cloud guide "%s" View IdP-specific setup instructions', name) + logging.info(' sso-cloud get "%s" View configuration details & endpoints', name) + + +class SsoCloudDeleteCommand(EnterpriseCommand, SsoCloudMixin): + def get_parser(self): + return sso_cloud_delete_parser + + def execute(self, params, **kwargs): + # type: (KeeperParams, **Any) -> Any + target = kwargs.get('target') + svc = self.find_sso_service(params, target) + sp_id = svc['sso_service_provider_id'] + sp_name = svc.get('name', target) + self.ensure_cloud_sso(svc, target) + + config_target = kwargs.get('config') + if config_target: + self._delete_configuration(params, sp_id, config_target, kwargs.get('force')) + else: + self._delete_service_provider(params, sp_id, sp_name, kwargs.get('force')) + + @staticmethod + def _delete_configuration(params, sp_id, config_target, force): + # type: (KeeperParams, int, str, bool) -> None + config_rs = SsoCloudMixin.get_selected_configuration(params, sp_id, config_target=config_target) + config_id = config_rs.ssoSpConfigurationId + config_name = config_rs.name + + if not force: + answer = user_choice( + f'Are you sure you want to delete configuration "{config_name}" (ID: {config_id})?', + 'yn', default='n') + if answer.lower() != 'y': + logging.info('Delete cancelled.') + return + + rq = ssocloud.SsoCloudConfigurationRequest() + rq.ssoServiceProviderId = sp_id + rq.ssoSpConfigurationId = config_id + + api.communicate_rest( + params, rq, 'sso/config/sso_cloud_configuration_delete', + rs_type=ssocloud.SsoCloudConfigurationResponse) + + logging.info('Configuration "%s" (ID: %s) deleted.', config_name, config_id) + api.query_enterprise(params, force=True) + + @staticmethod + def _delete_service_provider(params, sp_id, sp_name, force): + # type: (KeeperParams, int, str, bool) -> None + if not force: + answer = user_choice( + f'Are you sure you want to delete SSO Service Provider "{sp_name}" (ID: {sp_id}) ' + f'and ALL its configurations?', 'yn', default='n') + if answer.lower() != 'y': + logging.info('Delete cancelled.') + return + + rq = { + 'command': 'sso_service_provider_delete', + 'sso_service_provider_id': sp_id, + } + api.communicate(params, rq) + + logging.info('SSO Service Provider "%s" (ID: %s) deleted.', sp_name, sp_id) + api.query_enterprise(params, force=True)