From cd77fde278a0aaa55616632444babc95e799c7d2 Mon Sep 17 00:00:00 2001 From: Kevin Burchfield Date: Mon, 18 May 2026 11:03:22 -0400 Subject: [PATCH 1/4] Feature(authorization): Introduce the foundation of the Authorization API to the duo_client_python --- README.md | 2 ++ duo_client/__init__.py | 2 ++ duo_client/authorization.py | 57 +++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) create mode 100644 duo_client/authorization.py diff --git a/README.md b/README.md index d096467..dbc6584 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ **Activity** - The activity endpoint is in public preview and subject to change +**Authorization** - The authorization endpoint is in public preview and subject to change + ## Tested Against Python Versions * 3.7 * 3.8 diff --git a/duo_client/__init__.py b/duo_client/__init__.py index 3fb5d01..84e4680 100644 --- a/duo_client/__init__.py +++ b/duo_client/__init__.py @@ -1,10 +1,12 @@ from .accounts import Accounts from .admin import Admin from .auth import Auth +from .authorization import Authorization from .client import __version__ __all__ = [ 'Accounts', 'Admin', 'Auth', + 'Authorization', ] diff --git a/duo_client/authorization.py b/duo_client/authorization.py new file mode 100644 index 0000000..6a04bc5 --- /dev/null +++ b/duo_client/authorization.py @@ -0,0 +1,57 @@ +""" +Duo Security Authorization API reference client implementation. +""" +from . import client + + +class Authorization(client.Client): + def ping(self): + """ + Determine if the Duo Authorization service is up and responding. + + Returns information about the service state: { + 'time': , + } + """ + return self.json_api_call('GET', '/authorize/v1/ping', {}) + + def check(self): + """ + Determine if the integration key, secret key, and signature + generation are valid. + + Returns information about the service state: { + 'time': , + } + """ + return self.json_api_call('GET', '/authorize/v1/check', {}) + + def evaluate(self, access_token, mcp_server_id, mcp_server_name='', tool=None): + """ + Evaluate authorization policy for MCP server capabilities. + + Returns: { + 'allowed_capabilities': , + 'authorized': , + 'expires_at': , + 'user_id': , + 'non_human_identity': , + 'policy_version_id': , + } + """ + params = { + 'access_token': access_token, + 'mcp_server_id': mcp_server_id, + 'mcp_server_name': mcp_server_name, + } + if tool is not None: + params['tool'] = tool + response = self.json_api_call('POST', '/authorize/v1/mcp_capabilities/evaluate', params) + return { + 'allowed_capabilities': response.get('allowed_capabilities'), + 'authorized': response.get('authorized'), + 'expires_at': response.get('expires_at'), + 'user_id': response.get('user_id'), + 'non_human_identity': response.get('non_human_identity'), + 'policy_version_id': response.get('policy_version_id'), + } From a12e8dda2f72bb11eae4679d31a213cf419b1206 Mon Sep 17 00:00:00 2001 From: Kevin Burchfield Date: Mon, 18 May 2026 11:22:42 -0400 Subject: [PATCH 2/4] Add tests for the authorization client --- tests/authorization/__init__.py | 0 tests/authorization/base.py | 15 +++++++ tests/authorization/test_authorization.py | 55 +++++++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 tests/authorization/__init__.py create mode 100644 tests/authorization/base.py create mode 100644 tests/authorization/test_authorization.py diff --git a/tests/authorization/__init__.py b/tests/authorization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/authorization/base.py b/tests/authorization/base.py new file mode 100644 index 0000000..3debbee --- /dev/null +++ b/tests/authorization/base.py @@ -0,0 +1,15 @@ +import unittest +from .. import util +import duo_client.authorization + + +class TestAuthorization(unittest.TestCase): + + def setUp(self): + self.client = duo_client.authorization.Authorization( + 'test_ikey', 'test_skey', 'example.com') + self.client._connect = lambda: util.MockHTTPConnection() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py new file mode 100644 index 0000000..16da3be --- /dev/null +++ b/tests/authorization/test_authorization.py @@ -0,0 +1,55 @@ +import unittest +from .base import TestAuthorization +from .. import util + + +class TestPing(TestAuthorization): + + def test_ping(self): + response = self.client.ping() + self.assertEqual(response['method'], 'GET') + self.assertIn('/authorize/v1/ping', response['uri']) + + +class TestCheck(TestAuthorization): + + def test_check(self): + response = self.client.check() + self.assertEqual(response['method'], 'GET') + self.assertIn('/authorize/v1/check', response['uri']) + + +class TestEvaluate(TestAuthorization): + + def setUp(self): + super().setUp() + self.mock_conn = util.MockHTTPConnection() + self.client._connect = lambda: self.mock_conn + + def test_evaluate(self): + response = self.client.evaluate( + access_token='test_token', + mcp_server_id='server_123', + ) + self.assertEqual(self.mock_conn.method, 'POST') + self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) + self.assertIn('allowed_capabilities', response) + self.assertIn('authorized', response) + self.assertIn('expires_at', response) + self.assertIn('user_id', response) + self.assertIn('non_human_identity', response) + self.assertIn('policy_version_id', response) + + def test_evaluate_with_optional_params(self): + response = self.client.evaluate( + access_token='test_token', + mcp_server_id='server_123', + mcp_server_name='my_server', + tool='my_tool', + ) + self.assertEqual(self.mock_conn.method, 'POST') + self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) + + +if __name__ == '__main__': + unittest.main() From cbba2596836673569e344410d41c693df000888c Mon Sep 17 00:00:00 2001 From: Kevin Burchfield Date: Mon, 18 May 2026 11:28:31 -0400 Subject: [PATCH 3/4] Move the evaluate input into a dataclass --- duo_client/authorization.py | 26 +++++++++++++++++------ tests/authorization/test_authorization.py | 7 ++++-- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/duo_client/authorization.py b/duo_client/authorization.py index 6a04bc5..92705e1 100644 --- a/duo_client/authorization.py +++ b/duo_client/authorization.py @@ -1,9 +1,21 @@ """ Duo Security Authorization API reference client implementation. """ +from dataclasses import dataclass +from typing import ClassVar, Final, Optional + from . import client +@dataclass +class McpCapabilities: + route_fragment: ClassVar[Final[str]] = 'mcp_capabilities' + access_token: str + mcp_server_id: str + mcp_server_name: str = '' + tool: Optional[str] = None + + class Authorization(client.Client): def ping(self): """ @@ -26,7 +38,7 @@ def check(self): """ return self.json_api_call('GET', '/authorize/v1/check', {}) - def evaluate(self, access_token, mcp_server_id, mcp_server_name='', tool=None): + def evaluate(self, input: McpCapabilities): """ Evaluate authorization policy for MCP server capabilities. @@ -40,13 +52,13 @@ def evaluate(self, access_token, mcp_server_id, mcp_server_name='', tool=None): } """ params = { - 'access_token': access_token, - 'mcp_server_id': mcp_server_id, - 'mcp_server_name': mcp_server_name, + 'access_token': input.access_token, + 'mcp_server_id': input.mcp_server_id, + 'mcp_server_name': input.mcp_server_name, } - if tool is not None: - params['tool'] = tool - response = self.json_api_call('POST', '/authorize/v1/mcp_capabilities/evaluate', params) + if input.tool is not None: + params['tool'] = input.tool + response = self.json_api_call('POST', f'/authorize/v1/{input.route_fragment}/evaluate', params) return { 'allowed_capabilities': response.get('allowed_capabilities'), 'authorized': response.get('authorized'), diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py index 16da3be..9b2e59e 100644 --- a/tests/authorization/test_authorization.py +++ b/tests/authorization/test_authorization.py @@ -1,6 +1,7 @@ import unittest from .base import TestAuthorization from .. import util +from duo_client.authorization import McpCapabilities class TestPing(TestAuthorization): @@ -27,10 +28,11 @@ def setUp(self): self.client._connect = lambda: self.mock_conn def test_evaluate(self): - response = self.client.evaluate( + capabilities = McpCapabilities( access_token='test_token', mcp_server_id='server_123', ) + response = self.client.evaluate(capabilities) self.assertEqual(self.mock_conn.method, 'POST') self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) self.assertIn('allowed_capabilities', response) @@ -41,12 +43,13 @@ def test_evaluate(self): self.assertIn('policy_version_id', response) def test_evaluate_with_optional_params(self): - response = self.client.evaluate( + capabilities = McpCapabilities( access_token='test_token', mcp_server_id='server_123', mcp_server_name='my_server', tool='my_tool', ) + response = self.client.evaluate(capabilities) self.assertEqual(self.mock_conn.method, 'POST') self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) From 9ebe1ca5ed839e5ddfbf37272a0797dadd2f1b33 Mon Sep 17 00:00:00 2001 From: Kevin Burchfield Date: Mon, 18 May 2026 11:32:30 -0400 Subject: [PATCH 4/4] Add example for evaluating mcp capabilities --- .../evaluate_mcp_capabilities.py | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 examples/Authorization/evaluate_mcp_capabilities.py diff --git a/examples/Authorization/evaluate_mcp_capabilities.py b/examples/Authorization/evaluate_mcp_capabilities.py new file mode 100644 index 0000000..08bef24 --- /dev/null +++ b/examples/Authorization/evaluate_mcp_capabilities.py @@ -0,0 +1,99 @@ +""" +Example of Duo Authorization API MCP capabilities evaluation +""" + +from argparse import ArgumentParser, Namespace +import duo_client +from duo_client.authorization import McpCapabilities +import getpass + + +def _get_arg(args: Namespace, name: str, prompt: str, secure=False): + """Read arg from CLI flags or stdin, using getpass when sensitive information should not be echoed to tty""" + value = getattr(args, name) + if value is not None: + return value + + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials(args: Namespace) -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Authorization API ikey, skey and hostname strings + """ + + ikey = _get_arg(args, "ikey", 'Duo Authorization API integration key ("DI..."): ') + skey = _get_arg(args, "skey", 'Duo Authorization API integration secret key: ', secure=True) + host = _get_arg(args, "api_host", 'Duo Authorization API hostname ("api-....duosecurity.com"): ') + access_token = _get_arg(args, "access_token", 'Access token: ', secure=True) + mcp_server_id = _get_arg(args, "mcp_server_id", 'MCP Server ID: ') + + return { + "IKEY": ikey, + "SKEY": skey, + "APIHOST": host, + "ACCESS_TOKEN": access_token, + "MCP_SERVER_ID": mcp_server_id, + } + + +def main(): + """Main program entry point""" + + parser = ArgumentParser() + parser.add_argument("--ikey", type=str) + parser.add_argument("--skey", type=str) + parser.add_argument("--api-host", type=str) + parser.add_argument("--access-token", type=str) + parser.add_argument("--mcp-server-id", type=str) + parser.add_argument("--mcp-server-name", type=str, default='') + parser.add_argument("--tool", type=str, default=None) + args = parser.parse_args() + + inputs = prompt_for_credentials(args) + + authz_client = duo_client.Authorization( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'], + ) + + # Verify that the Duo service is available + duo_ping = authz_client.ping() + if 'time' in duo_ping: + print("\nDuo Authorization service check completed successfully.") + else: + print(f"Error: {duo_ping}") + + # Verify that IKEY and SKEY information provided are valid + duo_check = authz_client.check() + if 'time' in duo_check: + print("IKEY and SKEY provided have been verified.") + else: + print(f"Error: {duo_check}") + + # Evaluate MCP capabilities + capabilities = McpCapabilities( + access_token=inputs['ACCESS_TOKEN'], + mcp_server_id=inputs['MCP_SERVER_ID'], + mcp_server_name=args.mcp_server_name, + tool=args.tool, + ) + + print(f"\nEvaluating MCP capabilities for server {inputs['MCP_SERVER_ID']}...") + result = authz_client.evaluate(capabilities) + + print(f"\nAuthorized: {result['authorized']}") + print(f"Allowed capabilities: {result['allowed_capabilities']}") + print(f"User ID: {result['user_id']}") + print(f"Non-human identity: {result['non_human_identity']}") + print(f"Policy version ID: {result['policy_version_id']}") + print(f"Expires at: {result['expires_at']}") + + +if __name__ == '__main__': + main()