From c92cb2b2db199a6ac697289f2197847770c0866e Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 00:26:30 +0200 Subject: [PATCH 1/7] time zone refactor --- framework/python/src/core/session.py | 12 ++++++++---- framework/requirements.txt | 3 +++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/framework/python/src/core/session.py b/framework/python/src/core/session.py index d873a94a4..b4a509b79 100644 --- a/framework/python/src/core/session.py +++ b/framework/python/src/core/session.py @@ -29,6 +29,8 @@ from cryptography.x509.oid import NameOID from cryptography.hazmat.backends import default_backend +import tzlocal + NETWORK_KEY = 'network' DEVICE_INTF_KEY = 'device_intf' INTERNET_INTF_KEY = 'internet_intf' @@ -163,10 +165,12 @@ def __init__(self, root_dir): self.load_certs() # Fetch the timezone of the host system - tz = util.run_command('cat /etc/timezone') - # TODO: Check if timezone is fetched successfully - self._timezone = tz[0] - LOGGER.debug(f'System timezone is {self._timezone}') + try: + local_timezone = tzlocal.get_localzone() + return str(local_timezone) + except Exception as e: + LOGGER.error(f'Error getting local timezone: {e}') + return 'UTC' # MQTT client self._mqtt_client = mqtt.MQTT() diff --git a/framework/requirements.txt b/framework/requirements.txt index 5361caef1..032823769 100644 --- a/framework/requirements.txt +++ b/framework/requirements.txt @@ -44,3 +44,6 @@ APScheduler==3.10.4 Jinja2==3.1.6 beautifulsoup4==4.12.3 html5lib==1.1 + +#timezone +tzlocal==5.4.3 \ No newline at end of file From 3dc174b84ed354fc71aef817e058f69665059e69 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 00:31:37 +0200 Subject: [PATCH 2/7] pylint --- framework/python/src/core/session.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/framework/python/src/core/session.py b/framework/python/src/core/session.py index b4a509b79..035c1b28e 100644 --- a/framework/python/src/core/session.py +++ b/framework/python/src/core/session.py @@ -166,11 +166,11 @@ def __init__(self, root_dir): # Fetch the timezone of the host system try: - local_timezone = tzlocal.get_localzone() - return str(local_timezone) + self._timezone = str(tzlocal.get_localzone()) except Exception as e: LOGGER.error(f'Error getting local timezone: {e}') - return 'UTC' + self._timezone = 'UTC' + LOGGER.debug(f'System timezone is {self._timezone}') # MQTT client self._mqtt_client = mqtt.MQTT() From e992eb152e06424798cb024ba40795561845290d Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Mon, 22 Jun 2026 20:37:56 +0200 Subject: [PATCH 3/7] run framework tests script --- testing/unit/run_framework_test.sh | 59 ++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100755 testing/unit/run_framework_test.sh diff --git a/testing/unit/run_framework_test.sh b/testing/unit/run_framework_test.sh new file mode 100755 index 000000000..c16e74208 --- /dev/null +++ b/testing/unit/run_framework_test.sh @@ -0,0 +1,59 @@ +#!/bin/bash -e + +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Must be run from the root directory of Testrun +run_test(){ + + # Activate Python virtual environment + source venv/bin/activate + + # Add the framework sources + PYTHONPATH="$PWD/framework/python/src:$PWD/framework/python/src/common:$PWD/framework/python/src/core:$PWD/framework/python/src/framework" + + # Set the python path with all sources + export PYTHONPATH + + # Temporarily disable 'set -e' to capture exit code + set +e + + # Run all host level unit tests from within the venv + pytest testing/unit/framework + + # Capture the exit code + local exit_code=$? + + deactivate + + # Return the captured exit code to the caller + return $exit_code +} + + +# Call the run_test function with the provided arguments +run_test + +# Capture the exit code from the run_test function +exit_code=$? + +# If the exit code is not zero, print an error message +if [ $exit_code -ne 0 ]; then + echo "Tests failed with exit code $exit_code" +else + echo "All tests passed successfully." +fi + +# Exit with the captured exit code +exit $exit_code \ No newline at end of file From 9445793f5b7afd5830aff404fb5682047a8c7e92 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 11:08:58 +0200 Subject: [PATCH 4/7] utils unit tests --- testing/unit/framework/util_test.py | 232 +++++++++++++++++++++++----- 1 file changed, 194 insertions(+), 38 deletions(-) diff --git a/testing/unit/framework/util_test.py b/testing/unit/framework/util_test.py index ec8fd48fc..4b5ef8af8 100644 --- a/testing/unit/framework/util_test.py +++ b/testing/unit/framework/util_test.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,48 +14,204 @@ """Util tests""" -from collections import namedtuple -from unittest.mock import patch +from unittest.mock import patch, MagicMock +import pytest from common import util -from net_orc import ip_control - -Snicaddr = namedtuple('snicaddr', - ['family', 'address']) - -mock_addrs = { - 'eth0': [Snicaddr(17, '00:1A:2B:3C:4D:5E')], - 'wlan0': [Snicaddr(17, '66:77:88:99:AA:BB')], - 'enp0s3': [Snicaddr(17, '11:22:33:44:55:66')] -} - -@patch('psutil.net_if_addrs') -def test_get_sys_interfaces(mock_net_if_addrs): - mock_net_if_addrs.return_value = mock_addrs - # Expected result - expected = { - 'eth0': '00:1A:2B:3C:4D:5E', - 'enp0s3': '11:22:33:44:55:66' - } - result = ip_control.IPControl.get_sys_interfaces() - # Assert the result - assert result == expected + +# Tests for run_command() + +@patch('subprocess.Popen') +def test_run_command_success_with_output(mock_popen: MagicMock): + # Setup mock process + mock_process = MagicMock() + mock_process.communicate.return_value = (b'my_output\n', b'') + mock_process.returncode = 0 + mock_popen.return_value.__enter__.return_value = mock_process + + out, err = util.run_command('ls -la', output=True) + assert out == 'my_output' + assert err == b'' + mock_popen.assert_called_once() + + +@patch('subprocess.Popen') +def test_run_command_success_no_output(mock_popen: MagicMock): + mock_process = MagicMock() + mock_process.communicate.return_value = (b'', b'') + mock_process.returncode = 0 + mock_popen.return_value.__enter__.return_value = mock_process + + success = util.run_command('ls -la', output=False) + assert success is True + + +@patch('subprocess.Popen') +@patch.object(util, 'LOGGER') +def test_run_command_failure_with_logging( + mock_logger: MagicMock, + mock_popen: MagicMock +): + mock_process = MagicMock() + mock_process.communicate.return_value = (b'', b'Permission denied') + mock_process.returncode = 1 + mock_popen.return_value.__enter__.return_value = mock_process + + out, err = util.run_command('rm -rf /', output=True, supress_error=False) + assert out == '' + assert err == b'Permission denied' + mock_logger.error.assert_any_call('Command failed: rm -rf /') + + +@patch('subprocess.Popen') +@patch.object(util, 'LOGGER') +def test_run_command_failure_suppress_error( + mock_logger: MagicMock, + mock_popen: MagicMock +): + mock_process = MagicMock() + mock_process.communicate.return_value = (b'', b'Some error') + mock_process.returncode = 1 + mock_popen.return_value.__enter__.return_value = mock_process + + out, err = util.run_command('some_cmd', output=True, supress_error=True) + assert out == '' + assert err == b'Some error' + mock_logger.error.assert_not_called() + + +# Tests for interface_exists() + +@patch('netifaces.interfaces') +def test_interface_exists(mock_interfaces: MagicMock): + mock_interfaces.return_value = ['lo', 'eth0', 'wlan0'] + assert util.interface_exists('eth0') is True + assert util.interface_exists('eth1') is False + + +# Tests for prettify() + +def test_prettify_mac(): + # Prettify expects a string (or a mock byte-like string) + mac_string = '\x00\x0a\x95\x9d\x68\x16' + assert util.prettify(mac_string) == '00:0a:95:9d:68:16' + + +# Tests for get_sudo_user() + +@patch.dict('os.environ', {'SUDO_USER': 'root_user'}) +def test_get_sudo_user_present(): + assert util.get_sudo_user() == 'root_user' + + +@patch.dict('os.environ', {}, clear=True) +def test_get_sudo_user_absent(): + assert util.get_sudo_user() is None + +# Tests for get_pwd_user() + +@patch('os.getuid') +@patch('pwd.getpwuid') +def test_get_pwd_user_success( + mock_getpwuid: MagicMock, + mock_getuid: MagicMock +): + mock_getuid.return_value = 1000 + mock_pw = MagicMock() + mock_pw.pw_name = 'testuser' + mock_getpwuid.return_value = mock_pw + + assert util.get_pwd_user() == 'testuser' + mock_getpwuid.assert_called_once_with(1000) -def test_diff_dicts(): +# Tests for get_host_user() + +@patch.object(util, 'get_sudo_user') +def test_get_host_user_via_sudo(mock_get_sudo_user: MagicMock): + mock_get_sudo_user.return_value = 'sudo_dev' + assert util.get_host_user() == 'sudo_dev' + + +@patch('util.get_sudo_user') +@patch('os.getlogin') +def test_get_host_user_via_getlogin( + mock_getlogin: MagicMock, + mock_get_sudo_user: MagicMock +): + mock_get_sudo_user.return_value = None + mock_getlogin.return_value = 'login_dev' + assert util.get_host_user() == 'login_dev' + + +@patch('util.get_sudo_user') +@patch('os.getlogin') +@patch('getpass.getuser') +def test_get_host_user_via_getpass( + mock_getuser: MagicMock, + mock_getlogin: MagicMock, + mock_get_sudo_user: MagicMock +): + mock_get_sudo_user.return_value = None + mock_getlogin.side_effect = OSError('No tty') + mock_getuser.return_value = 'getpass_dev' + assert util.get_host_user() == 'getpass_dev' + + +# Tests for set_file_owner() + +@patch(f'{util.__name__}.run_command') +def test_set_file_owner(mock_run_command: MagicMock): + util.set_file_owner('/path/to/file', 'admin') + mock_run_command.assert_called_once_with('chown -R admin /path/to/file') + + +# Tests for get_module_display_name() + +@pytest.mark.parametrize('search_name, expected_display', [ + ('ntp', 'NTP'), + ('dns', 'DNS'), + ('connection', 'Connection'), + ('services', 'Services'), + ('tls', 'TLS'), + ('protocol', 'Protocol'), + ('unknown_mod', 'Unknown'), +]) +def test_get_module_display_name( + search_name: str, + expected_display: str +): + assert util.get_module_display_name(search_name) == expected_display + + +# Tests for diff_dicts() + +def test_diff_dicts_identical(): d1 = {'a': 1, 'b': 2} d2 = {'a': 1, 'b': 2} - #Assert equal dicts - assert not util.diff_dicts(d1, d2) - d2 = {'a': 1, 'c': 3} - expected = {'items_removed': {'b': 2},'items_added': {'c': 3}} - #Assert items added adn removed - assert util.diff_dicts(d1, d2) == expected + assert util.diff_dicts(d1, d2) == {} + + +def test_diff_dicts_items_removed(): + d1 = {'a': 1, 'b': 2} + d2 = {'a': 1} + assert util.diff_dicts(d1, d2) == { + 'items_removed': {'b': 2} + } + + +def test_diff_dicts_items_added(): d1 = {'a': 1} - d2 = {'b': 2} - expected = { - 'items_removed': {'a': 1}, - 'items_added': {'b': 2} + d2 = {'a': 1, 'c': 3} + assert util.diff_dicts(d1, d2) == { + 'items_added': {'c': 3} } - #Assert completely different dicts - assert util.diff_dicts(d1, d2) == expected + + +def test_diff_dicts_mixed(): + d1 = {'a': 1, 'b': 2} + d2 = {'a': 1, 'c': 3} + assert util.diff_dicts(d1, d2) == { + 'items_removed': {'b': 2}, + 'items_added': {'c': 3} + } \ No newline at end of file From 51ae087a5dffb475c12db27fd991cf29167ba9cb Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 11:14:19 +0200 Subject: [PATCH 5/7] pylint --- testing/unit/framework/util_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/unit/framework/util_test.py b/testing/unit/framework/util_test.py index 4b5ef8af8..9cfa68c97 100644 --- a/testing/unit/framework/util_test.py +++ b/testing/unit/framework/util_test.py @@ -214,4 +214,4 @@ def test_diff_dicts_mixed(): assert util.diff_dicts(d1, d2) == { 'items_removed': {'b': 2}, 'items_added': {'c': 3} - } \ No newline at end of file + } From 8eb1a4a67141491dbecdf2f7b4346eff5141d5c7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 12:23:55 +0200 Subject: [PATCH 6/7] session unit tests --- testing/unit/framework/session_test.py | 402 +++++++++++++++++++++++-- 1 file changed, 375 insertions(+), 27 deletions(-) diff --git a/testing/unit/framework/session_test.py b/testing/unit/framework/session_test.py index 8c48c6046..d1207e66d 100644 --- a/testing/unit/framework/session_test.py +++ b/testing/unit/framework/session_test.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,44 +14,392 @@ """Session methods tests""" -from unittest.mock import patch +import datetime +import os +import pytest +import pytz +from unittest.mock import patch, MagicMock, mock_open + +from common.statuses import TestResult +from common.device import Device from core import session -class MockUtil: - """mock util functions""" +def create_mock_cert( + common_name: str = "TestCert", + org_name: str = "TestOrg", + expired: bool = False, + self_signed: bool = True +) -> MagicMock: + mock_cert = MagicMock() + mock_cn_attr = MagicMock() + mock_cn_attr.value = common_name + mock_cert.subject.get_attributes_for_oid.return_value = [mock_cn_attr] + + mock_org_attr = MagicMock() + mock_org_attr.value = org_name + mock_cert.issuer.get_attributes_for_oid.return_value = [mock_org_attr] + date_now = datetime.datetime.now(pytz.utc) + if expired: + mock_cert.not_valid_after_utc = date_now - datetime.timedelta(days=1) + else: + mock_cert.not_valid_after_utc = date_now + datetime.timedelta(days=365) + + if self_signed: + mock_cert.issuer = mock_cert.subject + else: + mock_cert.issuer = MagicMock() + return mock_cert + - @staticmethod - def get_sys_interfaces(): - return {"eth0": "00:1A:2B:3C:4D:5E", "eth1": "66:77:88:99:AA:BB"} +@pytest.fixture +def mock_dependencies(): + """Fixture to globally mock system-level side effects during __init__.""" + with patch("session.util.get_host_user", return_value="testuser"), \ + patch("session.util.run_command", return_value=("1.0.0", b"")), \ + patch("session.mqtt.MQTT") as mock_mqtt_cls, \ + patch("session.IPControl") as mock_ip_control, \ + patch("os.path.isfile", return_value=False), \ + patch("os.path.exists", return_value=True), \ + patch("os.listdir", return_value=[]), \ + patch("builtins.open", mock_open(read_data="[]")): - @staticmethod - def diff_dicts(d1, d2): # pylint: disable=W0613 - return { - "items_added": {"eth1": "66:77:88:99:AA:BB"}, - "items_removed": {"eth2": "00:1B:2C:3D:4E:5F"}, + # Prepare a mock MQTT client instance + mock_mqtt_inst = MagicMock() + mock_mqtt_cls.return_value = mock_mqtt_inst + + yield { + "mqtt": mock_mqtt_inst, + "ip_control": mock_ip_control } -class TestrunSessionMock(session.TestrunSession): - def __init__(self): # pylint: disable=W0231 - self._ifaces = {"eth0": "00:1A:2B:3C:4D:5E", "eth2": "66:77:88:99:AA:BB"} +@pytest.fixture +def session_instance(mock_dependencies: dict): #pylint: disable=W0613, W0621 + """Fixture to instantiate a TestrunSession with clean mocked states.""" + sess = session.TestrunSession(root_dir="/fake/root") + return sess + +def test_session_init_default_config( + session_instance: session.TestrunSession #pylint: disable=W0621 +): + """Test that default configuration dictionary is correctly generated.""" + assert session_instance.get_config()["startup_timeout"] == 60 + assert session_instance.get_config()["log_level"] == "INFO" + assert session_instance.get_host_user() == "testuser" + assert session_instance.get_timezone() == "UTC" -util = MockUtil() +@patch("session.util.run_command") +def test_load_version_via_dpkg( + mock_run_cmd: MagicMock, + mock_dependencies: dict #pylint: disable=W0613, W0621 + ): + """Test that version is fetched successfully from dpkg database.""" + mock_run_cmd.return_value = ("2.3.1", b"") # No stderr + sess = session.TestrunSession(root_dir="/fake/root") + assert sess.get_version() == "2.3.1" -@patch("common.util.get_sys_interfaces", side_effect=util.get_sys_interfaces) -@patch("common.util.diff_dicts", side_effect=util.diff_dicts) -def test_detect_network_adapters_change( - mock_get_sys_interfaces, # pylint: disable=W0613 - mock_diff_dicts, # pylint: disable=W0613 +@patch("session.util.run_command") +@patch("os.path.exists") +def test_load_version_via_make_control( + mock_exists: MagicMock, + mock_run_cmd: MagicMock, + mock_dependencies: dict #pylint: disable=W0613, W0621 ): - testrun_session = TestrunSessionMock() + """Test version fallback to make control file if dpkg fails.""" + mock_run_cmd.side_effect = [ + ("", b"error_dpkg"), + ("Version: 1.4.2-beta", b""), + ("", b"error_dpkg"), + ("Version: 1.4.2-beta", b""), + ] + mock_exists.return_value = True + sess = session.TestrunSession(root_dir="/fake/root") + assert sess.get_version() == "1.4.2-beta" + + +# 3. Target Device Repository Tests + +def test_device_repository_operations( + session_instance: session.TestrunSession #pylint: disable=W0621 +): + """Test adding, fetching, and removing devices from session repository.""" + device = Device() + device.device_folder = "Raspberry_Pi" + device.manufacturer = "Raspberry" + device.model = "Pi 4" + device.mac_addr = "00:11:22:33:44:55" + + session_instance.add_device(device) + assert session_instance.get_device_by_name("raspberry_pi") == device + assert session_instance.get_device_by_name("non_existent") is None + assert session_instance.get_device_by_mac_addr("001122334455") == device + session_instance.remove_device(device) + assert len(session_instance.get_device_repository()) == 0 + + +# 4. Test Results Aggregation Tests + +def test_add_test_result_new( + session_instance: session.TestrunSession +): #pylint: disable=W0621 + """Test adding a brand new test result.""" + result = MagicMock() + result.name = "DNS Security Check" + result.description = "Checks DNS encryption" + result.details = "Everything is secure" + result.recommendations = [] + result.required_result = "Compliant" + result.result = TestResult.COMPLIANT + + session_instance.add_test_result(result) + assert len(session_instance.get_test_results()) == 1 + + +def test_add_test_result_update_existing( + session_instance: session.TestrunSession +): #pylint: disable=W0621 + initial_result = MagicMock() + initial_result.name = "NTP Sync" + initial_result.description = "NTP Initial State" + initial_result.details = "Checking sync..." + initial_result.required_result = "Compliant" + initial_result.result = TestResult.IN_PROGRESS + initial_result.recommendations = None + + session_instance._results = [initial_result] #pylint: disable=W0212 + + updated_result = MagicMock() + updated_result.name = "NTP Sync" + updated_result.description = "NTP Verification Success" + updated_result.details = ["Sync complete.", "Server resolved."] + updated_result.recommendations = ["Keep server stable"] + updated_result.result = TestResult.COMPLIANT + + session_instance.add_test_result(updated_result) + + assert initial_result.description == "NTP Verification Success" + assert initial_result.details == "Sync complete. Server resolved." + assert initial_result.recommendations == ["Keep server stable"] + assert initial_result.result == TestResult.COMPLIANT + + +def test_add_test_result_informational_coercion( + session_instance: session.TestrunSession +): #pylint: disable=W0621 + initial_result = MagicMock() + initial_result.name = "TLS Cipher Suit" + initial_result.required_result = "Informational" + initial_result.result = TestResult.IN_PROGRESS + initial_result.recommendations = None + + session_instance._results = [initial_result] #pylint: disable=W0212 + + updated_result = MagicMock() + updated_result.name = "TLS Cipher Suit" + updated_result.result = TestResult.NON_COMPLIANT + updated_result.recommendations = ["Upgrade ciphers"] + + session_instance.add_test_result(updated_result) + assert initial_result.result == TestResult.INFORMATIONAL + assert initial_result.optional_recommendations == ["Upgrade ciphers"] + assert initial_result.recommendations is None + + +# 5. Risk Profile Validation Tests + +def test_validate_profile_json_invalid_cases( + session_instance: session.TestrunSession +): #pylint: disable=W0621 + """Test validation errors for improperly structured risk profiles.""" + # Missing name + assert session_instance.validate_profile_json( + {"status": "Valid"}) is False + # Empty Name + assert session_instance.validate_profile_json( + {"name": " ", "status": "Valid"} + ) is False + # Name contains forbidden characters + assert session_instance.validate_profile_json( + {"name": "Profile/Invalid", "status": "Valid"} + ) is False - # Test added and removed - result = testrun_session.detect_network_adapters_change() - assert result == { - "adapters_added": {"eth1": "66:77:88:99:AA:BB"}, - "adapters_removed": {"eth2": "00:1B:2C:3D:4E:5F"}, + +def test_validate_profile_json_question_validation( + session_instance: session.TestrunSession +): #pylint: disable=W0621 + # Define simple profile formats + session_instance._profile_format_json = [ #pylint: disable=W0212 + { + "question": "Q1", + "type": "select", + "options": ["Yes", "No"] + }, + {"question": "Q2", + "type": "select-multiple", + "options": ["Option A", "Option B"] + } + ] + + # Valid profile JSON + valid_profile = { + "name": "My Valid Profile", + "status": "Valid", + "questions": [ + {"question": "Q1", "answer": "Yes"}, + {"question": "Q2", "answer": [0, 1]} + ] + } + assert session_instance.validate_profile_json(valid_profile) is True + + # Invalid string answer in select-multiple + invalid_profile_type = { + "name": "Invalid Profile", + "status": "Valid", + "questions": [ + { + "question": "Q2", + "answer": "Option A" + } + ] } + assert session_instance.validate_profile_json(invalid_profile_type) is False + + # Invalid option selected + invalid_select_option = { + "name": "Invalid Profile", + "status": "Valid", + "questions": [ + {"question": "Q1", + "answer": "Maybe" + } + ] + } + assert session_instance.validate_profile_json(invalid_select_option) is False + + +# 6. Certificate Management Tests + +@patch("os.path.exists", return_value=False) +def test_check_cert_file_name( + mock_exists: MagicMock, #pylint: disable=W0613 + session_instance: session.TestrunSession #pylint: disable=W0621 +): + assert session_instance.check_cert_file_name("unique_cert_name.pem") is True + + +@patch("session.x509.load_pem_x509_certificate") +@patch("builtins.open", new_callable=mock_open) +@patch("session.util.run_command") +def test_upload_cert_success( + mock_run_cmd: MagicMock, + mock_file_open: MagicMock, + mock_load_cert: MagicMock, + session_instance: session.TestrunSession #pylint: disable=W0621 +): + mock_cert = create_mock_cert( + common_name="GoogleRootCA", + org_name="Google LLC", + expired=False + ) + mock_load_cert.return_value = mock_cert + + cert_obj = session_instance.upload_cert( + filename="google_root.pem", + content=b"fake_pem_bytes" + ) + + assert cert_obj["name"] == "GoogleRootCA" + assert cert_obj["status"] == "Valid" + + mock_file_open.assert_called_once_with( + os.path.join( + session.CERTS_PATH, "google_root.pem" + ), + "wb") + mock_run_cmd.assert_called_once_with( + f"chown -R testuser {session.CERTS_PATH}" + ) + + +@patch("session.x509.load_pem_x509_certificate") +def test_upload_cert_missing_cn( + mock_load_cert: MagicMock, + session_instance: session.TestrunSession #pylint: disable=W0621 +): + mock_cert = MagicMock() + mock_cert.subject.get_attributes_for_oid.return_value = [] + mock_load_cert.return_value = mock_cert + with pytest.raises( + ValueError, + match="Certificate is missing the common name" + ): + session_instance.upload_cert( + filename="broken_cert.pem", + content=b"raw_bytes" + ) + + +@patch("session.x509.load_pem_x509_certificate") +@patch("session.os.listdir", return_value=["existing_root.pem"]) +@patch("builtins.open", new_callable=mock_open) +def test_load_certs( + mock_file: MagicMock, #pylint: disable=W0613 + mock_listdir: MagicMock, #pylint: disable=W0613 + mock_load_cert: MagicMock, + session_instance: session.TestrunSession #pylint: disable=W0621 + ): + mock_cert = create_mock_cert( + common_name="MyAuthority", + org_name="MyCorp", + expired=False, + self_signed=True + ) + mock_load_cert.return_value = mock_cert + session_instance.load_certs() + + assert len(session_instance.get_certs()) == 1 + cert = session_instance.get_certs()[0] + assert cert["name"] == "MyAuthority" + assert cert["type"] == "root" + + +@patch("session.os.remove") +def test_delete_cert_success( + mock_remove: MagicMock, + session_instance: session.TestrunSession #pylint: disable=W0621 +): + session_instance._certs = [ #pylint: disable=W0212 + {"filename": "test.pem", "name": "Test"} + ] + success = session_instance.delete_cert("test.pem") + assert success is True + assert len(session_instance.get_certs()) == 0 + mock_remove.assert_called_once_with( + os.path.join(session.CERTS_PATH, + "test.pem") + ) + + +# 7. Network Change Detection Tests + +@patch("session.util.diff_dicts") +def test_detect_network_adapters_change( + mock_diff: MagicMock, + session_instance: session.TestrunSession #pylint: disable=W0621 + ): + with patch.object(session.IPControl, "get_sys_interfaces") as mock_get_sys: + mock_get_sys.return_value = {"eth0": "up", "wlan0": "down"} + session_instance._ifaces = {"eth0": "up"} #pylint: disable=W0212 + mock_diff.return_value = { + "items_added": {"wlan0": "down"} + } + changes = session_instance.detect_network_adapters_change() + assert "adapters_added" in changes + assert changes["adapters_added"] == {"wlan0": "down"} + assert "adapters_removed" not in changes + # Verify that session_instance updated its local _ifaces state + assert session_instance.get_ifaces() == {"eth0": "up", "wlan0": "down"} + From 13ecd0a8c0de4cb66f633ee056bf4e9b7d6dd4a0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Tue, 23 Jun 2026 12:59:59 +0200 Subject: [PATCH 7/7] add framework tests to github actions --- .github/workflows/testing.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 2a964013c..6cb02f3f4 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -106,6 +106,9 @@ jobs: - name: Run tests for tls module shell: bash {0} run: bash testing/unit/run_test_module.sh tls captures certAuth certs reports root_certs output + - name: Run framework tests + shell: bash {0} + run: bash testing/unit/run_framework_test.sh - name: Run tests for risk profiles shell: bash {0} run: bash testing/unit/run_report_test.sh testing/unit/risk_profile/risk_profile_test.py