From 27cbbdedf06b17b5c720e806fd82a5ef5c00ef17 Mon Sep 17 00:00:00 2001 From: moss Date: Sun, 24 May 2026 20:32:40 +0700 Subject: [PATCH 1/3] feat(runtime): reflex.policies.export and weaving enforcement Export portable deterministic tool_guard specs from concerns for the bridge ReflexMonitor, plus JSON-RPC and enforcement helpers. Co-authored-by: Cursor --- .../opencoat_runtime_protocol/envelopes.py | 10 + .../concern/reflex_policy_export.py | 121 +++++++ .../opencoat_runtime_core/dcn/evolution.py | 8 + .../opencoat_runtime_core/weaving/__init__.py | 32 +- .../weaving/enforcement.py | 241 ++++++++++++++ .../ipc/jsonrpc_dispatch.py | 15 + .../tests/core/test_reflex_policy_export.py | 80 +++++ .../tests/core/test_weaving_enforcement.py | 298 ++++++++++++++++++ .../tests/daemon/test_jsonrpc_dispatch.py | 21 ++ 9 files changed, 825 insertions(+), 1 deletion(-) create mode 100644 packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py create mode 100644 packages/opencoat-runtime/opencoat_runtime_core/weaving/enforcement.py create mode 100644 packages/opencoat-runtime/tests/core/test_reflex_policy_export.py create mode 100644 packages/opencoat-runtime/tests/core/test_weaving_enforcement.py diff --git a/packages/opencoat-runtime-protocol/opencoat_runtime_protocol/envelopes.py b/packages/opencoat-runtime-protocol/opencoat_runtime_protocol/envelopes.py index 6b279b8..1ca9f3d 100644 --- a/packages/opencoat-runtime-protocol/opencoat_runtime_protocol/envelopes.py +++ b/packages/opencoat-runtime-protocol/opencoat_runtime_protocol/envelopes.py @@ -392,6 +392,16 @@ class ConcernMetrics(_Base): class Concern(_Base): id: str = Field(min_length=1) kind: ConcernKind = ConcernKind.CONCERN + # ── M-E0: MAN cell-type markers (ADR-0012 Decision 4, v0.3 §3) ────── + #: Aspect cell kind. ``"inhibitory"`` = A_reflex / InhibitoryReflex + #: (deterministic gate, hard enforcement). ``"excitatory"`` = A_cortex / + #: ExcitatoryNeuron (prompt-level, soft enforcement). + neuron_type: Literal["excitatory", "inhibitory"] = "excitatory" + #: Marks membership in the conserved core A_reflex. When ``True`` this + #: concern is excluded from ``⇩_slow`` structural rewrites (PlasticityEngine + #: / DCNEvolver). Implies ``neuron_type == "inhibitory"`` by convention. + reflex: bool = False + # ───────────────────────────────────────────────────────────────────── generated_type: str | None = None generated_tags: list[str] = Field(default_factory=list) name: str = Field(min_length=1) diff --git a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py new file mode 100644 index 0000000..231d69a --- /dev/null +++ b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py @@ -0,0 +1,121 @@ +"""Export portable in-proc reflex policy specs from the concern store (v0.3 §10.4). + +The bridge ``ReflexMonitor`` consumes the JSON returned by +``reflex.policies.export`` so hot-path tool guards can run synchronously +without ``joinpoint.submit`` on every ``before_tool_call``. +""" + +from __future__ import annotations + +from typing import Any, Literal + +from opencoat_runtime_protocol import AdviceType, Concern, LifecycleState, WeavingOperation + +ReflexCriticality = Literal["safety_critical", "advisory"] +ActionKind = Literal["tool_call"] + +_TOOL_JOINPOINTS = frozenset( + { + "before_tool_call", + "tool.before_call", + } +) + + +def _pointcut_keywords(concern: Concern) -> list[str]: + keys: list[str] = [] + for pc in concern.pointcuts: + jps = pc.joinpoints or [] + expr = pc.expression or "" + tool_pc = ( + not jps + or any(j in _TOOL_JOINPOINTS for j in jps) + or "before_tool_call" in expr + or "tool.before_call" in expr + ) + if not tool_pc: + continue + if pc.match and pc.match.any_keywords: + keys.extend(pc.match.any_keywords) + if concern.pointcut and concern.pointcut.match and concern.pointcut.match.any_keywords: + keys.extend(concern.pointcut.match.any_keywords) + seen: set[str] = set() + out: list[str] = [] + for k in keys: + if k in seen: + continue + seen.add(k) + out.append(k) + return out + + +def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: + """Return (deny_reason, needles) when concern is a hard tool guard block.""" + for adv in concern.advices: + template = adv.template or adv.advice_type + if template != AdviceType.TOOL_GUARD: + continue + effect = adv.effect or concern.weaving_policy + if effect is None: + continue + if effect.mode not in {WeavingOperation.BLOCK, WeavingOperation.SUPPRESS, WeavingOperation.ESCALATE}: + continue + target = effect.target or "" + if not (target == "tool_call" or target.startswith("tool_call.")): + continue + needles = _pointcut_keywords(concern) + if not needles: + continue + reason = (adv.content or concern.description or f"Blocked by {concern.id}").strip() + return reason, needles + + if concern.advice and concern.advice.type == AdviceType.TOOL_GUARD: + wp = concern.weaving_policy + if wp and wp.mode in { + WeavingOperation.BLOCK, + WeavingOperation.SUPPRESS, + WeavingOperation.ESCALATE, + }: + needles = _pointcut_keywords(concern) + if needles: + reason = (concern.advice.content or concern.description or f"Blocked by {concern.id}").strip() + return reason, needles + return None + + +def export_reflex_policies( + concerns: list[Concern], + *, + action_kind: ActionKind = "tool_call", +) -> dict[str, Any]: + """Build portable reflex policy export for the bridge TCB.""" + if action_kind != "tool_call": + return {"version": "0.1", "policies": []} + + policies: list[dict[str, Any]] = [] + for concern in concerns: + if concern.lifecycle_state in {LifecycleState.ARCHIVED, LifecycleState.MERGED}: + continue + hit = _is_hard_tool_block(concern) + if hit is None: + continue + reason, needles = hit + criticality: ReflexCriticality = "safety_critical" + policies.append( + { + "id": concern.id, + "criticality": criticality, + "action_kind": "tool_call", + "predicate": { + "kind": "args_contains", + "needles": needles, + }, + "deny_reason": reason, + } + ) + + policies.sort(key=lambda p: p["id"]) + return {"version": "0.1", "policies": policies} + + +__all__ = ["export_reflex_policies"] diff --git a/packages/opencoat-runtime/opencoat_runtime_core/dcn/evolution.py b/packages/opencoat-runtime/opencoat_runtime_core/dcn/evolution.py index 198e582..7bed85b 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/dcn/evolution.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/dcn/evolution.py @@ -85,10 +85,18 @@ def optimize(self) -> int: return 0 def _active_catalog(self) -> list[Concern]: + """Return active concerns eligible for ⇩_slow structural rewrites. + + Concerns with ``reflex=True`` belong to the conserved core (A_reflex / + brainstem) and are **excluded** from merge/archive regardless of their + lifecycle state. This is the M-E0 invariant: A_reflex is not subject + to stochastic graph rewriting (MAN §1, ADR-0012 Decision 4). + """ return [ c for c in self._concern_store.iter_all() if (c.lifecycle_state or LifecycleState.CREATED.value).lower() in _ACTIVE_STATES + and not c.reflex # conserved core: exclude A_reflex from ⇩_slow ][: self._max_catalog] def _merge_declared(self, catalog: list[Concern]) -> int: diff --git a/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py b/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py index e44d80a..5fdded0 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py @@ -3,11 +3,41 @@ 11 operations × 8 levels (v0.1 §15). The default weaver is :class:`ConcernWeaver`; alternative weavers can subclass it and override :meth:`build`. + +M-E0: :mod:`enforcement` adds hard/soft classification for every +``WeavingOperation`` and ``AdviceType`` (ADR-0012 Decision 4). """ +from . import enforcement +from .enforcement import ( + ADVICE_TYPE_ENFORCEMENT, + HARD_ADVICE_TYPES, + HARD_OPERATIONS, + INHIBITORY_ADVICE_TYPES, + OPERATION_ENFORCEMENT, + AdviceTypeMeta, + OperationMeta, + advice_type_meta, + operation_meta, +) from .merge import merge_injections from .operations import OPERATIONS from .targets import WEAVING_TARGETS from .weaver import ConcernWeaver -__all__ = ["OPERATIONS", "WEAVING_TARGETS", "ConcernWeaver", "merge_injections"] +__all__ = [ + "ADVICE_TYPE_ENFORCEMENT", + "HARD_ADVICE_TYPES", + "HARD_OPERATIONS", + "INHIBITORY_ADVICE_TYPES", + "OPERATION_ENFORCEMENT", + "OPERATIONS", + "WEAVING_TARGETS", + "AdviceTypeMeta", + "ConcernWeaver", + "OperationMeta", + "advice_type_meta", + "enforcement", + "merge_injections", + "operation_meta", +] diff --git a/packages/opencoat-runtime/opencoat_runtime_core/weaving/enforcement.py b/packages/opencoat-runtime/opencoat_runtime_core/weaving/enforcement.py new file mode 100644 index 0000000..620cb8d --- /dev/null +++ b/packages/opencoat-runtime/opencoat_runtime_core/weaving/enforcement.py @@ -0,0 +1,241 @@ +"""Enforcement classification for WeavingOperation and AdviceType (M-E0). + +Maps every op/advice-type to: + - enforcement: "hard" | "soft" + - fail_mode: "deny" | "allow" + - neuron_type: "inhibitory" | "excitatory" (AdviceType only) + +Design grounding +---------------- +- Hard ops execute at in-proc synchronous gate points (A_reflex / InhibitoryReflex). + A missing or erring hard gate defaults to *deny* (fail-closed). +- Soft ops are prompt-level conditionings bounded by LLM instruction-following. + A missing soft aspect defaults to *allow* (fail-open). +- ``neuron_type`` classifies the aspect cell kind from v0.3 §3. + +References: ADR-0012 Decision 4, v0.3 §3.2/§3.3, MAN §1/§2. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Final, Literal + +from opencoat_runtime_protocol import AdviceType, WeavingOperation + +EnforcementClass = Literal["hard", "soft"] +FailMode = Literal["deny", "allow"] +NeuronType = Literal["inhibitory", "excitatory"] + + +@dataclass(frozen=True) +class OperationMeta: + """Enforcement metadata for a single :class:`WeavingOperation`.""" + + operation: WeavingOperation + enforcement: EnforcementClass + fail_mode: FailMode + + +@dataclass(frozen=True) +class AdviceTypeMeta: + """Enforcement metadata for a single :class:`AdviceType`.""" + + advice_type: AdviceType + enforcement: EnforcementClass + fail_mode: FailMode + neuron_type: NeuronType + + +# --------------------------------------------------------------------------- +# WeavingOperation classification +# +# Deterministic gate ops (used by A_reflex / InhibitoryReflex at effect +# boundaries) → hard / deny. +# All prompt-level conditioning ops → soft / allow. +# --------------------------------------------------------------------------- + +OPERATION_ENFORCEMENT: Final[dict[WeavingOperation, OperationMeta]] = { + # ── hard ───────────────────────────────────────────────────────────── + WeavingOperation.BLOCK: OperationMeta( + operation=WeavingOperation.BLOCK, + enforcement="hard", + fail_mode="deny", + ), + WeavingOperation.VERIFY: OperationMeta( + operation=WeavingOperation.VERIFY, + enforcement="hard", + fail_mode="deny", + ), + # ── soft ───────────────────────────────────────────────────────────── + WeavingOperation.INSERT: OperationMeta( + operation=WeavingOperation.INSERT, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.REPLACE: OperationMeta( + operation=WeavingOperation.REPLACE, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.SUPPRESS: OperationMeta( + operation=WeavingOperation.SUPPRESS, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.ANNOTATE: OperationMeta( + operation=WeavingOperation.ANNOTATE, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.WARN: OperationMeta( + operation=WeavingOperation.WARN, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.REWRITE: OperationMeta( + operation=WeavingOperation.REWRITE, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.DEFER: OperationMeta( + operation=WeavingOperation.DEFER, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.ESCALATE: OperationMeta( + operation=WeavingOperation.ESCALATE, + enforcement="soft", + fail_mode="allow", + ), + WeavingOperation.COMPRESS: OperationMeta( + operation=WeavingOperation.COMPRESS, + enforcement="soft", + fail_mode="allow", + ), +} + +# --------------------------------------------------------------------------- +# AdviceType classification +# +# TOOL_GUARD / MEMORY_WRITE_GUARD / VERIFICATION_RULE are the A_reflex +# members in the initial conserved core — they map to InhibitoryReflex cell +# type (hard, fail-closed). All other advice types are ExcitatoryNeuron +# (soft, fail-open). +# --------------------------------------------------------------------------- + +ADVICE_TYPE_ENFORCEMENT: Final[dict[AdviceType, AdviceTypeMeta]] = { + # ── inhibitory / hard ──────────────────────────────────────────────── + AdviceType.TOOL_GUARD: AdviceTypeMeta( + advice_type=AdviceType.TOOL_GUARD, + enforcement="hard", + fail_mode="deny", + neuron_type="inhibitory", + ), + AdviceType.MEMORY_WRITE_GUARD: AdviceTypeMeta( + advice_type=AdviceType.MEMORY_WRITE_GUARD, + enforcement="hard", + fail_mode="deny", + neuron_type="inhibitory", + ), + AdviceType.VERIFICATION_RULE: AdviceTypeMeta( + advice_type=AdviceType.VERIFICATION_RULE, + enforcement="hard", + fail_mode="deny", + neuron_type="inhibitory", + ), + # ── excitatory / soft ──────────────────────────────────────────────── + AdviceType.REASONING_GUIDANCE: AdviceTypeMeta( + advice_type=AdviceType.REASONING_GUIDANCE, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.PLANNING_GUIDANCE: AdviceTypeMeta( + advice_type=AdviceType.PLANNING_GUIDANCE, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.DECISION_GUIDANCE: AdviceTypeMeta( + advice_type=AdviceType.DECISION_GUIDANCE, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.RESPONSE_REQUIREMENT: AdviceTypeMeta( + advice_type=AdviceType.RESPONSE_REQUIREMENT, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.REFLECTION_PROMPT: AdviceTypeMeta( + advice_type=AdviceType.REFLECTION_PROMPT, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.REWRITE_GUIDANCE: AdviceTypeMeta( + advice_type=AdviceType.REWRITE_GUIDANCE, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.SUPPRESS_INSTRUCTION: AdviceTypeMeta( + advice_type=AdviceType.SUPPRESS_INSTRUCTION, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), + AdviceType.ESCALATION_NOTICE: AdviceTypeMeta( + advice_type=AdviceType.ESCALATION_NOTICE, + enforcement="soft", + fail_mode="allow", + neuron_type="excitatory", + ), +} + +# Convenience sets for fast membership checks +HARD_OPERATIONS: Final[frozenset[WeavingOperation]] = frozenset( + op for op, meta in OPERATION_ENFORCEMENT.items() if meta.enforcement == "hard" +) +HARD_ADVICE_TYPES: Final[frozenset[AdviceType]] = frozenset( + at for at, meta in ADVICE_TYPE_ENFORCEMENT.items() if meta.enforcement == "hard" +) +INHIBITORY_ADVICE_TYPES: Final[frozenset[AdviceType]] = frozenset( + at for at, meta in ADVICE_TYPE_ENFORCEMENT.items() if meta.neuron_type == "inhibitory" +) + + +def operation_meta(op: WeavingOperation) -> OperationMeta: + """Return the :class:`OperationMeta` for *op*. + + Raises :exc:`KeyError` if *op* is not in the classification table — + this should never happen for valid enum members; the test suite asserts + full coverage. + """ + return OPERATION_ENFORCEMENT[op] + + +def advice_type_meta(at: AdviceType) -> AdviceTypeMeta: + """Return the :class:`AdviceTypeMeta` for *at*. + + Raises :exc:`KeyError` if *at* is not in the classification table. + """ + return ADVICE_TYPE_ENFORCEMENT[at] + + +__all__ = [ + "ADVICE_TYPE_ENFORCEMENT", + "HARD_ADVICE_TYPES", + "HARD_OPERATIONS", + "INHIBITORY_ADVICE_TYPES", + "OPERATION_ENFORCEMENT", + "AdviceTypeMeta", + "EnforcementClass", + "FailMode", + "NeuronType", + "OperationMeta", + "advice_type_meta", + "operation_meta", +] diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py index 4704da6..69914eb 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py @@ -44,6 +44,10 @@ Params: ``{"concern_id"?: str, "limit"?: int}`` — forwards to :meth:`~opencoat_runtime_core.ports.DCNStore.activation_log`. +``reflex.policies.export`` + Params: ``{"action_kind"?: "tool_call"}``. Returns portable deterministic + policy specs for the bridge in-proc ``ReflexMonitor`` (v0.3 §10.4). + ``health.ping`` Result: ``{"ok": true}`` — proves the handler is wired without touching stores. @@ -58,6 +62,7 @@ from opencoat_runtime_core import OpenCOATRuntime from opencoat_runtime_core.concern import ConcernBuilder, ConcernExtractor +from opencoat_runtime_core.concern.reflex_policy_export import export_reflex_policies from opencoat_runtime_core.concern.chat_extract import chat_text_for_extraction from opencoat_runtime_protocol import Concern, ConcernInjection, JoinpointEvent from pydantic import ValidationError @@ -169,6 +174,7 @@ def __init__( "runtime.last_injection": self._runtime_last_injection, "runtime.llm_info": self._runtime_llm_info, "dcn.activation_log": self._dcn_activation_log, + "reflex.policies.export": self._reflex_policies_export, } def handle(self, message: str | dict[str, Any]) -> dict[str, Any] | None: @@ -411,5 +417,14 @@ def _dcn_activation_log(self, params: dict[str, Any] | list[Any]) -> list[Any]: rows = self._rt.dcn_store.activation_log(concern_id, limit=lim) return [dict(r) for r in rows] + def _reflex_policies_export(self, params: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Portable deterministic reflex specs for in-proc bridge ``ReflexMonitor``.""" + p = _expect_params_dict(params) + action_kind = p.get("action_kind", "tool_call") + if action_kind not in ("tool_call",): + raise JsonRpcParamsError("action_kind must be 'tool_call'") + concerns = self._rt.concern_store.list() + return export_reflex_policies(concerns, action_kind=action_kind) + __all__ = ["JsonRpcHandler", "JsonRpcParamsError"] diff --git a/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py new file mode 100644 index 0000000..360f31b --- /dev/null +++ b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py @@ -0,0 +1,80 @@ +"""Tests for portable reflex policy export (bridge TCB sync).""" + +from __future__ import annotations + +from opencoat_runtime_core.concern.reflex_policy_export import export_reflex_policies +from opencoat_runtime_protocol import ( + AdviceKind, + AdviceType, + AopAdvice, + Concern, + PointcutDef, + WeavingLevel, + WeavingOperation, + WeavingPolicy, +) +from opencoat_runtime_protocol.envelopes import PointcutMatch + + +def _demo_tool_block() -> Concern: + return Concern( + id="demo-tool-block", + name="Demo — block destructive shell commands", + pointcuts=[ + PointcutDef( + id="pc-tool", + expression="before_tool_call()", + joinpoints=["before_tool_call"], + match=PointcutMatch(any_keywords=["rm -rf", "rm -rf"]), + ), + ], + advices=[ + AopAdvice( + id="adv-block", + kind=AdviceKind.BEFORE, + pointcut_ref="pc-tool", + content="Refusing destructive shell command.", + template=AdviceType.TOOL_GUARD, + effect=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.TOOL_LEVEL, + target="tool_call.arguments", + priority=0.9, + ), + ), + ], + ) + + +def test_export_demo_tool_block() -> None: + out = export_reflex_policies([_demo_tool_block()]) + assert out["version"] == "0.1" + assert len(out["policies"]) == 1 + row = out["policies"][0] + assert row["id"] == "demo-tool-block" + assert row["predicate"]["kind"] == "args_contains" + assert "rm -rf" in row["predicate"]["needles"] + + +def test_skips_soft_advice() -> None: + soft = Concern( + id="soft-hint", + name="hint", + advices=[ + AopAdvice( + id="a1", + kind=AdviceKind.BEFORE, + pointcut_ref="pc", + content="be careful", + template=AdviceType.REASONING_GUIDANCE, + effect=WeavingPolicy( + mode=WeavingOperation.INSERT, + level=WeavingLevel.PROMPT_LEVEL, + target="runtime_prompt.output_format", + ), + ), + ], + pointcuts=[PointcutDef(id="pc", expression="before_tool_call()")], + ) + out = export_reflex_policies([soft]) + assert out["policies"] == [] diff --git a/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py b/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py new file mode 100644 index 0000000..a77fd58 --- /dev/null +++ b/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py @@ -0,0 +1,298 @@ +"""M-E0 acceptance tests for weaving enforcement classification. + +Verifies: +1. Every WeavingOperation is in the classification table (full coverage). +2. Every AdviceType is in the classification table (full coverage). +3. Hard / soft split is correct for operations and advice types. +4. Convenience sets (HARD_OPERATIONS, HARD_ADVICE_TYPES, INHIBITORY_ADVICE_TYPES) + are consistent with the full tables. +5. A_reflex members (reflex=True concerns) are excluded from DCNEvolver + ⇩_slow rewrites (merge and archive). + +References: ADR-0012 Decision 4, v0.3 §3, MAN §1. +""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from opencoat_runtime_core.dcn.evolution import DCNEvolver +from opencoat_runtime_core.weaving.enforcement import ( + ADVICE_TYPE_ENFORCEMENT, + HARD_ADVICE_TYPES, + HARD_OPERATIONS, + INHIBITORY_ADVICE_TYPES, + OPERATION_ENFORCEMENT, + advice_type_meta, + operation_meta, +) +from opencoat_runtime_protocol import ( + ActivationState, + Advice, + AdviceType, + Concern, + LifecycleState, + Pointcut, + WeavingLevel, + WeavingOperation, + WeavingPolicy, +) +from opencoat_runtime_protocol.envelopes import PointcutMatch +from opencoat_runtime_storage.memory import MemoryConcernStore, MemoryDCNStore + +_NOW = datetime(2026, 5, 24, 12, 0, tzinfo=UTC) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _concern( + cid: str, + *, + keywords: list[str], + score: float = 0.6, + decay: float = 0.0, + lifecycle: str = LifecycleState.ACTIVE.value, + reflex: bool = False, + neuron_type: str = "excitatory", +) -> Concern: + return Concern( + id=cid, + name=cid, + description=cid, + lifecycle_state=lifecycle, + reflex=reflex, + neuron_type=neuron_type, # type: ignore[arg-type] + activation_state=ActivationState(score=score, decay=decay, active=True), + pointcut=Pointcut( + joinpoints=["before_tool_call"], + match=PointcutMatch(any_keywords=keywords), + ), + advice=Advice(type=AdviceType.TOOL_GUARD, content="block unsafe tools"), + weaving_policy=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.TOOL_LEVEL, + target="tool_call.arguments.*", + priority=0.9, + ), + ) + + +# --------------------------------------------------------------------------- +# 1. Full coverage — WeavingOperation +# --------------------------------------------------------------------------- + + +class TestOperationCoverage: + def test_all_operations_classified(self) -> None: + """Every WeavingOperation must appear in OPERATION_ENFORCEMENT.""" + missing = [op for op in WeavingOperation if op not in OPERATION_ENFORCEMENT] + assert missing == [], f"Unclassified WeavingOperation(s): {missing}" + + def test_table_has_no_extra_entries(self) -> None: + """Table must not contain values outside the enum.""" + valid = set(WeavingOperation) + extra = [op for op in OPERATION_ENFORCEMENT if op not in valid] + assert extra == [] + + def test_operation_meta_lookup(self) -> None: + for op in WeavingOperation: + meta = operation_meta(op) + assert meta.operation == op + assert meta.enforcement in ("hard", "soft") + assert meta.fail_mode in ("deny", "allow") + + def test_hard_operations_are_block_and_verify(self) -> None: + assert HARD_OPERATIONS == {WeavingOperation.BLOCK, WeavingOperation.VERIFY} + + def test_soft_operations_fail_open(self) -> None: + for op in WeavingOperation: + meta = operation_meta(op) + if meta.enforcement == "soft": + assert meta.fail_mode == "allow", ( + f"{op}: soft op must be fail_mode=allow, got {meta.fail_mode}" + ) + + def test_hard_operations_fail_closed(self) -> None: + for op in HARD_OPERATIONS: + meta = operation_meta(op) + assert meta.fail_mode == "deny", ( + f"{op}: hard op must be fail_mode=deny, got {meta.fail_mode}" + ) + + +# --------------------------------------------------------------------------- +# 2. Full coverage — AdviceType +# --------------------------------------------------------------------------- + + +class TestAdviceTypeCoverage: + def test_all_advice_types_classified(self) -> None: + """Every AdviceType must appear in ADVICE_TYPE_ENFORCEMENT.""" + missing = [at for at in AdviceType if at not in ADVICE_TYPE_ENFORCEMENT] + assert missing == [], f"Unclassified AdviceType(s): {missing}" + + def test_table_has_no_extra_entries(self) -> None: + valid = set(AdviceType) + extra = [at for at in ADVICE_TYPE_ENFORCEMENT if at not in valid] + assert extra == [] + + def test_advice_type_meta_lookup(self) -> None: + for at in AdviceType: + meta = advice_type_meta(at) + assert meta.advice_type == at + assert meta.enforcement in ("hard", "soft") + assert meta.fail_mode in ("deny", "allow") + assert meta.neuron_type in ("inhibitory", "excitatory") + + def test_hard_advice_types_are_guards_and_verification(self) -> None: + assert HARD_ADVICE_TYPES == { + AdviceType.TOOL_GUARD, + AdviceType.MEMORY_WRITE_GUARD, + AdviceType.VERIFICATION_RULE, + } + + def test_inhibitory_equals_hard_advice_types(self) -> None: + """All inhibitory advice types must be hard, and vice-versa.""" + assert INHIBITORY_ADVICE_TYPES == HARD_ADVICE_TYPES + + def test_soft_advice_types_fail_open(self) -> None: + for at in AdviceType: + meta = advice_type_meta(at) + if meta.enforcement == "soft": + assert meta.fail_mode == "allow" + assert meta.neuron_type == "excitatory" + + def test_hard_advice_types_fail_closed_inhibitory(self) -> None: + for at in HARD_ADVICE_TYPES: + meta = advice_type_meta(at) + assert meta.fail_mode == "deny" + assert meta.neuron_type == "inhibitory" + + +# --------------------------------------------------------------------------- +# 3. Concern model — neuron_type and reflex fields +# --------------------------------------------------------------------------- + + +class TestConcernFields: + def test_defaults(self) -> None: + c = Concern(id="c1", name="test concern") + assert c.neuron_type == "excitatory" + assert c.reflex is False + + def test_inhibitory_reflex_concern(self) -> None: + c = Concern( + id="reflex-1", + name="tool guard", + neuron_type="inhibitory", + reflex=True, + ) + assert c.neuron_type == "inhibitory" + assert c.reflex is True + + def test_model_copy_preserves_fields(self) -> None: + c = Concern(id="c2", name="base", neuron_type="inhibitory", reflex=True) + c2 = c.model_copy(update={"name": "updated"}) + assert c2.reflex is True + assert c2.neuron_type == "inhibitory" + + def test_serialisation_roundtrip(self) -> None: + c = Concern(id="c3", name="rt", neuron_type="inhibitory", reflex=True) + data = c.model_dump() + assert data["neuron_type"] == "inhibitory" + assert data["reflex"] is True + c2 = Concern.model_validate(data) + assert c2.neuron_type == "inhibitory" + assert c2.reflex is True + + +# --------------------------------------------------------------------------- +# 4. DCNEvolver — A_reflex excluded from ⇩_slow (M-E0 invariant) +# --------------------------------------------------------------------------- + + +class TestReflexExcludedFromEvolution: + def _make_evolver( + self, store: MemoryConcernStore, dcn: MemoryDCNStore + ) -> DCNEvolver: + return DCNEvolver( + concern_store=store, + dcn_store=dcn, + merge_min_keyword_overlap=2, + ) + + def test_reflex_concern_not_merged(self) -> None: + """A reflex concern must survive even when it overlaps keywords with another.""" + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert(_concern("reflex-a", keywords=["tool", "guard"], reflex=True, neuron_type="inhibitory")) + store.upsert(_concern("soft-b", keywords=["tool", "guard"], score=0.3)) + result = self._make_evolver(store, dcn).run(_NOW) + assert result.merged == 0, "reflex concern must not be merged" + assert store.get("reflex-a") is not None + assert store.get("reflex-a").lifecycle_state not in { + LifecycleState.MERGED.value, + LifecycleState.ARCHIVED.value, + } + + def test_reflex_concern_not_archived_when_cold(self) -> None: + """A cold weakened reflex concern must not be archived by ⇩_slow.""" + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert( + _concern( + "reflex-cold", + keywords=["memory"], + score=0.05, + decay=0.95, + lifecycle=LifecycleState.WEAKENED.value, + reflex=True, + neuron_type="inhibitory", + ) + ) + result = self._make_evolver(store, dcn).run(_NOW) + assert result.archived == 0, "reflex concern must not be archived" + c = store.get("reflex-cold") + assert c is not None + assert c.lifecycle_state == LifecycleState.WEAKENED.value + + def test_non_reflex_concern_still_merged(self) -> None: + """Non-reflex concerns are still subject to ⇩_slow as before.""" + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert(_concern("dup-a", keywords=["NVDA", "分析"], score=0.8)) + store.upsert(_concern("dup-b", keywords=["NVDA", "分析"], score=0.3)) + result = self._make_evolver(store, dcn).run(_NOW) + assert result.merged == 1 + + def test_non_reflex_concern_still_archived(self) -> None: + """Non-reflex cold weakened concerns are still archived.""" + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert( + _concern( + "soft-cold", + keywords=["x"], + score=0.05, + decay=0.95, + lifecycle=LifecycleState.WEAKENED.value, + ) + ) + result = self._make_evolver(store, dcn).run(_NOW) + assert result.archived == 1 + + def test_two_reflex_concerns_both_survive(self) -> None: + """Two reflex concerns with identical keywords must both survive.""" + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert(_concern("r1", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory")) + store.upsert(_concern("r2", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory")) + result = self._make_evolver(store, dcn).run(_NOW) + assert result.merged == 0 + assert store.get("r1") is not None + assert store.get("r2") is not None diff --git a/packages/opencoat-runtime/tests/daemon/test_jsonrpc_dispatch.py b/packages/opencoat-runtime/tests/daemon/test_jsonrpc_dispatch.py index d60778f..510cfb8 100644 --- a/packages/opencoat-runtime/tests/daemon/test_jsonrpc_dispatch.py +++ b/packages/opencoat-runtime/tests/daemon/test_jsonrpc_dispatch.py @@ -590,3 +590,24 @@ def test_extract_from_chat_flag_upserts_concerns(self) -> None: assert "error" not in out listed = h.handle(_req("concern.list"))["result"] assert len(listed) >= 1 + + +class TestReflexPoliciesExport: + def test_exports_hard_tool_guard_block_policies(self) -> None: + from opencoat_runtime_cli.demo_concerns import demo_concerns + + store = MemoryConcernStore() + for c in demo_concerns(): + store.upsert(c) + rt = OpenCOATRuntime( + concern_store=store, + dcn_store=MemoryDCNStore(), + llm=StubLLMClient(), + ) + h = JsonRpcHandler(rt) + out = h.handle(_req("reflex.policies.export", {"action_kind": "tool_call"})) + assert "error" not in out + result = out["result"] + assert result["version"] == "0.1" + ids = [p["id"] for p in result["policies"]] + assert "demo-tool-block" in ids From 6677eeeea84f9e99207e7ea874dffd51bb45d763 Mon Sep 17 00:00:00 2001 From: moss Date: Sun, 24 May 2026 20:39:41 +0700 Subject: [PATCH 2/3] fix(runtime): harden reflex.policies.export advice and pointcut filtering Skip AopAdvice rows without template instead of accessing advice_type. Only include legacy concern.pointcut keywords when joinpoints target tools. Co-authored-by: Cursor --- .../concern/reflex_policy_export.py | 61 +++++++++++++++---- .../tests/core/test_reflex_policy_export.py | 60 +++++++++++++++++- 2 files changed, 107 insertions(+), 14 deletions(-) diff --git a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py index 231d69a..2a22452 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py @@ -9,7 +9,15 @@ from typing import Any, Literal -from opencoat_runtime_protocol import AdviceType, Concern, LifecycleState, WeavingOperation +from opencoat_runtime_protocol import ( + AdviceType, + Concern, + JoinpointSelector, + LifecycleState, + Pointcut, + PointcutDef, + WeavingOperation, +) ReflexCriticality = Literal["safety_critical", "advisory"] ActionKind = Literal["tool_call"] @@ -22,22 +30,49 @@ ) +def _joinpoint_path(jp: str | JoinpointSelector) -> str: + if isinstance(jp, str): + return jp + if jp.path: + return jp.path + if jp.name: + return jp.name + return "" + + +def _expression_mentions_tool(expr: str) -> bool: + return "before_tool_call" in expr or "tool.before_call" in expr + + +def _pointcut_def_is_tool(pc: PointcutDef) -> bool: + jps = pc.joinpoints or [] + expr = pc.expression or "" + return ( + not jps + or any(_joinpoint_path(j) in _TOOL_JOINPOINTS for j in jps) + or _expression_mentions_tool(expr) + ) + + +def _legacy_pointcut_is_tool(pc: Pointcut) -> bool: + """Legacy ``concern.pointcut`` must explicitly target tool joinpoints.""" + jps = pc.joinpoints or [] + return any(_joinpoint_path(j) in _TOOL_JOINPOINTS for j in jps) + + def _pointcut_keywords(concern: Concern) -> list[str]: keys: list[str] = [] for pc in concern.pointcuts: - jps = pc.joinpoints or [] - expr = pc.expression or "" - tool_pc = ( - not jps - or any(j in _TOOL_JOINPOINTS for j in jps) - or "before_tool_call" in expr - or "tool.before_call" in expr - ) - if not tool_pc: + if not _pointcut_def_is_tool(pc): continue if pc.match and pc.match.any_keywords: keys.extend(pc.match.any_keywords) - if concern.pointcut and concern.pointcut.match and concern.pointcut.match.any_keywords: + if ( + concern.pointcut + and _legacy_pointcut_is_tool(concern.pointcut) + and concern.pointcut.match + and concern.pointcut.match.any_keywords + ): keys.extend(concern.pointcut.match.any_keywords) seen: set[str] = set() out: list[str] = [] @@ -52,8 +87,8 @@ def _pointcut_keywords(concern: Concern) -> list[str]: def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: """Return (deny_reason, needles) when concern is a hard tool guard block.""" for adv in concern.advices: - template = adv.template or adv.advice_type - if template != AdviceType.TOOL_GUARD: + template = adv.template + if template is None or template != AdviceType.TOOL_GUARD: continue effect = adv.effect or concern.weaving_policy if effect is None: diff --git a/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py index 360f31b..0c9112c 100644 --- a/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py +++ b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py @@ -13,7 +13,7 @@ WeavingOperation, WeavingPolicy, ) -from opencoat_runtime_protocol.envelopes import PointcutMatch +from opencoat_runtime_protocol.envelopes import Pointcut, PointcutMatch def _demo_tool_block() -> Concern: @@ -56,6 +56,64 @@ def test_export_demo_tool_block() -> None: assert "rm -rf" in row["predicate"]["needles"] +def test_skips_untemplated_aop_advice() -> None: + concern = Concern( + id="untemplated-guard", + name="bad row", + pointcuts=[ + PointcutDef( + id="pc-tool", + joinpoints=["before_tool_call"], + match=PointcutMatch(any_keywords=["rm -rf"]), + ), + ], + advices=[ + AopAdvice( + id="a1", + kind=AdviceKind.BEFORE, + pointcut_ref="pc-tool", + content="would block", + template=None, + effect=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.TOOL_LEVEL, + target="tool_call.arguments", + ), + ), + ], + ) + out = export_reflex_policies([concern]) + assert out["policies"] == [] + + +def test_legacy_pointcut_requires_tool_joinpoint() -> None: + concern = Concern( + id="response-guard", + name="response only", + pointcut=Pointcut( + joinpoints=["before_response"], + match=PointcutMatch(any_keywords=["secret"]), + ), + advices=[ + AopAdvice( + id="a1", + kind=AdviceKind.BEFORE, + pointcut_ref="pc", + content="block", + template=AdviceType.TOOL_GUARD, + effect=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.TOOL_LEVEL, + target="tool_call.arguments", + ), + ), + ], + pointcuts=[PointcutDef(id="pc", expression="before_response()")], + ) + out = export_reflex_policies([concern]) + assert out["policies"] == [] + + def test_skips_soft_advice() -> None: soft = Concern( id="soft-hint", From 0e3beb0b45669bd90e0927dcd16dd90ebd0e3df4 Mon Sep 17 00:00:00 2001 From: moss Date: Sun, 24 May 2026 21:01:05 +0700 Subject: [PATCH 3/3] chore: fix ruff lint and format for PR #79 CI Sort imports, remove unused pytest, fix SIM300 asserts, format new modules. Co-authored-by: Cursor --- .../concern/reflex_policy_export.py | 10 ++++++-- .../opencoat_runtime_core/weaving/__init__.py | 2 +- .../ipc/jsonrpc_dispatch.py | 2 +- .../tests/core/test_weaving_enforcement.py | 24 ++++++++++--------- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py index 2a22452..553629c 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py @@ -93,7 +93,11 @@ def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: effect = adv.effect or concern.weaving_policy if effect is None: continue - if effect.mode not in {WeavingOperation.BLOCK, WeavingOperation.SUPPRESS, WeavingOperation.ESCALATE}: + if effect.mode not in { + WeavingOperation.BLOCK, + WeavingOperation.SUPPRESS, + WeavingOperation.ESCALATE, + }: continue target = effect.target or "" if not (target == "tool_call" or target.startswith("tool_call.")): @@ -113,7 +117,9 @@ def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: }: needles = _pointcut_keywords(concern) if needles: - reason = (concern.advice.content or concern.description or f"Blocked by {concern.id}").strip() + reason = ( + concern.advice.content or concern.description or f"Blocked by {concern.id}" + ).strip() return reason, needles return None diff --git a/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py b/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py index 5fdded0..0fe6780 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/weaving/__init__.py @@ -30,8 +30,8 @@ "HARD_ADVICE_TYPES", "HARD_OPERATIONS", "INHIBITORY_ADVICE_TYPES", - "OPERATION_ENFORCEMENT", "OPERATIONS", + "OPERATION_ENFORCEMENT", "WEAVING_TARGETS", "AdviceTypeMeta", "ConcernWeaver", diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py index 69914eb..7341959 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py @@ -62,8 +62,8 @@ from opencoat_runtime_core import OpenCOATRuntime from opencoat_runtime_core.concern import ConcernBuilder, ConcernExtractor -from opencoat_runtime_core.concern.reflex_policy_export import export_reflex_policies from opencoat_runtime_core.concern.chat_extract import chat_text_for_extraction +from opencoat_runtime_core.concern.reflex_policy_export import export_reflex_policies from opencoat_runtime_protocol import Concern, ConcernInjection, JoinpointEvent from pydantic import ValidationError diff --git a/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py b/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py index a77fd58..4e72610 100644 --- a/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py +++ b/packages/opencoat-runtime/tests/core/test_weaving_enforcement.py @@ -16,8 +16,6 @@ from datetime import UTC, datetime -import pytest - from opencoat_runtime_core.dcn.evolution import DCNEvolver from opencoat_runtime_core.weaving.enforcement import ( ADVICE_TYPE_ENFORCEMENT, @@ -107,7 +105,7 @@ def test_operation_meta_lookup(self) -> None: assert meta.fail_mode in ("deny", "allow") def test_hard_operations_are_block_and_verify(self) -> None: - assert HARD_OPERATIONS == {WeavingOperation.BLOCK, WeavingOperation.VERIFY} + assert {WeavingOperation.BLOCK, WeavingOperation.VERIFY} == HARD_OPERATIONS def test_soft_operations_fail_open(self) -> None: for op in WeavingOperation: @@ -150,11 +148,11 @@ def test_advice_type_meta_lookup(self) -> None: assert meta.neuron_type in ("inhibitory", "excitatory") def test_hard_advice_types_are_guards_and_verification(self) -> None: - assert HARD_ADVICE_TYPES == { + assert { AdviceType.TOOL_GUARD, AdviceType.MEMORY_WRITE_GUARD, AdviceType.VERIFICATION_RULE, - } + } == HARD_ADVICE_TYPES def test_inhibitory_equals_hard_advice_types(self) -> None: """All inhibitory advice types must be hard, and vice-versa.""" @@ -217,9 +215,7 @@ def test_serialisation_roundtrip(self) -> None: class TestReflexExcludedFromEvolution: - def _make_evolver( - self, store: MemoryConcernStore, dcn: MemoryDCNStore - ) -> DCNEvolver: + def _make_evolver(self, store: MemoryConcernStore, dcn: MemoryDCNStore) -> DCNEvolver: return DCNEvolver( concern_store=store, dcn_store=dcn, @@ -230,7 +226,9 @@ def test_reflex_concern_not_merged(self) -> None: """A reflex concern must survive even when it overlaps keywords with another.""" store = MemoryConcernStore() dcn = MemoryDCNStore() - store.upsert(_concern("reflex-a", keywords=["tool", "guard"], reflex=True, neuron_type="inhibitory")) + store.upsert( + _concern("reflex-a", keywords=["tool", "guard"], reflex=True, neuron_type="inhibitory") + ) store.upsert(_concern("soft-b", keywords=["tool", "guard"], score=0.3)) result = self._make_evolver(store, dcn).run(_NOW) assert result.merged == 0, "reflex concern must not be merged" @@ -290,8 +288,12 @@ def test_two_reflex_concerns_both_survive(self) -> None: """Two reflex concerns with identical keywords must both survive.""" store = MemoryConcernStore() dcn = MemoryDCNStore() - store.upsert(_concern("r1", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory")) - store.upsert(_concern("r2", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory")) + store.upsert( + _concern("r1", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory") + ) + store.upsert( + _concern("r2", keywords=["tool", "safety"], reflex=True, neuron_type="inhibitory") + ) result = self._make_evolver(store, dcn).run(_NOW) assert result.merged == 0 assert store.get("r1") is not None