Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .github/workflows/reusable-governance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ name: Reusable Governance Gate
on:
workflow_call:
inputs:
repo-name:
required: true
rules-file:
required: false
type: string
description: "Path to the governance rules configuration file"
description: "Optional path to the governance rules configuration file (e.g., .github-central/org-tools/governance/rules/python-sdk-rules.yml). If not provided, resolved via --repo mapping."
secrets:
ORG_READ_TOKEN:
required: true
Expand Down Expand Up @@ -54,12 +54,18 @@ jobs:
run: |
set +e

RULES_FILE_ARG=""
if [ -n "${{ inputs.rules-file }}" ]; then
RULES_FILE_ARG="--rules-file ${{ inputs.rules-file }}"
fi

uv run .github-central/org-tools/governance/scripts/pr_validator.py \
--token "${{ secrets.ORG_READ_TOKEN }}" \
--org "${{ github.repository_owner }}" \
--repo "${{ github.repository }}" \
--pr "${{ github.event.pull_request.number }}" \
--repo-name "${{ inputs.repo-name }}"
$RULES_FILE_ARG


EVAL_EXIT_CODE=$?
set -e
Expand Down
14 changes: 7 additions & 7 deletions org-tools/governance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ rules:

### 4. `proxy_reviewers`

A list of GitHub usernames allowed to approve PRs on behalf of any required team.
A list of GitHub usernames allowed to approve PRs on behalf of any required team (see [GOVERNANCE.md](../../GOVERNANCE.md#governing-council-gc) for details on proxy voting rights).

```yaml
proxy_reviewers:
Expand Down Expand Up @@ -121,24 +121,24 @@ When evaluating a pull request:

## CLI Usage

Run the validator CLI command in the root of the checked-out repository:
Run the validator CLI command from this directory:

```bash
python3 org-tools/governance/scripts/pr_validator.py \
python3 scripts/pr_validator.py \
--token "<GITHUB_TOKEN>" \
--org "<ORG_NAME>" \
--repo "<REPO_NAME>" \
--repo "<REPO_PATH>" \
--pr <PR_NUMBER> \
--repo-name "<REPO_KEY>"
--rules-file "<PATH_TO_RULES_YAML>"
```

### CLI Arguments:

- `--token`: Org-level read token for GitHub REST API calls (requires permission to read team memberships).
- `--org`: The GitHub organization name.
- `--repo`: The GitHub repository name.
- `--repo`: The full GitHub repository path (e.g. `Universal-Commerce-Protocol/python-sdk`). If `--rules-file` is not provided, the script resolves the rules file by convention: it looks for a file named `<repo-name-suffix>-rules.yml` under `org-tools/governance/rules/` (e.g., `python-sdk-rules.yml`).
- `--pr`: The PR number to validate.
- `--repo-name`: A lookup key mapping to the rules configuration file (e.g. `python-sdk` maps to `org-tools/governance/rules/python-sdk-rules.yml`).
- `--rules-file`: Optional path to the governance rules YAML file. If provided, it overrides the convention-based path resolution.

---

Expand Down
59 changes: 26 additions & 33 deletions org-tools/governance/scripts/governance_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,10 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Set

from pr_models import Team, RuleRequirement, merge_requirements
import yaml


@dataclass(frozen=True)
class Team:
"""Represents a review team group in the hierarchy."""

name: str
level: int

@classmethod
def create(cls, name: str, level: int) -> "Team":
# Normalize to lowercase. GitHub team slugs are case-insensitive.
return cls(name=name.lower(), level=level)


@dataclass(frozen=True)
class RuleRequirement:
"""Represents a requirement for a rule, specifying min approvals and the target Team."""

min_approvals: int
team: Optional[Team] = None
min_team: Optional[Team] = None

def __post_init__(self):
if self.min_approvals is None or not isinstance(self.min_approvals, int):
raise ValueError("min_approvals must be an integer")
if self.min_approvals <= 0:
raise ValueError("min_approvals must be a positive integer")
if (self.team is None) == (self.min_team is None):
raise ValueError(
"RuleRequirement must specify exactly one of 'team' or 'min_team'"
)


@dataclass(frozen=True)
class GovernanceRule:
"""Represents a specific rule with name, file patterns, requirements, and exclusions."""
Expand Down Expand Up @@ -117,6 +86,30 @@ class GovernanceConfig:
fallback: List[RuleRequirement]
proxy_reviewers: Set[str]

def _get_requirements_for_file(self, file_path: str) -> List[RuleRequirement]:
"""Get the merged requirements that apply to a specific file."""
matched_rules = []
for rule in self.rules:
if rule.matches(file_path):
matched_rules.append(rule)

if not matched_rules:
return self.fallback

# De-duplicate rules by name, preserving order
unique_rules = list({r.name: r for r in matched_rules}.values())
requirements = []
for r in unique_rules:
requirements.extend(r.requires_all)

return merge_requirements(requirements)

def get_applicable_requirements(
self, changed_files: List[str]
) -> Dict[str, List[RuleRequirement]]:
"""Identify all rule requirements that apply to each of the PR's changed files."""
return {file: self._get_requirements_for_file(file) for file in changed_files}


@dataclass
class ValidationError:
Expand Down Expand Up @@ -184,7 +177,7 @@ def _parse_team_hierarchy(self, data: dict) -> Dict[str, Team]:

def _parse_proxy_reviewers(self, data: dict) -> Set[str]:
"""Parses the set of proxy reviewers."""
return set(data.get("proxy_reviewers", []))
return {r.lower() for r in data.get("proxy_reviewers", [])}

def _parse_fallback(
self, data: dict, teams: Dict[str, Team]
Expand Down
166 changes: 149 additions & 17 deletions org-tools/governance/scripts/pr_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from datetime import datetime
from enum import Enum

from governance_config_parser import RuleRequirement


class ReviewState(Enum):
"""Represents the state of a GitHub pull request review."""
Expand Down Expand Up @@ -53,6 +51,104 @@ class MergeableReason(Enum):
RULES_SATISFIED = "rules_satisfied"


@dataclass(frozen=True)
class Team:
"""Represents a review team group in the hierarchy."""

name: str
level: int

@classmethod
def create(cls, name: str, level: int) -> "Team":
# Normalize to lowercase. GitHub team slugs are case-insensitive.
return cls(name=name.lower(), level=level)


@dataclass(frozen=True)
class User:
"""Represents a user in the context of the governance rules engine."""

username: str
teams: set[str]
level: int

@classmethod
def create(cls, username: str, memberships: "TeamMemberships") -> "User":
"""Resolve a username into a rich User domain object."""
username_lower = username.lower()
user_teams = {
team
for team, members in memberships.members_by_team.items()
if username_lower in members
}

# Calculate their max hierarchy level
max_level = max((t.level for t in user_teams), default=0)

# Map Team objects to their string names for the User dataclass
team_names = {t.name for t in user_teams}
return cls(username=username_lower, teams=team_names, level=max_level)


class RequirementTargetType(Enum):
"""Represents the type of requirement target (exact team vs. hierarchical min team)."""

TEAM = "team"
MIN_TEAM = "min_team"


@dataclass(frozen=True)
class RuleRequirement:
"""Represents a requirement for a rule, specifying min approvals and the target Team."""

min_approvals: int
team: Team | None = None
min_team: Team | None = None

def __post_init__(self):
if self.min_approvals is None or not isinstance(self.min_approvals, int):
raise ValueError("min_approvals must be an integer")
if self.min_approvals <= 0:
raise ValueError("min_approvals must be a positive integer")
if (self.team is None and self.min_team is None) or (
self.team is not None and self.min_team is not None
):
raise ValueError(
"RuleRequirement must specify exactly one of 'team' or 'min_team'"
)

def is_satisfied_by(self, user: User) -> bool:
"""Check if a user satisfies this team or hierarchical level requirement."""
if self.team is not None:
return self.team.name in user.teams
elif self.min_team is not None:
return user.level >= self.min_team.level
return False

@property
def target_key(self) -> tuple[RequirementTargetType, str]:
"""A unique tuple key representing the target of this requirement."""
if self.team is not None:
return (RequirementTargetType.TEAM, self.team.name)
elif self.min_team is not None:
return (RequirementTargetType.MIN_TEAM, self.min_team.name)
raise ValueError("Invalid RuleRequirement: both team and min_team are None")


def merge_requirements(requirements: list[RuleRequirement]) -> list[RuleRequirement]:
"""Merge a list of requirements by keeping the maximum min_approvals for each target."""
merged: dict[tuple[RequirementTargetType, str], RuleRequirement] = {}
for req in requirements:
key = req.target_key
if key not in merged:
merged[key] = req
else:
existing = merged[key]
if req.min_approvals > existing.min_approvals:
merged[key] = req
return list(merged.values())


@dataclass(frozen=True)
class Review:
"""Represents a pull request review with a user and state."""
Expand All @@ -79,8 +175,8 @@ class PullRequest:
is_draft: bool
changed_files: list[str]
reviews: list[Review]
assigned_users: list[str] = field(default_factory=list)
assigned_teams: list[str] = field(default_factory=list)
assigned_user_names: list[str] = field(default_factory=list)
assigned_team_names: list[str] = field(default_factory=list)

@classmethod
def create(
Expand All @@ -90,38 +186,74 @@ def create(
is_draft: bool,
changed_files: list[str],
reviews: list[Review],
assigned_users: list[str] = None,
assigned_teams: list[str] = None,
assigned_user_names: list[str] = None,
assigned_team_names: list[str] = None,
) -> "PullRequest":
# Normalize to lowercase. GitHub usernames and team slugs are case-insensitive.
# Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name
assigned_u = [u.lower() for u in assigned_users] if assigned_users else []
assigned_t = [t.lower() for t in assigned_teams] if assigned_teams else []
assigned_u = (
[u.lower() for u in assigned_user_names] if assigned_user_names else []
)
assigned_t = (
[t.lower() for t in assigned_team_names] if assigned_team_names else []
)
return cls(
number=number,
author=author.lower(),
is_draft=is_draft,
changed_files=changed_files,
reviews=reviews,
assigned_users=assigned_u,
assigned_teams=assigned_t,
assigned_user_names=assigned_u,
assigned_team_names=assigned_t,
)

@property
def latest_actionable_reviews_by_username(self) -> dict[str, ReviewState]:
"""Resolve the latest actionable review state for each user.

Skips non-actionable states like COMMENTED.
"""
# Sort reviews by submitted_at (if available) to ensure chronological processing
sorted_reviews = sorted(
self.reviews,
key=lambda x: (
x.submitted_at if x.submitted_at is not None else datetime.min
),
)

relevant_reviews: dict[str, ReviewState] = {}
for r in sorted_reviews:
if r.state in ReviewState.actionable_states():
relevant_reviews[r.user] = r.state
return relevant_reviews

def has_proxy_override(self, proxy_reviewers: set[str]) -> bool:
"""Check if a proxy reviewer has approved the PR, bypassing rules."""
latest = self.latest_actionable_reviews_by_username
return any(
state == ReviewState.APPROVED and user in proxy_reviewers
for user, state in latest.items()
)


@dataclass(frozen=True)
class TeamMemberships:
"""Stores a map of team slugs to their members' GitHub usernames."""
"""Stores a map of Team objects to their members' GitHub usernames."""

members_by_team: dict[str, set[str]]
members_by_team: dict[Team, set[str]]

@classmethod
def create(cls, members_by_team: dict[str, set[str]]) -> "TeamMemberships":
def create(
cls, members_by_team: dict[str, set[str]], teams: dict[str, Team]
) -> "TeamMemberships":
# Normalize to lowercase. GitHub usernames and team slugs are case-insensitive.
# Reference: https://docs.github.com/en/rest/teams/teams?apiVersion=2026-03-10#get-a-team-by-name
normalized = {
team.lower(): {member.lower() for member in members}
for team, members in members_by_team.items()
}
normalized = {}
for team_slug, members in members_by_team.items():
team_obj = teams.get(team_slug.lower())
if team_obj is None:
team_obj = Team.create(name=team_slug, level=0)
normalized[team_obj] = {member.lower() for member in members}
return cls(members_by_team=normalized)


Expand Down
Loading
Loading