diff --git a/.github/workflows/reusable-governance.yml b/.github/workflows/reusable-governance.yml index 030bcd9..ad40b7c 100644 --- a/.github/workflows/reusable-governance.yml +++ b/.github/workflows/reusable-governance.yml @@ -17,10 +17,10 @@ name: Reusable Governance Gate on: workflow_call: inputs: - repo-name: - required: true + rules-file: + required: false type: string - description: "Path to the governance rules configuration file" + description: "Optional path to the governance rules configuration file (e.g., .github-central/org-tools/governance/rules/python-sdk-rules.yml). If not provided, resolved via --repo mapping." secrets: ORG_READ_TOKEN: required: true @@ -54,12 +54,18 @@ jobs: run: | set +e + RULES_FILE_ARG="" + if [ -n "${{ inputs.rules-file }}" ]; then + RULES_FILE_ARG="--rules-file ${{ inputs.rules-file }}" + fi + uv run .github-central/org-tools/governance/scripts/pr_validator.py \ --token "${{ secrets.ORG_READ_TOKEN }}" \ --org "${{ github.repository_owner }}" \ --repo "${{ github.repository }}" \ --pr "${{ github.event.pull_request.number }}" \ - --repo-name "${{ inputs.repo-name }}" + $RULES_FILE_ARG + EVAL_EXIT_CODE=$? set -e diff --git a/org-tools/governance/README.md b/org-tools/governance/README.md index 7d2827a..86d6674 100644 --- a/org-tools/governance/README.md +++ b/org-tools/governance/README.md @@ -85,7 +85,7 @@ rules: ### 4. `proxy_reviewers` -A list of GitHub usernames allowed to approve PRs on behalf of any required team. +A list of GitHub usernames allowed to approve PRs on behalf of any required team (see [GOVERNANCE.md](../../GOVERNANCE.md#governing-council-gc) for details on proxy voting rights). ```yaml proxy_reviewers: @@ -121,24 +121,24 @@ When evaluating a pull request: ## CLI Usage -Run the validator CLI command in the root of the checked-out repository: +Run the validator CLI command from this directory: ```bash -python3 org-tools/governance/scripts/pr_validator.py \ +python3 scripts/pr_validator.py \ --token "" \ --org "" \ - --repo "" \ + --repo "" \ --pr \ - --repo-name "" + --rules-file "" ``` ### CLI Arguments: - `--token`: Org-level read token for GitHub REST API calls (requires permission to read team memberships). - `--org`: The GitHub organization name. -- `--repo`: The GitHub repository name. +- `--repo`: The full GitHub repository path (e.g. `Universal-Commerce-Protocol/python-sdk`). If `--rules-file` is not provided, the script resolves the rules file by convention: it looks for a file named `-rules.yml` under `org-tools/governance/rules/` (e.g., `python-sdk-rules.yml`). - `--pr`: The PR number to validate. -- `--repo-name`: A lookup key mapping to the rules configuration file (e.g. `python-sdk` maps to `org-tools/governance/rules/python-sdk-rules.yml`). +- `--rules-file`: Optional path to the governance rules YAML file. If provided, it overrides the convention-based path resolution. --- diff --git a/org-tools/governance/scripts/governance_config_parser.py b/org-tools/governance/scripts/governance_config_parser.py index 9cb2505..fc47051 100644 --- a/org-tools/governance/scripts/governance_config_parser.py +++ b/org-tools/governance/scripts/governance_config_parser.py @@ -19,41 +19,10 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Set +from pr_models import Team, RuleRequirement, merge_requirements import yaml -@dataclass(frozen=True) -class Team: - """Represents a review team group in the hierarchy.""" - - name: str - level: int - - @classmethod - def create(cls, name: str, level: int) -> "Team": - # Normalize to lowercase. GitHub team slugs are case-insensitive. - return cls(name=name.lower(), level=level) - - -@dataclass(frozen=True) -class RuleRequirement: - """Represents a requirement for a rule, specifying min approvals and the target Team.""" - - min_approvals: int - team: Optional[Team] = None - min_team: Optional[Team] = None - - def __post_init__(self): - if self.min_approvals is None or not isinstance(self.min_approvals, int): - raise ValueError("min_approvals must be an integer") - if self.min_approvals <= 0: - raise ValueError("min_approvals must be a positive integer") - if (self.team is None) == (self.min_team is None): - raise ValueError( - "RuleRequirement must specify exactly one of 'team' or 'min_team'" - ) - - @dataclass(frozen=True) class GovernanceRule: """Represents a specific rule with name, file patterns, requirements, and exclusions.""" @@ -117,6 +86,30 @@ class GovernanceConfig: fallback: List[RuleRequirement] proxy_reviewers: Set[str] + def _get_requirements_for_file(self, file_path: str) -> List[RuleRequirement]: + """Get the merged requirements that apply to a specific file.""" + matched_rules = [] + for rule in self.rules: + if rule.matches(file_path): + matched_rules.append(rule) + + if not matched_rules: + return self.fallback + + # De-duplicate rules by name, preserving order + unique_rules = list({r.name: r for r in matched_rules}.values()) + requirements = [] + for r in unique_rules: + requirements.extend(r.requires_all) + + return merge_requirements(requirements) + + def get_applicable_requirements( + self, changed_files: List[str] + ) -> Dict[str, List[RuleRequirement]]: + """Identify all rule requirements that apply to each of the PR's changed files.""" + return {file: self._get_requirements_for_file(file) for file in changed_files} + @dataclass class ValidationError: @@ -184,7 +177,7 @@ def _parse_team_hierarchy(self, data: dict) -> Dict[str, Team]: def _parse_proxy_reviewers(self, data: dict) -> Set[str]: """Parses the set of proxy reviewers.""" - return set(data.get("proxy_reviewers", [])) + return {r.lower() for r in data.get("proxy_reviewers", [])} def _parse_fallback( self, data: dict, teams: Dict[str, Team] diff --git a/org-tools/governance/scripts/pr_models.py b/org-tools/governance/scripts/pr_models.py index 28e8185..8cbc595 100644 --- a/org-tools/governance/scripts/pr_models.py +++ b/org-tools/governance/scripts/pr_models.py @@ -18,8 +18,6 @@ from datetime import datetime from enum import Enum -from governance_config_parser import RuleRequirement - class ReviewState(Enum): """Represents the state of a GitHub pull request review.""" @@ -53,6 +51,104 @@ class MergeableReason(Enum): RULES_SATISFIED = "rules_satisfied" +@dataclass(frozen=True) +class Team: + """Represents a review team group in the hierarchy.""" + + name: str + level: int + + @classmethod + def create(cls, name: str, level: int) -> "Team": + # Normalize to lowercase. GitHub team slugs are case-insensitive. + return cls(name=name.lower(), level=level) + + +@dataclass(frozen=True) +class User: + """Represents a user in the context of the governance rules engine.""" + + username: str + teams: set[str] + level: int + + @classmethod + def create(cls, username: str, memberships: "TeamMemberships") -> "User": + """Resolve a username into a rich User domain object.""" + username_lower = username.lower() + user_teams = { + team + for team, members in memberships.members_by_team.items() + if username_lower in members + } + + # Calculate their max hierarchy level + max_level = max((t.level for t in user_teams), default=0) + + # Map Team objects to their string names for the User dataclass + team_names = {t.name for t in user_teams} + return cls(username=username_lower, teams=team_names, level=max_level) + + +class RequirementTargetType(Enum): + """Represents the type of requirement target (exact team vs. hierarchical min team).""" + + TEAM = "team" + MIN_TEAM = "min_team" + + +@dataclass(frozen=True) +class RuleRequirement: + """Represents a requirement for a rule, specifying min approvals and the target Team.""" + + min_approvals: int + team: Team | None = None + min_team: Team | None = None + + def __post_init__(self): + if self.min_approvals is None or not isinstance(self.min_approvals, int): + raise ValueError("min_approvals must be an integer") + if self.min_approvals <= 0: + raise ValueError("min_approvals must be a positive integer") + if (self.team is None and self.min_team is None) or ( + self.team is not None and self.min_team is not None + ): + raise ValueError( + "RuleRequirement must specify exactly one of 'team' or 'min_team'" + ) + + def is_satisfied_by(self, user: User) -> bool: + """Check if a user satisfies this team or hierarchical level requirement.""" + if self.team is not None: + return self.team.name in user.teams + elif self.min_team is not None: + return user.level >= self.min_team.level + return False + + @property + def target_key(self) -> tuple[RequirementTargetType, str]: + """A unique tuple key representing the target of this requirement.""" + if self.team is not None: + return (RequirementTargetType.TEAM, self.team.name) + elif self.min_team is not None: + return (RequirementTargetType.MIN_TEAM, self.min_team.name) + raise ValueError("Invalid RuleRequirement: both team and min_team are None") + + +def merge_requirements(requirements: list[RuleRequirement]) -> list[RuleRequirement]: + """Merge a list of requirements by keeping the maximum min_approvals for each target.""" + merged: dict[tuple[RequirementTargetType, str], RuleRequirement] = {} + for req in requirements: + key = req.target_key + if key not in merged: + merged[key] = req + else: + existing = merged[key] + if req.min_approvals > existing.min_approvals: + merged[key] = req + return list(merged.values()) + + @dataclass(frozen=True) class Review: """Represents a pull request review with a user and state.""" @@ -79,8 +175,8 @@ class PullRequest: is_draft: bool changed_files: list[str] reviews: list[Review] - assigned_users: list[str] = field(default_factory=list) - assigned_teams: list[str] = field(default_factory=list) + assigned_user_names: list[str] = field(default_factory=list) + assigned_team_names: list[str] = field(default_factory=list) @classmethod def create( @@ -90,38 +186,74 @@ def create( is_draft: bool, changed_files: list[str], reviews: list[Review], - assigned_users: list[str] = None, - assigned_teams: list[str] = None, + assigned_user_names: list[str] = None, + assigned_team_names: list[str] = None, ) -> "PullRequest": # Normalize to lowercase. GitHub usernames and team slugs are case-insensitive. # Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name - assigned_u = [u.lower() for u in assigned_users] if assigned_users else [] - assigned_t = [t.lower() for t in assigned_teams] if assigned_teams else [] + assigned_u = ( + [u.lower() for u in assigned_user_names] if assigned_user_names else [] + ) + assigned_t = ( + [t.lower() for t in assigned_team_names] if assigned_team_names else [] + ) return cls( number=number, author=author.lower(), is_draft=is_draft, changed_files=changed_files, reviews=reviews, - assigned_users=assigned_u, - assigned_teams=assigned_t, + assigned_user_names=assigned_u, + assigned_team_names=assigned_t, + ) + + @property + def latest_actionable_reviews_by_username(self) -> dict[str, ReviewState]: + """Resolve the latest actionable review state for each user. + + Skips non-actionable states like COMMENTED. + """ + # Sort reviews by submitted_at (if available) to ensure chronological processing + sorted_reviews = sorted( + self.reviews, + key=lambda x: ( + x.submitted_at if x.submitted_at is not None else datetime.min + ), + ) + + relevant_reviews: dict[str, ReviewState] = {} + for r in sorted_reviews: + if r.state in ReviewState.actionable_states(): + relevant_reviews[r.user] = r.state + return relevant_reviews + + def has_proxy_override(self, proxy_reviewers: set[str]) -> bool: + """Check if a proxy reviewer has approved the PR, bypassing rules.""" + latest = self.latest_actionable_reviews_by_username + return any( + state == ReviewState.APPROVED and user in proxy_reviewers + for user, state in latest.items() ) @dataclass(frozen=True) class TeamMemberships: - """Stores a map of team slugs to their members' GitHub usernames.""" + """Stores a map of Team objects to their members' GitHub usernames.""" - members_by_team: dict[str, set[str]] + members_by_team: dict[Team, set[str]] @classmethod - def create(cls, members_by_team: dict[str, set[str]]) -> "TeamMemberships": + def create( + cls, members_by_team: dict[str, set[str]], teams: dict[str, Team] + ) -> "TeamMemberships": # Normalize to lowercase. GitHub usernames and team slugs are case-insensitive. # Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name - normalized = { - team.lower(): {member.lower() for member in members} - for team, members in members_by_team.items() - } + normalized = {} + for team_slug, members in members_by_team.items(): + team_obj = teams.get(team_slug.lower()) + if team_obj is None: + team_obj = Team.create(name=team_slug, level=0) + normalized[team_obj] = {member.lower() for member in members} return cls(members_by_team=normalized) diff --git a/org-tools/governance/scripts/pr_validator.py b/org-tools/governance/scripts/pr_validator.py index 01ad58e..84d7498 100755 --- a/org-tools/governance/scripts/pr_validator.py +++ b/org-tools/governance/scripts/pr_validator.py @@ -22,8 +22,7 @@ """Pull Request validator using governance rules and team memberships under the Venn Diagram model.""" import argparse -from datetime import datetime -from enum import Enum +import os import sys from github import Auth, Github, GithubException @@ -41,23 +40,91 @@ Review, ReviewState, TeamMemberships, + User, ValidationErrorReason, ValidationResult, + merge_requirements, ) from validation_logger import ValidationLogger -class RepoName(Enum): - """Supported repository names mapping to governance rules.""" +class GitHubClient: + """Handles all interactions with the GitHub API.""" - UCP = "ucp" - PYTHON_SDK = "python-sdk" + def __init__(self, g: Github): + self.g = g + def fetch_team_memberships( + self, org_name: str, config: GovernanceConfig + ) -> TeamMemberships: + """Fetch team membership lists from GitHub. -REPO_RULES_MAPPING = { - RepoName.UCP: ".github-central/org-tools/governance/rules/ucp-rules.yml", - RepoName.PYTHON_SDK: ".github-central/org-tools/governance/rules/python-sdk-rules.yml", -} + Gets memberships for all teams in the hierarchy. + """ + members_by_team = {} + try: + org = self.g.get_organization(org_name) + except GithubException as e: + raise RuntimeError(f"Failed to fetch organization '{org_name}': {e}") from e + + for team_slug in config.teams: + try: + team = org.get_team_by_slug(team_slug) + members = {m.login for m in team.get_members()} + members_by_team[team_slug] = members + except GithubException as e: + raise RuntimeError( + f"Could not fetch members for team '{team_slug}': {e}" + ) from e + return TeamMemberships.create( + members_by_team=members_by_team, teams=config.teams + ) + + def fetch_pull_request(self, repo_name: str, pr_number: int) -> PullRequest: + """Fetch PR files, reviews, and requested reviewers from GitHub. + + Maps them to the PullRequest dataclass. + """ + try: + repo = self.g.get_repo(repo_name) + pr = repo.get_pull(pr_number) + + # Fetch changed files + changed_files = [f.filename for f in pr.get_files()] + + # Fetch reviews + reviews = [] + for r in pr.get_reviews(): + # Map string state to ReviewState enum + try: + state = ( + ReviewState(r.state.lower()) if r.state else ReviewState.UNKNOWN + ) + except ValueError: + # Fall back to UNKNOWN for unsupported or unknown states + state = ReviewState.UNKNOWN + reviews.append( + Review.create( + user=r.user.login, state=state, submitted_at=r.submitted_at + ) + ) + + # Fetch review requests (assigned users and teams) + review_requests = pr.get_review_requests() + assigned_user_names = [u.login for u in review_requests[0]] + assigned_team_names = [t.slug for t in review_requests[1]] + + return PullRequest.create( + number=pr.number, + author=pr.user.login, + is_draft=pr.draft, + changed_files=changed_files, + reviews=reviews, + assigned_user_names=assigned_user_names, + assigned_team_names=assigned_team_names, + ) + except GithubException as e: + raise RuntimeError(f"Failed to fetch PR info from GitHub: {e}") from e class PullRequestValidator: @@ -77,10 +144,10 @@ def validate(self, pr: PullRequest) -> ValidationResult: ) # 2. Extract Latest Reviews - latest_reviews = self._get_relevant_reviews_by_user(pr.reviews) + latest_reviews = pr.latest_actionable_reviews_by_username # 3. Proxy Voter Override Check - if self._has_proxy_override(latest_reviews): + if pr.has_proxy_override(self.config.proxy_reviewers): return ValidationResult( is_mergeable=True, mergeable_reason=MergeableReason.PROXY_OVERRIDE, @@ -94,10 +161,29 @@ def validate(self, pr: PullRequest) -> ValidationResult: ) # 5. Rule Matching - applicable_requirements = self._get_applicable_requirements(pr.changed_files) + requirements_by_file = self.config.get_applicable_requirements(pr.changed_files) + unmerged_requirements = [] + for reqs in requirements_by_file.values(): + unmerged_requirements.extend(reqs) + merged_requirements = merge_requirements(unmerged_requirements) + + # 6. Resolve Reviews and Assignments once + approver_usernames, assigned_usernames = ( + self._get_all_approvers_and_assigned_usernames(pr) + ) - # 6. Changes Requested Check - authorized_reviewers = self._get_authorized_reviewers(applicable_requirements) + # 7. Approvals & Assignments Evaluation (Global) + requirement_statuses = self._evaluate_requirements( + merged_requirements, approver_usernames, assigned_usernames + ) + + # 8. File-by-File Evaluation + file_statuses = self._evaluate_file_statuses( + requirements_by_file, approver_usernames, assigned_usernames + ) + + # 9. Changes Requested Check + authorized_reviewers = self._get_authorized_reviewers(merged_requirements) authorized_and_proxy_reviewers = set(authorized_reviewers) | set( self.config.proxy_reviewers ) @@ -108,17 +194,11 @@ def validate(self, pr: PullRequest) -> ValidationResult: return ValidationResult( is_mergeable=False, error=ValidationErrorReason.CHANGES_REQUESTED, + requirement_statuses=requirement_statuses, + file_statuses=file_statuses, ) - # 7. Approvals & Assignments Evaluation (Global) - requirement_statuses = self._evaluate_requirements( - pr, latest_reviews, applicable_requirements - ) - - # 8. File-by-File Evaluation - file_statuses = self._evaluate_file_statuses(pr, latest_reviews) - - # 9. Determine overall mergeability + # 10. Determine overall mergeability is_mergeable = not any( not status.is_satisfied for status in requirement_statuses ) @@ -138,337 +218,108 @@ def validate(self, pr: PullRequest) -> ValidationResult: file_statuses=file_statuses, ) - def _evaluate_file_statuses( + def _evaluate_requirement( self, - pr: PullRequest, - latest_reviews: dict[str, ReviewState], - ) -> list[FileValidationStatus]: - """Evaluates and generates validation statuses for each changed file in the PR.""" - requested_users_set = set(pr.assigned_users) - for team in pr.assigned_teams: - requested_users_set.update( - self.memberships.members_by_team.get(team, set()) - ) - - valid_approvals_set = { - user - for user, state in latest_reviews.items() - if state == ReviewState.APPROVED - and (user != pr.author or pr.author in self.config.proxy_reviewers) - } - - file_statuses = [] - for file in pr.changed_files: - # Find rules matching this specific file - file_rules = [] - for rule in self.config.rules: - if rule.matches(file): - file_rules.append(rule) - - file_requirements = [] - if file_rules: - unique_file_rules = list({r.name: r for r in file_rules}.values()) - for r in unique_file_rules: - file_requirements.extend(r.requires_all) - else: - file_requirements.extend(self.config.fallback) - - # Merge file-specific requirements - file_requirements = self._merge_requirements(file_requirements) - - file_req_statuses = [] - file_satisfied = True - for req in file_requirements: - # Under the Venn model, all valid approvals satisfying req are approvers - approvers = [ - user - for user in valid_approvals_set - if self._is_user_authorized_for_requirement(user, req) - ] - approved_count = len(approvers) - # Count assigned eligible reviewers for this requirement - assigned_count = sum( - 1 - for assigned_user in requested_users_set - if self._is_user_authorized_for_requirement(assigned_user, req) - ) - - # Check if this requirement is satisfied for this file - req_satisfied = approved_count >= req.min_approvals - if not req_satisfied: - file_satisfied = False - - file_req_statuses.append( - RequirementStatus( - requirement=req, - approved_count=approved_count, - assigned_count=assigned_count, - is_satisfied=req_satisfied, - approvers=sorted(approvers), - ) - ) - - file_statuses.append( - FileValidationStatus( - file_path=file, - requirement_statuses=file_req_statuses, - is_satisfied=file_satisfied, - ) - ) - return file_statuses - - def _has_proxy_override(self, latest_reviews: dict[str, ReviewState]) -> bool: - """Check if a proxy reviewer has approved the PR, bypassing rules.""" - return any( - state == ReviewState.APPROVED and user in self.config.proxy_reviewers - for user, state in latest_reviews.items() - ) - - def _get_applicable_requirements( - self, changed_files: list[str] - ) -> list[RuleRequirement]: - """Identify all rule requirements that apply to the PR's changed files. - - If multiple rules match, their requirements are merged by keeping the - maximum min_approvals for each unique target team/level to prevent - redundant approval demands. - """ - matched_rules = [] - has_fallback = False - - for file in changed_files: - matched_any = False - for rule in self.config.rules: - if rule.matches(file): - matched_rules.append(rule) - matched_any = True - if not matched_any: - has_fallback = True - - # De-duplicate matched rules by name, preserving insertion order - unique_rules = list({r.name: r for r in matched_rules}.values()) - - requirements = [] - for r in unique_rules: - requirements.extend(r.requires_all) - if has_fallback: - requirements.extend(self.config.fallback) - - return self._merge_requirements(requirements) - - def _merge_requirements( - self, requirements: list[RuleRequirement] - ) -> list[RuleRequirement]: - """Merge a list of requirements by keeping the maximum min_approvals for each target.""" - merged: dict[tuple[str, str], RuleRequirement] = {} - for req in requirements: - if req.team is not None: - key = ("team", req.team.name) - elif req.min_team is not None: - key = ("min_team", req.min_team.name) - else: - continue - - if key not in merged: - merged[key] = req - else: - existing = merged[key] - if req.min_approvals > existing.min_approvals: - merged[key] = req - - return list(merged.values()) - - def _get_relevant_reviews_by_user( - self, reviews: list[Review] - ) -> dict[str, ReviewState]: - """Resolve the latest review state for each user. - - Skips non-actionable states like COMMENTED. - """ - # Sort reviews by submitted_at (if available) to ensure chronological processing - sorted_reviews = sorted( - reviews, - key=lambda x: ( - x.submitted_at if x.submitted_at is not None else datetime.min - ), + req: RuleRequirement, + approver_usernames: set[str], + requested_users_set: set[str], + ) -> RequirementStatus: + """Calculate and return status for a requirement under the Venn Diagram model.""" + approver_users = [User.create(u, self.memberships) for u in approver_usernames] + assigned_users = [User.create(u, self.memberships) for u in requested_users_set] + + approvers = [u.username for u in approver_users if req.is_satisfied_by(u)] + assigned_count = sum(1 for u in assigned_users if req.is_satisfied_by(u)) + approved_count = len(approvers) + return RequirementStatus( + requirement=req, + approved_count=approved_count, + assigned_count=assigned_count, + is_satisfied=approved_count >= req.min_approvals, + approvers=sorted(approvers), ) - relevant_reviews: dict[str, ReviewState] = {} - for r in sorted_reviews: - if r.state in ReviewState.actionable_states(): - relevant_reviews[r.user] = r.state - return relevant_reviews - - def _get_user_level(self, user: str) -> int: - """Calculate the user's hierarchy level. - - Uses the max level of any team they belong to. - """ - max_level = 0 - for team, members in self.memberships.members_by_team.items(): - if user in members: - team_info = self.config.teams.get(team) - level = team_info.level if team_info is not None else 0 - if level > max_level: - max_level = level - return max_level - - def _is_user_authorized_for_requirement( - self, user: str, req: RuleRequirement - ) -> bool: - """Check if a user satisfies the team or hierarchical level requirement.""" - if req.team is not None: - team_members = self.memberships.members_by_team.get(req.team.name, set()) - return user in team_members - elif req.min_team is not None: - min_level = req.min_team.level - user_level = self._get_user_level(user) - return user_level >= min_level - return False - - def _get_authorized_reviewers( - self, requirements: list[RuleRequirement] - ) -> set[str]: - """Retrieve users who are authorized to satisfy any of the requirements. - - Gets them from team memberships. - """ - all_users = set() - for members in self.memberships.members_by_team.values(): - all_users.update(members) - - authorized = set() - for user in all_users: - for req in requirements: - if self._is_user_authorized_for_requirement(user, req): - authorized.add(user) - break - return authorized - def _evaluate_requirements( self, - pr: PullRequest, - latest_reviews: dict[str, ReviewState], requirements: list[RuleRequirement], + approver_usernames: set[str], + assigned_usernames: set[str], ) -> list[RequirementStatus]: """Evaluate each requirement's approvals and assignments count under the Venn Diagram model.""" - # Filter valid approvals (excluding author's self-approvals unless - # they are a proxy) - valid_approvals = { - user - for user, state in latest_reviews.items() - if state == ReviewState.APPROVED - and (user != pr.author or pr.author in self.config.proxy_reviewers) - } - - # Resolve all unique users currently requested to review (individually - # or via team) - requested_users_set = set(pr.assigned_users) - for team in pr.assigned_teams: - requested_users_set.update( - self.memberships.members_by_team.get(team, set()) - ) - requirement_statuses = [] for req in requirements: - # Under the Venn model, all valid approvals satisfying req are approvers - approvers = [ - user - for user in valid_approvals - if self._is_user_authorized_for_requirement(user, req) - ] - approved_count = len(approvers) - # Count assigned (requested eligible reviewers) independently - assigned_count = sum( - 1 - for assigned_user in requested_users_set - if self._is_user_authorized_for_requirement(assigned_user, req) - ) - requirement_statuses.append( - RequirementStatus( - requirement=req, - approved_count=approved_count, - assigned_count=assigned_count, - is_satisfied=approved_count >= req.min_approvals, - approvers=sorted(approvers), - ) + status = self._evaluate_requirement( + req, approver_usernames, assigned_usernames ) + requirement_statuses.append(status) return requirement_statuses + def _get_authorized_reviewers( + self, requirements: list[RuleRequirement] + ) -> set[str]: + """Retrieve users who are authorized to satisfy any of the requirements.""" + all_users = { + user + for members in self.memberships.members_by_team.values() + for user in members + } -def fetch_team_memberships( - g: Github, org_name: str, config: GovernanceConfig -) -> TeamMemberships: - """Fetch team membership lists from GitHub. - - Gets memberships for all teams in the hierarchy. - """ - members_by_team = {} - try: - org = g.get_organization(org_name) - except GithubException as e: - raise RuntimeError(f"Failed to fetch organization '{org_name}': {e}") from e - - for team_slug in config.teams: - try: - team = org.get_team_by_slug(team_slug) - members = {m.login for m in team.get_members()} - members_by_team[team_slug] = members - except GithubException as e: - raise RuntimeError( - f"Could not fetch members for team '{team_slug}': {e}" - ) from e - return TeamMemberships.create(members_by_team=members_by_team) - + authorized = set() + for username in all_users: + user = User.create(username, self.memberships) + for req in requirements: + if req.is_satisfied_by(user): + authorized.add(username) + break + return authorized -def fetch_pull_request(g: Github, repo_name: str, pr_number: int) -> PullRequest: - """Fetch PR files, reviews, and requested reviewers from GitHub. + def _get_all_approvers_and_assigned_usernames( + self, pr: PullRequest + ) -> tuple[set[str], set[str]]: + """Resolve valid approvals set and requested reviewers set for a PR.""" + assigned = set(pr.assigned_user_names) + for team in pr.assigned_team_names: + if team_obj := self.config.teams.get(team): + assigned.update(self.memberships.members_by_team.get(team_obj, set())) + + approvers = set() + for user, state in pr.latest_actionable_reviews_by_username.items(): + if state == ReviewState.APPROVED: + if user != pr.author: + approvers.add(user) + else: + assigned.add(user) - Maps them to the PullRequest dataclass. - """ - try: - repo = g.get_repo(repo_name) - pr = repo.get_pull(pr_number) + assigned.difference_update(approvers) + return approvers, assigned - # Fetch changed files - changed_files = [f.filename for f in pr.get_files()] + def _evaluate_file_statuses( + self, + requirements_by_file: dict[str, list[RuleRequirement]], + approver_usernames: set[str], + assigned_usernames: set[str], + ) -> list[FileValidationStatus]: + """Evaluates and generates validation statuses for each changed file in the PR.""" + file_statuses = [] + for file, file_requirements in requirements_by_file.items(): + file_req_statuses = self._evaluate_requirements( + file_requirements, approver_usernames, assigned_usernames + ) + file_satisfied = all(status.is_satisfied for status in file_req_statuses) - # Fetch reviews - reviews = [] - for r in pr.get_reviews(): - # Map string state to ReviewState enum - try: - state = ReviewState(r.state.lower()) if r.state else ReviewState.UNKNOWN - except ValueError: - # Fall back to UNKNOWN for unsupported or unknown states - state = ReviewState.UNKNOWN - reviews.append( - Review.create( - user=r.user.login, state=state, submitted_at=r.submitted_at + file_statuses.append( + FileValidationStatus( + file_path=file, + requirement_statuses=file_req_statuses, + is_satisfied=file_satisfied, ) ) - - # Fetch review requests (assigned users and teams) - review_requests = pr.get_review_requests() - assigned_users = [u.login for u in review_requests[0]] - assigned_teams = [t.slug for t in review_requests[1]] - - return PullRequest.create( - number=pr.number, - author=pr.user.login, - is_draft=pr.draft, - changed_files=changed_files, - reviews=reviews, - assigned_users=assigned_users, - assigned_teams=assigned_teams, - ) - except GithubException as e: - raise RuntimeError(f"Failed to fetch PR info from GitHub: {e}") from e + return file_statuses -def main() -> None: - """Run the governance gate check on a pull request.""" +def parse_args(args: list[str] | None = None) -> argparse.Namespace: + """Parse command line arguments.""" parser = argparse.ArgumentParser( description=( "Run governance gate check on a pull request using PullRequestValidator." @@ -485,45 +336,51 @@ def main() -> None: ) parser.add_argument("--pr", type=int, required=True, help="Pull Request number.") parser.add_argument( - "--repo-name", - required=True, - help="The name of the repository (e.g. 'ucp').", + "--rules-file", + help="Path to the governance rules YAML file. If not provided, resolved via --repo mapping.", ) - args = parser.parse_args() - - try: - repo_enum = RepoName(args.repo_name) - except ValueError: - print( - f"❌ ERROR: Invalid repository name '{args.repo_name}'. Must be one of: " - f"{[e.value for e in RepoName]}", - file=sys.stderr, + return parser.parse_args(args) + + +def run_validation(args: argparse.Namespace) -> ValidationResult: + """Execute the core validation flow.""" + if args.rules_file: + rules_file = args.rules_file + else: + # Resolve rules file path by convention: rules/-rules.yml + repo_suffix = args.repo.split("/")[-1] + rules_file = ( + f".github-central/org-tools/governance/rules/{repo_suffix}-rules.yml" ) - sys.exit(1) - rules_file = REPO_RULES_MAPPING.get(repo_enum) - if not rules_file: - print( - f"❌ ERROR: No governance rules mapped for repository '{repo_enum.value}'.", - file=sys.stderr, + if not os.path.exists(rules_file): + raise FileNotFoundError( + f"Governance rules file not found at '{rules_file}'. " + f"Please ensure it exists or specify a custom path using --rules-file." ) - sys.exit(1) - try: - # 1. Load config - config = GovernanceConfigParser().parse_file(rules_file) + # 1. Load config + config = GovernanceConfigParser().parse_file(rules_file) + + # 2. Authenticate and fetch data + auth = Auth.Token(args.token) + g = Github(auth=auth) + github_client = GitHubClient(g) - # 2. Authenticate and fetch data - auth = Auth.Token(args.token) - g = Github(auth=auth) + # 3. Fetch team memberships & PR details + memberships = github_client.fetch_team_memberships(args.org, config) + pr = github_client.fetch_pull_request(args.repo, args.pr) - # 3. Fetch team memberships & PR details - memberships = fetch_team_memberships(g, args.org, config) - pr = fetch_pull_request(g, args.repo, args.pr) + # 4. Validate PR + validator = PullRequestValidator(config, memberships) + return validator.validate(pr) - # 4. Validate PR - validator = PullRequestValidator(config, memberships) - result = validator.validate(pr) + +def main() -> None: + """Run the governance gate check on a pull request.""" + try: + args = parse_args() + result = run_validation(args) # 5. Output result and format messages logger = ValidationLogger(result) diff --git a/org-tools/governance/tests/test_pr_models.py b/org-tools/governance/tests/test_pr_models.py index 5c5084b..7e54a2e 100644 --- a/org-tools/governance/tests/test_pr_models.py +++ b/org-tools/governance/tests/test_pr_models.py @@ -27,6 +27,11 @@ Review, ReviewState, TeamMemberships, + RuleRequirement, + Team, + RequirementTargetType, + User, + merge_requirements, ) @@ -47,8 +52,8 @@ def test_pull_request_lowercase_normalization(self): is_draft=False, changed_files=["README.md"], reviews=[review_input], - assigned_users=["Dave", "EVE"], - assigned_teams=["Tech-Council", "ADMINS"], + assigned_user_names=["Dave", "EVE"], + assigned_team_names=["Tech-Council", "ADMINS"], ) expected = PullRequest( @@ -57,8 +62,8 @@ def test_pull_request_lowercase_normalization(self): is_draft=False, changed_files=["README.md"], reviews=[Review(user="bob", state=ReviewState.APPROVED)], - assigned_users=["dave", "eve"], - assigned_teams=["tech-council", "admins"], + assigned_user_names=["dave", "eve"], + assigned_team_names=["tech-council", "admins"], ) self.assertEqual(expected, pr) @@ -68,16 +73,73 @@ def test_team_memberships_lowercase_normalization(self): members_by_team={ "Tech-Council": {"Alice", "BOB"}, "ADMINS": {"charlie", "Dave"}, - } + }, + teams={ + "tech-council": Team.create("tech-council", 3), + "admins": Team.create("admins", 2), + }, ) self.assertEqual( memberships.members_by_team, { + Team.create("tech-council", 3): {"alice", "bob"}, + Team.create("admins", 2): {"charlie", "dave"}, + }, + ) + + def test_user_create_factory(self): + """Test that User.create resolves teams, level, and normalizes username.""" + teams = { + "tech-council": Team.create("tech-council", 3), + "admins": Team.create("admins", 2), + } + memberships = TeamMemberships.create( + members_by_team={ "tech-council": {"alice", "bob"}, - "admins": {"charlie", "dave"}, + "admins": {"bob", "charlie"}, }, + teams=teams, ) + # User in multiple teams, max level should be the highest (3) + user_bob = User.create("BOB", memberships) + self.assertEqual(user_bob.username, "bob") + self.assertEqual(user_bob.teams, {"tech-council", "admins"}) + self.assertEqual(user_bob.level, 3) + + # User in one team + user_alice = User.create("Alice", memberships) + self.assertEqual(user_alice.username, "alice") + self.assertEqual(user_alice.teams, {"tech-council"}) + self.assertEqual(user_alice.level, 3) + + # User in no teams + user_dave = User.create("dave", memberships) + self.assertEqual(user_dave.username, "dave") + self.assertEqual(user_dave.teams, set()) + self.assertEqual(user_dave.level, 0) + + def test_merge_requirements(self): + """Test that merge_requirements correctly merges requirements.""" + team_devops = Team.create(name="devops", level=1) + team_admin = Team.create(name="admins", level=2) + + req1 = RuleRequirement(min_approvals=1, team=team_devops) + req2 = RuleRequirement(min_approvals=3, team=team_devops) + req3 = RuleRequirement(min_approvals=2, team=team_admin) + + merged = merge_requirements([req1, req2, req3]) + + self.assertEqual(len(merged), 2) + + devops_req = next(r for r in merged if r.team == team_devops) + admin_req = next(r for r in merged if r.team == team_admin) + + self.assertEqual(devops_req.min_approvals, 3) + self.assertEqual(admin_req.min_approvals, 2) + self.assertEqual(devops_req.target_key, (RequirementTargetType.TEAM, "devops")) + self.assertEqual(admin_req.target_key, (RequirementTargetType.TEAM, "admins")) + if __name__ == "__main__": unittest.main() diff --git a/org-tools/governance/tests/test_pr_validator.py b/org-tools/governance/tests/test_pr_validator.py index 8795b75..cd5ef1e 100644 --- a/org-tools/governance/tests/test_pr_validator.py +++ b/org-tools/governance/tests/test_pr_validator.py @@ -40,7 +40,8 @@ def __init__(self, status=None, data=None, headers=None): from pr_validator import ( # noqa: E402 PullRequestValidator, - fetch_team_memberships, + GitHubClient, + main, ) from pr_models import ( # noqa: E402 MergeableReason, @@ -111,7 +112,7 @@ def setUp(self): ) # 6. Setup memberships - self.memberships = TeamMemberships( + self.memberships = TeamMemberships.create( members_by_team={ "devops": {"dev1", "dev2"}, "maintainers": {"maint1", "maint2", "tc-member1"}, @@ -121,7 +122,8 @@ def setUp(self): "gov-member2", "proxy1", }, - } + }, + teams=self.hierarchy, ) # 7. Setup validator @@ -285,6 +287,8 @@ def test_changes_requested_blocks(self): res = self.validator.validate(pr) self.assertFalse(res.is_mergeable) self.assertEqual(res.error, ValidationErrorReason.CHANGES_REQUESTED) + self.assertEqual(len(res.requirement_statuses), 1) + self.assertEqual(len(res.file_statuses), 1) def test_unauthorized_changes_requested_does_not_block(self): """Test that unauthorized changes requested do not block validation.""" @@ -316,6 +320,8 @@ def test_proxy_changes_requested_blocks(self): res = self.validator.validate(pr) self.assertFalse(res.is_mergeable) self.assertEqual(res.error, ValidationErrorReason.CHANGES_REQUESTED) + self.assertEqual(len(res.requirement_statuses), 1) + self.assertEqual(len(res.file_statuses), 1) def test_self_approval_restrictions(self): """Test restrictions on self-approval.""" @@ -333,20 +339,6 @@ def test_self_approval_restrictions(self): self.assertFalse(res.is_mergeable) self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) - # Proxy voter proxy1 can approve their own PR - pr_proxy = PullRequest( - number=1, - author="proxy1", - is_draft=False, - changed_files=["source/main.py"], - reviews=[ - Review(user="proxy1", state=ReviewState.APPROVED), - ], - ) - res_proxy = self.validator.validate(pr_proxy) - self.assertTrue(res_proxy.is_mergeable) - self.assertEqual(res_proxy.mergeable_reason, MergeableReason.PROXY_OVERRIDE) - def test_dismissed_reviews(self): """Test that dismissed reviews are handled correctly.""" pr = PullRequest( @@ -466,8 +458,8 @@ def test_requirement_status_reporting(self): is_draft=False, changed_files=["source/main.py"], reviews=[], - assigned_users=["tc-member2"], - assigned_teams=[], + assigned_user_names=["tc-member2"], + assigned_team_names=[], ) res = self.validator.validate(pr) self.assertFalse(res.is_mergeable) @@ -481,6 +473,42 @@ def test_requirement_status_reporting(self): self.assertEqual(status.assigned_count, 1) self.assertFalse(status.is_satisfied) + def test_assigned_team_expansion_reporting(self): + """Test that assigned teams are expanded to their eligible members in status reporting.""" + # We check source/main.py (requires 1 from min_team tech-council) + # We assign the tech-council team to review. + # Members of tech-council are tc-member1 and tc-member2. + # Both are eligible reviewers. + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["source/main.py"], + reviews=[], + assigned_user_names=[], + assigned_team_names=["tech-council"], + ) + res = self.validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.INSUFFICIENT_APPROVALS) + self.assertEqual(len(res.requirement_statuses), 1) + + status = res.requirement_statuses[0] + self.assertEqual(status.requirement.min_team, self.hierarchy["tech-council"]) + self.assertEqual(status.requirement.min_approvals, 1) + self.assertEqual(status.approved_count, 0) + # assigned_count should be 2 because tech-council has 2 members (tc-member1, tc-member2) + # and both satisfy the min_team hierarchy requirement. + self.assertEqual(status.assigned_count, 2) + self.assertFalse(status.is_satisfied) + + # Directly verify the internal helper resolves the specific usernames + approvers, assigned = self.validator._get_all_approvers_and_assigned_usernames( + pr + ) + self.assertEqual(approvers, set()) + self.assertEqual(assigned, {"tc-member1", "tc-member2"}) + def test_governance_config_parser(self): """Test parsing of governance configuration.""" yaml_data = { @@ -625,9 +653,68 @@ def test_merge_requirements_multiple_rules(self): self.assertTrue(res.is_mergeable) self.assertEqual(res.mergeable_reason, MergeableReason.RULES_SATISFIED) + def test_one_approved_one_changes_requested_assigned_count(self): + """Test that a user who has requested changes counts towards assigned_count, while an approver does not.""" + # 1. Setup a custom hierarchy and config requiring 2 approvals + hierarchy = { + "maintainers": Team(name="maintainers", level=2), + } + rules = [ + GovernanceRule( + name="Test Rule", + patterns=["*"], + requires_all=[ + RuleRequirement(min_approvals=2, min_team=hierarchy["maintainers"]) + ], + ) + ] + config = GovernanceConfig( + teams=hierarchy, + rules=rules, + fallback=[], + proxy_reviewers=set(), + ) + memberships = TeamMemberships.create( + members_by_team={ + "maintainers": {"maint1", "maint2", "maint3"}, + }, + teams=hierarchy, + ) + validator = PullRequestValidator(config, memberships) + + # PR where: + # - maint1 has approved + # - maint2 has requested changes + # both are in 'maintainers'. + pr = PullRequest( + number=1, + author="author1", + is_draft=False, + changed_files=["file.txt"], + reviews=[ + Review(user="maint1", state=ReviewState.APPROVED), + Review(user="maint2", state=ReviewState.CHANGES_REQUESTED), + ], + ) + + res = validator.validate(pr) + self.assertFalse(res.is_mergeable) + self.assertEqual(res.error, ValidationErrorReason.CHANGES_REQUESTED) + + self.assertEqual(len(res.requirement_statuses), 1) + status = res.requirement_statuses[0] + self.assertEqual(status.requirement.min_team, hierarchy["maintainers"]) + self.assertEqual(status.requirement.min_approvals, 2) + self.assertEqual(status.approved_count, 1) + self.assertEqual( + status.assigned_count, 1 + ) # Only maint2 is assigned/active and not approved + self.assertFalse(status.is_satisfied) + self.assertEqual(status.approvers, ["maint1"]) + class TestFetchTeamMemberships(unittest.TestCase): - """Tests for fetch_team_memberships function.""" + """Tests for GitHubClient.fetch_team_memberships method.""" def test_fetch_success(self): """Test fetch_team_memberships successfully fetches team memberships.""" @@ -666,13 +753,14 @@ def get_team_side_effect(slug): proxy_reviewers=set(), ) - memberships = fetch_team_memberships(mock_github, "my-org", config) + github_client = GitHubClient(mock_github) + memberships = github_client.fetch_team_memberships("my-org", config) self.assertEqual( memberships.members_by_team, { - "devops": {"user1", "user2"}, - "maintainers": {"user3"}, + Team("devops", 1): {"user1", "user2"}, + Team("maintainers", 2): {"user3"}, }, ) mock_github.get_organization.assert_called_once_with("my-org") @@ -691,8 +779,9 @@ def test_fetch_org_fails(self): proxy_reviewers=set(), ) + github_client = GitHubClient(mock_github) with self.assertRaises(RuntimeError) as ctx: - fetch_team_memberships(mock_github, "my-org", config) + github_client.fetch_team_memberships("my-org", config) self.assertIn("Failed to fetch organization 'my-org'", str(ctx.exception)) @@ -713,8 +802,9 @@ def test_fetch_team_fails(self): proxy_reviewers=set(), ) + github_client = GitHubClient(mock_github) with self.assertRaises(RuntimeError) as ctx: - fetch_team_memberships(mock_github, "my-org", config) + github_client.fetch_team_memberships("my-org", config) self.assertIn("Could not fetch members for team 'devops'", str(ctx.exception)) @@ -724,57 +814,117 @@ class TestPRValidatorMain(unittest.TestCase): @patch("pr_validator.sys.exit") @patch("pr_validator.print") + @patch("pr_validator.os.path.exists") @patch("pr_validator.argparse.ArgumentParser.parse_args") - def test_main_invalid_repo_name(self, mock_parse_args, mock_print, mock_exit): - """Test main fails when repo name is invalid.""" - from pr_validator import main - + def test_main_missing_rules_file_fallback( + self, mock_parse_args, mock_exists, mock_print, mock_exit + ): + """Test main fails when resolved convention rules file does not exist.""" mock_args = MagicMock() - mock_args.repo_name = "invalid-repo" + mock_args.repo = "Universal-Commerce-Protocol/non-existent-repo" + mock_args.rules_file = None mock_parse_args.return_value = mock_args + mock_exists.return_value = False mock_exit.side_effect = SystemExit(1) - with self.assertRaises(SystemExit) as cm: + with self.assertRaises(SystemExit): main() - self.assertEqual(cm.exception.code, 1) mock_exit.assert_called_once_with(1) mock_print.assert_any_call( - "❌ ERROR: Invalid repository name 'invalid-repo'. Must be one of: ['ucp', 'python-sdk']", + "❌ ERROR: Governance rules file not found at '.github-central/org-tools/governance/rules/non-existent-repo-rules.yml'. Please ensure it exists or specify a custom path using --rules-file.", file=sys.stderr, ) @patch("pr_validator.sys.exit") @patch("pr_validator.ValidationLogger") @patch("pr_validator.PullRequestValidator") - @patch("pr_validator.fetch_pull_request") - @patch("pr_validator.fetch_team_memberships") + @patch("pr_validator.GitHubClient") @patch("pr_validator.Github") @patch("pr_validator.Auth.Token") + @patch("pr_validator.os.path.exists") @patch("pr_validator.GovernanceConfigParser") @patch("pr_validator.argparse.ArgumentParser.parse_args") - def test_main_valid_repo_name( + def test_main_convention_success( self, mock_parse_args, mock_parser_class, + mock_exists, mock_token, mock_github_class, - mock_fetch_members, - mock_fetch_pr, + mock_github_client_class, mock_validator_class, mock_logger_class, mock_exit, ): - """Test main succeeds and parses file with valid repo name.""" - from pr_validator import main, RepoName, REPO_RULES_MAPPING + """Test main succeeds and resolves rules file path by convention when rules-file is not provided.""" + mock_args = MagicMock() + mock_args.token = "token" + mock_args.org = "org" + mock_args.repo = "Universal-Commerce-Protocol/python-sdk" + mock_args.pr = 123 + mock_args.rules_file = None + mock_parse_args.return_value = mock_args + mock_exists.return_value = True + mock_exit.side_effect = SystemExit(0) + + # Mock parse_file to return a dummy config + mock_parser = MagicMock() + mock_parser_class.return_value = mock_parser + dummy_config = MagicMock() + mock_parser.parse_file.return_value = dummy_config + + # Mock gateway + mock_gateway = MagicMock() + mock_github_client_class.return_value = mock_gateway + # Mock validation result to be mergeable + mock_validator = MagicMock() + mock_validator_class.return_value = mock_validator + mock_result = MagicMock() + mock_result.is_mergeable = True + mock_result.mergeable_reason = None + mock_validator.validate.return_value = mock_result + + with self.assertRaises(SystemExit): + main() + + # Check parse_file was called with the convention-resolved path + mock_parser.parse_file.assert_called_once_with( + ".github-central/org-tools/governance/rules/python-sdk-rules.yml" + ) + mock_exit.assert_called_once_with(0) + + @patch("pr_validator.sys.exit") + @patch("pr_validator.ValidationLogger") + @patch("pr_validator.PullRequestValidator") + @patch("pr_validator.GitHubClient") + @patch("pr_validator.Github") + @patch("pr_validator.Auth.Token") + @patch("pr_validator.os.path.exists") + @patch("pr_validator.GovernanceConfigParser") + @patch("pr_validator.argparse.ArgumentParser.parse_args") + def test_main_with_rules_file( + self, + mock_parse_args, + mock_parser_class, + mock_exists, + mock_token, + mock_github_class, + mock_github_client_class, + mock_validator_class, + mock_logger_class, + mock_exit, + ): + """Test main succeeds with a custom rules file, bypassing repo mapping.""" mock_args = MagicMock() mock_args.token = "token" mock_args.org = "org" - mock_args.repo = "repo" + mock_args.repo = "Universal-Commerce-Protocol/arbitrary-repo" mock_args.pr = 123 - mock_args.repo_name = "ucp" + mock_args.rules_file = "custom-rules.yml" mock_parse_args.return_value = mock_args + mock_exists.return_value = True mock_exit.side_effect = SystemExit(0) # Mock parse_file to return a dummy config @@ -783,6 +933,10 @@ def test_main_valid_repo_name( dummy_config = MagicMock() mock_parser.parse_file.return_value = dummy_config + # Mock gateway + mock_gateway = MagicMock() + mock_github_client_class.return_value = mock_gateway + # Mock validation result to be mergeable mock_validator = MagicMock() mock_validator_class.return_value = mock_validator @@ -795,8 +949,8 @@ def test_main_valid_repo_name( main() self.assertEqual(cm.exception.code, 0) - # Check parse_file was called with the mapped rules file - mock_parser.parse_file.assert_called_once_with(REPO_RULES_MAPPING[RepoName.UCP]) + # Check parse_file was called with the custom rules file + mock_parser.parse_file.assert_called_once_with("custom-rules.yml") mock_exit.assert_called_once_with(0)