diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index b01a2f7..99e33b9 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -1,4 +1,4 @@ -# Copyright 2026 Google LLC +# Copyright 2026 UCP Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v7 - uses: actions/setup-python@v6 with: python-version: "3.12" diff --git a/.github/workflows/reusable-governance.yml b/.github/workflows/reusable-governance.yml new file mode 100644 index 0000000..030bcd9 --- /dev/null +++ b/.github/workflows/reusable-governance.yml @@ -0,0 +1,114 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Reusable Governance Gate + +on: + workflow_call: + inputs: + repo-name: + required: true + type: string + description: "Path to the governance rules configuration file" + secrets: + ORG_READ_TOKEN: + required: true + description: "Org-level read token for team membership checks" + +jobs: + evaluate: + name: Approvals + runs-on: ubuntu-latest + if: github.event.pull_request.draft == false + steps: + # 1. Check out the caller repository (the PR code) + - name: Check out PR code + uses: actions/checkout@v7 + + # 2. Check out the central governance scripts from the central repository + - name: Check out central governance scripts + uses: actions/checkout@v7 + with: + repository: ${{ github.repository_owner }}/.github + path: .github-central + token: ${{ secrets.ORG_READ_TOKEN }} + + # 3. Set up uv using the standard action + - name: Set up uv + uses: astral-sh/setup-uv@v8.1.0 + + # 4. Run governance gate check and capture the exit code + - name: Evaluate Required Approvals + id: validator + run: | + set +e + + uv run .github-central/org-tools/governance/scripts/pr_validator.py \ + --token "${{ secrets.ORG_READ_TOKEN }}" \ + --org "${{ github.repository_owner }}" \ + --repo "${{ github.repository }}" \ + --pr "${{ github.event.pull_request.number }}" \ + --repo-name "${{ inputs.repo-name }}" + + EVAL_EXIT_CODE=$? + set -e + + # Save the exit code to the GitHub step outputs + echo "exit_code=$EVAL_EXIT_CODE" >> "$GITHUB_OUTPUT" + + # --- NEW: Create the Job Summary and Annotation --- + if [ -f summary.txt ]; then + # 1. Write the rich Markdown report directly to the GitHub Job Summary page + cat summary.txt >> "$GITHUB_STEP_SUMMARY" + + # 2. Properly escape newlines (%0A) and carriage returns (%0D) + # to prevent truncation in the inline workflow annotations + SUMMARY_TEXT=$(cat summary.txt | tr -d '\r' | sed ':a;N;$!ba;s/\n/%0A/g') + + # 3. Post it as an annotation in the UI + if [ "$EVAL_EXIT_CODE" -eq 0 ]; then + echo "::notice title=Approval Status::$SUMMARY_TEXT" + else + echo "::error title=Missing Approvals::$SUMMARY_TEXT" + fi + fi + # ---------------------------------- + + exit 0 + + # 5. Post the static Commit Status to the GitHub API with a clickable log link + - name: Post Static Status Check + uses: actions/github-script@v9 + with: + script: | + // Read the exit code from the previous step + const exitCode = parseInt('${{ steps.validator.outputs.exit_code }}', 10); + + // Map the exit code to a GitHub status state + const state = (exitCode === 0) ? 'success' : 'failure'; + const description = (exitCode === 0) ? 'Governance approvals met' : 'Pending required approvals'; + + // Construct the exact URL to the current GitHub Actions workflow run logs + const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`; + + // Post the status to the specific commit hash + await github.rest.repos.createCommitStatus({ + owner: context.repo.owner, + repo: context.repo.repo, + sha: context.payload.pull_request.head.sha, + state: state, + context: 'Governance / Approvals', + description: description, + target_url: runUrl // <-- This makes the status check clickable + }); diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..dad725b --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,40 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + run-tests: + name: Run Unit Tests + runs-on: ubuntu-latest + steps: + - name: Check out repository + uses: actions/checkout@v7 + + - name: Set up uv + uses: astral-sh/setup-uv@v8.1.0 + with: + python-version: "3.12" + + - name: Run governance tests + run: uv run --with PyGithub --with PyYAML python -m unittest discover -s org-tools/governance/tests + + - name: Run label-sync tests + run: uv run --with PyGithub --with PyYAML python org-tools/test_sync_labels.py diff --git a/org-tools/governance/README.md b/org-tools/governance/README.md new file mode 100644 index 0000000..7d2827a --- /dev/null +++ b/org-tools/governance/README.md @@ -0,0 +1,155 @@ +# GitHub Org Governance Tools - PR Validator + +This directory contains the GitHub Org Governance validator tools. The `pr_validator.py` script acts as a CI/CD gated workflow step to enforce custom review rules and team-approval requirements on Pull Requests, using dynamic team hierarchies, file glob rules, and proxy review capabilities. + +--- + +## Architecture Overview + +The system consists of the following key components: + +```mermaid +graph TD + A[reusable-governance.yml Workflow] --> B(pr_validator.py main) + subgraph pr_validator.py + B + E[PullRequestValidator] + end + subgraph governance_config_parser.py + C[GovernanceConfigParser] + D[GovernanceConfigValidator] + end + B --> C + B --> D + B --> E + C -->|Reads Config| F(python-sdk-rules.yml) + D -->|Static Sanity Analysis| F + E -->|Evaluates approvals| G[GitHub API] +``` + +- **`pr_validator.py`**: The entrypoint script and validator logic. + - `PullRequestValidator`: Fetches PR metadata (changed files, reviews, authors) from GitHub, queries team memberships, and evaluates reviews against the governance rules. +- **`governance_config_parser.py`**: Helper module for loading rules and analyzing configurations. + - `GovernanceConfigParser`: Parses YAML configuration files into typed python dataclasses (`Team`, `GovernanceRule`, `GovernanceConfig`, etc.). + - `GovernanceConfigValidator`: Performs static analysis on the repository files to detect rule design flaws (overlapping rules or files falling into default fallbacks). + +--- + +## Configuration Schema + +Governance rules are defined using YAML. Below is an overview of the configuration schema (e.g. `rules/python-sdk-rules.yml`): + +### 1. `team_hierarchy` + +Defines hierarchical roles with integer clearance tiers. Approvals from higher levels automatically satisfy requirements for lower levels (e.g. an approval from a level 4 member counts as a level 2 approval): + +```yaml +team_hierarchy: + devops-maintainers: 1 + maintainers: 2 + tech-council: 3 + governance-council: 4 +``` + +### 2. `fallback` + +Defines catch-all requirements applied to any file that does not match a specific custom rule: + +```yaml +fallback: + requires: + - min_team: "maintainers" + min_approvals: 1 +``` + +### 3. `rules` + +A list of review criteria matching specific file glob patterns: + +- `patterns`: File paths or wildcards (supports `*`, `**`, `?` globs) matching target files. +- `excludes`: Optional glob patterns to exclude from the rule. +- `requires`: The specific approval targets: + - `team`: Requires an approval from an exact team. + - `min_team`: Requires an approval from the specified team or any team higher in the hierarchy. + - `min_approvals`: Number of approvals required from members of that team/tier. + +```yaml +rules: + - name: "core_protocol_src" + patterns: + - "source/**" + requires: + - min_team: "tech-council" + min_approvals: 1 +``` + +### 4. `proxy_reviewers` + +A list of GitHub usernames allowed to approve PRs on behalf of any required team. + +```yaml +proxy_reviewers: + - user_a + - user_b +``` + +--- + +## How It Works + +### Step 1: Static Config Verification + +Before checking any PR, `pr_validator.py` instantiates the `GovernanceConfigValidator` to perform static validation checks: + +1. **Overlap Detection**: Verifies that no file in the repository matches more than one rule (to prevent conflicting requirements). +2. **Coverage Verification (No-Fallback)**: If strict coverage is desired, it verifies that every single tracked file in the repository matches at least one explicit rule. + +### Step 2: PR Evaluation Logic + +When evaluating a pull request: + +1. **Filter Author**: The PR author's own approvals are ignored. +2. **Match Rules**: Compares all modified files in the PR against the rules to collect the matching `RuleRequirement` sets. If a file matches no rule, the `fallback` requirements are applied. +3. **Resolve Approvals**: + - Retrieves active PR reviews from GitHub. + - Checks if the reviewer is a proxy reviewer. + - Queries team memberships for reviewers from the GitHub API. + - Maps team memberships to hierarchical levels. +4. **Evaluate Gates**: Validates that the approvals fulfill every required target. Prints results and writes a review summary to `summary.txt`. + +--- + +## CLI Usage + +Run the validator CLI command in the root of the checked-out repository: + +```bash +python3 org-tools/governance/scripts/pr_validator.py \ + --token "" \ + --org "" \ + --repo "" \ + --pr \ + --repo-name "" +``` + +### CLI Arguments: + +- `--token`: Org-level read token for GitHub REST API calls (requires permission to read team memberships). +- `--org`: The GitHub organization name. +- `--repo`: The GitHub repository name. +- `--pr`: The PR number to validate. +- `--repo-name`: A lookup key mapping to the rules configuration file (e.g. `python-sdk` maps to `org-tools/governance/rules/python-sdk-rules.yml`). + +--- + +## Running Unit Tests + +The test suite mock-patches the GitHub API and local git files to allow fast, isolated test runs: + +```bash +# Run pr_validator tests +python3 org-tools/governance/tests/test_pr_validator.py + +# Run config parser and validator tests +python3 org-tools/governance/tests/test_governance_config_parser.py +``` diff --git a/org-tools/governance/docs/validation_report.md b/org-tools/governance/docs/validation_report.md new file mode 100644 index 0000000..ca66ccf --- /dev/null +++ b/org-tools/governance/docs/validation_report.md @@ -0,0 +1,132 @@ +# βš–οΈ Understanding the Governance Gate Validation Report + +This guide explains the output of the **Governance Gate Validation Report** generated by the Pull Request validator. If your PR's governance check is pending or failing, this document will help you understand the report's structure and how to resolve any outstanding requirements. + +--- + +## πŸ“– Overview of the Report + +The validation report is written in Markdown and typically posted as log output. It consists of three main sections: + +1. **Overall Status Header**: A high-level indicator showing if the PR is mergeable or what high-level block is preventing it. +2. **Global PR Summary**: An aggregated view of all requirements across all changed files. +3. **File-by-File Breakdown**: A granular view showing exactly which files triggered which rules, and the approval status of each file. + +--- + +## 🚦 1. Overall Status Header + +The top of the report displays one of the following statuses, indicating the overall evaluation of your PR. + +### πŸ”΄ Failure States (Blocked) + +- **`## πŸ”΄ FAILURE: An authorized reviewer has requested changes.`** + - **What it means:** A reviewer who belongs to one of the teams required to approve your changed files has explicitly requested changes (a `CHANGES_REQUESTED` review). This strictly blocks the PR from merging, regardless of how many other approvals have been obtained. + - **How to resolve:** Address the reviewer's feedback, commit the changes, and ask them to approve or dismiss their previous review once satisfied. +- **`## πŸ”΄ FAILURE: Insufficient approvals.`** + - **What it means:** One or more file-level or team-level approval requirements have not been met. The PR needs additional reviews from authorized individuals. + - **How to resolve:** Refer to the **Global PR Summary** and **File-by-File Breakdown** sections below to see which approvals are missing. The submitter should simply wait for the required reviewers to be assigned and to submit their approvals. Do not manually assign reviewers, request reviews, or ping reviewers. +- **`## πŸ”΄ FAILURE: Unknown validation failure.`** + - **What it means:** An unexpected error or system exception occurred during validation. + - **How to resolve:** Check the CI pipeline logs and please **[file an issue in the .github repository](https://github.com/Universal-Commerce-Protocol/.github/issues/new?template=bug-report.yml)** + +--- + +## πŸš€ How to Resolve a Failing Governance Gate + +If your PR governance gate check is failing, follow these steps to resolve it: + +1. **Check for Draft Status**: If the status header is `πŸ”΄ FAILURE: Pull Request is in draft status.`, click **"Ready for review"** in GitHub. +2. **Check for Changes Requested**: If the status header is `πŸ”΄ FAILURE: An authorized reviewer has requested changes.`, locate their review, address their feedback, and request a re-review or ask them to approve/dismiss their review once resolved. +3. **Identify Missing Approvals**: Look at the **Global PR Summary** to find the requirements marked **πŸ”΄ No**. +4. **Wait for Reviewer Assignments and Approvals**: + - The DevOps team is responsible for routing and assigning the correct eligible reviewers. + - **Do not** manually search for, assign, or request reviews from users yourself. + - **Do not** ping assigned reviewers. Simply wait for the required reviews and approvals to be completed. + +--- + +## 🌍 2. Global PR Summary + +This section aggregates and deduplicates all requirements across all changed files in the PR, presenting them as a single consolidated list. + +### The Venn Diagram Model (Deduplication) + +If your PR modifies multiple files, the validator doesn't just add up all the approvals blindly. Instead, it uses a **Venn Diagram model**: + +- If multiple modified files require approval from the **same team or hierarchical level**, the requirements are merged. +- The validator keeps the **maximum** number of approvals (`min_approvals`) required by any single file for that target. E.g., if File A requires 1 approval from `tech-council` and File B requires 2 approvals from `tech-council`, the global requirement is merged to require **2 approvals** from `tech-council` (not 3). + +### Understanding Requirement Items + +Each requirement in the summary is formatted with detailed progress: + +```markdown +- **[Min Approvals] approval(s)** from [Target Team / Hierarchical Level] + - **Met:** [🟒 Yes / πŸ”΄ No] ([Approved Count]/[Min Approvals] approved - approved by: [Reviewers]) + - **Pending:** Needs [Remaining Count] approval(s) from [Target Team] ([Eligible Count] eligible reviewer(s) assigned). [Hint] +``` + +#### Key Fields: + +1. **Requirement Line**: Describes the rule (e.g., `* **2 approvals** from team 'tech-council'` or `* **1 approval** from team 'tech-council' or higher in the UCP governance hierarchy`). +2. **Met Line**: + - **🟒 Yes**: The requirement is fully satisfied. + - **πŸ”΄ No**: The requirement is not yet satisfied. + - **approved by**: Displays a comma-separated list of the authorized users whose reviews satisfied this requirement. +3. **Pending Line** (only shown if **Met** is **πŸ”΄ No**): + - Shows the number of remaining approvals needed. + - Displays the number of **eligible reviewers** currently assigned/requested on the PR who can satisfy this requirement. + - **Hint:** Explains the current status and next steps: + - _β€œWaiting for approval from...”_ β€” There are already enough eligible reviewers assigned to the PR. They just need to submit their approvals. + - _β€œWaiting for X more reviewer(s) to be assigned from...”_ β€” There are not enough eligible reviewers assigned to the PR to meet the requirement. **Note for contributors:** Please do **not** manually request reviews or assign reviewers yourself; the DevOps team will assign the appropriate eligible reviewers. Please simply wait for assignments. + +--- + +## πŸ“‚ 3. File-by-File Breakdown + +This section shows the validation status of every single changed file. This is extremely helpful when a PR spans multiple zones of ownership, allowing authors and reviewers to pin down exactly which file is blocking the PR. + +### Format: + +```markdown +- **File:** `path/to/file.py` + - **Status:** [🟒 SATISFIED / πŸ”΄ UNSATISFIED] + - **Requirements:** + [List of individual requirements for this file] +``` + +### How Rules are Matched to Files: + +1. **Rule Matching**: The validator compares the file path against the glob patterns defined in your governance configuration file (e.g., `rules/python-sdk-rules.yml`). +2. **Cumulative Rules**: If a file matches multiple rules, it must satisfy the requirements of **all** matching rules. +3. **Fallback**: If a file does not match any custom rule in the configuration, the catch-all `fallback` requirement is applied. + +--- + +## πŸ›‘οΈ Role Clearance Hierarchy + +The governance system supports a team hierarchy. Approvals from members of teams higher in the hierarchy can satisfy requirements for lower levels. + +For example, if the hierarchy is defined as: + +1. `devops-maintainers` (Level 1) +2. `maintainers` (Level 2) +3. `tech-council` (Level 3) +4. `governance-council` (Level 4) + +- A requirement for **`team 'maintainers' or higher in the UCP governance hierarchy`** (Level 2+) can be satisfied by approvals from members of `maintainers`, `tech-council`, or `governance-council`. +- A requirement for **`team 'tech-council' or higher in the UCP governance hierarchy`** (Level 3+) can be satisfied by approvals from members of `tech-council` or `governance-council`. +- A requirement for an exact team (e.g., **`team 'devops-maintainers'`**) must be approved by an explicit member of that team, regardless of hierarchy. + +--- + +## πŸ’¬ Getting Help & Reporting Issues + +If you suspect there is a bug or an unexpected problem with the governance validator script itself (for example, if a check is failing incorrectly or an error is raised), please **[file an issue in the .github repository](https://github.com/Universal-Commerce-Protocol/.github/issues/new)**. + +When reporting an issue, please include: + +- A link to the affected Pull Request. +- The copy-pasted contents of the **Governance Gate Validation Report**. +- Any relevant error logs or output from the GitHub Actions runner. diff --git a/org-tools/governance/rules/python-sdk-rules.yml b/org-tools/governance/rules/python-sdk-rules.yml new file mode 100644 index 0000000..0082ff9 --- /dev/null +++ b/org-tools/governance/rules/python-sdk-rules.yml @@ -0,0 +1,53 @@ +--- +rules: + # --------------------------------------------------------- + # Governance + # --------------------------------------------------------- + - name: "governance_and_licensing" + patterns: + - "LICENSE" + # Consider updating this path if renaming to owners.yaml + - ".github/CODEOWNERS" + requires: + # Strictly Governance Council (L4) + - team: "governance-council" + min_approvals: 2 + + # --------------------------------------------------------- + # Infrastructure, Tooling & Configuration + # --------------------------------------------------------- + - name: "infra_and_tooling" + patterns: + - ".github/**" + - ".gitignore" + - ".pre-commit-config.yaml" + - "pyproject.toml" + - "uv.lock" + - "generate_models.sh" + - "preprocess_schemas.py" + - "README.md" + - "src/**" + - "templates/**" + excludes: + - ".github/CODEOWNERS" # Covered by Governance rule + requires: + # Grants access to devops (L1), maintainers (L2), tech-council (L3), + # and governance-council (L4) + - min_team: "devops-maintainers" + min_approvals: 2 + +fallback: + requires: + # Default for all files (*). + # Strictest rule possible + - min_team: "governance-council" + min_approvals: 2 + +proxy_reviewers: + - amithanda + +team_hierarchy: + devops-maintainers: 1 + maintainers: 2 + tech-council: 3 + governance-council: 4 diff --git a/org-tools/governance/scripts/governance_config_parser.py b/org-tools/governance/scripts/governance_config_parser.py new file mode 100644 index 0000000..9cb2505 --- /dev/null +++ b/org-tools/governance/scripts/governance_config_parser.py @@ -0,0 +1,362 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Set + +import yaml + + +@dataclass(frozen=True) +class Team: + """Represents a review team group in the hierarchy.""" + + name: str + level: int + + @classmethod + def create(cls, name: str, level: int) -> "Team": + # Normalize to lowercase. GitHub team slugs are case-insensitive. + return cls(name=name.lower(), level=level) + + +@dataclass(frozen=True) +class RuleRequirement: + """Represents a requirement for a rule, specifying min approvals and the target Team.""" + + min_approvals: int + team: Optional[Team] = None + min_team: Optional[Team] = None + + def __post_init__(self): + if self.min_approvals is None or not isinstance(self.min_approvals, int): + raise ValueError("min_approvals must be an integer") + if self.min_approvals <= 0: + raise ValueError("min_approvals must be a positive integer") + if (self.team is None) == (self.min_team is None): + raise ValueError( + "RuleRequirement must specify exactly one of 'team' or 'min_team'" + ) + + +@dataclass(frozen=True) +class GovernanceRule: + """Represents a specific rule with name, file patterns, requirements, and exclusions.""" + + name: str + patterns: List[str] + requires_all: List[RuleRequirement] + excluded_patterns: List[str] = field(default_factory=list) + + # Internal compiled pattern fields + _compiled_patterns: List[re.Pattern] = field(init=False, repr=False) + _compiled_excludes: List[re.Pattern] = field(init=False, repr=False) + + def __post_init__(self): + # Pre-compile and cache regexes on instantiation + object.__setattr__( + self, + "_compiled_patterns", + [self._compile_pattern(p) for p in self.patterns], + ) + object.__setattr__( + self, + "_compiled_excludes", + [self._compile_pattern(p) for p in self.excluded_patterns], + ) + + def matches(self, file_path: str) -> bool: + """Checks if a file path matches the rule (matches patterns and is not excluded).""" + # Cross-platform normalization to POSIX forward-slash standard + posix_path = Path(file_path).as_posix() + for compiled_exclude in self._compiled_excludes: + if compiled_exclude.match(posix_path): + return False + for compiled_pattern in self._compiled_patterns: + if compiled_pattern.match(posix_path): + return True + return False + + @staticmethod + def _compile_pattern(pattern: str) -> re.Pattern: + """Translates a glob pattern into a compiled regex Pattern.""" + escaped = re.escape(pattern) + # Translate wildcards to regex equivalents: + # 1. '**/ ' matches zero or more directories + regex_str = escaped.replace(r"\*\*/", "(?:.*/)?") + # 2. Trailing '**' matches anything + regex_str = regex_str.replace(r"\*\*", ".*") + # 3. '*' matches any filename segment (excluding '/') + regex_str = regex_str.replace(r"\*", "[^/]*") + # 4. '?' matches a single character (excluding '/') + regex_str = regex_str.replace(r"\?", "[^/]") + return re.compile(rf"^{regex_str}$") + + +@dataclass +class GovernanceConfig: + """Represents the complete governance configuration.""" + + teams: Dict[str, Team] + rules: List[GovernanceRule] + fallback: List[RuleRequirement] + proxy_reviewers: Set[str] + + +@dataclass +class ValidationError: + """Represents a validation error in the governance rules configuration.""" + + file_path: str + message: str + + def __str__(self) -> str: + return self.message + + +class GovernanceConfigParser: + """Parser for reading, parsing, and validating governance rules configuration files.""" + + def __init__(self, repo_root: Optional[str] = None): + """Initializes the parser, optionally specifying a repository root.""" + self.repo_root = repo_root + + def parse_file(self, file_path: str) -> GovernanceConfig: + """Reads, parses, and validates the governance YAML file, mapping to typed dataclasses.""" + file_path_obj = Path(file_path) + if not file_path_obj.exists(): + raise FileNotFoundError(f"Governance file not found at '{file_path}'.") + + try: + with open(file_path_obj, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + except Exception as e: + raise ValueError(f"Failed to parse governance YAML: {e}") + + return self._parse(data) + + def _parse(self, data: Any) -> GovernanceConfig: + """Parses a dictionary representing governance rules configuration into typed dataclasses.""" + if not isinstance(data, dict): + raise ValueError("Governance configuration must be a YAML map.") + + teams = self._parse_team_hierarchy(data) + fallback = self._parse_fallback(data, teams) + rules = self._parse_rules(data, teams) + proxy_reviewers = self._parse_proxy_reviewers(data) + + return GovernanceConfig( + teams=teams, + rules=rules, + fallback=fallback, + proxy_reviewers=proxy_reviewers, + ) + + def _parse_team_hierarchy(self, data: dict) -> Dict[str, Team]: + """Parses the team group hierarchy ranks.""" + team_hierarchy_yaml = data.get("team_hierarchy", {}) + if not isinstance(team_hierarchy_yaml, dict): + raise ValueError("team_hierarchy must be a map.") + + hierarchy = {} + for team, level in team_hierarchy_yaml.items(): + if not isinstance(level, int): + raise ValueError( + f"Hierarchy rank level for team '{team}' must be an integer." + ) + hierarchy[team.lower()] = Team.create(name=team, level=level) + return hierarchy + + def _parse_proxy_reviewers(self, data: dict) -> Set[str]: + """Parses the set of proxy reviewers.""" + return set(data.get("proxy_reviewers", [])) + + def _parse_fallback( + self, data: dict, teams: Dict[str, Team] + ) -> List[RuleRequirement]: + """Parses the fallback rule requirements.""" + fallback_yaml = data.get("fallback", {}) + if not isinstance(fallback_yaml, dict): + raise ValueError("fallback configuration must be a map.") + requires = fallback_yaml.get("requires", []) + if not isinstance(requires, list): + raise ValueError("fallback requires must be a list.") + return self._parse_requirements(requires, teams) + + def _parse_rules(self, data: dict, teams: Dict[str, Team]) -> List[GovernanceRule]: + """Parses the list of governance rules.""" + rules_yaml = data.get("rules", []) + if not isinstance(rules_yaml, list): + raise ValueError("rules must be a list.") + + return [self._parse_rule(rule_data, teams) for rule_data in rules_yaml] + + def _parse_rule(self, rule_data: dict, teams: Dict[str, Team]) -> GovernanceRule: + """Parses a single governance rule.""" + name = rule_data.get("name", "Unnamed Rule") + patterns = rule_data.get("patterns", []) + if isinstance(patterns, str): + patterns = [patterns] + + # Support both 'excluded_patterns' and 'excludes' (for backward compatibility) + excludes = rule_data.get("excluded_patterns", rule_data.get("excludes", [])) + if isinstance(excludes, str): + excludes = [excludes] + + requires_all = self._parse_requirements(rule_data.get("requires", []), teams) + return GovernanceRule( + name=name, + patterns=patterns, + requires_all=requires_all, + excluded_patterns=excludes, + ) + + def _resolve_team( + self, team_name: str, teams: Dict[str, Team], is_min: bool = False + ) -> Team: + """Resolves a team name to a Team object, checking that it exists in the hierarchy.""" + team_name_lower = team_name.lower() + if team_name_lower not in teams: + prefix = "Min team" if is_min else "Team" + raise ValueError( + f"{prefix} '{team_name}' specified in requirements is not defined in team_hierarchy." + ) + return teams[team_name_lower] + + def _parse_requirements( + self, requires_yaml: List[Dict[str, Any]], teams: Dict[str, Team] + ) -> List[RuleRequirement]: + """Parses rule requirement constraints, resolving team string names to Team objects.""" + requirements = [] + for req in requires_yaml: + min_approvals = req.get("min_approvals") + if ( + min_approvals is None + or not isinstance(min_approvals, int) + or min_approvals <= 0 + ): + raise ValueError( + "Requirement must include a positive integer 'min_approvals'." + ) + + team_str = req.get("team") + min_team_str = req.get("min_team") + + team = ( + self._resolve_team(team_str, teams, is_min=False) + if team_str is not None + else None + ) + min_team = ( + self._resolve_team(min_team_str, teams, is_min=True) + if min_team_str is not None + else None + ) + + requirements.append( + RuleRequirement( + min_approvals=min_approvals, + team=team, + min_team=min_team, + ) + ) + return requirements + + +class GovernanceConfigValidator: + """Validator for verifying business rules against the repository state.""" + + def __init__(self, config: GovernanceConfig, repo_root: Optional[str] = None): + """Initializes the validator with a governance configuration and optional repo root.""" + self.config = config + self.repo_root = repo_root + + def validate_overlaps( + self, repo_path: Optional[str] = None + ) -> List[ValidationError]: + """Walks the repository's tracked files and returns a list of overlap errors.""" + resolved_path = repo_path or self.repo_root or os.getcwd() + tracked_files = self._get_tracked_files(resolved_path) + overlapping_files: Dict[str, List[str]] = {} + + for file_path in tracked_files: + matched_rule_names = [] + for rule in self.config.rules: + if rule.matches(file_path): + matched_rule_names.append(rule.name) + + if len(matched_rule_names) > 1: + overlapping_files[file_path] = matched_rule_names + + errors = [] + for file, rules in overlapping_files.items(): + errors.append( + ValidationError( + file_path=file, + message=f"File '{file}' matches multiple rules: {', '.join(rules)}", + ) + ) + return errors + + def validate_no_fallback( + self, repo_path: Optional[str] = None + ) -> List[ValidationError]: + """Walks the repository's tracked files and returns a list of errors for files falling into fallback rules.""" + resolved_path = repo_path or self.repo_root or os.getcwd() + tracked_files = self._get_tracked_files(resolved_path) + errors = [] + + for file_path in tracked_files: + matched_any = False + for rule in self.config.rules: + if rule.matches(file_path): + matched_any = True + break + + if not matched_any: + errors.append( + ValidationError( + file_path=file_path, + message=f"File '{file_path}' does not match any governance rule (falls back to fallback requirements)", + ) + ) + + return errors + + def _get_tracked_files(self, repo_path: str) -> List[str]: + """Gets all tracked and staged files in the repository using git ls-files.""" + result = subprocess.run( + [ + "git", + "-C", + repo_path, + "ls-files", + "--cached", + "--others", + "--exclude-standard", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + ) + return [ + Path(line.strip()).as_posix() + for line in result.stdout.splitlines() + if line.strip() + ] diff --git a/org-tools/governance/scripts/pr_models.py b/org-tools/governance/scripts/pr_models.py new file mode 100644 index 0000000..28e8185 --- /dev/null +++ b/org-tools/governance/scripts/pr_models.py @@ -0,0 +1,156 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Data models and enums representing pull request state and validation results.""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum + +from governance_config_parser import RuleRequirement + + +class ReviewState(Enum): + """Represents the state of a GitHub pull request review.""" + + APPROVED = "approved" + CHANGES_REQUESTED = "changes_requested" + COMMENTED = "commented" + DISMISSED = "dismissed" + PENDING = "pending" + UNKNOWN = "unknown" + + @classmethod + def actionable_states(cls) -> set["ReviewState"]: + """States that affect PR mergeability.""" + return {cls.APPROVED, cls.CHANGES_REQUESTED, cls.DISMISSED} + + +class ValidationErrorReason(Enum): + """Reasons why a pull request might fail governance validation.""" + + DRAFT_PR = "draft_pr" + CHANGES_REQUESTED = "changes_requested" + INSUFFICIENT_APPROVALS = "insufficient_approvals" + + +class MergeableReason(Enum): + """Reasons why a pull request is considered mergeable.""" + + NO_CHANGED_FILES = "no_changed_files" + PROXY_OVERRIDE = "proxy_override" + RULES_SATISFIED = "rules_satisfied" + + +@dataclass(frozen=True) +class Review: + """Represents a pull request review with a user and state.""" + + user: str + state: ReviewState + submitted_at: datetime | None = None + + @classmethod + def create( + cls, user: str, state: ReviewState, submitted_at: datetime | None = None + ) -> "Review": + # Normalize to lowercase. GitHub usernames and team slugs are case-insensitive. + # Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name + return cls(user=user.lower(), state=state, submitted_at=submitted_at) + + +@dataclass(frozen=True) +class PullRequest: + """Represents a GitHub pull request with its files, reviews, and requests.""" + + number: int + author: str + is_draft: bool + changed_files: list[str] + reviews: list[Review] + assigned_users: list[str] = field(default_factory=list) + assigned_teams: list[str] = field(default_factory=list) + + @classmethod + def create( + cls, + number: int, + author: str, + is_draft: bool, + changed_files: list[str], + reviews: list[Review], + assigned_users: list[str] = None, + assigned_teams: list[str] = None, + ) -> "PullRequest": + # Normalize to lowercase. GitHub usernames and team slugs are case-insensitive. + # Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name + assigned_u = [u.lower() for u in assigned_users] if assigned_users else [] + assigned_t = [t.lower() for t in assigned_teams] if assigned_teams else [] + return cls( + number=number, + author=author.lower(), + is_draft=is_draft, + changed_files=changed_files, + reviews=reviews, + assigned_users=assigned_u, + assigned_teams=assigned_t, + ) + + +@dataclass(frozen=True) +class TeamMemberships: + """Stores a map of team slugs to their members' GitHub usernames.""" + + members_by_team: dict[str, set[str]] + + @classmethod + def create(cls, members_by_team: dict[str, set[str]]) -> "TeamMemberships": + # Normalize to lowercase. GitHub usernames and team slugs are case-insensitive. + # Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name + normalized = { + team.lower(): {member.lower() for member in members} + for team, members in members_by_team.items() + } + return cls(members_by_team=normalized) + + +@dataclass(frozen=True) +class RequirementStatus: + """Represents the status of a specific rule requirement.""" + + requirement: RuleRequirement + approved_count: int = 0 + assigned_count: int = 0 + is_satisfied: bool = False + approvers: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class FileValidationStatus: + """Represents the validation status of a single file in the PR.""" + + file_path: str + requirement_statuses: list[RequirementStatus] + is_satisfied: bool + + +@dataclass(frozen=True) +class ValidationResult: + """Represents the final result of a pull request validation.""" + + is_mergeable: bool + error: ValidationErrorReason | None = None + mergeable_reason: MergeableReason | None = None + requirement_statuses: list[RequirementStatus] = field(default_factory=list) + file_statuses: list[FileValidationStatus] = field(default_factory=list) diff --git a/org-tools/governance/scripts/pr_validator.py b/org-tools/governance/scripts/pr_validator.py new file mode 100755 index 0000000..01ad58e --- /dev/null +++ b/org-tools/governance/scripts/pr_validator.py @@ -0,0 +1,547 @@ +#!/usr/bin/env python3 +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# /// script +# dependencies = [ +# "PyGithub", +# "PyYAML", +# ] +# /// +"""Pull Request validator using governance rules and team memberships under the Venn Diagram model.""" + +import argparse +from datetime import datetime +from enum import Enum +import sys + +from github import Auth, Github, GithubException + +from governance_config_parser import ( + GovernanceConfig, + GovernanceConfigParser, + RuleRequirement, +) +from pr_models import ( + FileValidationStatus, + MergeableReason, + PullRequest, + RequirementStatus, + Review, + ReviewState, + TeamMemberships, + ValidationErrorReason, + ValidationResult, +) +from validation_logger import ValidationLogger + + +class RepoName(Enum): + """Supported repository names mapping to governance rules.""" + + UCP = "ucp" + PYTHON_SDK = "python-sdk" + + +REPO_RULES_MAPPING = { + RepoName.UCP: ".github-central/org-tools/governance/rules/ucp-rules.yml", + RepoName.PYTHON_SDK: ".github-central/org-tools/governance/rules/python-sdk-rules.yml", +} + + +class PullRequestValidator: + """Rule engine for validating Pull Requests against governance rules using the Venn Diagram model.""" + + def __init__(self, config: GovernanceConfig, memberships: TeamMemberships): + """Initialize the validator with config and memberships.""" + self.config = config + self.memberships = memberships + + def validate(self, pr: PullRequest) -> ValidationResult: + """Validate a Pull Request against the governance rules.""" + # 1. Draft PR Check + if pr.is_draft: + return ValidationResult( + is_mergeable=False, error=ValidationErrorReason.DRAFT_PR + ) + + # 2. Extract Latest Reviews + latest_reviews = self._get_relevant_reviews_by_user(pr.reviews) + + # 3. Proxy Voter Override Check + if self._has_proxy_override(latest_reviews): + return ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.PROXY_OVERRIDE, + ) + + # 4. Empty Changed Files Check + if not pr.changed_files: + return ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.NO_CHANGED_FILES, + ) + + # 5. Rule Matching + applicable_requirements = self._get_applicable_requirements(pr.changed_files) + + # 6. Changes Requested Check + authorized_reviewers = self._get_authorized_reviewers(applicable_requirements) + authorized_and_proxy_reviewers = set(authorized_reviewers) | set( + self.config.proxy_reviewers + ) + if any( + latest_reviews.get(reviewer) == ReviewState.CHANGES_REQUESTED + for reviewer in authorized_and_proxy_reviewers + ): + return ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.CHANGES_REQUESTED, + ) + + # 7. Approvals & Assignments Evaluation (Global) + requirement_statuses = self._evaluate_requirements( + pr, latest_reviews, applicable_requirements + ) + + # 8. File-by-File Evaluation + file_statuses = self._evaluate_file_statuses(pr, latest_reviews) + + # 9. Determine overall mergeability + is_mergeable = not any( + not status.is_satisfied for status in requirement_statuses + ) + + if not is_mergeable: + return ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=requirement_statuses, + file_statuses=file_statuses, + ) + + return ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.RULES_SATISFIED, + requirement_statuses=requirement_statuses, + file_statuses=file_statuses, + ) + + def _evaluate_file_statuses( + self, + pr: PullRequest, + latest_reviews: dict[str, ReviewState], + ) -> list[FileValidationStatus]: + """Evaluates and generates validation statuses for each changed file in the PR.""" + requested_users_set = set(pr.assigned_users) + for team in pr.assigned_teams: + requested_users_set.update( + self.memberships.members_by_team.get(team, set()) + ) + + valid_approvals_set = { + user + for user, state in latest_reviews.items() + if state == ReviewState.APPROVED + and (user != pr.author or pr.author in self.config.proxy_reviewers) + } + + file_statuses = [] + for file in pr.changed_files: + # Find rules matching this specific file + file_rules = [] + for rule in self.config.rules: + if rule.matches(file): + file_rules.append(rule) + + file_requirements = [] + if file_rules: + unique_file_rules = list({r.name: r for r in file_rules}.values()) + for r in unique_file_rules: + file_requirements.extend(r.requires_all) + else: + file_requirements.extend(self.config.fallback) + + # Merge file-specific requirements + file_requirements = self._merge_requirements(file_requirements) + + file_req_statuses = [] + file_satisfied = True + for req in file_requirements: + # Under the Venn model, all valid approvals satisfying req are approvers + approvers = [ + user + for user in valid_approvals_set + if self._is_user_authorized_for_requirement(user, req) + ] + approved_count = len(approvers) + # Count assigned eligible reviewers for this requirement + assigned_count = sum( + 1 + for assigned_user in requested_users_set + if self._is_user_authorized_for_requirement(assigned_user, req) + ) + + # Check if this requirement is satisfied for this file + req_satisfied = approved_count >= req.min_approvals + if not req_satisfied: + file_satisfied = False + + file_req_statuses.append( + RequirementStatus( + requirement=req, + approved_count=approved_count, + assigned_count=assigned_count, + is_satisfied=req_satisfied, + approvers=sorted(approvers), + ) + ) + + file_statuses.append( + FileValidationStatus( + file_path=file, + requirement_statuses=file_req_statuses, + is_satisfied=file_satisfied, + ) + ) + return file_statuses + + def _has_proxy_override(self, latest_reviews: dict[str, ReviewState]) -> bool: + """Check if a proxy reviewer has approved the PR, bypassing rules.""" + return any( + state == ReviewState.APPROVED and user in self.config.proxy_reviewers + for user, state in latest_reviews.items() + ) + + def _get_applicable_requirements( + self, changed_files: list[str] + ) -> list[RuleRequirement]: + """Identify all rule requirements that apply to the PR's changed files. + + If multiple rules match, their requirements are merged by keeping the + maximum min_approvals for each unique target team/level to prevent + redundant approval demands. + """ + matched_rules = [] + has_fallback = False + + for file in changed_files: + matched_any = False + for rule in self.config.rules: + if rule.matches(file): + matched_rules.append(rule) + matched_any = True + if not matched_any: + has_fallback = True + + # De-duplicate matched rules by name, preserving insertion order + unique_rules = list({r.name: r for r in matched_rules}.values()) + + requirements = [] + for r in unique_rules: + requirements.extend(r.requires_all) + if has_fallback: + requirements.extend(self.config.fallback) + + return self._merge_requirements(requirements) + + def _merge_requirements( + self, requirements: list[RuleRequirement] + ) -> list[RuleRequirement]: + """Merge a list of requirements by keeping the maximum min_approvals for each target.""" + merged: dict[tuple[str, str], RuleRequirement] = {} + for req in requirements: + if req.team is not None: + key = ("team", req.team.name) + elif req.min_team is not None: + key = ("min_team", req.min_team.name) + else: + continue + + if key not in merged: + merged[key] = req + else: + existing = merged[key] + if req.min_approvals > existing.min_approvals: + merged[key] = req + + return list(merged.values()) + + def _get_relevant_reviews_by_user( + self, reviews: list[Review] + ) -> dict[str, ReviewState]: + """Resolve the latest review state for each user. + + Skips non-actionable states like COMMENTED. + """ + # Sort reviews by submitted_at (if available) to ensure chronological processing + sorted_reviews = sorted( + reviews, + key=lambda x: ( + x.submitted_at if x.submitted_at is not None else datetime.min + ), + ) + + relevant_reviews: dict[str, ReviewState] = {} + for r in sorted_reviews: + if r.state in ReviewState.actionable_states(): + relevant_reviews[r.user] = r.state + return relevant_reviews + + def _get_user_level(self, user: str) -> int: + """Calculate the user's hierarchy level. + + Uses the max level of any team they belong to. + """ + max_level = 0 + for team, members in self.memberships.members_by_team.items(): + if user in members: + team_info = self.config.teams.get(team) + level = team_info.level if team_info is not None else 0 + if level > max_level: + max_level = level + return max_level + + def _is_user_authorized_for_requirement( + self, user: str, req: RuleRequirement + ) -> bool: + """Check if a user satisfies the team or hierarchical level requirement.""" + if req.team is not None: + team_members = self.memberships.members_by_team.get(req.team.name, set()) + return user in team_members + elif req.min_team is not None: + min_level = req.min_team.level + user_level = self._get_user_level(user) + return user_level >= min_level + return False + + def _get_authorized_reviewers( + self, requirements: list[RuleRequirement] + ) -> set[str]: + """Retrieve users who are authorized to satisfy any of the requirements. + + Gets them from team memberships. + """ + all_users = set() + for members in self.memberships.members_by_team.values(): + all_users.update(members) + + authorized = set() + for user in all_users: + for req in requirements: + if self._is_user_authorized_for_requirement(user, req): + authorized.add(user) + break + return authorized + + def _evaluate_requirements( + self, + pr: PullRequest, + latest_reviews: dict[str, ReviewState], + requirements: list[RuleRequirement], + ) -> list[RequirementStatus]: + """Evaluate each requirement's approvals and assignments count under the Venn Diagram model.""" + # Filter valid approvals (excluding author's self-approvals unless + # they are a proxy) + valid_approvals = { + user + for user, state in latest_reviews.items() + if state == ReviewState.APPROVED + and (user != pr.author or pr.author in self.config.proxy_reviewers) + } + + # Resolve all unique users currently requested to review (individually + # or via team) + requested_users_set = set(pr.assigned_users) + for team in pr.assigned_teams: + requested_users_set.update( + self.memberships.members_by_team.get(team, set()) + ) + + requirement_statuses = [] + for req in requirements: + # Under the Venn model, all valid approvals satisfying req are approvers + approvers = [ + user + for user in valid_approvals + if self._is_user_authorized_for_requirement(user, req) + ] + approved_count = len(approvers) + # Count assigned (requested eligible reviewers) independently + assigned_count = sum( + 1 + for assigned_user in requested_users_set + if self._is_user_authorized_for_requirement(assigned_user, req) + ) + requirement_statuses.append( + RequirementStatus( + requirement=req, + approved_count=approved_count, + assigned_count=assigned_count, + is_satisfied=approved_count >= req.min_approvals, + approvers=sorted(approvers), + ) + ) + + return requirement_statuses + + +def fetch_team_memberships( + g: Github, org_name: str, config: GovernanceConfig +) -> TeamMemberships: + """Fetch team membership lists from GitHub. + + Gets memberships for all teams in the hierarchy. + """ + members_by_team = {} + try: + org = g.get_organization(org_name) + except GithubException as e: + raise RuntimeError(f"Failed to fetch organization '{org_name}': {e}") from e + + for team_slug in config.teams: + try: + team = org.get_team_by_slug(team_slug) + members = {m.login for m in team.get_members()} + members_by_team[team_slug] = members + except GithubException as e: + raise RuntimeError( + f"Could not fetch members for team '{team_slug}': {e}" + ) from e + return TeamMemberships.create(members_by_team=members_by_team) + + +def fetch_pull_request(g: Github, repo_name: str, pr_number: int) -> PullRequest: + """Fetch PR files, reviews, and requested reviewers from GitHub. + + Maps them to the PullRequest dataclass. + """ + try: + repo = g.get_repo(repo_name) + pr = repo.get_pull(pr_number) + + # Fetch changed files + changed_files = [f.filename for f in pr.get_files()] + + # Fetch reviews + reviews = [] + for r in pr.get_reviews(): + # Map string state to ReviewState enum + try: + state = ReviewState(r.state.lower()) if r.state else ReviewState.UNKNOWN + except ValueError: + # Fall back to UNKNOWN for unsupported or unknown states + state = ReviewState.UNKNOWN + reviews.append( + Review.create( + user=r.user.login, state=state, submitted_at=r.submitted_at + ) + ) + + # Fetch review requests (assigned users and teams) + review_requests = pr.get_review_requests() + assigned_users = [u.login for u in review_requests[0]] + assigned_teams = [t.slug for t in review_requests[1]] + + return PullRequest.create( + number=pr.number, + author=pr.user.login, + is_draft=pr.draft, + changed_files=changed_files, + reviews=reviews, + assigned_users=assigned_users, + assigned_teams=assigned_teams, + ) + except GithubException as e: + raise RuntimeError(f"Failed to fetch PR info from GitHub: {e}") from e + + +def main() -> None: + """Run the governance gate check on a pull request.""" + parser = argparse.ArgumentParser( + description=( + "Run governance gate check on a pull request using PullRequestValidator." + ) + ) + parser.add_argument( + "--token", required=True, help="GitHub token for authentication." + ) + parser.add_argument("--org", required=True, help="GitHub Organization name.") + parser.add_argument( + "--repo", + required=True, + help="GitHub Repository name (e.g. 'your-organization/your-repo').", + ) + parser.add_argument("--pr", type=int, required=True, help="Pull Request number.") + parser.add_argument( + "--repo-name", + required=True, + help="The name of the repository (e.g. 'ucp').", + ) + args = parser.parse_args() + + try: + repo_enum = RepoName(args.repo_name) + except ValueError: + print( + f"❌ ERROR: Invalid repository name '{args.repo_name}'. Must be one of: " + f"{[e.value for e in RepoName]}", + file=sys.stderr, + ) + sys.exit(1) + + rules_file = REPO_RULES_MAPPING.get(repo_enum) + if not rules_file: + print( + f"❌ ERROR: No governance rules mapped for repository '{repo_enum.value}'.", + file=sys.stderr, + ) + sys.exit(1) + + try: + # 1. Load config + config = GovernanceConfigParser().parse_file(rules_file) + + # 2. Authenticate and fetch data + auth = Auth.Token(args.token) + g = Github(auth=auth) + + # 3. Fetch team memberships & PR details + memberships = fetch_team_memberships(g, args.org, config) + pr = fetch_pull_request(g, args.repo, args.pr) + + # 4. Validate PR + validator = PullRequestValidator(config, memberships) + result = validator.validate(pr) + + # 5. Output result and format messages + logger = ValidationLogger(result) + logger.write_summary() + if not result.is_mergeable: + sys.exit(1) + sys.exit(0) + + except Exception as e: + error_msg = f"ERROR: {e}" + print(f"❌ {error_msg}", file=sys.stderr) + try: + with open("summary.txt", "w", encoding="utf-8") as f: + f.write(error_msg) + except Exception: + pass + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/org-tools/governance/scripts/validation_logger.py b/org-tools/governance/scripts/validation_logger.py new file mode 100644 index 0000000..f6d9e14 --- /dev/null +++ b/org-tools/governance/scripts/validation_logger.py @@ -0,0 +1,161 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidationLogger for formatting and writing PR validation summaries in Markdown.""" + +from pr_models import ( + MergeableReason, + RequirementStatus, + ValidationErrorReason, + ValidationResult, +) + + +class ValidationLogger: + """Logs the validation result and creates the summary file.""" + + GUIDE_URL = ( + "https://github.com/Universal-Commerce-Protocol/.github/blob/main/" + "org-tools/governance/docs/validation_report.md" + ) + ISSUE_URL = "https://github.com/Universal-Commerce-Protocol/.github/issues" + + def __init__(self, result: ValidationResult): + self.result = result + + def write_summary(self, filename: str = "summary.txt") -> None: + """Writes the summary and detailed result to the specified file.""" + report = self.generate_report() + with open(filename, "w", encoding="utf-8") as f: + f.write(report) + + def generate_report(self) -> str: + """Generates a beautiful Markdown validation report.""" + lines = [] + lines.append("# βš–οΈ Governance Gate Validation Report") + lines.append("") + + # Overall Status Header + if self.result.is_mergeable: + if self.result.mergeable_reason == MergeableReason.NO_CHANGED_FILES: + lines.append("## 🟒 SUCCESS: No changed files in this Pull Request.") + elif self.result.mergeable_reason == MergeableReason.PROXY_OVERRIDE: + lines.append( + "## 🟒 SUCCESS: Emergency override exception approved by Proxy Reviewer." + ) + else: + lines.append("## 🟒 SUCCESS: All governance rules satisfied.") + else: + if self.result.error == ValidationErrorReason.DRAFT_PR: + lines.append("## πŸ”΄ FAILURE: Pull Request is in draft status.") + elif self.result.error == ValidationErrorReason.CHANGES_REQUESTED: + lines.append( + "## πŸ”΄ FAILURE: An authorized reviewer has requested changes." + ) + elif self.result.error == ValidationErrorReason.INSUFFICIENT_APPROVALS: + lines.append("## πŸ”΄ FAILURE: Insufficient approvals.") + else: + lines.append("## πŸ”΄ FAILURE: Unknown validation failure.") + + # 1. Global PR Summary + if self.result.requirement_statuses: + lines.append("---") + lines.append("") + lines.append("### 🌍 1. GLOBAL PR SUMMARY") + lines.append("") + lines.append("Merged requirements across all changed files:") + lines.append("") + for status in self.result.requirement_statuses: + lines.append(self._format_requirement_status(status, indent="")) + lines.append("") + + # 2. File-by-File Breakdown + if self.result.file_statuses: + lines.append("") + lines.append("---") + lines.append("") + lines.append("### πŸ“‚ 2. FILE-BY-FILE BREAKDOWN") + lines.append("") + for f_status in self.result.file_statuses: + status_icon = ( + "🟒 SATISFIED" if f_status.is_satisfied else "πŸ”΄ UNSATISFIED" + ) + lines.append(f"* **File:** `{f_status.file_path}`") + lines.append(f" * **Status:** {status_icon}") + lines.append(" * **Requirements:**") + for status in f_status.requirement_statuses: + formatted_status = self._format_requirement_status( + status, indent=" " + ) + lines.append(formatted_status) + lines.append("") + + # 3. Help Guide Reference + if not self.result.is_mergeable: + lines.append("") + lines.append("---") + lines.append("") + lines.append( + "πŸ’‘ **Need help?** For a detailed explanation of this report and how to resolve " + f"failures, see the [Governance Gate Validation Guide]({self.GUIDE_URL})." + ) + lines.append("") + lines.append( + "If you suspect a bug or issue with this validator script, please " + f"[report an issue in the .github repository]({self.ISSUE_URL})." + ) + + return "\n".join(lines) + + def _format_requirement_status( + self, status: RequirementStatus, indent: str = "" + ) -> str: + """Formats the status details of a single requirement with Markdown styling.""" + req = status.requirement + team_desc = ( + f"team '{req.team.name}'" + if req.team + else f"team '{req.min_team.name}' or higher in the UCP governance hierarchy" + ) + + # A) The requirement description + req_line = f"{indent}* **{req.min_approvals} approval{'s' if req.min_approvals > 1 else ''}** from {team_desc}" + + # B) Which requirements have been met + met_str = "🟒 Yes" if status.is_satisfied else "πŸ”΄ No" + approved_by_suffix = ( + f" - approved by: {', '.join(status.approvers)}" if status.approvers else "" + ) + met_line = ( + f"{indent} * **Met:** {met_str} ({status.approved_count}/{req.min_approvals} " + f"approved{approved_by_suffix})" + ) + + # C) Which still need to be met (Pending if not satisfied, omitted if satisfied) + if status.is_satisfied: + return f"{req_line}\n{met_line}" + + missing = req.min_approvals - status.approved_count + if status.assigned_count >= missing: + hint = f"Waiting for approval from {team_desc}." + else: + needed = missing - status.assigned_count + hint = f"Waiting for {needed} more reviewer(s) to be assigned from {team_desc}." + pending_line = ( + f"{indent} * **Pending:** Needs {missing} approval{'s' if missing > 1 else ''} " + f"from {team_desc} ({status.assigned_count} eligible " + f"reviewer{'s' if status.assigned_count != 1 else ''} assigned). {hint}" + ) + + return f"{req_line}\n{met_line}\n{pending_line}" diff --git a/org-tools/governance/tests/test_governance_config_parser.py b/org-tools/governance/tests/test_governance_config_parser.py new file mode 100644 index 0000000..341548b --- /dev/null +++ b/org-tools/governance/tests/test_governance_config_parser.py @@ -0,0 +1,354 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the governance config parser.""" + +import os +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../scripts")) +) + + +from governance_config_parser import ( + GovernanceConfig, + GovernanceConfigParser, + GovernanceRule, + GovernanceConfigValidator, + RuleRequirement, + Team, +) + + +class TestGovernanceConfigParser(unittest.TestCase): + """Tests for GovernanceConfigParser class.""" + + def test_parse_valid_dict(self): + """Test parsing a valid dictionary.""" + yaml_data = { + "team_hierarchy": { + "devops": 1, + "maintainers": 2, + "tech-council": 3, + }, + "proxy_reviewers": ["proxy-user"], + "fallback": {"requires": [{"min_team": "maintainers", "min_approvals": 1}]}, + "rules": [ + { + "name": "Core Rule", + "patterns": ["source/**/*.py"], + "excluded_patterns": ["source/special/**/*.py"], + "requires": [{"team": "tech-council", "min_approvals": 2}], + } + ], + } + + parser = GovernanceConfigParser() + config = parser._parse(yaml_data) + + # Assert Teams + self.assertEqual( + config.teams, + { + "devops": Team("devops", 1), + "maintainers": Team("maintainers", 2), + "tech-council": Team("tech-council", 3), + }, + ) + + # Assert Proxy Reviewers + self.assertEqual(config.proxy_reviewers, {"proxy-user"}) + + # Assert Fallback + self.assertEqual(len(config.fallback), 1) + self.assertEqual(config.fallback[0].min_approvals, 1) + self.assertEqual(config.fallback[0].min_team, Team("maintainers", 2)) + self.assertIsNone(config.fallback[0].team) + + # Assert Rules + self.assertEqual(len(config.rules), 1) + rule = config.rules[0] + self.assertEqual(rule.name, "Core Rule") + self.assertEqual(rule.patterns, ["source/**/*.py"]) + self.assertEqual(rule.excluded_patterns, ["source/special/**/*.py"]) + self.assertEqual(len(rule.requires_all), 1) + self.assertEqual(rule.requires_all[0].min_approvals, 2) + self.assertEqual(rule.requires_all[0].team, Team("tech-council", 3)) + + def test_parse_file_not_found(self): + """Test parse_file raises FileNotFoundError for missing file.""" + parser = GovernanceConfigParser() + with self.assertRaises(FileNotFoundError): + parser.parse_file("non-existent-file.yml") + + def test_parse_invalid_yaml_syntax(self): + """Test parse_file raises ValueError for invalid YAML.""" + parser = GovernanceConfigParser() + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) / "invalid-config.yml" + temp_path.write_text("invalid: yaml: : syntax", encoding="utf-8") + + with self.assertRaises(ValueError) as ctx: + parser.parse_file(str(temp_path)) + self.assertIn("Failed to parse governance YAML", str(ctx.exception)) + + def test_parse_invalid_root_type(self): + """Test _parse raises ValueError when data is not a map.""" + parser = GovernanceConfigParser() + with self.assertRaises(ValueError) as ctx: + parser._parse(["not", "a", "map"]) + self.assertIn("must be a YAML map", str(ctx.exception)) + + def test_min_approvals_validation(self): + """Test validations for min_approvals values.""" + # 1. min_approvals missing/None + with self.assertRaises(ValueError) as ctx: + RuleRequirement(min_approvals=None, team=Team("devops", 1)) + self.assertIn("must be an integer", str(ctx.exception)) + + # 2. min_approvals negative + with self.assertRaises(ValueError) as ctx: + RuleRequirement(min_approvals=-2, team=Team("devops", 1)) + self.assertIn("must be a positive integer", str(ctx.exception)) + + # 3. min_approvals zero + with self.assertRaises(ValueError) as ctx: + RuleRequirement(min_approvals=0, team=Team("devops", 1)) + self.assertIn("must be a positive integer", str(ctx.exception)) + + def test_rule_requirement_mutual_exclusion(self): + """Test that team and min_team are mutually exclusive.""" + # Specifying both team and min_team + with self.assertRaises(ValueError) as ctx: + RuleRequirement( + min_approvals=1, team=Team("devops", 1), min_team=Team("maintainers", 2) + ) + self.assertIn("specify exactly one of", str(ctx.exception)) + + # Specifying neither + with self.assertRaises(ValueError) as ctx: + RuleRequirement(min_approvals=1, team=None, min_team=None) + self.assertIn("specify exactly one of", str(ctx.exception)) + + def test_missing_team_in_hierarchy(self): + """Test that referenced team must be defined in hierarchy.""" + yaml_data = { + "team_hierarchy": {"devops": 1}, + "rules": [ + { + "name": "Rule with Typo", + "patterns": ["*"], + "requires": [{"team": "devops-typo", "min_approvals": 1}], + } + ], + } + parser = GovernanceConfigParser() + with self.assertRaises(ValueError) as ctx: + parser._parse(yaml_data) + self.assertIn("not defined in team_hierarchy", str(ctx.exception)) + + def test_fallback_type_validation(self): + """Test that fallback requires block must be a list.""" + # Fallback requires is not a list + yaml_data = { + "team_hierarchy": {"devops": 1}, + "fallback": {"requires": "not-a-list"}, + } + parser = GovernanceConfigParser() + with self.assertRaises(ValueError) as ctx: + parser._parse(yaml_data) + self.assertIn("fallback requires must be a list", str(ctx.exception)) + + def test_parse_mixed_case_normalization(self): + """Test that team names in hierarchy, rules, and fallback are normalized to lowercase.""" + yaml_data = { + "team_hierarchy": { + "Tech-Council": 3, + "MAINTAINERS": 2, + }, + "proxy_reviewers": ["proxy-user"], + "fallback": {"requires": [{"min_team": "MAINTAINERS", "min_approvals": 1}]}, + "rules": [ + { + "name": "Core Rule", + "patterns": ["source/**/*.py"], + "requires": [{"team": "Tech-Council", "min_approvals": 2}], + } + ], + } + + parser = GovernanceConfigParser() + config = parser._parse(yaml_data) + + # Verify teams dictionary keys and Team object names are lowercased + self.assertEqual( + config.teams, + { + "tech-council": Team("tech-council", 3), + "maintainers": Team("maintainers", 2), + }, + ) + + # Verify fallback and rules resolved team references are lowercased + self.assertEqual(config.fallback[0].min_team.name, "maintainers") + self.assertEqual(config.rules[0].requires_all[0].team.name, "tech-council") + + +class TestGovernanceConfigValidator(unittest.TestCase): + """Tests for GovernanceConfigValidator class.""" + + def setUp(self): + """Set up test fixtures.""" + self.hierarchy = { + "devops": Team("devops", 1), + "maintainers": Team("maintainers", 2), + } + + @patch("governance_config_parser.GovernanceConfigValidator._get_tracked_files") + def test_validate_overlaps_success(self, mock_get_files): + """Test validate_overlaps returns no errors when rules do not overlap.""" + mock_get_files.return_value = [ + "src/main.py", + "docs/index.md", + "docs/specification/index.md", + ] + + rule_source = GovernanceRule( + name="Source", patterns=["src/**/*.py"], requires_all=[] + ) + rule_docs_general = GovernanceRule( + name="Docs General", + patterns=["docs/**/*.md"], + excluded_patterns=["docs/specification/**/*.md"], + requires_all=[], + ) + rule_docs_spec = GovernanceRule( + name="Docs Spec", patterns=["docs/specification/**/*.md"], requires_all=[] + ) + + config = GovernanceConfig( + teams=self.hierarchy, + rules=[rule_source, rule_docs_general, rule_docs_spec], + fallback=[], + proxy_reviewers=set(), + ) + + validator = GovernanceConfigValidator(config) + errors = validator.validate_overlaps(repo_path="/dummy") + self.assertEqual(len(errors), 0) + + @patch("governance_config_parser.GovernanceConfigValidator._get_tracked_files") + def test_validate_overlaps_failure(self, mock_get_files): + """Test validate_overlaps returns errors when rules overlap.""" + mock_get_files.return_value = ["docs/specification/index.md"] + + rule_docs_general = GovernanceRule( + name="Docs General", patterns=["docs/**/*.md"], requires_all=[] + ) + rule_docs_spec = GovernanceRule( + name="Docs Spec", patterns=["docs/specification/**/*.md"], requires_all=[] + ) + + config = GovernanceConfig( + teams=self.hierarchy, + rules=[rule_docs_general, rule_docs_spec], + fallback=[], + proxy_reviewers=set(), + ) + + validator = GovernanceConfigValidator(config) + errors = validator.validate_overlaps(repo_path="/dummy") + self.assertEqual(len(errors), 1) + self.assertIn("matches multiple rules", errors[0].message) + + def test_rule_pattern_matching_edge_cases(self): + """Tests the custom glob-to-regex engine in GovernanceRule.""" + rule = GovernanceRule( + name="Regex Test", + patterns=["src/**/*.py", "tests/*.py", "docs/file_?.md"], + requires_all=[], + ) + + # Should Match + self.assertTrue(rule.matches("src/main.py")) + self.assertTrue(rule.matches("src/nested/deep/main.py")) + self.assertTrue(rule.matches("tests/test_main.py")) + self.assertTrue(rule.matches("docs/file_1.md")) + self.assertTrue(rule.matches("docs/file_A.md")) + + # Should Not Match + self.assertFalse(rule.matches("src_backup/main.py")) # prefix mismatch + self.assertFalse( + rule.matches("tests/nested/test_main.py") + ) # Single * shouldn't cross directories + self.assertFalse(rule.matches("docs/file_10.md")) # ? is only one character + self.assertFalse(rule.matches("src/main.js")) # wrong extension + + @patch("governance_config_parser.GovernanceConfigValidator._get_tracked_files") + def test_validate_no_fallback_success(self, mock_get_files): + """Test validate_no_fallback returns no errors when all files match a rule.""" + mock_get_files.return_value = [ + "src/main.py", + "docs/index.md", + ] + + rule_source = GovernanceRule( + "Source", patterns=["src/**/*.py"], requires_all=[] + ) + rule_docs = GovernanceRule("Docs", patterns=["docs/**/*.md"], requires_all=[]) + + config = GovernanceConfig( + teams=self.hierarchy, + rules=[rule_source, rule_docs], + fallback=[], + proxy_reviewers=set(), + ) + + validator = GovernanceConfigValidator(config) + errors = validator.validate_no_fallback(repo_path="/dummy") + self.assertEqual(len(errors), 0) + + @patch("governance_config_parser.GovernanceConfigValidator._get_tracked_files") + def test_validate_no_fallback_failure(self, mock_get_files): + """Test validate_no_fallback returns errors for files not matching any rule.""" + mock_get_files.return_value = [ + "src/main.py", + "unmatched_file.txt", + ] + + rule_source = GovernanceRule( + "Source", patterns=["src/**/*.py"], requires_all=[] + ) + + config = GovernanceConfig( + teams=self.hierarchy, + rules=[rule_source], + fallback=[], + proxy_reviewers=set(), + ) + + validator = GovernanceConfigValidator(config) + errors = validator.validate_no_fallback(repo_path="/dummy") + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0].file_path, "unmatched_file.txt") + self.assertIn("does not match any governance rule", errors[0].message) + + +if __name__ == "__main__": + unittest.main() diff --git a/org-tools/governance/tests/test_pr_models.py b/org-tools/governance/tests/test_pr_models.py new file mode 100644 index 0000000..5c5084b --- /dev/null +++ b/org-tools/governance/tests/test_pr_models.py @@ -0,0 +1,83 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the Pull Request dataclasses and models.""" + +import os +import sys +import unittest + +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../scripts")) +) + +from pr_models import ( + PullRequest, + Review, + ReviewState, + TeamMemberships, +) + + +class TestPRModels(unittest.TestCase): + """Tests for the lowercase normalization in PR models.""" + + def test_review_lowercase_normalization(self): + """Test that Review user names are normalized to lowercase.""" + review = Review.create(user="ALICE", state=ReviewState.APPROVED) + self.assertEqual(review.user, "alice") + + def test_pull_request_lowercase_normalization(self): + """Test that PullRequest fields are normalized to lowercase.""" + review_input = Review.create(user="Bob", state=ReviewState.APPROVED) + pr = PullRequest.create( + number=42, + author="CHARLIE", + is_draft=False, + changed_files=["README.md"], + reviews=[review_input], + assigned_users=["Dave", "EVE"], + assigned_teams=["Tech-Council", "ADMINS"], + ) + + expected = PullRequest( + number=42, + author="charlie", + is_draft=False, + changed_files=["README.md"], + reviews=[Review(user="bob", state=ReviewState.APPROVED)], + assigned_users=["dave", "eve"], + assigned_teams=["tech-council", "admins"], + ) + self.assertEqual(expected, pr) + + def test_team_memberships_lowercase_normalization(self): + """Test that TeamMemberships keys and member sets are normalized to lowercase.""" + memberships = TeamMemberships.create( + members_by_team={ + "Tech-Council": {"Alice", "BOB"}, + "ADMINS": {"charlie", "Dave"}, + } + ) + self.assertEqual( + memberships.members_by_team, + { + "tech-council": {"alice", "bob"}, + "admins": {"charlie", "dave"}, + }, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/org-tools/governance/tests/test_pr_validator.py b/org-tools/governance/tests/test_pr_validator.py new file mode 100644 index 0000000..8795b75 --- /dev/null +++ b/org-tools/governance/tests/test_pr_validator.py @@ -0,0 +1,804 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the pull request validator.""" + +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../scripts")) +) + + +# Mock the 'github' module before importing our script +class MockGithubException(Exception): + def __init__(self, status=None, data=None, headers=None): + super().__init__(f"Github error {status}: {data}") + self.status = status + self.data = data + self.headers = headers + + +mock_github = MagicMock() +mock_github.GithubException = MockGithubException +sys.modules["github"] = mock_github + + +from pr_validator import ( # noqa: E402 + PullRequestValidator, + fetch_team_memberships, +) +from pr_models import ( # noqa: E402 + MergeableReason, + PullRequest, + Review, + ReviewState, + TeamMemberships, + ValidationErrorReason, +) +from governance_config_parser import ( # noqa: E402 + GovernanceConfig, + GovernanceConfigParser, + GovernanceRule, + RuleRequirement, + Team, +) + + +class TestPullRequestValidator(unittest.TestCase): + """Tests for TestPullRequestValidator class.""" + + def setUp(self): + """Set up test fixtures.""" + # 1. Setup team hierarchy + self.hierarchy = { + "devops": Team(name="devops", level=1), + "maintainers": Team(name="maintainers", level=2), + "tech-council": Team(name="tech-council", level=3), + "governance-council": Team(name="governance-council", level=4), + } + + # 2. Setup rules + self.rules = [ + GovernanceRule( + name="Governance & Licensing", + patterns=["LICENSE", ".github/CODEOWNERS"], + requires_all=[ + RuleRequirement( + min_approvals=1, team=self.hierarchy["governance-council"] + ) + ], + ), + GovernanceRule( + name="Core Protocol Source", + patterns=["source/**"], + requires_all=[ + RuleRequirement( + min_approvals=1, min_team=self.hierarchy["tech-council"] + ) + ], + ), + ] + + # 3. Setup fallback + self.fallback = [ + RuleRequirement(min_approvals=1, min_team=self.hierarchy["maintainers"]) + ] + + # 4. Setup proxy reviewers + self.proxy_reviewers = {"proxy1", "proxy2"} + + # 5. Setup config + self.config = GovernanceConfig( + teams=self.hierarchy, + rules=self.rules, + fallback=self.fallback, + proxy_reviewers=self.proxy_reviewers, + ) + + # 6. Setup memberships + self.memberships = TeamMemberships( + members_by_team={ + "devops": {"dev1", "dev2"}, + "maintainers": {"maint1", "maint2", "tc-member1"}, + "tech-council": {"tc-member1", "tc-member2"}, + "governance-council": { + "gov-member1", + "gov-member2", + "proxy1", + }, + } + ) + + # 7. Setup validator + self.validator = PullRequestValidator(self.config, self.memberships) + + def test_draft_pr_fails_quickly(self): + """Test that draft pull requests fail validation quickly.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=True, + changed_files=["source/main.py"], + reviews=[], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.DRAFT_PR) + + def test_no_changed_files_passes(self): + """Test that pull requests with no changed files pass validation.""" + pr = PullRequest( + number=1, author="author1", is_draft=False, changed_files=[], reviews=[] + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + self.assertEqual(res.mergeable_reason, MergeableReason.NO_CHANGED_FILES) + + def test_proxy_voter_override_passes(self): + """Test that proxy voter overrides pass validation.""" + # Proxy override bypasses changes requested and missing approvals + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.CHANGES_REQUESTED), + Review(user="proxy1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + self.assertEqual(res.mergeable_reason, MergeableReason.PROXY_OVERRIDE) + + def test_standard_rule_success(self): + """Test that standard rules are successfully validated.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + self.assertEqual(res.mergeable_reason, MergeableReason.RULES_SATISFIED) + + def test_insufficient_approvals_fails(self): + """Test that validation fails if approvals are insufficient.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="dev1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + + def test_fallback_applied_correctly(self): + """Test that fallback rules are applied correctly.""" + # dev1 (level 1) approval should fail for fallback (needs level >= 2) + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["docs/index.md"], + reviews=[ + Review(user="dev1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + + # maint1 (level 2) approval should pass + pr_ok = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["docs/index.md"], + reviews=[ + Review(user="maint1", state=ReviewState.APPROVED), + ], + ) + res_ok = self.validator.validate(pr_ok) + self.assertTrue(res_ok.is_mergeable) + self.assertEqual(res_ok.mergeable_reason, MergeableReason.RULES_SATISFIED) + + def test_venn_diagram_higher_level_counts(self): + """Test that higher level approvals satisfy lower level requirements.""" + # gov-member1 (level 4) satisfies level >= 2 fallback requirement + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["docs/index.md"], + reviews=[ + Review(user="gov-member1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + + def test_specific_team_requirement(self): + """Test validation of specific team requirements.""" + # tc-member1 (level 3) is not in governance-council, should fail + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["LICENSE"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + + # gov-member1 is in governance-council, should pass + pr_ok = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["LICENSE"], + reviews=[ + Review(user="gov-member1", state=ReviewState.APPROVED), + ], + ) + res_ok = self.validator.validate(pr_ok) + self.assertTrue(res_ok.is_mergeable) + + def test_changes_requested_blocks(self): + """Test that changes requested block validation.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="tc-member2", state=ReviewState.CHANGES_REQUESTED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.CHANGES_REQUESTED) + + def test_unauthorized_changes_requested_does_not_block(self): + """Test that unauthorized changes requested do not block validation.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="dev1", state=ReviewState.CHANGES_REQUESTED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + + def test_proxy_changes_requested_blocks(self): + """Test that changes requested by a proxy reviewer block validation.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="proxy2", state=ReviewState.CHANGES_REQUESTED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.CHANGES_REQUESTED) + + def test_self_approval_restrictions(self): + """Test restrictions on self-approval.""" + # Author tc-member1 cannot approve their own PR + pr = PullRequest( + number=1, + author="tc-member1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + + # Proxy voter proxy1 can approve their own PR + pr_proxy = PullRequest( + number=1, + author="proxy1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="proxy1", state=ReviewState.APPROVED), + ], + ) + res_proxy = self.validator.validate(pr_proxy) + self.assertTrue(res_proxy.is_mergeable) + self.assertEqual(res_proxy.mergeable_reason, MergeableReason.PROXY_OVERRIDE) + + def test_dismissed_reviews(self): + """Test that dismissed reviews are handled correctly.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="tc-member1", state=ReviewState.DISMISSED), + ], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + + def test_commented_reviews_ignored(self): + """Test that commented reviews are ignored.""" + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[ + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="tc-member1", state=ReviewState.COMMENTED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + + def test_multi_level_requirement(self): + """Test validation of multi-level requirements.""" + custom_rule = GovernanceRule( + name="Venn Rule", + patterns=["*"], + requires_all=[ + RuleRequirement( + min_approvals=2, min_team=self.hierarchy["governance-council"] + ), + RuleRequirement( + min_approvals=4, min_team=self.hierarchy["maintainers"] + ), + ], + ) + config = GovernanceConfig( + teams=self.hierarchy, + rules=[custom_rule], + fallback=[], + proxy_reviewers=set(), + ) + validator = PullRequestValidator(config, self.memberships) + + # Case A: 2 level-4, 2 level-2 approvals (Total 4) + # Under the Venn Diagram model, the 2 level-4 approvals count towards the level-2 + # requirement as well, satisfying both requirements with 4 total unique approvals. + pr_ok_double_dip = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["file.txt"], + reviews=[ + Review(user="gov-member1", state=ReviewState.APPROVED), + Review(user="gov-member2", state=ReviewState.APPROVED), + Review(user="maint1", state=ReviewState.APPROVED), + Review(user="maint2", state=ReviewState.APPROVED), + ], + ) + res_ok_double_dip = validator.validate(pr_ok_double_dip) + self.assertTrue(res_ok_double_dip.is_mergeable) + + # Case B: 3 level-4 approvals, total only 3 approvals. + # Fails because total approvals (3) is less than the level-2 requirement (4). + pr_fail_too_few = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["file.txt"], + reviews=[ + Review(user="gov-member1", state=ReviewState.APPROVED), + Review(user="gov-member2", state=ReviewState.APPROVED), + Review(user="proxy1", state=ReviewState.APPROVED), + ], + ) + res_fail_too_few = validator.validate(pr_fail_too_few) + self.assertFalse(res_fail_too_few.is_mergeable) + self.assertEqual( + res_fail_too_few.error, ValidationErrorReason.INSUFFICIENT_APPROVALS + ) + + # Case C: 2 level-4, 4 level-2 approvals (Total 6 unique, satisfies both) + # 2 assigned to level-4, 4 assigned to level-2. + pr_ok = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["file.txt"], + reviews=[ + Review(user="gov-member1", state=ReviewState.APPROVED), + Review(user="gov-member2", state=ReviewState.APPROVED), + Review(user="maint1", state=ReviewState.APPROVED), + Review(user="maint2", state=ReviewState.APPROVED), + Review(user="tc-member1", state=ReviewState.APPROVED), + Review(user="tc-member2", state=ReviewState.APPROVED), + ], + ) + res_ok = validator.validate(pr_ok) + self.assertTrue(res_ok.is_mergeable) + + def test_requirement_status_reporting(self): + """Test requirement status reporting in validation result.""" + # We check source/main.py (requires 1 from min_team tech-council) + # We request tc-member2 to review (1 eligible reviewer assigned) + # but have 0 approvals. + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[], + assigned_users=["tc-member2"], + assigned_teams=[], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + self.assertEqual(len(res.requirement_statuses), 1) + + status = res.requirement_statuses[0] + self.assertEqual(status.requirement.min_team, self.hierarchy["tech-council"]) + self.assertEqual(status.requirement.min_approvals, 1) + self.assertEqual(status.approved_count, 0) + self.assertEqual(status.assigned_count, 1) + self.assertFalse(status.is_satisfied) + + def test_governance_config_parser(self): + """Test parsing of governance configuration.""" + yaml_data = { + "team_hierarchy": { + "devops": 1, + "maintainers": 2, + "governance-council": 3, + }, + "proxy_reviewers": ["proxy1"], + "fallback": {"requires": [{"min_team": "maintainers", "min_approvals": 1}]}, + "rules": [ + { + "name": "Licensing", + "patterns": ["LICENSE"], + "requires": [{"team": "governance-council", "min_approvals": 2}], + } + ], + } + + config = GovernanceConfigParser()._parse(yaml_data) + + self.assertEqual( + config.teams, + { + "devops": Team("devops", 1), + "maintainers": Team("maintainers", 2), + "governance-council": Team("governance-council", 3), + }, + ) + self.assertEqual(config.proxy_reviewers, {"proxy1"}) + + self.assertEqual(len(config.fallback), 1) + self.assertEqual(config.fallback[0].min_team, Team("maintainers", 2)) + self.assertEqual(config.fallback[0].min_approvals, 1) + + self.assertEqual(len(config.rules), 1) + rule = config.rules[0] + self.assertEqual(rule.name, "Licensing") + self.assertEqual(rule.patterns, ["LICENSE"]) + self.assertEqual(len(rule.requires_all), 1) + self.assertEqual(rule.requires_all[0].team, Team("governance-council", 3)) + self.assertEqual(rule.requires_all[0].min_approvals, 2) + + def test_wildcard_matching(self): + """Test wildcard pattern matching.""" + # Test 1: Recursive Glob pattern + rule_recursive = GovernanceRule( + name="Recursive", patterns=["source/**/*.py"], requires_all=[] + ) + self.assertTrue(rule_recursive.matches("source/main.py")) + self.assertTrue(rule_recursive.matches("source/a/b/c/main.py")) + self.assertFalse(rule_recursive.matches("other/main.py")) + + # Test 2: Shallow Glob pattern + rule_shallow = GovernanceRule( + name="Shallow", patterns=["source/*.py"], requires_all=[] + ) + self.assertTrue(rule_shallow.matches("source/main.py")) + self.assertFalse(rule_shallow.matches("source/a/main.py")) + self.assertFalse(rule_shallow.matches("main.py")) + + def test_rule_excludes(self): + """Test rule pattern exclusion matching.""" + # Rule A matches source/** but excludes source/special/** + rule = GovernanceRule( + name="Source Rule", + patterns=["source/**"], + requires_all=[ + RuleRequirement( + min_approvals=1, min_team=self.hierarchy["tech-council"] + ) + ], + excluded_patterns=["source/special/**"], + ) + config = GovernanceConfig( + teams=self.hierarchy, + rules=[rule], + fallback=[], + proxy_reviewers=set(), + ) + validator = PullRequestValidator(config, self.memberships) + + # Case A: Matches source/main.py (not excluded) + pr_ok = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[Review(user="tc-member1", state=ReviewState.APPROVED)], + ) + res_ok = validator.validate(pr_ok) + self.assertTrue(res_ok.is_mergeable) + + # Case B: Excluded source/special/helper.py (should use fallback rule) + config_with_fallback = GovernanceConfig( + teams=self.hierarchy, + rules=[rule], + fallback=[ + RuleRequirement(min_approvals=1, min_team=self.hierarchy["maintainers"]) + ], + proxy_reviewers=set(), + ) + validator_with_fallback = PullRequestValidator( + config_with_fallback, self.memberships + ) + + pr_excluded_fail = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/special/helper.py"], + reviews=[ + Review(user="dev1", state=ReviewState.APPROVED) + ], # dev1 (L1) not enough for L2 fallback + ) + res_excluded_fail = validator_with_fallback.validate(pr_excluded_fail) + self.assertFalse(res_excluded_fail.is_mergeable) + self.assertEqual( + res_excluded_fail.error, ValidationErrorReason.INSUFFICIENT_APPROVALS + ) + + def test_merge_requirements_multiple_rules(self): + """Test that requirements from multiple rules are merged by taking the maximum approvals needed.""" + # Rule 1 (devops-rule): needs 2 devops approvals + # Rule 2 (central-rule): needs 1 devops approval and 1 tech-council approval + # Merged requirements should be: 2 devops approvals and 1 tech-council approval. + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + # Changed files matching both rules + changed_files=["source/ops/deploy.sh", "source/central/core.py"], + reviews=[ + # 2 devops approvals + Review(user="dev1", state=ReviewState.APPROVED), + Review(user="dev2", state=ReviewState.APPROVED), + # 1 tech-council approval + Review(user="tc-member1", state=ReviewState.APPROVED), + ], + ) + res = self.validator.validate(pr) + self.assertTrue(res.is_mergeable) + self.assertEqual(res.mergeable_reason, MergeableReason.RULES_SATISFIED) + + +class TestFetchTeamMemberships(unittest.TestCase): + """Tests for fetch_team_memberships function.""" + + def test_fetch_success(self): + """Test fetch_team_memberships successfully fetches team memberships.""" + mock_github = MagicMock() + mock_org = MagicMock() + mock_github.get_organization.return_value = mock_org + + mock_team1 = MagicMock() + mock_member1 = MagicMock() + mock_member1.login = "user1" + mock_member2 = MagicMock() + mock_member2.login = "user2" + mock_team1.get_members.return_value = [mock_member1, mock_member2] + + mock_team2 = MagicMock() + mock_member3 = MagicMock() + mock_member3.login = "user3" + mock_team2.get_members.return_value = [mock_member3] + + def get_team_side_effect(slug): + if slug == "devops": + return mock_team1 + elif slug == "maintainers": + return mock_team2 + raise MockGithubException(status=404, data={"message": "Not Found"}) + + mock_org.get_team_by_slug.side_effect = get_team_side_effect + + config = GovernanceConfig( + teams={ + "devops": Team("devops", 1), + "maintainers": Team("maintainers", 2), + }, + rules=[], + fallback=[], + proxy_reviewers=set(), + ) + + memberships = fetch_team_memberships(mock_github, "my-org", config) + + self.assertEqual( + memberships.members_by_team, + { + "devops": {"user1", "user2"}, + "maintainers": {"user3"}, + }, + ) + mock_github.get_organization.assert_called_once_with("my-org") + + def test_fetch_org_fails(self): + """Test fetch_team_memberships raises RuntimeError when org fetch fails.""" + mock_github = MagicMock() + mock_github.get_organization.side_effect = MockGithubException( + status=404, data={"message": "Org Not Found"} + ) + + config = GovernanceConfig( + teams={"devops": Team("devops", 1)}, + rules=[], + fallback=[], + proxy_reviewers=set(), + ) + + with self.assertRaises(RuntimeError) as ctx: + fetch_team_memberships(mock_github, "my-org", config) + + self.assertIn("Failed to fetch organization 'my-org'", str(ctx.exception)) + + def test_fetch_team_fails(self): + """Test fetch_team_memberships raises RuntimeError when a team fetch fails.""" + mock_github = MagicMock() + mock_org = MagicMock() + mock_github.get_organization.return_value = mock_org + + mock_org.get_team_by_slug.side_effect = MockGithubException( + status=404, data={"message": "Team Not Found"} + ) + + config = GovernanceConfig( + teams={"devops": Team("devops", 1)}, + rules=[], + fallback=[], + proxy_reviewers=set(), + ) + + with self.assertRaises(RuntimeError) as ctx: + fetch_team_memberships(mock_github, "my-org", config) + + self.assertIn("Could not fetch members for team 'devops'", str(ctx.exception)) + + +class TestPRValidatorMain(unittest.TestCase): + """Tests for the main function of pr_validator.""" + + @patch("pr_validator.sys.exit") + @patch("pr_validator.print") + @patch("pr_validator.argparse.ArgumentParser.parse_args") + def test_main_invalid_repo_name(self, mock_parse_args, mock_print, mock_exit): + """Test main fails when repo name is invalid.""" + from pr_validator import main + + mock_args = MagicMock() + mock_args.repo_name = "invalid-repo" + mock_parse_args.return_value = mock_args + mock_exit.side_effect = SystemExit(1) + + with self.assertRaises(SystemExit) as cm: + main() + + self.assertEqual(cm.exception.code, 1) + mock_exit.assert_called_once_with(1) + mock_print.assert_any_call( + "❌ ERROR: Invalid repository name 'invalid-repo'. Must be one of: ['ucp', 'python-sdk']", + file=sys.stderr, + ) + + @patch("pr_validator.sys.exit") + @patch("pr_validator.ValidationLogger") + @patch("pr_validator.PullRequestValidator") + @patch("pr_validator.fetch_pull_request") + @patch("pr_validator.fetch_team_memberships") + @patch("pr_validator.Github") + @patch("pr_validator.Auth.Token") + @patch("pr_validator.GovernanceConfigParser") + @patch("pr_validator.argparse.ArgumentParser.parse_args") + def test_main_valid_repo_name( + self, + mock_parse_args, + mock_parser_class, + mock_token, + mock_github_class, + mock_fetch_members, + mock_fetch_pr, + mock_validator_class, + mock_logger_class, + mock_exit, + ): + """Test main succeeds and parses file with valid repo name.""" + from pr_validator import main, RepoName, REPO_RULES_MAPPING + + mock_args = MagicMock() + mock_args.token = "token" + mock_args.org = "org" + mock_args.repo = "repo" + mock_args.pr = 123 + mock_args.repo_name = "ucp" + mock_parse_args.return_value = mock_args + mock_exit.side_effect = SystemExit(0) + + # Mock parse_file to return a dummy config + mock_parser = MagicMock() + mock_parser_class.return_value = mock_parser + dummy_config = MagicMock() + mock_parser.parse_file.return_value = dummy_config + + # Mock validation result to be mergeable + mock_validator = MagicMock() + mock_validator_class.return_value = mock_validator + mock_result = MagicMock() + mock_result.is_mergeable = True + mock_result.mergeable_reason = None + mock_validator.validate.return_value = mock_result + + with self.assertRaises(SystemExit) as cm: + main() + + self.assertEqual(cm.exception.code, 0) + # Check parse_file was called with the mapped rules file + mock_parser.parse_file.assert_called_once_with(REPO_RULES_MAPPING[RepoName.UCP]) + mock_exit.assert_called_once_with(0) + + +if __name__ == "__main__": + unittest.main() diff --git a/org-tools/governance/tests/test_validation_logger.py b/org-tools/governance/tests/test_validation_logger.py new file mode 100644 index 0000000..88a5e1f --- /dev/null +++ b/org-tools/governance/tests/test_validation_logger.py @@ -0,0 +1,482 @@ +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the ValidationLogger class.""" + +import os +import sys +import unittest +from unittest.mock import patch + +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../scripts")) +) + +from pr_models import ( + FileValidationStatus, + MergeableReason, + RequirementStatus, + ValidationErrorReason, + ValidationResult, +) +from governance_config_parser import RuleRequirement, Team +from validation_logger import ValidationLogger + +HELP_SUFFIX = ( + "\n" + "\n" + "---\n" + "\n" + "πŸ’‘ **Need help?** For a detailed explanation of this report and how to resolve " + "failures, see the [Governance Gate Validation Guide](https://github.com/Universal-Commerce-Protocol/.github/blob/main/org-tools/governance/docs/validation_report.md).\n" + "\n" + "If you suspect a bug or issue with this validator script, please " + "[report an issue in the .github repository](https://github.com/Universal-Commerce-Protocol/.github/issues)." +) + + +class TestValidationLogger(unittest.TestCase): + """Tests for ValidationLogger class.""" + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_mergeable_no_changed_files(self, mock_open): + """Test summary output when PR is mergeable due to no changed files.""" + res = ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.NO_CHANGED_FILES, + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## 🟒 SUCCESS: No changed files in this Pull Request." + ) + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_mergeable_proxy_override(self, mock_open): + """Test summary output when PR is mergeable due to proxy override.""" + res = ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.PROXY_OVERRIDE, + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## 🟒 SUCCESS: Emergency override exception approved by Proxy Reviewer." + ) + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_mergeable_rules_satisfied(self, mock_open): + """Test summary output when PR is mergeable due to rules satisfied.""" + res = ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.RULES_SATISFIED, + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## 🟒 SUCCESS: All governance rules satisfied." + ) + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_error_draft_pr(self, mock_open): + """Test summary output when PR validation fails due to draft status.""" + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.DRAFT_PR, + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Pull Request is in draft status." + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_error_changes_requested(self, mock_open): + """Test summary output when PR validation fails due to changes requested.""" + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.CHANGES_REQUESTED, + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: An authorized reviewer has requested changes." + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_error_insufficient_approvals_summary(self, mock_open): + """Test summary message when PR has insufficient approvals.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + status = RequirementStatus( + requirement=req, + approved_count=1, + assigned_count=0, + is_satisfied=False, + ) + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (1/2 approved)\n" + " * **Pending:** Needs 1 approval from team 'tech-council' (0 eligible reviewers assigned). " + "Waiting for 1 more reviewer(s) to be assigned from team 'tech-council'.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_waiting_for_approval(self, mock_open): + """Test write_summary outputs 'Waiting for approval' when reviewers are assigned.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + status = RequirementStatus( + requirement=req, + approved_count=1, + assigned_count=1, # 1 approved + 1 assigned = 2 (meets min_approvals) + is_satisfied=False, + ) + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (1/2 approved)\n" + " * **Pending:** Needs 1 approval from team 'tech-council' (1 eligible reviewer assigned). " + "Waiting for approval from team 'tech-council'.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_assign_more_reviewers(self, mock_open): + """Test write_summary outputs 'Assign X more reviewers' when not enough are assigned.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + status = RequirementStatus( + requirement=req, + approved_count=0, + assigned_count=1, # 0 approved + 1 assigned = 1 (needs 1 more assigned) + is_satisfied=False, + ) + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (0/2 approved)\n" + " * **Pending:** Needs 2 approvals from team 'tech-council' (1 eligible reviewer assigned). " + "Waiting for 1 more reviewer(s) to be assigned from team 'tech-council'.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_with_approvers(self, mock_open): + """Test write_summary outputs approver details for partially filled slots.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + status = RequirementStatus( + requirement=req, + approved_count=1, + assigned_count=1, + is_satisfied=False, + approvers=["alice"], + ) + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (1/2 approved - approved by: alice)\n" + " * **Pending:** Needs 1 approval from team 'tech-council' (1 eligible reviewer assigned). " + "Waiting for approval from team 'tech-council'.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_satisfied_requirement_omits_pending(self, mock_open): + """Test write_summary omits the pending/unmet line entirely when requirement is satisfied.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + status = RequirementStatus( + requirement=req, + approved_count=2, + assigned_count=0, + is_satisfied=True, + approvers=["alice", "bob"], + ) + res = ValidationResult( + is_mergeable=True, + mergeable_reason=MergeableReason.RULES_SATISFIED, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## 🟒 SUCCESS: All governance rules satisfied.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** 🟒 Yes (2/2 approved - approved by: alice, bob)\n" + ) + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_with_file_statuses(self, mock_open): + """Test write_summary fully formats the file-by-file breakdown section.""" + team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=2, team=team) + + status_satisfied = RequirementStatus( + requirement=req, + approved_count=2, + assigned_count=0, + is_satisfied=True, + approvers=["alice", "bob"], + ) + + status_unsatisfied = RequirementStatus( + requirement=req, + approved_count=0, + assigned_count=1, + is_satisfied=False, + ) + + file_status_ok = FileValidationStatus( + file_path="src/main.py", + requirement_statuses=[status_satisfied], + is_satisfied=True, + ) + + file_status_pending = FileValidationStatus( + file_path="src/auth.py", + requirement_statuses=[status_unsatisfied], + is_satisfied=False, + ) + + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status_unsatisfied], + file_statuses=[file_status_ok, file_status_pending], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (0/2 approved)\n" + " * **Pending:** Needs 2 approvals from team 'tech-council' (1 eligible reviewer assigned). " + "Waiting for 1 more reviewer(s) to be assigned from team 'tech-council'.\n" + "\n" + "\n" + "---\n" + "\n" + "### πŸ“‚ 2. FILE-BY-FILE BREAKDOWN\n" + "\n" + "* **File:** `src/main.py`\n" + " * **Status:** 🟒 SATISFIED\n" + " * **Requirements:**\n" + " * **2 approvals** from team 'tech-council'\n" + " * **Met:** 🟒 Yes (2/2 approved - approved by: alice, bob)\n" + "\n" + "* **File:** `src/auth.py`\n" + " * **Status:** πŸ”΄ UNSATISFIED\n" + " * **Requirements:**\n" + " * **2 approvals** from team 'tech-council'\n" + " * **Met:** πŸ”΄ No (0/2 approved)\n" + " * **Pending:** Needs 2 approvals from team 'tech-council' (1 eligible reviewer assigned). " + "Waiting for 1 more reviewer(s) to be assigned from team 'tech-council'.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + @patch("builtins.open", new_callable=unittest.mock.mock_open) + def test_write_summary_with_hierarchical_clearance(self, mock_open): + """Test write_summary outputs correct terminology for hierarchical clearance rules.""" + min_team = Team(name="tech-council", level=3) + req = RuleRequirement(min_approvals=1, min_team=min_team) + status = RequirementStatus( + requirement=req, + approved_count=0, + assigned_count=0, + is_satisfied=False, + ) + res = ValidationResult( + is_mergeable=False, + error=ValidationErrorReason.INSUFFICIENT_APPROVALS, + requirement_statuses=[status], + ) + logger = ValidationLogger(res) + logger.write_summary("dummy.txt") + + mock_open.assert_called_once_with("dummy.txt", "w", encoding="utf-8") + handle = mock_open() + written_content = "".join(call.args[0] for call in handle.write.call_args_list) + + expected = ( + "# βš–οΈ Governance Gate Validation Report\n" + "\n" + "## πŸ”΄ FAILURE: Insufficient approvals.\n" + "---\n" + "\n" + "### 🌍 1. GLOBAL PR SUMMARY\n" + "\n" + "Merged requirements across all changed files:\n" + "\n" + "* **1 approval** from team 'tech-council' or higher in the UCP governance hierarchy\n" + " * **Met:** πŸ”΄ No (0/1 approved)\n" + " * **Pending:** Needs 1 approval from team 'tech-council' or higher in the UCP governance hierarchy " + "(0 eligible reviewers assigned). Waiting for 1 more reviewer(s) to be assigned from team " + "'tech-council' or higher in the UCP governance hierarchy.\n" + ) + HELP_SUFFIX + self.assertEqual(expected, written_content) + + +if __name__ == "__main__": + unittest.main()