From 809327d034059a747a3205f0bdb2f9372d3187d1 Mon Sep 17 00:00:00 2001 From: moss Date: Sun, 24 May 2026 23:50:27 +0700 Subject: [PATCH 1/2] =?UTF-8?q?feat(runtime,bridge):=20v0.3=20phase-2=20me?= =?UTF-8?q?ga=20=E2=80=94=20TCB=20guards,=20cold=20plasticity,=20replay?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend in-proc ReflexMonitor to spawn/message/queue hooks, export all action kinds, add cold lift/archive on heartbeat, and deterministic r_t JSONL replay tests so users can enable one config flag and get full hot-path + plasticity loop. Co-authored-by: Cursor --- .../openclaw-opencoat-bridge/README.md | 7 +- .../openclaw.plugin.json | 6 +- .../openclaw-opencoat-bridge/src/daemon.ts | 2 + .../openclaw-opencoat-bridge/src/index.ts | 90 ++++++++++- .../src/reflex-monitor.test.ts | 15 +- .../src/reflex-monitor.ts | 2 +- .../src/reflex-policies.ts | 29 +++- .../src/reflex-policy-spec.ts | 31 +++- .../src/reflex-policy-sync.ts | 12 +- .../src/reflex-tool-guard.ts | 25 ++- .../openclaw-opencoat-bridge/src/types.ts | 4 +- .../concern/reflex_policy_export.py | 144 +++++++++++------- .../credit/plasticity_engine.py | 64 +++++++- .../opencoat_runtime_core/credit/rt_replay.py | 46 ++++++ .../ipc/jsonrpc_dispatch.py | 13 +- .../runtime_builder.py | 14 +- .../workers/__init__.py | 2 + .../workers/cold_plasticity_worker.py | 40 +++++ .../tests/core/test_plasticity_cold.py | 48 ++++++ .../tests/core/test_r_t_replay.py | 68 +++++++++ .../tests/core/test_reflex_policy_export.py | 94 ++++++++---- 21 files changed, 642 insertions(+), 114 deletions(-) create mode 100644 packages/opencoat-runtime/opencoat_runtime_core/credit/rt_replay.py create mode 100644 packages/opencoat-runtime/opencoat_runtime_daemon/workers/cold_plasticity_worker.py create mode 100644 packages/opencoat-runtime/tests/core/test_plasticity_cold.py create mode 100644 packages/opencoat-runtime/tests/core/test_r_t_replay.py diff --git a/integrations/openclaw-opencoat-bridge/README.md b/integrations/openclaw-opencoat-bridge/README.md index 5cad2db..ecb8865 100644 --- a/integrations/openclaw-opencoat-bridge/README.md +++ b/integrations/openclaw-opencoat-bridge/README.md @@ -337,15 +337,16 @@ guard decision). Enable in OpenClaw plugin config: **Behavior** -- Policies load from daemon `reflex.policies.export` (hard `TOOL_GUARD` + BLOCK concerns). -- Falls back to built-in `demo-tool-block` needles when export is empty. +- Policies load from daemon `reflex.policies.export` with `action_kind: all` (tool, spawn, message, queue hard BLOCK concerns). +- In-proc guards: `before_tool_call`, `subagent_spawning`, `message_sending`, `queue_before_enqueue`. +- Falls back to built-in `demo-tool-block` + `oc.dogfood.queue-block` when export is empty. - **Fail-closed** on monitor errors (contrast: collaborative path fail-open). - Optional async `joinpoint.submit` audit (`reflexAuditToDaemon`) for DCN without blocking the hook. Gateway log when enabled: ```text -[opencoat-bridge] in-proc ReflexMonitor tool_guard policies: demo-tool-block, … +[opencoat-bridge] in-proc ReflexMonitor policies: demo-tool-block, oc.dogfood.queue-block, … ``` See [v0.3 §10.5](../../docs/design/v0.3-morphogenetic-architecture.md#105-实现分期-2026-05). diff --git a/integrations/openclaw-opencoat-bridge/openclaw.plugin.json b/integrations/openclaw-opencoat-bridge/openclaw.plugin.json index 61e009d..c99d5f1 100644 --- a/integrations/openclaw-opencoat-bridge/openclaw.plugin.json +++ b/integrations/openclaw-opencoat-bridge/openclaw.plugin.json @@ -21,7 +21,11 @@ }, "inProcReflexToolGuard": { "type": "boolean", - "description": "Run before_tool_call through in-proc ReflexMonitor (v0.3 TCB prototype; fail-closed)" + "description": "Run hot-path hooks through in-proc ReflexMonitor (tool/spawn/message/queue; fail-closed)" + }, + "inProcReflexGuards": { + "type": "boolean", + "description": "Alias for inProcReflexToolGuard — enables all in-proc reflex guards (default true when tool guard on)" }, "reflexSyncFromDaemon": { "type": "boolean", diff --git a/integrations/openclaw-opencoat-bridge/src/daemon.ts b/integrations/openclaw-opencoat-bridge/src/daemon.ts index fe72d9b..b474712 100644 --- a/integrations/openclaw-opencoat-bridge/src/daemon.ts +++ b/integrations/openclaw-opencoat-bridge/src/daemon.ts @@ -29,6 +29,8 @@ export function resolveConfig(raw: Record | undefined): BridgeC runtimeObservers: raw?.runtimeObservers !== false, observerPollMs, inProcReflexToolGuard: raw?.inProcReflexToolGuard === true, + inProcReflexGuards: + raw?.inProcReflexGuards === true || raw?.inProcReflexToolGuard === true, reflexSyncFromDaemon: raw?.reflexSyncFromDaemon !== false, reflexAuditToDaemon: raw?.reflexAuditToDaemon !== false, reflexPolicies: parseInlineReflexPolicies(raw?.reflexPolicies), diff --git a/integrations/openclaw-opencoat-bridge/src/index.ts b/integrations/openclaw-opencoat-bridge/src/index.ts index 0402210..3b22814 100644 --- a/integrations/openclaw-opencoat-bridge/src/index.ts +++ b/integrations/openclaw-opencoat-bridge/src/index.ts @@ -35,7 +35,7 @@ import { toolResultPayload, } from "./payloads.js"; import { createObserveEmitter } from "./emit-joinpoint.js"; -import { loadReflexRuntime, buildReflexRuntime } from "./reflex-policy-sync.js"; +import { loadReflexRuntime, buildReflexRuntime, inProcReflexEnabled } from "./reflex-policy-sync.js"; import type { ReflexRuntime } from "./reflex-policy-sync.js"; import type { DecisionRecord } from "./reflex-monitor.js"; import { @@ -45,10 +45,12 @@ import { buildToolOutcomeRt, buildTurnCompleteRt, } from "./r-t-emit.js"; +import { buildPayloadAction } from "./reflex-policies.js"; import { buildReflexState, buildToolCallAction, failClosedToolGuard, + reflexDenyDecision, reflexToolGuardDecision, } from "./reflex-tool-guard.js"; import { @@ -258,7 +260,7 @@ async function handleHook( : {}; const toolName = typeof e.toolName === "string" ? e.toolName : "tool"; - if (cfg.inProcReflexToolGuard) { + if (inProcReflexEnabled(cfg)) { const runtime = reflexState.runtime; if (!runtime) { return failClosedToolGuard( @@ -330,11 +332,61 @@ async function handleHook( } case "message_out": { + if (inProcReflexEnabled(cfg)) { + const runtime = reflexState.runtime; + if (!runtime) { + return { + cancel: true, + content: "OpenCOAT ReflexMonitor fail-closed: not initialized", + }; + } + const e = asRecord(event); + const content = typeof e.content === "string" ? e.content : ""; + const action = buildPayloadAction("message_out", { content, ...payload }); + const decision = reflexDenyDecision( + runtime.monitor, + action, + buildReflexState(c), + ); + if (decision.block) { + return { + cancel: true, + content: + decision.blockReason ?? + "Blocked by OpenCOAT ReflexMonitor (message_out).", + }; + } + return {}; + } const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c); return messageSendingDecision(inj); } case "subagent_spawn": { + if (inProcReflexEnabled(cfg)) { + const runtime = reflexState.runtime; + if (!runtime) { + return { + status: "error", + error: "OpenCOAT ReflexMonitor fail-closed: not initialized", + }; + } + const action = buildPayloadAction("spawn", payload); + const decision = reflexDenyDecision( + runtime.monitor, + action, + buildReflexState(c), + ); + if (decision.block) { + return { + status: "error", + error: + decision.blockReason ?? + "Blocked by OpenCOAT ReflexMonitor (subagent_spawn).", + }; + } + return { status: "ok" }; + } const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c); const decision = subagentSpawnDecision(inj); if (decision.status === "error") return decision; @@ -342,6 +394,30 @@ async function handleHook( } case "queue_guard": { + if (inProcReflexEnabled(cfg)) { + const runtime = reflexState.runtime; + if (!runtime) { + return { + block: true, + blockReason: "OpenCOAT ReflexMonitor fail-closed: not initialized", + }; + } + const action = buildPayloadAction("queue_enqueue", payload); + const decision = reflexDenyDecision( + runtime.monitor, + action, + buildReflexState(c), + ); + if (decision.block) { + return { + block: true, + blockReason: + decision.blockReason ?? + "Blocked by OpenCOAT ReflexMonitor (queue_guard).", + }; + } + return {}; + } const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c); return queueBeforeEnqueueDecision(inj); } @@ -401,7 +477,7 @@ async function handleHook( }`, ); return binding.kind === "tool_guard" - ? cfg.inProcReflexToolGuard + ? inProcReflexEnabled(cfg) ? failClosedToolGuard( {}, err instanceof Error ? err : new Error(String(err)), @@ -414,13 +490,13 @@ async function handleHook( export default function register(api: BridgePluginApi): void { const cfg = resolveConfig(api.pluginConfig); - if (cfg.inProcReflexToolGuard) { + if (inProcReflexEnabled(cfg)) { reflexState.runtime = buildReflexRuntime(cfg, null); void loadReflexRuntime(cfg) .then((runtime) => { reflexState.runtime = runtime; api.logger?.info?.( - `[opencoat-bridge] in-proc ReflexMonitor tool_guard policies: ${ + `[opencoat-bridge] in-proc ReflexMonitor policies: ${ runtime.policyIds.join(", ") || "(none)" }`, ); @@ -452,7 +528,9 @@ export default function register(api: BridgePluginApi): void { `(skipped: ${SKIPPED_HOOKS.join(", ")}; daemon=${ cfg.enabled ? cfg.daemonUrl : "disabled" }${observerNote}${ - cfg.inProcReflexToolGuard ? "; in-proc ReflexMonitor tool_guard" : "" + inProcReflexEnabled(cfg) + ? "; in-proc ReflexMonitor (tool/spawn/message/queue)" + : "" }${cfg.emitRtJsonl ? "; r_t JSONL emit" : ""})`, ); } diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-monitor.test.ts b/integrations/openclaw-opencoat-bridge/src/reflex-monitor.test.ts index 456497e..96b0f5f 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-monitor.test.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-monitor.test.ts @@ -1,7 +1,7 @@ import { describe, it } from "node:test"; import assert from "node:assert/strict"; import { ReflexMonitor } from "./reflex-monitor.js"; -import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC } from "./reflex-policies.js"; +import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC, DEMO_QUEUE_BLOCK_SPEC } from "./reflex-policies.js"; import type { Action, ReflexPolicy, State } from "./reflex-monitor.js"; const state: State = { @@ -96,4 +96,17 @@ describe("ReflexMonitor", () => { assert.equal(decision.kind, "allow"); assert.equal(record.policy_id, undefined); }); + + it("denies queue enqueue with QUEUE_DOGFOOD_BLOCK keyword", () => { + const monitor = new ReflexMonitor( + compileReflexPolicies([DEMO_QUEUE_BLOCK_SPEC]), + ); + const action: Action = { + kind: "queue_enqueue", + name: "queue_enqueue", + args: { prompt: "Please QUEUE_DOGFOOD_BLOCK this follow-up" }, + }; + const { decision } = monitor.mediate(action, state); + assert.equal(decision.kind, "deny"); + }); }); diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-monitor.ts b/integrations/openclaw-opencoat-bridge/src/reflex-monitor.ts index 7e1695a..d62be62 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-monitor.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-monitor.ts @@ -6,7 +6,7 @@ import type { ReflexCriticality } from "./reflex-policy-spec.js"; -export type ActionKind = "tool_call"; +export type ActionKind = "tool_call" | "spawn" | "message_out" | "queue_enqueue"; export type Action = { kind: ActionKind; diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-policies.ts b/integrations/openclaw-opencoat-bridge/src/reflex-policies.ts index dbdcc93..5fbfb86 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-policies.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-policies.ts @@ -1,5 +1,5 @@ import type { ReflexPolicySpec } from "./reflex-policy-spec.js"; -import type { Action, Decision, ReflexPolicy, State } from "./reflex-monitor.js"; +import type { Action, ActionKind, Decision, ReflexPolicy, State } from "./reflex-monitor.js"; function serializeArgs(args: Record): string { try { @@ -35,7 +35,7 @@ function compileOne(spec: ReflexPolicySpec): ReflexPolicy { id: spec.id, criticality: spec.criticality, applies(action: Action, _state: State): boolean { - if (action.kind !== "tool_call") return false; + if (action.kind !== spec.action_kind) return false; if (spec.predicate.kind === "tool_name") { return spec.predicate.names.includes(action.name); } @@ -68,3 +68,28 @@ export const DEMO_TOOL_BLOCK_SPEC: ReflexPolicySpec = { deny_reason: "Refusing destructive shell command — `rm -rf` is blocked by demo-tool-block.", }; + +/** Queue dogfood block when daemon export is unavailable. */ +export const DEMO_QUEUE_BLOCK_SPEC: ReflexPolicySpec = { + id: "oc.dogfood.queue-block", + criticality: "safety_critical", + action_kind: "queue_enqueue", + predicate: { + kind: "text_contains", + needles: ["QUEUE_DOGFOOD_BLOCK"], + }, + deny_reason: + "Follow-up queue blocked by OpenCOAT dogfood concern (oc.dogfood.queue-block).", +}; + +export function buildPayloadAction( + kind: ActionKind, + payload: Record, +): Action { + return { + kind, + name: kind, + args: { ...payload, text: JSON.stringify(payload) }, + raw: payload, + }; +} diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-policy-spec.ts b/integrations/openclaw-opencoat-bridge/src/reflex-policy-spec.ts index c25399b..efaae1d 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-policy-spec.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-policy-spec.ts @@ -2,18 +2,23 @@ export type ReflexCriticality = "safety_critical" | "advisory"; +export type ReflexActionKind = + | "tool_call" + | "spawn" + | "message_out" + | "queue_enqueue"; + export type ReflexPolicySpec = { id: string; criticality: ReflexCriticality; - /** Only `tool_call` is implemented in the TCB prototype. */ - action_kind: "tool_call"; + action_kind: ReflexActionKind; predicate: ReflexPredicateSpec; deny_reason: string; }; export type ReflexPredicateSpec = | { - kind: "args_contains"; + kind: "args_contains" | "text_contains"; needles: string[]; case_insensitive?: boolean; } @@ -27,6 +32,13 @@ export type ReflexPolicyExport = { policies: ReflexPolicySpec[]; }; +const ACTION_KINDS = new Set([ + "tool_call", + "spawn", + "message_out", + "queue_enqueue", +]); + export function parseReflexPolicyExport(raw: unknown): ReflexPolicyExport | null { if (!raw || typeof raw !== "object") return null; const obj = raw as Record; @@ -37,7 +49,9 @@ export function parseReflexPolicyExport(raw: unknown): ReflexPolicyExport | null if (!row || typeof row !== "object") continue; const p = row as Record; if (typeof p.id !== "string" || !p.id.trim()) continue; - if (p.action_kind !== "tool_call") continue; + if (typeof p.action_kind !== "string" || !ACTION_KINDS.has(p.action_kind as ReflexActionKind)) { + continue; + } if (p.criticality !== "safety_critical" && p.criticality !== "advisory") continue; if (typeof p.deny_reason !== "string") continue; @@ -46,11 +60,14 @@ export function parseReflexPolicyExport(raw: unknown): ReflexPolicyExport | null const pr = pred as Record; let predicate: ReflexPredicateSpec | null = null; - if (pr.kind === "args_contains" && Array.isArray(pr.needles)) { + if ( + (pr.kind === "args_contains" || pr.kind === "text_contains") && + Array.isArray(pr.needles) + ) { const needles = pr.needles.filter((n): n is string => typeof n === "string" && n.length > 0); if (needles.length) { predicate = { - kind: "args_contains", + kind: pr.kind, needles, case_insensitive: pr.case_insensitive === true, }; @@ -64,7 +81,7 @@ export function parseReflexPolicyExport(raw: unknown): ReflexPolicyExport | null policies.push({ id: p.id, criticality: p.criticality, - action_kind: "tool_call", + action_kind: p.action_kind as ReflexActionKind, predicate, deny_reason: p.deny_reason, }); diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-policy-sync.ts b/integrations/openclaw-opencoat-bridge/src/reflex-policy-sync.ts index d2c0292..dba5a05 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-policy-sync.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-policy-sync.ts @@ -2,7 +2,7 @@ import { parseReflexPolicyExport, type ReflexPolicyExport, } from "./reflex-policy-spec.js"; -import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC } from "./reflex-policies.js"; +import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC, DEMO_QUEUE_BLOCK_SPEC } from "./reflex-policies.js"; import { ReflexMonitor } from "./reflex-monitor.js"; import type { BridgeConfig } from "./types.js"; @@ -21,7 +21,7 @@ export async function fetchReflexPolicyExport( jsonrpc: "2.0", method: "reflex.policies.export", id: `reflex-export-${crypto.randomUUID()}`, - params: { action_kind: "tool_call" }, + params: { action_kind: "all" }, }; let res: Response; @@ -46,6 +46,10 @@ export async function fetchReflexPolicyExport( return parseReflexPolicyExport(json.result); } +export function inProcReflexEnabled(cfg: BridgeConfig): boolean { + return cfg.inProcReflexToolGuard || cfg.inProcReflexGuards; +} + export function buildReflexRuntime( cfg: BridgeConfig, exported: ReflexPolicyExport | null, @@ -54,7 +58,7 @@ export function buildReflexRuntime( ...(cfg.reflexPolicies ?? []), ...(exported?.policies ?? []), ...(cfg.reflexIncludeDemoPolicy && !exported?.policies?.length - ? [DEMO_TOOL_BLOCK_SPEC] + ? [DEMO_TOOL_BLOCK_SPEC, DEMO_QUEUE_BLOCK_SPEC] : []), ]; @@ -78,7 +82,7 @@ export function buildReflexRuntime( export async function loadReflexRuntime(cfg: BridgeConfig): Promise { const exported = - cfg.inProcReflexToolGuard && cfg.reflexSyncFromDaemon + inProcReflexEnabled(cfg) && cfg.reflexSyncFromDaemon ? await fetchReflexPolicyExport(cfg) : null; return buildReflexRuntime(cfg, exported); diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts index 3a4a0d6..8595e31 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts @@ -1,5 +1,5 @@ import type { AgentHookCtx } from "./types.js"; -import type { Action, ReflexMonitor, State } from "./reflex-monitor.js"; +import type { Action, ActionKind, ReflexMonitor, State } from "./reflex-monitor.js"; import type { ToolGuardDecision } from "./injector.js"; export function buildToolCallAction(event: { @@ -28,6 +28,29 @@ export function buildReflexState(ctx: AgentHookCtx): State { }; } +export type ReflexDenyResult = { + block: boolean; + blockReason?: string; + record?: ReturnType["record"]; +}; + +/** Generic deny-only ReflexMonitor path for spawn/message/queue hooks. */ +export function reflexDenyDecision( + monitor: ReflexMonitor, + action: Action, + state: State, +): ReflexDenyResult { + const { decision, record } = monitor.mediate(action, state); + if (decision.kind === "deny") { + return { + block: true, + blockReason: decision.reason, + record, + }; + } + return { block: false, record }; +} + /** Map ReflexMonitor output to OpenClaw ``before_tool_call`` hook return shape. */ export function reflexToolGuardDecision( monitor: ReflexMonitor, diff --git a/integrations/openclaw-opencoat-bridge/src/types.ts b/integrations/openclaw-opencoat-bridge/src/types.ts index 5673a18..45a65b3 100644 --- a/integrations/openclaw-opencoat-bridge/src/types.ts +++ b/integrations/openclaw-opencoat-bridge/src/types.ts @@ -10,8 +10,10 @@ export type BridgeConfig = { runtimeObservers: boolean; /** Interval for queue/task poll service (ms). */ observerPollMs: number; - /** Run ``before_tool_call`` through in-proc ``ReflexMonitor`` (v0.3 TCB prototype). */ + /** Run hot-path hooks through in-proc ``ReflexMonitor`` (v0.3 TCB prototype). */ inProcReflexToolGuard: boolean; + /** Enable in-proc guards for spawn/message/queue (defaults to tool guard flag). */ + inProcReflexGuards: boolean; /** Pull ``reflex.policies.export`` from daemon on plugin load. */ reflexSyncFromDaemon: boolean; /** Emit joinpoint.submit after in-proc guard for DCN audit (async). */ diff --git a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py index 553629c..23b60be 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py @@ -1,8 +1,8 @@ """Export portable in-proc reflex policy specs from the concern store (v0.3 §10.4). The bridge ``ReflexMonitor`` consumes the JSON returned by -``reflex.policies.export`` so hot-path tool guards can run synchronously -without ``joinpoint.submit`` on every ``before_tool_call``. +``reflex.policies.export`` so hot-path guards can run synchronously +without ``joinpoint.submit`` on every hook. """ from __future__ import annotations @@ -20,15 +20,36 @@ ) ReflexCriticality = Literal["safety_critical", "advisory"] -ActionKind = Literal["tool_call"] +ActionKind = Literal["tool_call", "spawn", "message_out", "queue_enqueue", "all"] -_TOOL_JOINPOINTS = frozenset( +_BLOCK_MODES = frozenset( { - "before_tool_call", - "tool.before_call", + WeavingOperation.BLOCK, + WeavingOperation.SUPPRESS, + WeavingOperation.ESCALATE, } ) +_TOOL_JOINPOINTS = frozenset({"before_tool_call", "tool.before_call"}) +_SPAWN_JOINPOINTS = frozenset( + {"task.before_create", "subagent_spawning", "subagent.before_spawn"} +) +_MESSAGE_JOINPOINTS = frozenset( + { + "before_response", + "response.before_final", + "message_sending", + } +) +_QUEUE_JOINPOINTS = frozenset({"queue.before_enqueue"}) + +_ACTION_PROFILES: dict[str, tuple[frozenset[str], tuple[str, ...], bool]] = { + "tool_call": (_TOOL_JOINPOINTS, ("tool_call",), True), + "spawn": (_SPAWN_JOINPOINTS, ("task", "subagent"), False), + "message_out": (_MESSAGE_JOINPOINTS, ("runtime_prompt", "response"), False), + "queue_enqueue": (_QUEUE_JOINPOINTS, ("queue",), False), +} + def _joinpoint_path(jp: str | JoinpointSelector) -> str: if isinstance(jp, str): @@ -40,36 +61,39 @@ def _joinpoint_path(jp: str | JoinpointSelector) -> str: return "" -def _expression_mentions_tool(expr: str) -> bool: - return "before_tool_call" in expr or "tool.before_call" in expr +def _expression_mentions_joinpoints(expr: str, joinpoints: frozenset[str]) -> bool: + return any(jp in expr for jp in joinpoints) -def _pointcut_def_is_tool(pc: PointcutDef) -> bool: +def _pointcut_def_matches(pc: PointcutDef, joinpoints: frozenset[str]) -> bool: jps = pc.joinpoints or [] expr = pc.expression or "" return ( not jps - or any(_joinpoint_path(j) in _TOOL_JOINPOINTS for j in jps) - or _expression_mentions_tool(expr) + or any(_joinpoint_path(j) in joinpoints for j in jps) + or _expression_mentions_joinpoints(expr, joinpoints) ) -def _legacy_pointcut_is_tool(pc: Pointcut) -> bool: - """Legacy ``concern.pointcut`` must explicitly target tool joinpoints.""" +def _legacy_pointcut_matches(pc: Pointcut, joinpoints: frozenset[str]) -> bool: jps = pc.joinpoints or [] - return any(_joinpoint_path(j) in _TOOL_JOINPOINTS for j in jps) + return any(_joinpoint_path(j) in joinpoints for j in jps) + +def _target_matches(target: str, prefixes: tuple[str, ...]) -> bool: + return any(target == p or target.startswith(f"{p}.") for p in prefixes) -def _pointcut_keywords(concern: Concern) -> list[str]: + +def _pointcut_keywords(concern: Concern, joinpoints: frozenset[str]) -> list[str]: keys: list[str] = [] for pc in concern.pointcuts: - if not _pointcut_def_is_tool(pc): + if not _pointcut_def_matches(pc, joinpoints): continue if pc.match and pc.match.any_keywords: keys.extend(pc.match.any_keywords) if ( concern.pointcut - and _legacy_pointcut_is_tool(concern.pointcut) + and _legacy_pointcut_matches(concern.pointcut, joinpoints) and concern.pointcut.match and concern.pointcut.match.any_keywords ): @@ -84,38 +108,33 @@ def _pointcut_keywords(concern: Concern) -> list[str]: return out -def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: - """Return (deny_reason, needles) when concern is a hard tool guard block.""" +def _is_hard_block( + concern: Concern, + *, + joinpoints: frozenset[str], + target_prefixes: tuple[str, ...], + require_tool_guard: bool, +) -> tuple[str, list[str]] | None: + """Return (deny_reason, needles) when concern is a hard block for the profile.""" for adv in concern.advices: - template = adv.template - if template is None or template != AdviceType.TOOL_GUARD: + if require_tool_guard and adv.template != AdviceType.TOOL_GUARD: continue effect = adv.effect or concern.weaving_policy - if effect is None: - continue - if effect.mode not in { - WeavingOperation.BLOCK, - WeavingOperation.SUPPRESS, - WeavingOperation.ESCALATE, - }: + if effect is None or effect.mode not in _BLOCK_MODES: continue target = effect.target or "" - if not (target == "tool_call" or target.startswith("tool_call.")): + if not _target_matches(target, target_prefixes): continue - needles = _pointcut_keywords(concern) + needles = _pointcut_keywords(concern, joinpoints) if not needles: continue reason = (adv.content or concern.description or f"Blocked by {concern.id}").strip() return reason, needles - if concern.advice and concern.advice.type == AdviceType.TOOL_GUARD: + if require_tool_guard and concern.advice and concern.advice.type == AdviceType.TOOL_GUARD: wp = concern.weaving_policy - if wp and wp.mode in { - WeavingOperation.BLOCK, - WeavingOperation.SUPPRESS, - WeavingOperation.ESCALATE, - }: - needles = _pointcut_keywords(concern) + if wp and wp.mode in _BLOCK_MODES: + needles = _pointcut_keywords(concern, joinpoints) if needles: reason = ( concern.advice.content or concern.description or f"Blocked by {concern.id}" @@ -124,39 +143,60 @@ def _is_hard_tool_block(concern: Concern) -> tuple[str, list[str]] | None: return None -def export_reflex_policies( - concerns: list[Concern], - *, - action_kind: ActionKind = "tool_call", -) -> dict[str, Any]: - """Build portable reflex policy export for the bridge TCB.""" - if action_kind != "tool_call": - return {"version": "0.1", "policies": []} +def _export_one_kind(concerns: list[Concern], action_kind: ActionKind) -> list[dict[str, Any]]: + if action_kind == "all": + return [] + + profile = _ACTION_PROFILES.get(action_kind) + if profile is None: + return [] + + joinpoints, target_prefixes, require_tool_guard = profile + predicate_kind = "args_contains" if action_kind == "tool_call" else "text_contains" policies: list[dict[str, Any]] = [] for concern in concerns: if concern.lifecycle_state in {LifecycleState.ARCHIVED, LifecycleState.MERGED}: continue - hit = _is_hard_tool_block(concern) + hit = _is_hard_block( + concern, + joinpoints=joinpoints, + target_prefixes=target_prefixes, + require_tool_guard=require_tool_guard, + ) if hit is None: continue reason, needles = hit - criticality: ReflexCriticality = "safety_critical" policies.append( { "id": concern.id, - "criticality": criticality, - "action_kind": "tool_call", + "criticality": "safety_critical", + "action_kind": action_kind, "predicate": { - "kind": "args_contains", + "kind": predicate_kind, "needles": needles, }, "deny_reason": reason, } ) - policies.sort(key=lambda p: p["id"]) - return {"version": "0.1", "policies": policies} + return policies + + +def export_reflex_policies( + concerns: list[Concern], + *, + action_kind: ActionKind = "tool_call", +) -> dict[str, Any]: + """Build portable reflex policy export for the bridge TCB.""" + if action_kind == "all": + merged: list[dict[str, Any]] = [] + for kind in ("tool_call", "spawn", "message_out", "queue_enqueue"): + merged.extend(_export_one_kind(concerns, kind)) + merged.sort(key=lambda p: (p["action_kind"], p["id"])) + return {"version": "0.1", "policies": merged} + + return {"version": "0.1", "policies": _export_one_kind(concerns, action_kind)} __all__ = ["export_reflex_policies"] diff --git a/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py b/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py index acc08d5..a152428 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py @@ -26,10 +26,26 @@ def as_dict(self) -> dict[str, int]: } +@dataclass(frozen=True) +class ColdStepStats: + lifted: int = 0 + archived: int = 0 + skipped: int = 0 + + def as_dict(self) -> dict[str, int]: + return { + "lifted": self.lifted, + "archived": self.archived, + "skipped": self.skipped, + } + + class PlasticityEngine: - """Prototype ``⇩_slow`` reweight only — no split/lift/connect/prune yet.""" + """Prototype ``⇩_slow`` reweight + cold lift/archive (v0.3 §11 subset).""" DEFAULT_DELTA = 0.05 + LIFT_SCORE = 0.75 + ARCHIVE_SCORE = 0.08 def __init__(self, *, step_delta: float = DEFAULT_DELTA) -> None: if not 0.0 < step_delta <= 1.0: @@ -123,6 +139,45 @@ def _attribute_policy( return concern_id, advantage return None, 0.0 + def cold_step( + self, + *, + concern_store: ConcernStore, + lifecycle: ConcernLifecycleManager, + ) -> ColdStepStats: + """Cold-path lift (reflex flag) and archive weak concerns.""" + lifted = 0 + archived = 0 + skipped = 0 + for concern in concern_store.list(): + if concern.lifecycle_state in {"archived", "merged", "deleted"}: + skipped += 1 + continue + score = ( + concern.activation_state.score + if concern.activation_state is not None + else None + ) + if score is None: + skipped += 1 + continue + if concern.reflex: + skipped += 1 + continue + try: + if score >= self.LIFT_SCORE and concern.lifecycle_state == "reinforced": + updated = concern.model_copy(update={"reflex": True}) + concern_store.upsert(updated) + lifted += 1 + elif score <= self.ARCHIVE_SCORE and concern.lifecycle_state == "weakened": + lifecycle.archive(concern, reason="cold plasticity: score below threshold") + archived += 1 + else: + skipped += 1 + except Exception: + skipped += 1 + return ColdStepStats(lifted=lifted, archived=archived, skipped=skipped) + def concern_ids_from_records(records: list[RtRecord]) -> list[str]: engine = PlasticityEngine() @@ -134,4 +189,9 @@ def concern_ids_from_records(records: list[RtRecord]) -> list[str]: return ids -__all__ = ["PlasticityEngine", "ReweightStats", "concern_ids_from_records"] +__all__ = [ + "ColdStepStats", + "PlasticityEngine", + "ReweightStats", + "concern_ids_from_records", +] diff --git a/packages/opencoat-runtime/opencoat_runtime_core/credit/rt_replay.py b/packages/opencoat-runtime/opencoat_runtime_core/credit/rt_replay.py new file mode 100644 index 0000000..4dce424 --- /dev/null +++ b/packages/opencoat-runtime/opencoat_runtime_core/credit/rt_replay.py @@ -0,0 +1,46 @@ +"""Deterministic replay of ``r_t.jsonl`` for plasticity tests (v0.3 §11 step 4).""" + +from __future__ import annotations + +import json +from pathlib import Path + +from opencoat_runtime_core.concern.lifecycle import ConcernLifecycleManager +from opencoat_runtime_core.credit.plasticity_engine import PlasticityEngine +from opencoat_runtime_core.credit.r_t_record import RtRecord +from opencoat_runtime_core.ports import ConcernStore, DCNStore + + +def read_rt_jsonl(path: Path | str) -> list[RtRecord]: + """Load all ``r_t`` rows from a JSONL file (ignores tail cursor).""" + records: list[RtRecord] = [] + with Path(path).open(encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line: + continue + records.append(RtRecord.model_validate(json.loads(line))) + return records + + +def replay_rt_jsonl( + path: Path | str, + *, + concern_store: ConcernStore, + dcn_store: DCNStore, + engine: PlasticityEngine | None = None, +) -> dict[str, float]: + """Replay JSONL rows through reweight and return final concern scores.""" + records = read_rt_jsonl(path) + plasticity = engine or PlasticityEngine() + lifecycle = ConcernLifecycleManager(concern_store=concern_store, dcn_store=dcn_store) + plasticity.reweight(records, concern_store=concern_store, lifecycle=lifecycle) + scores: dict[str, float] = {} + for concern in concern_store.list(): + if concern.activation_state is None or concern.activation_state.score is None: + continue + scores[concern.id] = concern.activation_state.score + return scores + + +__all__ = ["read_rt_jsonl", "replay_rt_jsonl"] diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py index dc43881..99f3f19 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py @@ -442,8 +442,17 @@ def _reflex_policies_export(self, params: dict[str, Any] | list[Any]) -> dict[st """Portable deterministic reflex specs for in-proc bridge ``ReflexMonitor``.""" p = _expect_params_dict(params) action_kind = p.get("action_kind", "tool_call") - if action_kind not in ("tool_call",): - raise JsonRpcParamsError("action_kind must be 'tool_call'") + if action_kind not in ( + "tool_call", + "spawn", + "message_out", + "queue_enqueue", + "all", + ): + raise JsonRpcParamsError( + "action_kind must be one of: tool_call, spawn, message_out, " + "queue_enqueue, all" + ) concerns = self._rt.concern_store.list() return export_reflex_policies(concerns, action_kind=action_kind) diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/runtime_builder.py b/packages/opencoat-runtime/opencoat_runtime_daemon/runtime_builder.py index 630c911..a56a617 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/runtime_builder.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/runtime_builder.py @@ -53,7 +53,13 @@ from opencoat_runtime_storage.sqlite import SqliteConcernStore, SqliteDCNStore from .config.loader import DaemonConfig, LLMSettings, StorageBackend -from .workers import ConflictScannerWorker, DecayWorker, MergeArchiverWorker, RtPlasticityWorker +from .workers import ( + ColdPlasticityWorker, + ConflictScannerWorker, + DecayWorker, + MergeArchiverWorker, + RtPlasticityWorker, +) logger = logging.getLogger(__name__) @@ -77,20 +83,24 @@ def build_heartbeat_maintenance( ) conflict = ConflictScannerWorker(concern_store=concern_store, dcn_store=dcn_store) rt_worker = RtPlasticityWorker(rt_service=rt_plasticity) if rt_plasticity is not None else None + cold_worker = ColdPlasticityWorker(concern_store=concern_store, dcn_store=dcn_store) def maintenance(now: datetime) -> dict[str, int]: decay_stats = decay.run(now) merge_stats = merge_archiver.run(now) conflict_stats = conflict.run(now) rt_stats = rt_worker.run(now) if rt_worker is not None else {} + cold_stats = cold_worker.run(now) return { "decay_count": int(decay_stats.get("touched", 0)), "archive_count": int(decay_stats.get("archived", 0)) - + int(merge_stats.get("archived", 0)), + + int(merge_stats.get("archived", 0)) + + int(cold_stats.get("archived", 0)), "merge_count": int(merge_stats.get("merged", 0)), "conflict_count": int(conflict_stats.get("edges_added", 0)), "rt_reinforced": int(rt_stats.get("reinforced", 0)), "rt_weakened": int(rt_stats.get("weakened", 0)), + "cold_lifted": int(cold_stats.get("lifted", 0)), } return maintenance diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/workers/__init__.py b/packages/opencoat-runtime/opencoat_runtime_daemon/workers/__init__.py index 234cbdf..d85b67a 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/workers/__init__.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/workers/__init__.py @@ -4,6 +4,7 @@ the worker classes; M6 wires them into the scheduler. """ +from .cold_plasticity_worker import ColdPlasticityWorker from .conflict_scanner import ConflictScannerWorker from .decay_worker import DecayWorker from .extraction_worker import ExtractionWorker @@ -13,6 +14,7 @@ from .verification_worker import VerificationWorker __all__ = [ + "ColdPlasticityWorker", "ConflictScannerWorker", "DecayWorker", "ExtractionWorker", diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/workers/cold_plasticity_worker.py b/packages/opencoat-runtime/opencoat_runtime_daemon/workers/cold_plasticity_worker.py new file mode 100644 index 0000000..05b0f79 --- /dev/null +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/workers/cold_plasticity_worker.py @@ -0,0 +1,40 @@ +"""Cold-path plasticity (lift/archive) on heartbeat.""" + +from __future__ import annotations + +from datetime import datetime + +from opencoat_runtime_core.concern.lifecycle import ConcernLifecycleManager +from opencoat_runtime_core.credit.plasticity_engine import PlasticityEngine +from opencoat_runtime_core.ports import ConcernStore, DCNStore + +from ._base import Worker + + +class ColdPlasticityWorker(Worker): + """Run :class:`PlasticityEngine` cold_step over the concern store.""" + + def __init__( + self, + *, + concern_store: ConcernStore, + dcn_store: DCNStore, + engine: PlasticityEngine | None = None, + ) -> None: + self._concern_store = concern_store + self._lifecycle = ConcernLifecycleManager( + concern_store=concern_store, + dcn_store=dcn_store, + ) + self._engine = engine or PlasticityEngine() + + def run(self, now: datetime) -> dict: + del now + stats = self._engine.cold_step( + concern_store=self._concern_store, + lifecycle=self._lifecycle, + ) + return stats.as_dict() + + +__all__ = ["ColdPlasticityWorker"] diff --git a/packages/opencoat-runtime/tests/core/test_plasticity_cold.py b/packages/opencoat-runtime/tests/core/test_plasticity_cold.py new file mode 100644 index 0000000..60bfe34 --- /dev/null +++ b/packages/opencoat-runtime/tests/core/test_plasticity_cold.py @@ -0,0 +1,48 @@ +"""Tests for PlasticityEngine cold_step lift/archive.""" + +from __future__ import annotations + +from opencoat_runtime_core.concern.lifecycle import ConcernLifecycleManager +from opencoat_runtime_core.credit.plasticity_engine import PlasticityEngine +from opencoat_runtime_protocol import ActivationState, Concern +from opencoat_runtime_storage.memory import MemoryConcernStore, MemoryDCNStore + + +def test_cold_step_lifts_reinforced_high_score() -> None: + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert( + Concern( + id="strong-guard", + name="guard", + lifecycle_state="reinforced", + activation_state=ActivationState(score=0.8, active=True, decay=0.0), + ) + ) + lifecycle = ConcernLifecycleManager(concern_store=store, dcn_store=dcn) + stats = PlasticityEngine().cold_step(concern_store=store, lifecycle=lifecycle) + + assert stats.lifted == 1 + updated = store.get("strong-guard") + assert updated is not None + assert updated.reflex is True + + +def test_cold_step_archives_weak_low_score() -> None: + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert( + Concern( + id="weak-hint", + name="hint", + lifecycle_state="weakened", + activation_state=ActivationState(score=0.05, active=False, decay=0.0), + ) + ) + lifecycle = ConcernLifecycleManager(concern_store=store, dcn_store=dcn) + stats = PlasticityEngine().cold_step(concern_store=store, lifecycle=lifecycle) + + assert stats.archived == 1 + updated = store.get("weak-hint") + assert updated is not None + assert updated.lifecycle_state == "archived" diff --git a/packages/opencoat-runtime/tests/core/test_r_t_replay.py b/packages/opencoat-runtime/tests/core/test_r_t_replay.py new file mode 100644 index 0000000..6f0a3a2 --- /dev/null +++ b/packages/opencoat-runtime/tests/core/test_r_t_replay.py @@ -0,0 +1,68 @@ +"""Deterministic r_t JSONL replay harness tests.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + +from opencoat_runtime_core.credit.plasticity_engine import PlasticityEngine +from opencoat_runtime_core.credit.r_t_record import RtRecord, RtSignal +from opencoat_runtime_core.credit.rt_replay import read_rt_jsonl, replay_rt_jsonl +from opencoat_runtime_protocol import Concern +from opencoat_runtime_storage.memory import MemoryConcernStore, MemoryDCNStore + + +def _write_jsonl(path: Path, records: list[RtRecord]) -> None: + with path.open("w", encoding="utf-8") as fh: + for rec in records: + fh.write(json.dumps(rec.model_dump(mode="json")) + "\n") + + +def test_replay_is_deterministic(tmp_path: Path) -> None: + records = [ + RtRecord( + ts=datetime(2026, 5, 24, tzinfo=UTC), + session_id="s1", + turn_id="run-1", + joinpoint="before_tool_call", + hook="before_tool_call", + signal=RtSignal( + kind="tool_blocked", + reflex={"policy_id": "demo-tool-block", "decision": "deny"}, + ), + r=1.0, + ), + RtRecord( + ts=datetime(2026, 5, 24, 1, tzinfo=UTC), + session_id="s1", + turn_id="run-1", + joinpoint="after_tool_call", + hook="after_tool_call", + signal=RtSignal( + kind="tool_outcome", + tool_name="shell.exec", + reflex={"policy_id": "demo-tool-block", "decision": "deny"}, + ), + r=1.0, + ), + ] + path = tmp_path / "r_t.jsonl" + _write_jsonl(path, records) + assert len(read_rt_jsonl(path)) == 2 + + def run_once() -> dict[str, float]: + store = MemoryConcernStore() + dcn = MemoryDCNStore() + store.upsert(Concern(id="demo-tool-block", name="block")) + return replay_rt_jsonl( + path, + concern_store=store, + dcn_store=dcn, + engine=PlasticityEngine(step_delta=0.1), + ) + + first = run_once() + second = run_once() + assert first == second + assert first["demo-tool-block"] > 0.0 diff --git a/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py index 0c9112c..f1fa153 100644 --- a/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py +++ b/packages/opencoat-runtime/tests/core/test_reflex_policy_export.py @@ -13,7 +13,7 @@ WeavingOperation, WeavingPolicy, ) -from opencoat_runtime_protocol.envelopes import Pointcut, PointcutMatch +from opencoat_runtime_protocol.envelopes import PointcutMatch def _demo_tool_block() -> Concern: @@ -86,34 +86,6 @@ def test_skips_untemplated_aop_advice() -> None: assert out["policies"] == [] -def test_legacy_pointcut_requires_tool_joinpoint() -> None: - concern = Concern( - id="response-guard", - name="response only", - pointcut=Pointcut( - joinpoints=["before_response"], - match=PointcutMatch(any_keywords=["secret"]), - ), - advices=[ - AopAdvice( - id="a1", - kind=AdviceKind.BEFORE, - pointcut_ref="pc", - content="block", - template=AdviceType.TOOL_GUARD, - effect=WeavingPolicy( - mode=WeavingOperation.BLOCK, - level=WeavingLevel.TOOL_LEVEL, - target="tool_call.arguments", - ), - ), - ], - pointcuts=[PointcutDef(id="pc", expression="before_response()")], - ) - out = export_reflex_policies([concern]) - assert out["policies"] == [] - - def test_skips_soft_advice() -> None: soft = Concern( id="soft-hint", @@ -136,3 +108,67 @@ def test_skips_soft_advice() -> None: ) out = export_reflex_policies([soft]) assert out["policies"] == [] + + +def test_export_queue_block_concern() -> None: + concern = Concern( + id="oc.dogfood.queue-block", + name="queue block", + pointcuts=[ + PointcutDef( + id="pc-queue-block", + joinpoints=["queue.before_enqueue"], + match=PointcutMatch(any_keywords=["QUEUE_DOGFOOD_BLOCK"]), + ), + ], + advices=[ + AopAdvice( + id="adv-block", + kind=AdviceKind.BEFORE, + pointcut_ref="pc-queue-block", + content="Follow-up queue blocked.", + template=AdviceType.MEMORY_WRITE_GUARD, + effect=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.MEMORY_LEVEL, + target="queue.prompt", + ), + ), + ], + ) + out = export_reflex_policies([concern], action_kind="queue_enqueue") + assert len(out["policies"]) == 1 + row = out["policies"][0] + assert row["action_kind"] == "queue_enqueue" + assert row["predicate"]["kind"] == "text_contains" + assert "QUEUE_DOGFOOD_BLOCK" in row["predicate"]["needles"] + + +def test_export_all_merges_kinds() -> None: + queue = Concern( + id="oc.dogfood.queue-block", + name="queue block", + pointcuts=[ + PointcutDef( + id="pc-queue-block", + joinpoints=["queue.before_enqueue"], + match=PointcutMatch(any_keywords=["QUEUE_DOGFOOD_BLOCK"]), + ), + ], + advices=[ + AopAdvice( + id="adv-block", + kind=AdviceKind.BEFORE, + pointcut_ref="pc-queue-block", + content="blocked", + effect=WeavingPolicy( + mode=WeavingOperation.BLOCK, + level=WeavingLevel.MEMORY_LEVEL, + target="queue.prompt", + ), + ), + ], + ) + out = export_reflex_policies([_demo_tool_block(), queue], action_kind="all") + kinds = {p["action_kind"] for p in out["policies"]} + assert kinds == {"tool_call", "queue_enqueue"} From 5bc7c3fac048be9a4c41c12c32d1dd89d824d7d2 Mon Sep 17 00:00:00 2001 From: moss Date: Mon, 25 May 2026 01:13:09 +0700 Subject: [PATCH 2/2] fix(bridge,runtime): CI format, in-proc fail-closed guards, queue dogfood test Apply ruff format on phase-2 Python files so CI passes. Extend in-proc ReflexMonitor error handling to fail-closed for message, spawn, and queue hooks (not only tool_guard). Improve live queue block script to wait for an active run and recognize in-proc deny log lines. Co-authored-by: Cursor --- .../scripts/live-queue-block-test.sh | 24 ++++++++++-- .../openclaw-opencoat-bridge/src/index.ts | 39 +++++++++++++++---- .../src/reflex-tool-guard.test.ts | 32 +++++++++++++++ .../src/reflex-tool-guard.ts | 29 +++++++++++++- .../concern/reflex_policy_export.py | 4 +- .../credit/plasticity_engine.py | 6 +-- .../ipc/jsonrpc_dispatch.py | 3 +- 7 files changed, 113 insertions(+), 24 deletions(-) diff --git a/examples/09_queue_hook_dogfood/scripts/live-queue-block-test.sh b/examples/09_queue_hook_dogfood/scripts/live-queue-block-test.sh index 41a313e..6ce6c3a 100755 --- a/examples/09_queue_hook_dogfood/scripts/live-queue-block-test.sh +++ b/examples/09_queue_hook_dogfood/scripts/live-queue-block-test.sh @@ -130,7 +130,21 @@ openclaw gateway call chat.send --json --timeout 60000 \ --params "$(cat "$PARAMS_DIR/msg1.json")" \ | tee "$PARAMS_DIR/msg1-response.json" -echo "== wait ${WAIT_ACTIVE_SEC}s then message 2 (block trigger) ==" +wait_for_active_run() { + local deadline=$((SECONDS + ${WAIT_ACTIVE_MAX_SEC:-90})) + echo "== wait for active embedded run (max ${WAIT_ACTIVE_MAX_SEC:-90}s) ==" + while ((SECONDS < deadline)); do + if tail -n +"$((log_start + 1))" "$LOG" 2>/dev/null | grep -qE 'before_tool_call→|before_agent_run→'; then + echo "active run detected in gateway log" + return 0 + fi + sleep 1 + done + fail "timed out waiting for active embedded run (before_tool_call / before_agent_run in log)" +} + +wait_for_active_run +echo "== hold ${WAIT_ACTIVE_SEC}s with run active, then message 2 (block trigger) ==" sleep "$WAIT_ACTIVE_SEC" openclaw gateway call chat.send --json --timeout 60000 \ @@ -142,12 +156,14 @@ sleep 2 echo "== gateway log (queue / opencoat-bridge) ==" log_slice="$(mktemp)" tail -n +"$((log_start + 1))" "$LOG" >"$log_slice" || true -if grep -E 'queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block|queue_before_enqueue.*oc\.dogfood\.queue-block' "$log_slice" | tail -5; then +if grep -E \ + 'queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block|queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block \(in-proc deny\)|queue_before_enqueue.*oc\.dogfood\.queue-block|follow-up enqueue blocked by plugin hook.*queue|ReflexMonitor \(queue_guard\)' \ + "$log_slice" | tail -5; then hook_ok=true else hook_ok=false - echo "(no queue_before_enqueue activation line for oc.dogfood.queue-block)" - grep -iE 'queue_before_enqueue|oc\.dogfood\.queue-block' "$log_slice" | tail -10 || true + echo "(no queue_before_enqueue / in-proc deny / follow-up block line for oc.dogfood.queue-block)" + grep -iE 'queue_before_enqueue|oc\.dogfood\.queue-block|queue_guard|follow-up enqueue blocked' "$log_slice" | tail -15 || true fi rm -f "$log_slice" diff --git a/integrations/openclaw-opencoat-bridge/src/index.ts b/integrations/openclaw-opencoat-bridge/src/index.ts index 3b22814..5046b0c 100644 --- a/integrations/openclaw-opencoat-bridge/src/index.ts +++ b/integrations/openclaw-opencoat-bridge/src/index.ts @@ -49,6 +49,9 @@ import { buildPayloadAction } from "./reflex-policies.js"; import { buildReflexState, buildToolCallAction, + failClosedMessageGuard, + failClosedQueueGuard, + failClosedSpawnGuard, failClosedToolGuard, reflexDenyDecision, reflexToolGuardDecision, @@ -409,6 +412,13 @@ async function handleHook( buildReflexState(c), ); if (decision.block) { + const policyId = + decision.record?.policy_id?.trim() || "ReflexMonitor"; + if (cfg.logActivations) { + api.logger?.info?.( + `[opencoat-bridge] ${binding.hook}→${binding.joinpoint}: ${policyId} (in-proc deny)`, + ); + } return { block: true, blockReason: @@ -476,14 +486,27 @@ async function handleHook( err instanceof Error ? err.message : String(err) }`, ); - return binding.kind === "tool_guard" - ? inProcReflexEnabled(cfg) - ? failClosedToolGuard( - {}, - err instanceof Error ? err : new Error(String(err)), - ) - : {} - : undefined; + if (inProcReflexEnabled(cfg)) { + switch (binding.kind) { + case "tool_guard": { + const e = asRecord(event); + const params = + e.params && typeof e.params === "object" + ? { ...(e.params as Record) } + : {}; + return failClosedToolGuard(params, err); + } + case "message_out": + return failClosedMessageGuard(err); + case "subagent_spawn": + return failClosedSpawnGuard(err); + case "queue_guard": + return failClosedQueueGuard(err); + default: + break; + } + } + return binding.kind === "tool_guard" ? {} : undefined; } } diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.test.ts b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.test.ts index 13fb12a..443350a 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.test.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.test.ts @@ -5,6 +5,10 @@ import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC } from "./reflex-policies.j import { buildReflexState, buildToolCallAction, + failClosedMessageGuard, + failClosedQueueGuard, + failClosedSpawnGuard, + failClosedToolGuard, reflexToolGuardDecision, } from "./reflex-tool-guard.js"; @@ -38,3 +42,31 @@ describe("reflexToolGuardDecision", () => { assert.equal(out.block, false); }); }); + +describe("in-proc fail-closed helpers", () => { + const err = new Error("monitor blew up"); + + it("failClosedToolGuard blocks with reason", () => { + const out = failClosedToolGuard({ command: "x" }, err); + assert.equal(out.block, true); + assert.match(out.blockReason ?? "", /monitor blew up/); + }); + + it("failClosedMessageGuard cancels send", () => { + const out = failClosedMessageGuard(err); + assert.equal(out.cancel, true); + assert.match(out.content, /monitor blew up/); + }); + + it("failClosedSpawnGuard returns error status", () => { + const out = failClosedSpawnGuard(err); + assert.equal(out.status, "error"); + assert.match(out.error, /monitor blew up/); + }); + + it("failClosedQueueGuard blocks enqueue", () => { + const out = failClosedQueueGuard(err); + assert.equal(out.block, true); + assert.match(out.blockReason, /monitor blew up/); + }); +}); diff --git a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts index 8595e31..3090bba 100644 --- a/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts +++ b/integrations/openclaw-opencoat-bridge/src/reflex-tool-guard.ts @@ -80,15 +80,40 @@ export function reflexToolGuardDecision( return { block: false, params, record }; } +function failClosedReason(err: unknown): string { + const msg = err instanceof Error ? err.message : String(err); + return `OpenCOAT ReflexMonitor fail-closed: ${msg}`; +} + /** Fail-closed when the monitor itself throws (TCB unavailable). */ export function failClosedToolGuard( params: Record, err: unknown, ): ToolGuardDecision { - const msg = err instanceof Error ? err.message : String(err); return { block: true, - blockReason: `OpenCOAT ReflexMonitor fail-closed: ${msg}`, + blockReason: failClosedReason(err), params, }; } + +export function failClosedMessageGuard(err: unknown): { + cancel: true; + content: string; +} { + return { cancel: true, content: failClosedReason(err) }; +} + +export function failClosedSpawnGuard(err: unknown): { + status: "error"; + error: string; +} { + return { status: "error", error: failClosedReason(err) }; +} + +export function failClosedQueueGuard(err: unknown): { + block: true; + blockReason: string; +} { + return { block: true, blockReason: failClosedReason(err) }; +} diff --git a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py index 23b60be..20eae71 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/concern/reflex_policy_export.py @@ -31,9 +31,7 @@ ) _TOOL_JOINPOINTS = frozenset({"before_tool_call", "tool.before_call"}) -_SPAWN_JOINPOINTS = frozenset( - {"task.before_create", "subagent_spawning", "subagent.before_spawn"} -) +_SPAWN_JOINPOINTS = frozenset({"task.before_create", "subagent_spawning", "subagent.before_spawn"}) _MESSAGE_JOINPOINTS = frozenset( { "before_response", diff --git a/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py b/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py index a152428..c82d422 100644 --- a/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py +++ b/packages/opencoat-runtime/opencoat_runtime_core/credit/plasticity_engine.py @@ -153,11 +153,7 @@ def cold_step( if concern.lifecycle_state in {"archived", "merged", "deleted"}: skipped += 1 continue - score = ( - concern.activation_state.score - if concern.activation_state is not None - else None - ) + score = concern.activation_state.score if concern.activation_state is not None else None if score is None: skipped += 1 continue diff --git a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py index 99f3f19..2c72463 100644 --- a/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py +++ b/packages/opencoat-runtime/opencoat_runtime_daemon/ipc/jsonrpc_dispatch.py @@ -450,8 +450,7 @@ def _reflex_policies_export(self, params: dict[str, Any] | list[Any]) -> dict[st "all", ): raise JsonRpcParamsError( - "action_kind must be one of: tool_call, spawn, message_out, " - "queue_enqueue, all" + "action_kind must be one of: tool_call, spawn, message_out, queue_enqueue, all" ) concerns = self._rt.concern_store.list() return export_reflex_policies(concerns, action_kind=action_kind)