Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions examples/09_queue_hook_dogfood/scripts/live-queue-block-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,21 @@ openclaw gateway call chat.send --json --timeout 60000 \
--params "$(cat "$PARAMS_DIR/msg1.json")" \
| tee "$PARAMS_DIR/msg1-response.json"

echo "== wait ${WAIT_ACTIVE_SEC}s then message 2 (block trigger) =="
wait_for_active_run() {
local deadline=$((SECONDS + ${WAIT_ACTIVE_MAX_SEC:-90}))
echo "== wait for active embedded run (max ${WAIT_ACTIVE_MAX_SEC:-90}s) =="
while ((SECONDS < deadline)); do
if tail -n +"$((log_start + 1))" "$LOG" 2>/dev/null | grep -qE 'before_tool_call→|before_agent_run→'; then
echo "active run detected in gateway log"
return 0
fi
sleep 1
done
fail "timed out waiting for active embedded run (before_tool_call / before_agent_run in log)"
}

wait_for_active_run
echo "== hold ${WAIT_ACTIVE_SEC}s with run active, then message 2 (block trigger) =="
sleep "$WAIT_ACTIVE_SEC"

openclaw gateway call chat.send --json --timeout 60000 \
Expand All @@ -142,12 +156,14 @@ sleep 2
echo "== gateway log (queue / opencoat-bridge) =="
log_slice="$(mktemp)"
tail -n +"$((log_start + 1))" "$LOG" >"$log_slice" || true
if grep -E 'queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block|queue_before_enqueue.*oc\.dogfood\.queue-block' "$log_slice" | tail -5; then
if grep -E \
'queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block|queue_before_enqueue→queue\.before_enqueue: oc\.dogfood\.queue-block \(in-proc deny\)|queue_before_enqueue.*oc\.dogfood\.queue-block|follow-up enqueue blocked by plugin hook.*queue|ReflexMonitor \(queue_guard\)' \
"$log_slice" | tail -5; then
hook_ok=true
else
hook_ok=false
echo "(no queue_before_enqueue activation line for oc.dogfood.queue-block)"
grep -iE 'queue_before_enqueue|oc\.dogfood\.queue-block' "$log_slice" | tail -10 || true
echo "(no queue_before_enqueue / in-proc deny / follow-up block line for oc.dogfood.queue-block)"
grep -iE 'queue_before_enqueue|oc\.dogfood\.queue-block|queue_guard|follow-up enqueue blocked' "$log_slice" | tail -15 || true
fi
rm -f "$log_slice"

Expand Down
7 changes: 4 additions & 3 deletions integrations/openclaw-opencoat-bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,16 @@ guard decision). Enable in OpenClaw plugin config:

**Behavior**

- Policies load from daemon `reflex.policies.export` (hard `TOOL_GUARD` + BLOCK concerns).
- Falls back to built-in `demo-tool-block` needles when export is empty.
- Policies load from daemon `reflex.policies.export` with `action_kind: all` (tool, spawn, message, queue hard BLOCK concerns).
- In-proc guards: `before_tool_call`, `subagent_spawning`, `message_sending`, `queue_before_enqueue`.
- Falls back to built-in `demo-tool-block` + `oc.dogfood.queue-block` when export is empty.
- **Fail-closed** on monitor errors (contrast: collaborative path fail-open).
- Optional async `joinpoint.submit` audit (`reflexAuditToDaemon`) for DCN without blocking the hook.

Gateway log when enabled:

```text
[opencoat-bridge] in-proc ReflexMonitor tool_guard policies: demo-tool-block, …
[opencoat-bridge] in-proc ReflexMonitor policies: demo-tool-block, oc.dogfood.queue-block, …
```

See [v0.3 §10.5](../../docs/design/v0.3-morphogenetic-architecture.md#105-实现分期-2026-05).
Expand Down
6 changes: 5 additions & 1 deletion integrations/openclaw-opencoat-bridge/openclaw.plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
},
"inProcReflexToolGuard": {
"type": "boolean",
"description": "Run before_tool_call through in-proc ReflexMonitor (v0.3 TCB prototype; fail-closed)"
"description": "Run hot-path hooks through in-proc ReflexMonitor (tool/spawn/message/queue; fail-closed)"
},
"inProcReflexGuards": {
"type": "boolean",
"description": "Alias for inProcReflexToolGuard — enables all in-proc reflex guards (default true when tool guard on)"
},
"reflexSyncFromDaemon": {
"type": "boolean",
Expand Down
2 changes: 2 additions & 0 deletions integrations/openclaw-opencoat-bridge/src/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export function resolveConfig(raw: Record<string, unknown> | undefined): BridgeC
runtimeObservers: raw?.runtimeObservers !== false,
observerPollMs,
inProcReflexToolGuard: raw?.inProcReflexToolGuard === true,
inProcReflexGuards:
raw?.inProcReflexGuards === true || raw?.inProcReflexToolGuard === true,
reflexSyncFromDaemon: raw?.reflexSyncFromDaemon !== false,
reflexAuditToDaemon: raw?.reflexAuditToDaemon !== false,
reflexPolicies: parseInlineReflexPolicies(raw?.reflexPolicies),
Expand Down
127 changes: 114 additions & 13 deletions integrations/openclaw-opencoat-bridge/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
toolResultPayload,
} from "./payloads.js";
import { createObserveEmitter } from "./emit-joinpoint.js";
import { loadReflexRuntime, buildReflexRuntime } from "./reflex-policy-sync.js";
import { loadReflexRuntime, buildReflexRuntime, inProcReflexEnabled } from "./reflex-policy-sync.js";
import type { ReflexRuntime } from "./reflex-policy-sync.js";
import type { DecisionRecord } from "./reflex-monitor.js";
import {
Expand All @@ -45,10 +45,15 @@ import {
buildToolOutcomeRt,
buildTurnCompleteRt,
} from "./r-t-emit.js";
import { buildPayloadAction } from "./reflex-policies.js";
import {
buildReflexState,
buildToolCallAction,
failClosedMessageGuard,
failClosedQueueGuard,
failClosedSpawnGuard,
failClosedToolGuard,
reflexDenyDecision,
reflexToolGuardDecision,
} from "./reflex-tool-guard.js";
import {
Expand Down Expand Up @@ -258,7 +263,7 @@ async function handleHook(
: {};
const toolName = typeof e.toolName === "string" ? e.toolName : "tool";

if (cfg.inProcReflexToolGuard) {
if (inProcReflexEnabled(cfg)) {
const runtime = reflexState.runtime;
if (!runtime) {
return failClosedToolGuard(
Expand Down Expand Up @@ -330,18 +335,99 @@ async function handleHook(
}

case "message_out": {
if (inProcReflexEnabled(cfg)) {
const runtime = reflexState.runtime;
if (!runtime) {
return {
cancel: true,
content: "OpenCOAT ReflexMonitor fail-closed: not initialized",
};
}
const e = asRecord(event);
const content = typeof e.content === "string" ? e.content : "";
const action = buildPayloadAction("message_out", { content, ...payload });
const decision = reflexDenyDecision(
runtime.monitor,
action,
buildReflexState(c),
);
if (decision.block) {
return {
cancel: true,
content:
decision.blockReason ??
"Blocked by OpenCOAT ReflexMonitor (message_out).",
};
}
return {};
}
const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c);
return messageSendingDecision(inj);
}

case "subagent_spawn": {
if (inProcReflexEnabled(cfg)) {
const runtime = reflexState.runtime;
if (!runtime) {
return {
status: "error",
error: "OpenCOAT ReflexMonitor fail-closed: not initialized",
};
}
const action = buildPayloadAction("spawn", payload);
const decision = reflexDenyDecision(
runtime.monitor,
action,
buildReflexState(c),
);
if (decision.block) {
return {
status: "error",
error:
decision.blockReason ??
"Blocked by OpenCOAT ReflexMonitor (subagent_spawn).",
};
}
return { status: "ok" };
}
const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c);
const decision = subagentSpawnDecision(inj);
if (decision.status === "error") return decision;
return { status: "ok" };
}

case "queue_guard": {
if (inProcReflexEnabled(cfg)) {
const runtime = reflexState.runtime;
if (!runtime) {
return {
block: true,
blockReason: "OpenCOAT ReflexMonitor fail-closed: not initialized",
};
}
const action = buildPayloadAction("queue_enqueue", payload);
const decision = reflexDenyDecision(
runtime.monitor,
action,
buildReflexState(c),
);
if (decision.block) {
const policyId =
decision.record?.policy_id?.trim() || "ReflexMonitor";
if (cfg.logActivations) {
api.logger?.info?.(
`[opencoat-bridge] ${binding.hook}→${binding.joinpoint}: ${policyId} (in-proc deny)`,
);
}
return {
block: true,
blockReason:
decision.blockReason ??
"Blocked by OpenCOAT ReflexMonitor (queue_guard).",
};
}
return {};
}
const inj = await emit(cfg, api, binding.hook, binding.joinpoint, payload, c);
return queueBeforeEnqueueDecision(inj);
}
Expand Down Expand Up @@ -400,27 +486,40 @@ async function handleHook(
err instanceof Error ? err.message : String(err)
}`,
);
return binding.kind === "tool_guard"
? cfg.inProcReflexToolGuard
? failClosedToolGuard(
{},
err instanceof Error ? err : new Error(String(err)),
)
: {}
: undefined;
if (inProcReflexEnabled(cfg)) {
switch (binding.kind) {
case "tool_guard": {
const e = asRecord(event);
const params =
e.params && typeof e.params === "object"
? { ...(e.params as Record<string, unknown>) }
: {};
return failClosedToolGuard(params, err);
}
case "message_out":
return failClosedMessageGuard(err);
case "subagent_spawn":
return failClosedSpawnGuard(err);
case "queue_guard":
return failClosedQueueGuard(err);
default:
break;
}
}
return binding.kind === "tool_guard" ? {} : undefined;
}
}

export default function register(api: BridgePluginApi): void {
const cfg = resolveConfig(api.pluginConfig);

if (cfg.inProcReflexToolGuard) {
if (inProcReflexEnabled(cfg)) {
reflexState.runtime = buildReflexRuntime(cfg, null);
void loadReflexRuntime(cfg)
.then((runtime) => {
reflexState.runtime = runtime;
api.logger?.info?.(
`[opencoat-bridge] in-proc ReflexMonitor tool_guard policies: ${
`[opencoat-bridge] in-proc ReflexMonitor policies: ${
runtime.policyIds.join(", ") || "(none)"
}`,
);
Expand Down Expand Up @@ -452,7 +551,9 @@ export default function register(api: BridgePluginApi): void {
`(skipped: ${SKIPPED_HOOKS.join(", ")}; daemon=${
cfg.enabled ? cfg.daemonUrl : "disabled"
}${observerNote}${
cfg.inProcReflexToolGuard ? "; in-proc ReflexMonitor tool_guard" : ""
inProcReflexEnabled(cfg)
? "; in-proc ReflexMonitor (tool/spawn/message/queue)"
: ""
}${cfg.emitRtJsonl ? "; r_t JSONL emit" : ""})`,
);
}
15 changes: 14 additions & 1 deletion integrations/openclaw-opencoat-bridge/src/reflex-monitor.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it } from "node:test";
import assert from "node:assert/strict";
import { ReflexMonitor } from "./reflex-monitor.js";
import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC } from "./reflex-policies.js";
import { compileReflexPolicies, DEMO_TOOL_BLOCK_SPEC, DEMO_QUEUE_BLOCK_SPEC } from "./reflex-policies.js";
import type { Action, ReflexPolicy, State } from "./reflex-monitor.js";

const state: State = {
Expand Down Expand Up @@ -96,4 +96,17 @@ describe("ReflexMonitor", () => {
assert.equal(decision.kind, "allow");
assert.equal(record.policy_id, undefined);
});

it("denies queue enqueue with QUEUE_DOGFOOD_BLOCK keyword", () => {
const monitor = new ReflexMonitor(
compileReflexPolicies([DEMO_QUEUE_BLOCK_SPEC]),
);
const action: Action = {
kind: "queue_enqueue",
name: "queue_enqueue",
args: { prompt: "Please QUEUE_DOGFOOD_BLOCK this follow-up" },
};
const { decision } = monitor.mediate(action, state);
assert.equal(decision.kind, "deny");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import type { ReflexCriticality } from "./reflex-policy-spec.js";

export type ActionKind = "tool_call";
export type ActionKind = "tool_call" | "spawn" | "message_out" | "queue_enqueue";

export type Action = {
kind: ActionKind;
Expand Down
29 changes: 27 additions & 2 deletions integrations/openclaw-opencoat-bridge/src/reflex-policies.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ReflexPolicySpec } from "./reflex-policy-spec.js";
import type { Action, Decision, ReflexPolicy, State } from "./reflex-monitor.js";
import type { Action, ActionKind, Decision, ReflexPolicy, State } from "./reflex-monitor.js";

function serializeArgs(args: Record<string, unknown>): string {
try {
Expand Down Expand Up @@ -35,7 +35,7 @@ function compileOne(spec: ReflexPolicySpec): ReflexPolicy {
id: spec.id,
criticality: spec.criticality,
applies(action: Action, _state: State): boolean {
if (action.kind !== "tool_call") return false;
if (action.kind !== spec.action_kind) return false;
if (spec.predicate.kind === "tool_name") {
return spec.predicate.names.includes(action.name);
}
Expand Down Expand Up @@ -68,3 +68,28 @@ export const DEMO_TOOL_BLOCK_SPEC: ReflexPolicySpec = {
deny_reason:
"Refusing destructive shell command — `rm -rf` is blocked by demo-tool-block.",
};

/** Queue dogfood block when daemon export is unavailable. */
export const DEMO_QUEUE_BLOCK_SPEC: ReflexPolicySpec = {
id: "oc.dogfood.queue-block",
criticality: "safety_critical",
action_kind: "queue_enqueue",
predicate: {
kind: "text_contains",
needles: ["QUEUE_DOGFOOD_BLOCK"],
},
deny_reason:
"Follow-up queue blocked by OpenCOAT dogfood concern (oc.dogfood.queue-block).",
};

export function buildPayloadAction(
kind: ActionKind,
payload: Record<string, unknown>,
): Action {
return {
kind,
name: kind,
args: { ...payload, text: JSON.stringify(payload) },
raw: payload,
};
}
Loading
Loading