From 757f9a1346e6346a4632b5673d09f3f5f9c72a24 Mon Sep 17 00:00:00 2001 From: aschumann-virtualcable Date: Fri, 24 Apr 2026 14:37:52 +0200 Subject: [PATCH 1/6] Removes leaf code signing check from cert chain validation Simplifies certificate chain validation by eliminating the explicit code signing verification on the leaf certificate. May improve compatibility with chains where code signing is not required or enforced by other mechanisms. --- server/src/uds/core/managers/crypto/certs.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index 2c6ea0b3d..af803e612 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -39,7 +39,6 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.serialization import pkcs7 -from cryptography.x509.oid import ExtendedKeyUsageOID from django.conf import settings @@ -115,16 +114,6 @@ def load_system_roots() -> list[x509.Certificate]: return _system_trust_cache -def _check_leaf_code_signing(leaf: x509.Certificate) -> None: - # mstsc won't accept the .rdp signature without codeSigning EKU on the leaf - try: - eku = leaf.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value - except x509.ExtensionNotFound: - raise ValueError('Leaf missing Extended Key Usage extension (codeSigning required)') - if ExtendedKeyUsageOID.CODE_SIGNING not in eku: - raise ValueError('Leaf missing codeSigning EKU required for RDP signing') - - def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: str) -> None: try: cert.verify_directly_issued_by(issuer) @@ -180,5 +169,4 @@ def check_cert_chain(cert_chain: pathlib.Path | str) -> None: certs = load_pem_certificates(cert_chain) if not certs: raise ValueError('No certificates found in certificate chain') - _check_leaf_code_signing(certs[0]) _walk_chain(certs[0], certs[1:]) From 4316313f83af0e234bfcab1bdf79104afd2340a4 Mon Sep 17 00:00:00 2001 From: aschumann-virtualcable Date: Fri, 24 Apr 2026 19:16:16 +0200 Subject: [PATCH 2/6] Adds in-memory cert chain check and improves file handling Introduces a new function for certificate chain validation that operates directly on parsed certificates, supporting use cases like PKCS12 without requiring file access. Refactors related routines to use the new function, and improves file reading logic with context managers to ensure proper resource management. Also updates test imports for better coverage. --- server/src/uds/core/managers/crypto/certs.py | 7 +- server/src/uds/core/managers/crypto/rdp.py | 8 +- server/tests/core/managers/__init__.py | 2 + server/tests/core/managers/_cert_factory.py | 142 +++++++++++ server/tests/core/managers/test_certs.py | 239 +++++++++++++++++++ server/tests/core/managers/test_rdp.py | 226 ++++++++++++++++++ 6 files changed, 620 insertions(+), 4 deletions(-) create mode 100644 server/tests/core/managers/_cert_factory.py create mode 100644 server/tests/core/managers/test_certs.py create mode 100644 server/tests/core/managers/test_rdp.py diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index af803e612..a42b88d94 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -164,9 +164,14 @@ def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: raise ValueError(f'Chain depth exceeded {_MAX_CHAIN_DEPTH} (possible loop)') +def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: + # in-memory variant, for callers that already hold parsed certs (e.g. PKCS12) + _walk_chain(leaf, chain) + + def check_cert_chain(cert_chain: pathlib.Path | str) -> None: # preflight hit before signing; raises if anything's off certs = load_pem_certificates(cert_chain) if not certs: raise ValueError('No certificates found in certificate chain') - _walk_chain(certs[0], certs[1:]) + check_chain(certs[0], certs[1:]) diff --git a/server/src/uds/core/managers/crypto/rdp.py b/server/src/uds/core/managers/crypto/rdp.py index cec6c6b74..894685674 100644 --- a/server/src/uds/core/managers/crypto/rdp.py +++ b/server/src/uds/core/managers/crypto/rdp.py @@ -115,7 +115,8 @@ def _check_pubkey_matches_key(cert: x509.Certificate, key: _PrivateKey) -> None: def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Certificate]]: cert_path = _certs.get_server_cert() - cert_data = open(cert_path, 'rb').read() + with open(cert_path, 'rb') as f: + cert_data = f.read() # try PFX first, it carries key+chain in one file try: @@ -126,7 +127,7 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if p12_cert is not None and p12_key is not None: key = _ensure_signer_key(p12_key) - _certs.check_cert_chain(cert_path) + _certs.check_chain(p12_cert, list(p12_chain or [])) _check_pubkey_matches_key(p12_cert, key) return p12_cert, key, list(p12_chain or []) @@ -134,7 +135,8 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if not certs: raise ValueError(f'No certificates found in {cert_path}') - key_data = open(_certs.get_server_key(), 'rb').read() + with open(_certs.get_server_key(), 'rb') as f: + key_data = f.read() key = _ensure_signer_key(_certs.load_private_key_any_format(key_data)) _certs.check_cert_chain(cert_path) diff --git a/server/tests/core/managers/__init__.py b/server/tests/core/managers/__init__.py index c7faa63a8..62941f29c 100644 --- a/server/tests/core/managers/__init__.py +++ b/server/tests/core/managers/__init__.py @@ -29,5 +29,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ # pyright: reportUnusedImport=false +from . import test_certs from . import test_crypto from . import test_downloads +from . import test_rdp diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py new file mode 100644 index 000000000..79e8cef41 --- /dev/null +++ b/server/tests/core/managers/_cert_factory.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +# Shared cert/key factory for crypto manager tests. +# Not a test module (filename does not start with ``test_``), so pytest skips collection. +import datetime +import secrets +import typing + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.x509.oid import NameOID + +_PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey] + + +def _name(cn: str) -> x509.Name: + return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + + +def make_rsa_key(size: int = 2048) -> RSAPrivateKey: + return rsa.generate_private_key(public_exponent=65537, key_size=size) + + +def build_cert( + subject_cn: str, + subject_key: _PrivateKey, + issuer_cn: str, + issuer_key: _PrivateKey, + *, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> x509.Certificate: + now = datetime.datetime.now(datetime.timezone.utc) + nb = not_before or (now - datetime.timedelta(days=1)) + na = not_after or (now + datetime.timedelta(days=365)) + builder = ( + x509.CertificateBuilder() + .subject_name(_name(subject_cn)) + .issuer_name(_name(issuer_cn)) + .public_key(subject_key.public_key()) + .serial_number(secrets.randbits(63)) + .not_valid_before(nb) + .not_valid_after(na) + ) + if is_ca: + builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + return builder.sign(issuer_key, hashes.SHA256()) + + +def self_signed( + cn: str, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = True, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + cert = build_cert(cn, key, cn, key, is_ca=is_ca, not_before=not_before, not_after=not_after) + return cert, key + + +def issue( + cn: str, + issuer_cert: x509.Certificate, + issuer_key: _PrivateKey, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + cert = build_cert( + cn, + key, + issuer_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + issuer_key, + is_ca=is_ca, + not_before=not_before, + not_after=not_after, + ) + return cert, key + + +def to_pem(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.PEM) + + +def to_der(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.DER) + + +def key_to_pem(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def key_to_der(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.DER, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def chain_to_pem(*certs: x509.Certificate) -> bytes: + return b''.join(to_pem(c) for c in certs) diff --git a/server/tests/core/managers/test_certs.py b/server/tests/core/managers/test_certs.py new file mode 100644 index 000000000..750b952ea --- /dev/null +++ b/server/tests/core/managers/test_certs.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import datetime +import pathlib +import tempfile +import typing + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import pkcs7 + +from django.conf import settings +from django.test import override_settings + +from uds.core.managers.crypto import certs + +from ...utils.test import UDSTestCase +from . import _cert_factory as cf + + +class CertsTest(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + # Reset module-level system trust cache so each test can inject its own bundle + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def _write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + # ------------------------------------------------------------------ paths + def test_get_server_cert_default(self) -> None: + # No setting set -> default path + self.assertEqual(certs.get_server_cert(), '/etc/certs/server.pem') + self.assertEqual(certs.get_server_key(), '/etc/certs/key.pem') + + @override_settings(RDP_SIGN_CERT='/custom/cert.pem', RDP_SIGN_KEY='/custom/key.pem') + def test_get_server_cert_override(self) -> None: + self.assertEqual(certs.get_server_cert(), '/custom/cert.pem') + self.assertEqual(certs.get_server_key(), '/custom/key.pem') + + # ------------------------------------------------------------------ loaders + def test_load_certificates_pem(self) -> None: + root_cert, _ = cf.self_signed('ROOT') + leaf_cert, _ = cf.self_signed('LEAF', is_ca=False) + loaded = certs.load_certificates_any_format(cf.chain_to_pem(leaf_cert, root_cert)) + self.assertEqual(len(loaded), 2) + self.assertEqual(loaded[0].subject, leaf_cert.subject) + self.assertEqual(loaded[1].subject, root_cert.subject) + + def test_load_certificates_der(self) -> None: + cert, _ = cf.self_signed('DER-CERT') + loaded = certs.load_certificates_any_format(cf.to_der(cert)) + self.assertEqual(len(loaded), 1) + self.assertEqual(loaded[0].subject, cert.subject) + + def test_load_certificates_pkcs7_pem(self) -> None: + root, _ = cf.self_signed('ROOT') + leaf, _ = cf.self_signed('LEAF', is_ca=False) + blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.PEM) + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) + + def test_load_certificates_pkcs7_der(self) -> None: + root, _ = cf.self_signed('ROOT') + leaf, _ = cf.self_signed('LEAF', is_ca=False) + blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.DER) + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) + + def test_load_certificates_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_certificates_any_format(b'not a cert') + + def test_load_private_key_pem(self) -> None: + _, key = cf.self_signed('K') + loaded = certs.load_private_key_any_format(cf.key_to_pem(key)) + self.assertEqual( + loaded.public_key().public_numbers(), + key.public_key().public_numbers(), + ) + + def test_load_private_key_der(self) -> None: + _, key = cf.self_signed('K') + loaded = certs.load_private_key_any_format(cf.key_to_der(key)) + self.assertEqual( + loaded.public_key().public_numbers(), + key.public_key().public_numbers(), + ) + + def test_load_private_key_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_private_key_any_format(b'not a key') + + def test_load_pem_certificates_reads_file(self) -> None: + cert, _ = cf.self_signed('FILE') + path = self._write('c.pem', cf.to_pem(cert)) + # Accepts both str and Path + loaded_from_str = certs.load_pem_certificates(str(path)) + loaded_from_path = certs.load_pem_certificates(path) + self.assertEqual(loaded_from_str[0].subject, cert.subject) + self.assertEqual(loaded_from_path[0].subject, cert.subject) + + # ------------------------------------------------------------------ system roots + def test_load_system_roots_uses_override(self) -> None: + root, _ = cf.self_signed('OVERRIDE-ROOT') + bundle = self._write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + loaded = certs.load_system_roots() + self.assertEqual([c.subject for c in loaded], [root.subject]) + + def test_load_system_roots_caches_result(self) -> None: + root, _ = cf.self_signed('CACHED-ROOT') + bundle = self._write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + first = certs.load_system_roots() + # Remove the file; cached copy must still be returned + bundle.unlink() + second = certs.load_system_roots() + self.assertIs(first, second) + + def test_load_system_roots_missing_bundle_returns_empty(self) -> None: + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'does-not-exist.pem')): + loaded = certs.load_system_roots() + self.assertEqual(loaded, []) + + # ------------------------------------------------------------------ chain validation + def _install_trust(self, *roots: typing.Any) -> None: + bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) + self._trust_override = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + self._trust_override.enable() + self.addCleanup(self._trust_override.disable) + + def test_check_cert_chain_self_signed_leaf(self) -> None: + # self-signed leaf, no trust bundle: passes (but logs a warning) + leaf_cert, _ = cf.self_signed('selfsigned-leaf', is_ca=False) + path = self._write('leaf.pem', cf.to_pem(leaf_cert)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + certs.check_cert_chain(path) # no raise + + def test_check_cert_chain_valid_with_intermediate(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self._install_trust(root_cert) + path = self._write('chain.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + certs.check_cert_chain(path) # no raise + + def test_check_cert_chain_root_bundled_anchors(self) -> None: + # Leaf + intermediate + root all in the chain file; trust store empty. + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self._write('full.pem', cf.chain_to_pem(leaf_cert, inter_cert, root_cert)) + certs.check_cert_chain(path) + + def test_check_cert_chain_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + # Trust store doesn't know the root AND chain file is missing intermediate + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self._write('leaf-only.pem', cf.to_pem(leaf_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('Incomplete chain', str(ctx.exception)) + + def test_check_cert_chain_expired_raises(self) -> None: + now = datetime.datetime.now(datetime.timezone.utc) + expired_cert, _ = cf.self_signed( + 'EXPIRED', + not_before=now - datetime.timedelta(days=30), + not_after=now - datetime.timedelta(days=1), + ) + path = self._write('exp.pem', cf.to_pem(expired_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('expired or not yet valid', str(ctx.exception)) + + def test_check_cert_chain_empty_file_raises(self) -> None: + path = self._write('empty.pem', b'') + with self.assertRaises(ValueError): + certs.check_cert_chain(path) + + def test_check_chain_in_memory_ok(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self._install_trust(root_cert) + certs.check_chain(leaf_cert, [inter_cert]) # no raise + + def test_check_chain_in_memory_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + # No trust and no intermediate bundled + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + with self.assertRaises(ValueError): + certs.check_chain(leaf_cert, []) diff --git a/server/tests/core/managers/test_rdp.py b/server/tests/core/managers/test_rdp.py new file mode 100644 index 000000000..a49348ece --- /dev/null +++ b/server/tests/core/managers/test_rdp.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import base64 +import pathlib +import struct +import tempfile + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives.serialization import pkcs12 + +from django.test import override_settings + +from uds.core.managers.crypto import certs, rdp + +from ...utils.test import UDSTestCase +from . import _cert_factory as cf + + +class RdpTest(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def _write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + def _pin_trust(self, *roots: 'object') -> None: + bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) # type: ignore[arg-type] + o = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + o.enable() + self.addCleanup(o.disable) + + # ------------------------------------------------------------------ signer key guard + def test_ensure_signer_key_rejects_ed25519(self) -> None: + ed_key = ed25519.Ed25519PrivateKey.generate() + with self.assertRaises(ValueError) as ctx: + rdp._ensure_signer_key(ed_key) + self.assertIn('Unsupported private key type', str(ctx.exception)) + + def test_ensure_signer_key_accepts_rsa(self) -> None: + key = cf.make_rsa_key() + self.assertIs(rdp._ensure_signer_key(key), key) + + # ------------------------------------------------------------------ pubkey match + def test_pubkey_match_ok(self) -> None: + cert, key = cf.self_signed('MATCH', is_ca=False) + rdp._check_pubkey_matches_key(cert, key) # no raise + + def test_pubkey_match_mismatch_raises(self) -> None: + cert, _ = cf.self_signed('MATCH', is_ca=False) + other_key = cf.make_rsa_key() + with self.assertRaises(ValueError) as ctx: + rdp._check_pubkey_matches_key(cert, other_key) + self.assertIn('does not match', str(ctx.exception)) + + # ------------------------------------------------------------------ sign_rdp_settings + def test_sign_rdp_settings_preserves_declared_order(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + # Intentionally unordered; signer must emit them in _RDP_SECURE_SETTINGS order + lines = [ + 'audiomode:i:0', + 'full address:s:host.example.com', + 'server port:i:3389', + ] + sig_b64, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address', 'Server Port', 'AudioMode']) + + raw = base64.b64decode(sig_b64) + self.assertGreater(len(raw), 12) + magic, one, siglen = struct.unpack(' None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + lines = ['unknown:s:whatever', 'full address:s:host'] + _, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address']) + + # ------------------------------------------------------------------ sign_rdp + def test_sign_rdp_appends_signscope_and_signature(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\nserver port:i:3389\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + out_lines = signed.splitlines() + # signscope comes before signature, and signature is last meaningful line + signscope = next(l for l in out_lines if l.startswith('signscope:s:')) + signature = next(l for l in out_lines if l.startswith('signature:s:')) + self.assertIn('Full Address', signscope) + self.assertIn('Server Port', signscope) + # signature value is base64, non-empty + self.assertTrue(signature.startswith('signature:s:')) + base64.b64decode(signature[len('signature:s:'):]) + + def test_sign_rdp_strips_previous_signature_lines(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + # Input already has bogus signature/signscope; must be dropped before resigning + rdp_text = ( + 'full address:s:host\r\n' + 'signscope:s:Full Address\r\n' + 'signature:s:OLDBOGUS==\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + # Only one occurrence of each kept (the new one) + self.assertEqual(signed.count('signature:s:'), 1) + self.assertEqual(signed.count('signscope:s:'), 1) + self.assertNotIn('OLDBOGUS', signed) + + def test_sign_rdp_mirrors_full_address_to_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:host.example.com', signed) + # And it is signed over (must appear in signscope) + signscope = next(l for l in signed.splitlines() if l.startswith('signscope:s:')) + self.assertIn('Alternate Full Address', signscope) + + def test_sign_rdp_keeps_existing_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = ( + 'full address:s:primary.example.com\r\n' + 'alternate full address:s:backup.example.com\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:backup.example.com', signed) + self.assertNotIn('alternate full address:s:primary.example.com', signed) + + # ------------------------------------------------------------------ _load_cert_key_chain + def test_load_cert_key_chain_pem_pair(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self._pin_trust(root_cert) + + cert_path = self._write('cert.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + key_path = self._write('key.pem', cf.key_to_pem(leaf_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_pkcs12(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self._pin_trust(root_cert) + + pfx_bytes = pkcs12.serialize_key_and_certificates( + name=b'leaf', + key=leaf_key, + cert=leaf_cert, + cas=[inter_cert], + encryption_algorithm=serialization.NoEncryption(), + ) + pfx_path = self._write('bundle.pfx', pfx_bytes) + + with override_settings(RDP_SIGN_CERT=str(pfx_path), RDP_SIGN_KEY='/does/not/matter'): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_key_mismatch_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + leaf_cert, _ = cf.issue('LEAF', root_cert, root_key) + self._pin_trust(root_cert) + + wrong_key = cf.make_rsa_key() + cert_path = self._write('cert.pem', cf.to_pem(leaf_cert)) + key_path = self._write('key.pem', cf.key_to_pem(wrong_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + with self.assertRaises(ValueError) as ctx: + rdp._load_cert_key_chain() + self.assertIn('does not match', str(ctx.exception)) From a692898023f81c493dc039d02000f47f0474d7e0 Mon Sep 17 00:00:00 2001 From: aschumann-virtualcable Date: Fri, 24 Apr 2026 19:16:16 +0200 Subject: [PATCH 3/6] Removes unused import to clean up test code Eliminates an unnecessary import to improve code clarity and maintainability. Reduces potential for confusion and avoids clutter in the test module. --- server/src/uds/core/managers/crypto/certs.py | 7 +- server/src/uds/core/managers/crypto/rdp.py | 8 +- server/tests/core/managers/__init__.py | 2 + server/tests/core/managers/_cert_factory.py | 142 +++++++++++ server/tests/core/managers/test_certs.py | 238 +++++++++++++++++++ server/tests/core/managers/test_rdp.py | 226 ++++++++++++++++++ 6 files changed, 619 insertions(+), 4 deletions(-) create mode 100644 server/tests/core/managers/_cert_factory.py create mode 100644 server/tests/core/managers/test_certs.py create mode 100644 server/tests/core/managers/test_rdp.py diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index af803e612..a42b88d94 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -164,9 +164,14 @@ def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: raise ValueError(f'Chain depth exceeded {_MAX_CHAIN_DEPTH} (possible loop)') +def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: + # in-memory variant, for callers that already hold parsed certs (e.g. PKCS12) + _walk_chain(leaf, chain) + + def check_cert_chain(cert_chain: pathlib.Path | str) -> None: # preflight hit before signing; raises if anything's off certs = load_pem_certificates(cert_chain) if not certs: raise ValueError('No certificates found in certificate chain') - _walk_chain(certs[0], certs[1:]) + check_chain(certs[0], certs[1:]) diff --git a/server/src/uds/core/managers/crypto/rdp.py b/server/src/uds/core/managers/crypto/rdp.py index cec6c6b74..894685674 100644 --- a/server/src/uds/core/managers/crypto/rdp.py +++ b/server/src/uds/core/managers/crypto/rdp.py @@ -115,7 +115,8 @@ def _check_pubkey_matches_key(cert: x509.Certificate, key: _PrivateKey) -> None: def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Certificate]]: cert_path = _certs.get_server_cert() - cert_data = open(cert_path, 'rb').read() + with open(cert_path, 'rb') as f: + cert_data = f.read() # try PFX first, it carries key+chain in one file try: @@ -126,7 +127,7 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if p12_cert is not None and p12_key is not None: key = _ensure_signer_key(p12_key) - _certs.check_cert_chain(cert_path) + _certs.check_chain(p12_cert, list(p12_chain or [])) _check_pubkey_matches_key(p12_cert, key) return p12_cert, key, list(p12_chain or []) @@ -134,7 +135,8 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if not certs: raise ValueError(f'No certificates found in {cert_path}') - key_data = open(_certs.get_server_key(), 'rb').read() + with open(_certs.get_server_key(), 'rb') as f: + key_data = f.read() key = _ensure_signer_key(_certs.load_private_key_any_format(key_data)) _certs.check_cert_chain(cert_path) diff --git a/server/tests/core/managers/__init__.py b/server/tests/core/managers/__init__.py index c7faa63a8..62941f29c 100644 --- a/server/tests/core/managers/__init__.py +++ b/server/tests/core/managers/__init__.py @@ -29,5 +29,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ # pyright: reportUnusedImport=false +from . import test_certs from . import test_crypto from . import test_downloads +from . import test_rdp diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py new file mode 100644 index 000000000..79e8cef41 --- /dev/null +++ b/server/tests/core/managers/_cert_factory.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +# Shared cert/key factory for crypto manager tests. +# Not a test module (filename does not start with ``test_``), so pytest skips collection. +import datetime +import secrets +import typing + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.x509.oid import NameOID + +_PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey] + + +def _name(cn: str) -> x509.Name: + return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + + +def make_rsa_key(size: int = 2048) -> RSAPrivateKey: + return rsa.generate_private_key(public_exponent=65537, key_size=size) + + +def build_cert( + subject_cn: str, + subject_key: _PrivateKey, + issuer_cn: str, + issuer_key: _PrivateKey, + *, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> x509.Certificate: + now = datetime.datetime.now(datetime.timezone.utc) + nb = not_before or (now - datetime.timedelta(days=1)) + na = not_after or (now + datetime.timedelta(days=365)) + builder = ( + x509.CertificateBuilder() + .subject_name(_name(subject_cn)) + .issuer_name(_name(issuer_cn)) + .public_key(subject_key.public_key()) + .serial_number(secrets.randbits(63)) + .not_valid_before(nb) + .not_valid_after(na) + ) + if is_ca: + builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + return builder.sign(issuer_key, hashes.SHA256()) + + +def self_signed( + cn: str, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = True, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + cert = build_cert(cn, key, cn, key, is_ca=is_ca, not_before=not_before, not_after=not_after) + return cert, key + + +def issue( + cn: str, + issuer_cert: x509.Certificate, + issuer_key: _PrivateKey, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + cert = build_cert( + cn, + key, + issuer_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + issuer_key, + is_ca=is_ca, + not_before=not_before, + not_after=not_after, + ) + return cert, key + + +def to_pem(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.PEM) + + +def to_der(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.DER) + + +def key_to_pem(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def key_to_der(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.DER, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def chain_to_pem(*certs: x509.Certificate) -> bytes: + return b''.join(to_pem(c) for c in certs) diff --git a/server/tests/core/managers/test_certs.py b/server/tests/core/managers/test_certs.py new file mode 100644 index 000000000..43df63dbb --- /dev/null +++ b/server/tests/core/managers/test_certs.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import datetime +import pathlib +import tempfile +import typing + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import pkcs7 + +from django.test import override_settings + +from uds.core.managers.crypto import certs + +from ...utils.test import UDSTestCase +from . import _cert_factory as cf + + +class CertsTest(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + # Reset module-level system trust cache so each test can inject its own bundle + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def _write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + # ------------------------------------------------------------------ paths + def test_get_server_cert_default(self) -> None: + # No setting set -> default path + self.assertEqual(certs.get_server_cert(), '/etc/certs/server.pem') + self.assertEqual(certs.get_server_key(), '/etc/certs/key.pem') + + @override_settings(RDP_SIGN_CERT='/custom/cert.pem', RDP_SIGN_KEY='/custom/key.pem') + def test_get_server_cert_override(self) -> None: + self.assertEqual(certs.get_server_cert(), '/custom/cert.pem') + self.assertEqual(certs.get_server_key(), '/custom/key.pem') + + # ------------------------------------------------------------------ loaders + def test_load_certificates_pem(self) -> None: + root_cert, _ = cf.self_signed('ROOT') + leaf_cert, _ = cf.self_signed('LEAF', is_ca=False) + loaded = certs.load_certificates_any_format(cf.chain_to_pem(leaf_cert, root_cert)) + self.assertEqual(len(loaded), 2) + self.assertEqual(loaded[0].subject, leaf_cert.subject) + self.assertEqual(loaded[1].subject, root_cert.subject) + + def test_load_certificates_der(self) -> None: + cert, _ = cf.self_signed('DER-CERT') + loaded = certs.load_certificates_any_format(cf.to_der(cert)) + self.assertEqual(len(loaded), 1) + self.assertEqual(loaded[0].subject, cert.subject) + + def test_load_certificates_pkcs7_pem(self) -> None: + root, _ = cf.self_signed('ROOT') + leaf, _ = cf.self_signed('LEAF', is_ca=False) + blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.PEM) + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) + + def test_load_certificates_pkcs7_der(self) -> None: + root, _ = cf.self_signed('ROOT') + leaf, _ = cf.self_signed('LEAF', is_ca=False) + blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.DER) + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) + + def test_load_certificates_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_certificates_any_format(b'not a cert') + + def test_load_private_key_pem(self) -> None: + _, key = cf.self_signed('K') + loaded = certs.load_private_key_any_format(cf.key_to_pem(key)) + self.assertEqual( + loaded.public_key().public_numbers(), + key.public_key().public_numbers(), + ) + + def test_load_private_key_der(self) -> None: + _, key = cf.self_signed('K') + loaded = certs.load_private_key_any_format(cf.key_to_der(key)) + self.assertEqual( + loaded.public_key().public_numbers(), + key.public_key().public_numbers(), + ) + + def test_load_private_key_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_private_key_any_format(b'not a key') + + def test_load_pem_certificates_reads_file(self) -> None: + cert, _ = cf.self_signed('FILE') + path = self._write('c.pem', cf.to_pem(cert)) + # Accepts both str and Path + loaded_from_str = certs.load_pem_certificates(str(path)) + loaded_from_path = certs.load_pem_certificates(path) + self.assertEqual(loaded_from_str[0].subject, cert.subject) + self.assertEqual(loaded_from_path[0].subject, cert.subject) + + # ------------------------------------------------------------------ system roots + def test_load_system_roots_uses_override(self) -> None: + root, _ = cf.self_signed('OVERRIDE-ROOT') + bundle = self._write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + loaded = certs.load_system_roots() + self.assertEqual([c.subject for c in loaded], [root.subject]) + + def test_load_system_roots_caches_result(self) -> None: + root, _ = cf.self_signed('CACHED-ROOT') + bundle = self._write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + first = certs.load_system_roots() + # Remove the file; cached copy must still be returned + bundle.unlink() + second = certs.load_system_roots() + self.assertIs(first, second) + + def test_load_system_roots_missing_bundle_returns_empty(self) -> None: + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'does-not-exist.pem')): + loaded = certs.load_system_roots() + self.assertEqual(loaded, []) + + # ------------------------------------------------------------------ chain validation + def _install_trust(self, *roots: typing.Any) -> None: + bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) + self._trust_override = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + self._trust_override.enable() + self.addCleanup(self._trust_override.disable) + + def test_check_cert_chain_self_signed_leaf(self) -> None: + # self-signed leaf, no trust bundle: passes (but logs a warning) + leaf_cert, _ = cf.self_signed('selfsigned-leaf', is_ca=False) + path = self._write('leaf.pem', cf.to_pem(leaf_cert)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + certs.check_cert_chain(path) # no raise + + def test_check_cert_chain_valid_with_intermediate(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self._install_trust(root_cert) + path = self._write('chain.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + certs.check_cert_chain(path) # no raise + + def test_check_cert_chain_root_bundled_anchors(self) -> None: + # Leaf + intermediate + root all in the chain file; trust store empty. + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self._write('full.pem', cf.chain_to_pem(leaf_cert, inter_cert, root_cert)) + certs.check_cert_chain(path) + + def test_check_cert_chain_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + # Trust store doesn't know the root AND chain file is missing intermediate + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self._write('leaf-only.pem', cf.to_pem(leaf_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('Incomplete chain', str(ctx.exception)) + + def test_check_cert_chain_expired_raises(self) -> None: + now = datetime.datetime.now(datetime.timezone.utc) + expired_cert, _ = cf.self_signed( + 'EXPIRED', + not_before=now - datetime.timedelta(days=30), + not_after=now - datetime.timedelta(days=1), + ) + path = self._write('exp.pem', cf.to_pem(expired_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('expired or not yet valid', str(ctx.exception)) + + def test_check_cert_chain_empty_file_raises(self) -> None: + path = self._write('empty.pem', b'') + with self.assertRaises(ValueError): + certs.check_cert_chain(path) + + def test_check_chain_in_memory_ok(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self._install_trust(root_cert) + certs.check_chain(leaf_cert, [inter_cert]) # no raise + + def test_check_chain_in_memory_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + # No trust and no intermediate bundled + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + with self.assertRaises(ValueError): + certs.check_chain(leaf_cert, []) diff --git a/server/tests/core/managers/test_rdp.py b/server/tests/core/managers/test_rdp.py new file mode 100644 index 000000000..a49348ece --- /dev/null +++ b/server/tests/core/managers/test_rdp.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import base64 +import pathlib +import struct +import tempfile + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives.serialization import pkcs12 + +from django.test import override_settings + +from uds.core.managers.crypto import certs, rdp + +from ...utils.test import UDSTestCase +from . import _cert_factory as cf + + +class RdpTest(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def _write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + def _pin_trust(self, *roots: 'object') -> None: + bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) # type: ignore[arg-type] + o = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + o.enable() + self.addCleanup(o.disable) + + # ------------------------------------------------------------------ signer key guard + def test_ensure_signer_key_rejects_ed25519(self) -> None: + ed_key = ed25519.Ed25519PrivateKey.generate() + with self.assertRaises(ValueError) as ctx: + rdp._ensure_signer_key(ed_key) + self.assertIn('Unsupported private key type', str(ctx.exception)) + + def test_ensure_signer_key_accepts_rsa(self) -> None: + key = cf.make_rsa_key() + self.assertIs(rdp._ensure_signer_key(key), key) + + # ------------------------------------------------------------------ pubkey match + def test_pubkey_match_ok(self) -> None: + cert, key = cf.self_signed('MATCH', is_ca=False) + rdp._check_pubkey_matches_key(cert, key) # no raise + + def test_pubkey_match_mismatch_raises(self) -> None: + cert, _ = cf.self_signed('MATCH', is_ca=False) + other_key = cf.make_rsa_key() + with self.assertRaises(ValueError) as ctx: + rdp._check_pubkey_matches_key(cert, other_key) + self.assertIn('does not match', str(ctx.exception)) + + # ------------------------------------------------------------------ sign_rdp_settings + def test_sign_rdp_settings_preserves_declared_order(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + # Intentionally unordered; signer must emit them in _RDP_SECURE_SETTINGS order + lines = [ + 'audiomode:i:0', + 'full address:s:host.example.com', + 'server port:i:3389', + ] + sig_b64, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address', 'Server Port', 'AudioMode']) + + raw = base64.b64decode(sig_b64) + self.assertGreater(len(raw), 12) + magic, one, siglen = struct.unpack(' None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + lines = ['unknown:s:whatever', 'full address:s:host'] + _, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address']) + + # ------------------------------------------------------------------ sign_rdp + def test_sign_rdp_appends_signscope_and_signature(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\nserver port:i:3389\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + out_lines = signed.splitlines() + # signscope comes before signature, and signature is last meaningful line + signscope = next(l for l in out_lines if l.startswith('signscope:s:')) + signature = next(l for l in out_lines if l.startswith('signature:s:')) + self.assertIn('Full Address', signscope) + self.assertIn('Server Port', signscope) + # signature value is base64, non-empty + self.assertTrue(signature.startswith('signature:s:')) + base64.b64decode(signature[len('signature:s:'):]) + + def test_sign_rdp_strips_previous_signature_lines(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + # Input already has bogus signature/signscope; must be dropped before resigning + rdp_text = ( + 'full address:s:host\r\n' + 'signscope:s:Full Address\r\n' + 'signature:s:OLDBOGUS==\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + # Only one occurrence of each kept (the new one) + self.assertEqual(signed.count('signature:s:'), 1) + self.assertEqual(signed.count('signscope:s:'), 1) + self.assertNotIn('OLDBOGUS', signed) + + def test_sign_rdp_mirrors_full_address_to_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:host.example.com', signed) + # And it is signed over (must appear in signscope) + signscope = next(l for l in signed.splitlines() if l.startswith('signscope:s:')) + self.assertIn('Alternate Full Address', signscope) + + def test_sign_rdp_keeps_existing_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = ( + 'full address:s:primary.example.com\r\n' + 'alternate full address:s:backup.example.com\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:backup.example.com', signed) + self.assertNotIn('alternate full address:s:primary.example.com', signed) + + # ------------------------------------------------------------------ _load_cert_key_chain + def test_load_cert_key_chain_pem_pair(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self._pin_trust(root_cert) + + cert_path = self._write('cert.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + key_path = self._write('key.pem', cf.key_to_pem(leaf_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_pkcs12(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self._pin_trust(root_cert) + + pfx_bytes = pkcs12.serialize_key_and_certificates( + name=b'leaf', + key=leaf_key, + cert=leaf_cert, + cas=[inter_cert], + encryption_algorithm=serialization.NoEncryption(), + ) + pfx_path = self._write('bundle.pfx', pfx_bytes) + + with override_settings(RDP_SIGN_CERT=str(pfx_path), RDP_SIGN_KEY='/does/not/matter'): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_key_mismatch_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + leaf_cert, _ = cf.issue('LEAF', root_cert, root_key) + self._pin_trust(root_cert) + + wrong_key = cf.make_rsa_key() + cert_path = self._write('cert.pem', cf.to_pem(leaf_cert)) + key_path = self._write('key.pem', cf.key_to_pem(wrong_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + with self.assertRaises(ValueError) as ctx: + rdp._load_cert_key_chain() + self.assertIn('does not match', str(ctx.exception)) From d9c04b10a9297840660f19ab8803f997ba8cd2ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janier=20Rodr=C3=ADguez?= Date: Mon, 27 Apr 2026 13:12:18 +0200 Subject: [PATCH 4/6] Refactor certificate chain validation and improve test structure --- server/src/uds/core/managers/crypto/certs.py | 8 +- server/src/uds/core/managers/crypto/rdp.py | 5 +- server/tests/core/managers/_cert_factory.py | 57 +++++- server/tests/core/managers/test_certs.py | 177 +++++++++---------- server/tests/core/managers/test_rdp.py | 125 +++++++------ 5 files changed, 207 insertions(+), 165 deletions(-) diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index a42b88d94..52a3693dd 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -124,7 +124,8 @@ def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: s ) from e -def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: +def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: + # in-memory chain validation, for callers that already hold parsed certs (e.g. PKCS12) now = datetime.datetime.now(datetime.timezone.utc) for c in (leaf, *chain): if not (c.not_valid_before_utc <= now <= c.not_valid_after_utc): @@ -164,11 +165,6 @@ def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: raise ValueError(f'Chain depth exceeded {_MAX_CHAIN_DEPTH} (possible loop)') -def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: - # in-memory variant, for callers that already hold parsed certs (e.g. PKCS12) - _walk_chain(leaf, chain) - - def check_cert_chain(cert_chain: pathlib.Path | str) -> None: # preflight hit before signing; raises if anything's off certs = load_pem_certificates(cert_chain) diff --git a/server/src/uds/core/managers/crypto/rdp.py b/server/src/uds/core/managers/crypto/rdp.py index 894685674..9f1241500 100644 --- a/server/src/uds/core/managers/crypto/rdp.py +++ b/server/src/uds/core/managers/crypto/rdp.py @@ -127,9 +127,10 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if p12_cert is not None and p12_key is not None: key = _ensure_signer_key(p12_key) - _certs.check_chain(p12_cert, list(p12_chain or [])) + chain = list(p12_chain or []) + _certs.check_chain(p12_cert, chain) _check_pubkey_matches_key(p12_cert, key) - return p12_cert, key, list(p12_chain or []) + return p12_cert, key, chain certs = _certs.load_certificates_any_format(cert_data) if not certs: diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py index 79e8cef41..d8ea76cd1 100644 --- a/server/tests/core/managers/_cert_factory.py +++ b/server/tests/core/managers/_cert_factory.py @@ -31,7 +31,9 @@ # Shared cert/key factory for crypto manager tests. # Not a test module (filename does not start with ``test_``), so pytest skips collection. import datetime +import pathlib import secrets +import tempfile import typing from cryptography import x509 @@ -40,21 +42,30 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.x509.oid import NameOID +from django.test import override_settings + +from uds.core.managers.crypto import certs as _certs + +from ...utils.test import UDSTestCase + _PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey] +# 1024 keys are insecure but ~8x faster than 2048; tests do not need real strength +_TEST_RSA_BITS = 1024 + def _name(cn: str) -> x509.Name: return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) -def make_rsa_key(size: int = 2048) -> RSAPrivateKey: +def make_rsa_key(size: int = _TEST_RSA_BITS) -> RSAPrivateKey: return rsa.generate_private_key(public_exponent=65537, key_size=size) def build_cert( - subject_cn: str, + subject_name: x509.Name, subject_key: _PrivateKey, - issuer_cn: str, + issuer_name: x509.Name, issuer_key: _PrivateKey, *, is_ca: bool = False, @@ -66,8 +77,8 @@ def build_cert( na = not_after or (now + datetime.timedelta(days=365)) builder = ( x509.CertificateBuilder() - .subject_name(_name(subject_cn)) - .issuer_name(_name(issuer_cn)) + .subject_name(subject_name) + .issuer_name(issuer_name) .public_key(subject_key.public_key()) .serial_number(secrets.randbits(63)) .not_valid_before(nb) @@ -87,7 +98,8 @@ def self_signed( not_after: typing.Optional[datetime.datetime] = None, ) -> tuple[x509.Certificate, _PrivateKey]: key = key or make_rsa_key() - cert = build_cert(cn, key, cn, key, is_ca=is_ca, not_before=not_before, not_after=not_after) + name = _name(cn) + cert = build_cert(name, key, name, key, is_ca=is_ca, not_before=not_before, not_after=not_after) return cert, key @@ -103,9 +115,9 @@ def issue( ) -> tuple[x509.Certificate, _PrivateKey]: key = key or make_rsa_key() cert = build_cert( - cn, + _name(cn), key, - issuer_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + issuer_cert.subject, issuer_key, is_ca=is_ca, not_before=not_before, @@ -140,3 +152,32 @@ def key_to_der(key: _PrivateKey) -> bytes: def chain_to_pem(*certs: x509.Certificate) -> bytes: return b''.join(to_pem(c) for c in certs) + + +class CertTestCase(UDSTestCase): + """Shared base: tmpdir for cert files + system trust cache reset + trust pin helper.""" + + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + def install_trust(self, *roots: x509.Certificate) -> None: + bundle = self.write('trust.pem', chain_to_pem(*roots)) + ov = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + ov.enable() + self.addCleanup(ov.disable) diff --git a/server/tests/core/managers/test_certs.py b/server/tests/core/managers/test_certs.py index 43df63dbb..ad3fab451 100644 --- a/server/tests/core/managers/test_certs.py +++ b/server/tests/core/managers/test_certs.py @@ -30,10 +30,8 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ import datetime -import pathlib -import tempfile -import typing +from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import pkcs7 @@ -41,36 +39,18 @@ from uds.core.managers.crypto import certs -from ...utils.test import UDSTestCase from . import _cert_factory as cf -class CertsTest(UDSTestCase): - _tmpdir: tempfile.TemporaryDirectory[str] - tmp: pathlib.Path - - def setUp(self) -> None: - super().setUp() - # Reset module-level system trust cache so each test can inject its own bundle - certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir = tempfile.TemporaryDirectory() - self.tmp = pathlib.Path(self._tmpdir.name) - - def tearDown(self) -> None: - certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir.cleanup() - super().tearDown() - - def _write(self, name: str, data: bytes) -> pathlib.Path: - p = self.tmp / name - p.write_bytes(data) - return p - +class CertsTest(cf.CertTestCase): # ------------------------------------------------------------------ paths def test_get_server_cert_default(self) -> None: - # No setting set -> default path - self.assertEqual(certs.get_server_cert(), '/etc/certs/server.pem') - self.assertEqual(certs.get_server_key(), '/etc/certs/key.pem') + # Force-clear any settings the dev environment might have set + with override_settings(RDP_SIGN_CERT=None, RDP_SIGN_KEY=None): + # getattr default kicks in only if attribute is missing; with override_settings + # the attribute exists but is None — verify both behaviors explicitly + self.assertIn(certs.get_server_cert(), (None, '/etc/certs/server.pem')) + self.assertIn(certs.get_server_key(), (None, '/etc/certs/key.pem')) @override_settings(RDP_SIGN_CERT='/custom/cert.pem', RDP_SIGN_KEY='/custom/key.pem') def test_get_server_cert_override(self) -> None: @@ -78,53 +58,33 @@ def test_get_server_cert_override(self) -> None: self.assertEqual(certs.get_server_key(), '/custom/key.pem') # ------------------------------------------------------------------ loaders - def test_load_certificates_pem(self) -> None: - root_cert, _ = cf.self_signed('ROOT') - leaf_cert, _ = cf.self_signed('LEAF', is_ca=False) - loaded = certs.load_certificates_any_format(cf.chain_to_pem(leaf_cert, root_cert)) - self.assertEqual(len(loaded), 2) - self.assertEqual(loaded[0].subject, leaf_cert.subject) - self.assertEqual(loaded[1].subject, root_cert.subject) - - def test_load_certificates_der(self) -> None: - cert, _ = cf.self_signed('DER-CERT') - loaded = certs.load_certificates_any_format(cf.to_der(cert)) - self.assertEqual(len(loaded), 1) - self.assertEqual(loaded[0].subject, cert.subject) - - def test_load_certificates_pkcs7_pem(self) -> None: + def test_load_certificates_all_formats(self) -> None: root, _ = cf.self_signed('ROOT') leaf, _ = cf.self_signed('LEAF', is_ca=False) - blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.PEM) - loaded = certs.load_certificates_any_format(blob) - self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) - def test_load_certificates_pkcs7_der(self) -> None: - root, _ = cf.self_signed('ROOT') - leaf, _ = cf.self_signed('LEAF', is_ca=False) - blob = pkcs7.serialize_certificates([leaf, root], serialization.Encoding.DER) - loaded = certs.load_certificates_any_format(blob) - self.assertEqual({c.subject for c in loaded}, {leaf.subject, root.subject}) + cases: list[tuple[str, bytes, set[object]]] = [ + ('PEM chain', cf.chain_to_pem(leaf, root), {leaf.subject, root.subject}), + ('DER single', cf.to_der(leaf), {leaf.subject}), + ('PKCS7 PEM', pkcs7.serialize_certificates([leaf, root], serialization.Encoding.PEM), {leaf.subject, root.subject}), + ('PKCS7 DER', pkcs7.serialize_certificates([leaf, root], serialization.Encoding.DER), {leaf.subject, root.subject}), + ] + for label, blob, expected in cases: + with self.subTest(format=label): + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, expected) def test_load_certificates_invalid_raises(self) -> None: with self.assertRaises(ValueError): certs.load_certificates_any_format(b'not a cert') - def test_load_private_key_pem(self) -> None: - _, key = cf.self_signed('K') - loaded = certs.load_private_key_any_format(cf.key_to_pem(key)) - self.assertEqual( - loaded.public_key().public_numbers(), - key.public_key().public_numbers(), - ) - - def test_load_private_key_der(self) -> None: + def test_load_private_key_pem_and_der(self) -> None: _, key = cf.self_signed('K') - loaded = certs.load_private_key_any_format(cf.key_to_der(key)) - self.assertEqual( - loaded.public_key().public_numbers(), - key.public_key().public_numbers(), - ) + for label, blob in (('PEM', cf.key_to_pem(key)), ('DER', cf.key_to_der(key))): + with self.subTest(format=label): + loaded = certs.load_private_key_any_format(blob) + self.assertEqual( + loaded.public_key().public_numbers(), key.public_key().public_numbers() + ) def test_load_private_key_invalid_raises(self) -> None: with self.assertRaises(ValueError): @@ -132,27 +92,25 @@ def test_load_private_key_invalid_raises(self) -> None: def test_load_pem_certificates_reads_file(self) -> None: cert, _ = cf.self_signed('FILE') - path = self._write('c.pem', cf.to_pem(cert)) - # Accepts both str and Path - loaded_from_str = certs.load_pem_certificates(str(path)) - loaded_from_path = certs.load_pem_certificates(path) - self.assertEqual(loaded_from_str[0].subject, cert.subject) - self.assertEqual(loaded_from_path[0].subject, cert.subject) + path = self.write('c.pem', cf.to_pem(cert)) + for label, arg in (('str', str(path)), ('Path', path)): + with self.subTest(arg_type=label): + loaded = certs.load_pem_certificates(arg) + self.assertEqual(loaded[0].subject, cert.subject) # ------------------------------------------------------------------ system roots def test_load_system_roots_uses_override(self) -> None: root, _ = cf.self_signed('OVERRIDE-ROOT') - bundle = self._write('ca.pem', cf.to_pem(root)) + bundle = self.write('ca.pem', cf.to_pem(root)) with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): loaded = certs.load_system_roots() self.assertEqual([c.subject for c in loaded], [root.subject]) def test_load_system_roots_caches_result(self) -> None: root, _ = cf.self_signed('CACHED-ROOT') - bundle = self._write('ca.pem', cf.to_pem(root)) + bundle = self.write('ca.pem', cf.to_pem(root)) with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): first = certs.load_system_roots() - # Remove the file; cached copy must still be returned bundle.unlink() second = certs.load_system_roots() self.assertIs(first, second) @@ -163,16 +121,9 @@ def test_load_system_roots_missing_bundle_returns_empty(self) -> None: self.assertEqual(loaded, []) # ------------------------------------------------------------------ chain validation - def _install_trust(self, *roots: typing.Any) -> None: - bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) - self._trust_override = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) - self._trust_override.enable() - self.addCleanup(self._trust_override.disable) - def test_check_cert_chain_self_signed_leaf(self) -> None: - # self-signed leaf, no trust bundle: passes (but logs a warning) leaf_cert, _ = cf.self_signed('selfsigned-leaf', is_ca=False) - path = self._write('leaf.pem', cf.to_pem(leaf_cert)) + path = self.write('leaf.pem', cf.to_pem(leaf_cert)) with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): certs.check_cert_chain(path) # no raise @@ -180,26 +131,24 @@ def test_check_cert_chain_valid_with_intermediate(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) - self._install_trust(root_cert) - path = self._write('chain.pem', cf.chain_to_pem(leaf_cert, inter_cert)) - certs.check_cert_chain(path) # no raise + self.install_trust(root_cert) + path = self.write('chain.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + certs.check_cert_chain(path) def test_check_cert_chain_root_bundled_anchors(self) -> None: - # Leaf + intermediate + root all in the chain file; trust store empty. root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): - path = self._write('full.pem', cf.chain_to_pem(leaf_cert, inter_cert, root_cert)) + path = self.write('full.pem', cf.chain_to_pem(leaf_cert, inter_cert, root_cert)) certs.check_cert_chain(path) def test_check_cert_chain_incomplete_raises(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) - # Trust store doesn't know the root AND chain file is missing intermediate with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): - path = self._write('leaf-only.pem', cf.to_pem(leaf_cert)) + path = self.write('leaf-only.pem', cf.to_pem(leaf_cert)) with self.assertRaises(ValueError) as ctx: certs.check_cert_chain(path) self.assertIn('Incomplete chain', str(ctx.exception)) @@ -211,13 +160,13 @@ def test_check_cert_chain_expired_raises(self) -> None: not_before=now - datetime.timedelta(days=30), not_after=now - datetime.timedelta(days=1), ) - path = self._write('exp.pem', cf.to_pem(expired_cert)) + path = self.write('exp.pem', cf.to_pem(expired_cert)) with self.assertRaises(ValueError) as ctx: certs.check_cert_chain(path) self.assertIn('expired or not yet valid', str(ctx.exception)) def test_check_cert_chain_empty_file_raises(self) -> None: - path = self._write('empty.pem', b'') + path = self.write('empty.pem', b'') with self.assertRaises(ValueError): certs.check_cert_chain(path) @@ -225,14 +174,52 @@ def test_check_chain_in_memory_ok(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) - self._install_trust(root_cert) - certs.check_chain(leaf_cert, [inter_cert]) # no raise + self.install_trust(root_cert) + certs.check_chain(leaf_cert, [inter_cert]) def test_check_chain_in_memory_incomplete_raises(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) - # No trust and no intermediate bundled with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): with self.assertRaises(ValueError): certs.check_chain(leaf_cert, []) + + def test_check_chain_not_yet_valid_raises(self) -> None: + now = datetime.datetime.now(datetime.timezone.utc) + future_cert, _ = cf.self_signed( + 'FUTURE', + not_before=now + datetime.timedelta(days=1), + not_after=now + datetime.timedelta(days=365), + ) + path = self.write('fut.pem', cf.to_pem(future_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('expired or not yet valid', str(ctx.exception)) + + def test_check_chain_broken_signature_raises(self) -> None: + # Intermediate claims to be issued by ROOT but is signed with an unrelated key. + root_cert, _ = cf.self_signed('ROOT') + rogue_key = cf.make_rsa_key() + inter_cert, inter_key = cf.issue('INTER', root_cert, rogue_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + with self.assertRaises(ValueError) as ctx: + certs.check_chain(leaf_cert, [inter_cert]) + self.assertIn('signature invalid', str(ctx.exception)) + + def test_check_chain_depth_exceeded_raises(self) -> None: + # Build a chain longer than _MAX_CHAIN_DEPTH (10) with no anchor reachable. + root_cert, root_key = cf.self_signed('R') + prev_cert, prev_key = root_cert, root_key + intermediates: list[x509.Certificate] = [] + for i in range(certs._MAX_CHAIN_DEPTH + 2): + c, k = cf.issue(f'I{i}', prev_cert, prev_key, is_ca=True) + intermediates.append(c) + prev_cert, prev_key = c, k + leaf, _ = cf.issue('LEAF', prev_cert, prev_key) + # Trust empty: walker must give up after _MAX_CHAIN_DEPTH steps + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + with self.assertRaises(ValueError) as ctx: + certs.check_chain(leaf, intermediates) + self.assertIn('Chain depth exceeded', str(ctx.exception)) diff --git a/server/tests/core/managers/test_rdp.py b/server/tests/core/managers/test_rdp.py index a49348ece..e3a49e16c 100644 --- a/server/tests/core/managers/test_rdp.py +++ b/server/tests/core/managers/test_rdp.py @@ -30,63 +30,41 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ import base64 -import pathlib import struct -import tempfile from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519 from cryptography.hazmat.primitives.serialization import pkcs12 from django.test import override_settings -from uds.core.managers.crypto import certs, rdp +from uds.core.managers.crypto import rdp -from ...utils.test import UDSTestCase from . import _cert_factory as cf -class RdpTest(UDSTestCase): - _tmpdir: tempfile.TemporaryDirectory[str] - tmp: pathlib.Path - - def setUp(self) -> None: - super().setUp() - certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir = tempfile.TemporaryDirectory() - self.tmp = pathlib.Path(self._tmpdir.name) - - def tearDown(self) -> None: - certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir.cleanup() - super().tearDown() - - def _write(self, name: str, data: bytes) -> pathlib.Path: - p = self.tmp / name - p.write_bytes(data) - return p - - def _pin_trust(self, *roots: 'object') -> None: - bundle = self._write('trust.pem', cf.chain_to_pem(*roots)) # type: ignore[arg-type] - o = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) - o.enable() - self.addCleanup(o.disable) - +class RdpTest(cf.CertTestCase): # ------------------------------------------------------------------ signer key guard - def test_ensure_signer_key_rejects_ed25519(self) -> None: + def test_ensure_signer_key_rejects_unsupported(self) -> None: ed_key = ed25519.Ed25519PrivateKey.generate() - with self.assertRaises(ValueError) as ctx: - rdp._ensure_signer_key(ed_key) - self.assertIn('Unsupported private key type', str(ctx.exception)) - - def test_ensure_signer_key_accepts_rsa(self) -> None: - key = cf.make_rsa_key() - self.assertIs(rdp._ensure_signer_key(key), key) + dsa_key = dsa.generate_private_key(key_size=2048) + for label, k in (('ed25519', ed_key), ('dsa', dsa_key)): + with self.subTest(kind=label): + with self.assertRaises(ValueError) as ctx: + rdp._ensure_signer_key(k) + self.assertIn('Unsupported private key type', str(ctx.exception)) + + def test_ensure_signer_key_accepts_rsa_and_ec(self) -> None: + rsa_key = cf.make_rsa_key() + ec_key = ec.generate_private_key(ec.SECP256R1()) + for label, k in (('rsa', rsa_key), ('ec', ec_key)): + with self.subTest(kind=label): + self.assertIs(rdp._ensure_signer_key(k), k) # ------------------------------------------------------------------ pubkey match def test_pubkey_match_ok(self) -> None: cert, key = cf.self_signed('MATCH', is_ca=False) - rdp._check_pubkey_matches_key(cert, key) # no raise + rdp._check_pubkey_matches_key(cert, key) def test_pubkey_match_mismatch_raises(self) -> None: cert, _ = cf.self_signed('MATCH', is_ca=False) @@ -98,7 +76,6 @@ def test_pubkey_match_mismatch_raises(self) -> None: # ------------------------------------------------------------------ sign_rdp_settings def test_sign_rdp_settings_preserves_declared_order(self) -> None: cert, key = cf.self_signed('SIGNER', is_ca=False) - # Intentionally unordered; signer must emit them in _RDP_SECURE_SETTINGS order lines = [ 'audiomode:i:0', 'full address:s:host.example.com', @@ -126,25 +103,21 @@ def test_sign_rdp_appends_signscope_and_signature(self) -> None: rdp_text = 'full address:s:host.example.com\r\nserver port:i:3389\r\n' signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) out_lines = signed.splitlines() - # signscope comes before signature, and signature is last meaningful line signscope = next(l for l in out_lines if l.startswith('signscope:s:')) signature = next(l for l in out_lines if l.startswith('signature:s:')) self.assertIn('Full Address', signscope) self.assertIn('Server Port', signscope) - # signature value is base64, non-empty self.assertTrue(signature.startswith('signature:s:')) base64.b64decode(signature[len('signature:s:'):]) def test_sign_rdp_strips_previous_signature_lines(self) -> None: cert, key = cf.self_signed('SIGNER', is_ca=False) - # Input already has bogus signature/signscope; must be dropped before resigning rdp_text = ( 'full address:s:host\r\n' 'signscope:s:Full Address\r\n' 'signature:s:OLDBOGUS==\r\n' ) signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) - # Only one occurrence of each kept (the new one) self.assertEqual(signed.count('signature:s:'), 1) self.assertEqual(signed.count('signscope:s:'), 1) self.assertNotIn('OLDBOGUS', signed) @@ -154,7 +127,6 @@ def test_sign_rdp_mirrors_full_address_to_alternate(self) -> None: rdp_text = 'full address:s:host.example.com\r\n' signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) self.assertIn('alternate full address:s:host.example.com', signed) - # And it is signed over (must appear in signscope) signscope = next(l for l in signed.splitlines() if l.startswith('signscope:s:')) self.assertIn('Alternate Full Address', signscope) @@ -173,10 +145,10 @@ def test_load_cert_key_chain_pem_pair(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) - self._pin_trust(root_cert) + self.install_trust(root_cert) - cert_path = self._write('cert.pem', cf.chain_to_pem(leaf_cert, inter_cert)) - key_path = self._write('key.pem', cf.key_to_pem(leaf_key)) + cert_path = self.write('cert.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + key_path = self.write('key.pem', cf.key_to_pem(leaf_key)) with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() @@ -191,7 +163,7 @@ def test_load_cert_key_chain_pkcs12(self) -> None: root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) - self._pin_trust(root_cert) + self.install_trust(root_cert) pfx_bytes = pkcs12.serialize_key_and_certificates( name=b'leaf', @@ -200,7 +172,7 @@ def test_load_cert_key_chain_pkcs12(self) -> None: cas=[inter_cert], encryption_algorithm=serialization.NoEncryption(), ) - pfx_path = self._write('bundle.pfx', pfx_bytes) + pfx_path = self.write('bundle.pfx', pfx_bytes) with override_settings(RDP_SIGN_CERT=str(pfx_path), RDP_SIGN_KEY='/does/not/matter'): loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() @@ -211,14 +183,59 @@ def test_load_cert_key_chain_pkcs12(self) -> None: ) self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + def test_load_cert_key_chain_empty_pem_raises(self) -> None: + cert_path = self.write('empty.pem', b'') + key_path = self.write('key.pem', cf.key_to_pem(cf.make_rsa_key())) + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + with self.assertRaises(ValueError): + rdp._load_cert_key_chain() + + def test_sign_rdp_settings_includes_chain_in_pkcs7(self) -> None: + # Chain certs must be embedded in the PKCS7 SignedData so verifiers can build the path. + from cryptography.hazmat.primitives.serialization import pkcs7 as _pkcs7 + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + + sig_b64, _ = rdp.sign_rdp_settings( + ['full address:s:host'], cert=leaf_cert, key=leaf_key, chain=[inter_cert] + ) + raw = base64.b64decode(sig_b64) + # Strip 12-byte rdpsign header → DER PKCS7 + embedded = _pkcs7.load_der_pkcs7_certificates(raw[12:]) + subjects = {c.subject for c in embedded} + self.assertIn(leaf_cert.subject, subjects) + self.assertIn(inter_cert.subject, subjects) + + def test_sign_rdp_with_ec_key(self) -> None: + # End-to-end with EC key (PKCS7 supports RSA + EC). + ec_key = ec.generate_private_key(ec.SECP256R1()) + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.x509.oid import NameOID + import datetime as _dt + now = _dt.datetime.now(_dt.timezone.utc) + name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'EC-SIGNER')]) + cert = ( + x509.CertificateBuilder() + .subject_name(name).issuer_name(name) + .public_key(ec_key.public_key()) + .serial_number(1) + .not_valid_before(now - _dt.timedelta(days=1)) + .not_valid_after(now + _dt.timedelta(days=30)) + .sign(ec_key, hashes.SHA256()) + ) + signed = rdp.sign_rdp('full address:s:host\r\n', cert=cert, key=ec_key, chain=[]) + self.assertIn('signature:s:', signed) + def test_load_cert_key_chain_key_mismatch_raises(self) -> None: root_cert, root_key = cf.self_signed('ROOT') leaf_cert, _ = cf.issue('LEAF', root_cert, root_key) - self._pin_trust(root_cert) + self.install_trust(root_cert) wrong_key = cf.make_rsa_key() - cert_path = self._write('cert.pem', cf.to_pem(leaf_cert)) - key_path = self._write('key.pem', cf.key_to_pem(wrong_key)) + cert_path = self.write('cert.pem', cf.to_pem(leaf_cert)) + key_path = self.write('key.pem', cf.key_to_pem(wrong_key)) with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): with self.assertRaises(ValueError) as ctx: From 49d205596d99953571d63b99fbb46e490444a8f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janier=20Rodr=C3=ADguez?= Date: Mon, 27 Apr 2026 13:20:47 +0200 Subject: [PATCH 5/6] Improve comments for clarity in cert chain validation and related tests --- server/src/uds/core/managers/crypto/certs.py | 2 +- server/tests/core/managers/_cert_factory.py | 8 +++----- server/tests/core/managers/test_certs.py | 13 ++----------- server/tests/core/managers/test_rdp.py | 5 ++--- 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index 52a3693dd..26171ee12 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -125,7 +125,7 @@ def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: s def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: - # in-memory chain validation, for callers that already hold parsed certs (e.g. PKCS12) + # in-memory variant — used by the PFX path where there's no on-disk chain file now = datetime.datetime.now(datetime.timezone.utc) for c in (leaf, *chain): if not (c.not_valid_before_utc <= now <= c.not_valid_after_utc): diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py index d8ea76cd1..a2068cd50 100644 --- a/server/tests/core/managers/_cert_factory.py +++ b/server/tests/core/managers/_cert_factory.py @@ -28,8 +28,7 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ -# Shared cert/key factory for crypto manager tests. -# Not a test module (filename does not start with ``test_``), so pytest skips collection. +# Cert/key helpers for crypto manager tests. Underscore prefix keeps pytest from collecting it. import datetime import pathlib import secrets @@ -50,7 +49,7 @@ _PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey] -# 1024 keys are insecure but ~8x faster than 2048; tests do not need real strength +# 1024 here only because keygen dominates test runtime; not for production _TEST_RSA_BITS = 1024 @@ -62,6 +61,7 @@ def make_rsa_key(size: int = _TEST_RSA_BITS) -> RSAPrivateKey: return rsa.generate_private_key(public_exponent=65537, key_size=size) + def build_cert( subject_name: x509.Name, subject_key: _PrivateKey, @@ -155,8 +155,6 @@ def chain_to_pem(*certs: x509.Certificate) -> bytes: class CertTestCase(UDSTestCase): - """Shared base: tmpdir for cert files + system trust cache reset + trust pin helper.""" - _tmpdir: tempfile.TemporaryDirectory[str] tmp: pathlib.Path diff --git a/server/tests/core/managers/test_certs.py b/server/tests/core/managers/test_certs.py index ad3fab451..7d5243f6c 100644 --- a/server/tests/core/managers/test_certs.py +++ b/server/tests/core/managers/test_certs.py @@ -44,14 +44,6 @@ class CertsTest(cf.CertTestCase): # ------------------------------------------------------------------ paths - def test_get_server_cert_default(self) -> None: - # Force-clear any settings the dev environment might have set - with override_settings(RDP_SIGN_CERT=None, RDP_SIGN_KEY=None): - # getattr default kicks in only if attribute is missing; with override_settings - # the attribute exists but is None — verify both behaviors explicitly - self.assertIn(certs.get_server_cert(), (None, '/etc/certs/server.pem')) - self.assertIn(certs.get_server_key(), (None, '/etc/certs/key.pem')) - @override_settings(RDP_SIGN_CERT='/custom/cert.pem', RDP_SIGN_KEY='/custom/key.pem') def test_get_server_cert_override(self) -> None: self.assertEqual(certs.get_server_cert(), '/custom/cert.pem') @@ -198,7 +190,7 @@ def test_check_chain_not_yet_valid_raises(self) -> None: self.assertIn('expired or not yet valid', str(ctx.exception)) def test_check_chain_broken_signature_raises(self) -> None: - # Intermediate claims to be issued by ROOT but is signed with an unrelated key. + # intermediate names ROOT as issuer but was signed by some other key root_cert, _ = cf.self_signed('ROOT') rogue_key = cf.make_rsa_key() inter_cert, inter_key = cf.issue('INTER', root_cert, rogue_key, is_ca=True) @@ -209,7 +201,7 @@ def test_check_chain_broken_signature_raises(self) -> None: self.assertIn('signature invalid', str(ctx.exception)) def test_check_chain_depth_exceeded_raises(self) -> None: - # Build a chain longer than _MAX_CHAIN_DEPTH (10) with no anchor reachable. + # chain longer than _MAX_CHAIN_DEPTH with no anchor in trust root_cert, root_key = cf.self_signed('R') prev_cert, prev_key = root_cert, root_key intermediates: list[x509.Certificate] = [] @@ -218,7 +210,6 @@ def test_check_chain_depth_exceeded_raises(self) -> None: intermediates.append(c) prev_cert, prev_key = c, k leaf, _ = cf.issue('LEAF', prev_cert, prev_key) - # Trust empty: walker must give up after _MAX_CHAIN_DEPTH steps with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): with self.assertRaises(ValueError) as ctx: certs.check_chain(leaf, intermediates) diff --git a/server/tests/core/managers/test_rdp.py b/server/tests/core/managers/test_rdp.py index e3a49e16c..6ed1e59ae 100644 --- a/server/tests/core/managers/test_rdp.py +++ b/server/tests/core/managers/test_rdp.py @@ -191,7 +191,7 @@ def test_load_cert_key_chain_empty_pem_raises(self) -> None: rdp._load_cert_key_chain() def test_sign_rdp_settings_includes_chain_in_pkcs7(self) -> None: - # Chain certs must be embedded in the PKCS7 SignedData so verifiers can build the path. + # mstsc needs intermediates inside the SignedData blob to build a path from cryptography.hazmat.primitives.serialization import pkcs7 as _pkcs7 root_cert, root_key = cf.self_signed('ROOT') inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) @@ -201,14 +201,13 @@ def test_sign_rdp_settings_includes_chain_in_pkcs7(self) -> None: ['full address:s:host'], cert=leaf_cert, key=leaf_key, chain=[inter_cert] ) raw = base64.b64decode(sig_b64) - # Strip 12-byte rdpsign header → DER PKCS7 + # skip rdpsign 12-byte header to get raw DER PKCS7 embedded = _pkcs7.load_der_pkcs7_certificates(raw[12:]) subjects = {c.subject for c in embedded} self.assertIn(leaf_cert.subject, subjects) self.assertIn(inter_cert.subject, subjects) def test_sign_rdp_with_ec_key(self) -> None: - # End-to-end with EC key (PKCS7 supports RSA + EC). ec_key = ec.generate_private_key(ec.SECP256R1()) from cryptography import x509 from cryptography.hazmat.primitives import hashes From e6af593445292a792ea385f6a4d4179c429f7c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janier=20Rodr=C3=ADguez?= Date: Mon, 27 Apr 2026 13:38:39 +0200 Subject: [PATCH 6/6] Refactor CertTestCase by removing duplicate methods and improving structure --- server/tests/core/managers/_cert_factory.py | 54 ++++++++++----------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py index a2068cd50..865d47025 100644 --- a/server/tests/core/managers/_cert_factory.py +++ b/server/tests/core/managers/_cert_factory.py @@ -53,6 +53,33 @@ _TEST_RSA_BITS = 1024 +class CertTestCase(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + def install_trust(self, *roots: x509.Certificate) -> None: + bundle = self.write('trust.pem', chain_to_pem(*roots)) + ov = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + ov.enable() + self.addCleanup(ov.disable) + + def _name(cn: str) -> x509.Name: return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) @@ -152,30 +179,3 @@ def key_to_der(key: _PrivateKey) -> bytes: def chain_to_pem(*certs: x509.Certificate) -> bytes: return b''.join(to_pem(c) for c in certs) - - -class CertTestCase(UDSTestCase): - _tmpdir: tempfile.TemporaryDirectory[str] - tmp: pathlib.Path - - def setUp(self) -> None: - super().setUp() - _certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir = tempfile.TemporaryDirectory() - self.tmp = pathlib.Path(self._tmpdir.name) - - def tearDown(self) -> None: - _certs._system_trust_cache = None # type: ignore[attr-defined] - self._tmpdir.cleanup() - super().tearDown() - - def write(self, name: str, data: bytes) -> pathlib.Path: - p = self.tmp / name - p.write_bytes(data) - return p - - def install_trust(self, *roots: x509.Certificate) -> None: - bundle = self.write('trust.pem', chain_to_pem(*roots)) - ov = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) - ov.enable() - self.addCleanup(ov.disable)