From b72e8d5b0d86c0ec30705c8b6718b48a56ffebd4 Mon Sep 17 00:00:00 2001 From: drknowhow Date: Wed, 24 Jun 2026 17:07:47 -0400 Subject: [PATCH 1/2] fix: circuit breaker for c3_delegate CLI backends A broken-but-installed delegate backend (gemini/codex/claude) re-spawned a 90-120s subprocess on every call: the handlers returned the error on runtime failure (if not ok:) but never demoted the backend. Add a reusable thread-safe CircuitBreaker (services/circuit_breaker.py) and wire it into all three CLI handlers: gate before spawn, record_failure() opens after N consecutive fails (default 3 / 60s cooldown, configurable via delegate_config breaker_failure_threshold/breaker_cooldown_seconds), record_success() resets, half-open probe after cooldown. Trips emit a NotificationStore warning; the auto router skips tripped backends and falls back to ollama. Breaker state is process-global on purpose: backend health (auth, CLI version) is a host fact, not per-project. Tests: tests/test_circuit_breaker.py (5 pass). Co-Authored-By: Claude Opus 4.8 (1M context) --- cli/tools/delegate.py | 82 ++++++++++++++++++++++++++- services/circuit_breaker.py | 86 ++++++++++++++++++++++++++++ tests/test_circuit_breaker.py | 103 ++++++++++++++++++++++++++++++++++ 3 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 services/circuit_breaker.py create mode 100644 tests/test_circuit_breaker.py diff --git a/cli/tools/delegate.py b/cli/tools/delegate.py index 223e9e0..dc747e3 100644 --- a/cli/tools/delegate.py +++ b/cli/tools/delegate.py @@ -11,10 +11,12 @@ import shutil import subprocess import sys +import threading import time from pathlib import Path from core import count_tokens +from services.circuit_breaker import CircuitBreaker log = logging.getLogger(__name__) @@ -252,11 +254,20 @@ def _handle_claude_delegate(task: str, task_type: str, context: str, file_path: str, svc, dcfg: dict, finalize) -> str: """Handle delegation via Claude Code CLI.""" timeout = int(dcfg.get("claude_timeout", 90)) + breaker = _backend_breaker("claude", dcfg) + if not breaker.allow(): + return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"}, + "[delegate:degraded] Claude skipped after repeated failures; retrying in " + f"~{breaker.cooldown_remaining()}s. Run 'claude --version' to diagnose.", + "degraded") _log_progress(svc, f"[delegate] Routing {task_type} → Claude CLI...") output, ok = _run_claude(task, context, cwd=str(svc.project_path), timeout=timeout) if not ok: + if breaker.record_failure(): + _notify_backend_degraded(svc, "claude", breaker) return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"}, output, "error") + breaker.record_success() return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"}, output, "ok") @@ -681,6 +692,51 @@ def _run_codex_resume(follow_up: str, timeout: int = 120, _delegate_cache: dict[str, tuple[str, int]] = {} _delegate_metrics = {"total_calls": 0, "tokens_saved": 0} +# Per-backend runtime circuit breakers. Distinct from the install-status flags +# (_gemini_available etc., which only answer "is the CLI on PATH"): these track +# *runtime* health so a broken-but-installed backend (expired auth, repeated +# timeouts) stops re-spawning a 90-120s subprocess on every call. Keyed by +# backend name and intentionally process-global — backend health (auth, CLI +# version) is a property of the host, not of any single project. +_backend_breakers: dict[str, CircuitBreaker] = {} +_backend_breakers_lock = threading.Lock() + + +def _backend_breaker(name: str, dcfg: dict | None = None) -> CircuitBreaker: + """Return (creating on first use) the runtime circuit breaker for a backend.""" + with _backend_breakers_lock: + breaker = _backend_breakers.get(name) + if breaker is None: + cfg = dcfg or {} + breaker = CircuitBreaker( + name, + failure_threshold=int(cfg.get("breaker_failure_threshold", 3) or 3), + cooldown_seconds=float(cfg.get("breaker_cooldown_seconds", 60) or 60), + ) + _backend_breakers[name] = breaker + return breaker + + +def _notify_backend_degraded(svc, name: str, breaker: CircuitBreaker) -> None: + """Surface a backend trip via the NotificationStore (best-effort, never raises).""" + notifications = getattr(svc, "notifications", None) + if notifications is None: + return + try: + notifications.add( + agent="c3", + severity="warning", + title=f"Delegate backend degraded: {name}", + message=( + f"{name} failed {breaker.failure_threshold}x consecutively; c3_delegate " + f"will skip it for ~{int(breaker.cooldown_seconds)}s instead of re-spawning " + f"the CLI. Run '{name} --version' to diagnose." + ), + replace_if_unacked=True, + ) + except Exception: + pass + def get_delegate_metrics() -> dict: return dict(_delegate_metrics) @@ -765,6 +821,13 @@ def _handle_codex_delegate(task: str, task_type: str, context: str, "[delegate:error] Codex CLI not available. Run 'codex --version' to diagnose.", "unavailable") + breaker = _backend_breaker("codex", dcfg) + if not breaker.allow(): + return finalize("c3_delegate", {"task_type": task_type, "backend": "codex"}, + "[delegate:degraded] Codex skipped after repeated failures; retrying in " + f"~{breaker.cooldown_remaining()}s. Run 'codex --version' to diagnose.", + "degraded") + # Resolve model/sandbox/reasoning from config or defaults cdef = CODEX_MODELS.get(task_type, CODEX_MODELS.get("ask", {})) model = dcfg.get("codex_default_model") or cdef.get("model", "gpt-5.3-codex-spark") @@ -807,10 +870,13 @@ def _handle_codex_delegate(task: str, task_type: str, context: str, elapsed = round(time.monotonic() - t0, 1) if not ok: + if breaker.record_failure(): + _notify_backend_degraded(svc, "codex", breaker) return finalize("c3_delegate", {"task_type": task_type, "backend": "codex", "model": model, "elapsed": f"{elapsed}s"}, output, "error") + breaker.record_success() _delegate_metrics["total_calls"] += 1 _delegate_cache[ckey] = (output, count_tokens(output)) @@ -880,6 +946,13 @@ def _handle_gemini_delegate(task: str, task_type: str, context: str, "[delegate:error] Gemini CLI not available. Run 'gemini --version' to diagnose.", "unavailable") + breaker = _backend_breaker("gemini", dcfg) + if not breaker.allow(): + return finalize("c3_delegate", {"task_type": task_type, "backend": "gemini"}, + "[delegate:degraded] Gemini skipped after repeated failures; retrying in " + f"~{breaker.cooldown_remaining()}s. Run 'gemini --version' to diagnose.", + "degraded") + # Resolve model from config or defaults gdef = GEMINI_MODELS.get(task_type, GEMINI_MODELS.get("ask", {})) model = dcfg.get("gemini_default_model") or gdef.get("model", "gemini-2.5-flash") @@ -919,10 +992,13 @@ def _handle_gemini_delegate(task: str, task_type: str, context: str, elapsed = round(time.monotonic() - t0, 1) if not ok: + if breaker.record_failure(): + _notify_backend_degraded(svc, "gemini", breaker) return finalize("c3_delegate", {"task_type": task_type, "backend": "gemini", "model": model, "elapsed": f"{elapsed}s"}, output, "error") + breaker.record_success() _delegate_metrics["total_calls"] += 1 _delegate_cache[ckey] = (output, count_tokens(output)) @@ -1068,9 +1144,11 @@ def _check_claude(): _gemini_avail = (_gemini_available is True) or ( _gemini_available is None and task_type not in _light_tasks and _is_gemini_on_path() ) - if task_type in heavy_codex and _codex_avail and _codex_available is not False: + if (task_type in heavy_codex and _codex_avail and _codex_available is not False + and _backend_breaker("codex", dcfg).allow()): backend = "codex" - elif task_type in heavy_gemini and _gemini_avail and _gemini_available is not False: + elif (task_type in heavy_gemini and _gemini_avail and _gemini_available is not False + and _backend_breaker("gemini", dcfg).allow()): backend = "gemini" else: backend = "ollama" diff --git a/services/circuit_breaker.py b/services/circuit_breaker.py new file mode 100644 index 0000000..0d05530 --- /dev/null +++ b/services/circuit_breaker.py @@ -0,0 +1,86 @@ +"""Lightweight thread-safe circuit breaker for flapping subsystems. + +Borrowed in spirit from Headroom's TransformPipeline breaker: after N +consecutive failures a subsystem is treated as unhealthy and calls are +short-circuited for a cooldown window instead of re-running (and re-failing) +the expensive operation every time. A single success closes the breaker. + +First consumer: c3_delegate, to stop re-spawning a broken-but-installed CLI +backend (a 90-120s subprocess timeout) on every call. Deliberately +dependency-free so any call-time subsystem (e.g. the Ollama embed/generate +path) can reuse it later. +""" +from __future__ import annotations + +import threading +import time + + +class CircuitBreaker: + """Consecutive-failure breaker: closed -> open (after N fails) -> half-open (after cooldown).""" + + def __init__( + self, + name: str = "", + *, + failure_threshold: int = 3, + cooldown_seconds: float = 60.0, + ) -> None: + self.name = name + self.failure_threshold = max(1, int(failure_threshold)) + self.cooldown_seconds = max(0.0, float(cooldown_seconds)) + self._lock = threading.Lock() + self._failures = 0 + self._open = False + self._opened_at = 0.0 + + def allow(self) -> bool: + """Return True if a call may proceed. + + An open breaker permits a single probe once the cooldown has elapsed + (half-open); the next ``record_success``/``record_failure`` resolves it. + """ + with self._lock: + if not self._open: + return True + return (time.monotonic() - self._opened_at) >= self.cooldown_seconds + + def record_success(self) -> None: + """Reset the breaker after a healthy call.""" + with self._lock: + self._failures = 0 + self._open = False + self._opened_at = 0.0 + + def record_failure(self) -> bool: + """Count a failed call. + + Returns True iff this failure *just* tripped the breaker open (callers + can use that edge to emit a one-shot notification). A failed half-open + probe restarts the cooldown but does not re-trip. + """ + with self._lock: + self._failures += 1 + if not self._open and self._failures >= self.failure_threshold: + self._open = True + self._opened_at = time.monotonic() + return True + if self._open: + self._opened_at = time.monotonic() + return False + + def cooldown_remaining(self) -> int: + """Whole seconds left before the next probe is allowed (0 if closed/elapsed).""" + with self._lock: + if not self._open: + return 0 + remaining = self.cooldown_seconds - (time.monotonic() - self._opened_at) + return max(0, int(round(remaining))) + + @property + def is_open(self) -> bool: + """True while calls are actively being short-circuited (open and within cooldown).""" + with self._lock: + if not self._open: + return False + return (time.monotonic() - self._opened_at) < self.cooldown_seconds diff --git a/tests/test_circuit_breaker.py b/tests/test_circuit_breaker.py new file mode 100644 index 0000000..781dd66 --- /dev/null +++ b/tests/test_circuit_breaker.py @@ -0,0 +1,103 @@ +"""Tests for the CircuitBreaker primitive and the c3_delegate demote-on-failure wiring.""" +import time + +from services.circuit_breaker import CircuitBreaker + + +def test_closed_until_threshold_then_opens(): + br = CircuitBreaker("x", failure_threshold=3, cooldown_seconds=60) + assert br.allow() + assert br.record_failure() is False # 1 + assert br.record_failure() is False # 2 + assert br.allow() # still closed below threshold + assert br.record_failure() is True # 3 -> trips open (edge) + assert not br.allow() # open, within cooldown + assert br.is_open + + +def test_success_closes_breaker(): + br = CircuitBreaker("x", failure_threshold=2, cooldown_seconds=60) + br.record_failure() + assert br.record_failure() is True + assert not br.allow() + br.record_success() + assert br.allow() + assert br.cooldown_remaining() == 0 + assert not br.is_open + + +def test_half_open_probe_after_cooldown(): + br = CircuitBreaker("x", failure_threshold=1, cooldown_seconds=0.05) + assert br.record_failure() is True + assert not br.allow() # blocked within cooldown + time.sleep(0.08) + assert br.allow() # half-open: one probe allowed + assert br.record_failure() is False # failed probe re-arms, not a new trip + assert not br.allow() + + +def test_cooldown_remaining_within_bounds(): + br = CircuitBreaker("x", failure_threshold=1, cooldown_seconds=30) + br.record_failure() + assert 0 < br.cooldown_remaining() <= 30 + + +class _FakeNotifications: + def __init__(self): + self.entries = [] + + def add(self, **kwargs): + self.entries.append(kwargs) + return kwargs + + +class _FakeSvc: + def __init__(self): + self.project_path = "." + self.delegate_config = { + "gemini_enabled": True, + "auto_compress": False, + "breaker_failure_threshold": 3, + "breaker_cooldown_seconds": 60, + } + self.notifications = _FakeNotifications() + self.compressor = None + self._agent_progress_cb = None + + +def _finalize(_tool, _meta, _output, status): + return status + + +def test_gemini_demotes_after_repeated_failures(monkeypatch): + """The core bug fix: a broken backend must stop re-spawning the CLI every call.""" + from cli.tools import delegate + + # Isolate module-global breaker + cache + availability state. + delegate._backend_breakers.clear() + delegate._delegate_cache.clear() + monkeypatch.setattr(delegate, "_gemini_available", True, raising=False) + + calls = {"run": 0} + + def fake_run_gemini(*_a, **_k): + calls["run"] += 1 + return ("boom", False, {}) + + monkeypatch.setattr(delegate, "_run_gemini", fake_run_gemini) + + svc = _FakeSvc() + dcfg = svc.delegate_config + + # Three real failures trip the breaker (threshold=3). + for _ in range(3): + assert delegate._handle_gemini_delegate("t", "ask", "", "", svc, dcfg, _finalize) == "error" + assert calls["run"] == 3 + + # Fourth call: breaker open -> short-circuit, NO subprocess re-spawn. + assert delegate._handle_gemini_delegate("t", "ask", "", "", svc, dcfg, _finalize) == "degraded" + assert calls["run"] == 3 + + # The trip emitted exactly one degradation notification. + degraded = [e for e in svc.notifications.entries if "degraded" in e.get("title", "").lower()] + assert len(degraded) == 1 From 9c479a7b5dcaab178e22f17e75d5daad548f2d00 Mon Sep 17 00:00:00 2001 From: drknowhow Date: Wed, 24 Jun 2026 17:14:36 -0400 Subject: [PATCH 2/2] chore(release): 2.39.1 - c3_delegate circuit breaker Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 17 +++++++++++++++++ pyproject.toml | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95114ab..e4b4813 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [2.39.1] - 2026-06-24 + +A small reliability fix for `c3_delegate`. + +### Fixed + +- **A broken delegate backend re-spawned a subprocess on every call.** When a CLI + backend (`gemini`/`codex`/`claude`) was installed but failing at runtime (expired + auth, model pulled away, repeated timeouts), `_handle_*_delegate` returned the error + without demoting the backend — so every subsequent `c3_delegate` call paid the full + 90–120s subprocess spawn + timeout again. Each backend now has a thread-safe circuit + breaker (`services/circuit_breaker.py`): after N consecutive failures (default 3) it + short-circuits for a cooldown (default 60s) with a single half-open probe on recovery, + and surfaces a notification when it trips. The `auto` router skips tripped backends and + falls back to Ollama. Thresholds are configurable via `delegate_config` + (`breaker_failure_threshold`, `breaker_cooldown_seconds`). + ## [2.39.0] - 2026-06-22 A correctness & security hardening release. A multi-agent audit of C3 surfaced a diff --git a/pyproject.toml b/pyproject.toml index 61fdffe..5f9c60b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "code-context-control" -version = "2.39.0" +version = "2.39.1" description = "Local code-intelligence layer for AI coding tools (Claude Code, Codex, Gemini, Copilot). Retrieve less, read less, edit safer." readme = "README.md" requires-python = ">=3.10"