Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.39.1] - 2026-06-24

A small reliability fix for `c3_delegate`.

### Fixed

- **A broken delegate backend re-spawned a subprocess on every call.** When a CLI
backend (`gemini`/`codex`/`claude`) was installed but failing at runtime (expired
auth, model pulled away, repeated timeouts), `_handle_*_delegate` returned the error
without demoting the backend — so every subsequent `c3_delegate` call paid the full
90–120s subprocess spawn + timeout again. Each backend now has a thread-safe circuit
breaker (`services/circuit_breaker.py`): after N consecutive failures (default 3) it
short-circuits for a cooldown (default 60s) with a single half-open probe on recovery,
and surfaces a notification when it trips. The `auto` router skips tripped backends and
falls back to Ollama. Thresholds are configurable via `delegate_config`
(`breaker_failure_threshold`, `breaker_cooldown_seconds`).

## [2.39.0] - 2026-06-22

A correctness & security hardening release. A multi-agent audit of C3 surfaced a
Expand Down
82 changes: 80 additions & 2 deletions cli/tools/delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import shutil
import subprocess
import sys
import threading
import time
from pathlib import Path

from core import count_tokens
from services.circuit_breaker import CircuitBreaker

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -252,11 +254,20 @@ def _handle_claude_delegate(task: str, task_type: str, context: str,
file_path: str, svc, dcfg: dict, finalize) -> str:
"""Handle delegation via Claude Code CLI."""
timeout = int(dcfg.get("claude_timeout", 90))
breaker = _backend_breaker("claude", dcfg)
if not breaker.allow():
return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"},
"[delegate:degraded] Claude skipped after repeated failures; retrying in "
f"~{breaker.cooldown_remaining()}s. Run 'claude --version' to diagnose.",
"degraded")
_log_progress(svc, f"[delegate] Routing {task_type} → Claude CLI...")
output, ok = _run_claude(task, context, cwd=str(svc.project_path), timeout=timeout)
if not ok:
if breaker.record_failure():
_notify_backend_degraded(svc, "claude", breaker)
return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"},
output, "error")
breaker.record_success()
return finalize("c3_delegate", {"task_type": task_type, "backend": "claude"},
output, "ok")

Expand Down Expand Up @@ -681,6 +692,51 @@ def _run_codex_resume(follow_up: str, timeout: int = 120,
_delegate_cache: dict[str, tuple[str, int]] = {}
_delegate_metrics = {"total_calls": 0, "tokens_saved": 0}

# Per-backend runtime circuit breakers. Distinct from the install-status flags
# (_gemini_available etc., which only answer "is the CLI on PATH"): these track
# *runtime* health so a broken-but-installed backend (expired auth, repeated
# timeouts) stops re-spawning a 90-120s subprocess on every call. Keyed by
# backend name and intentionally process-global — backend health (auth, CLI
# version) is a property of the host, not of any single project.
_backend_breakers: dict[str, CircuitBreaker] = {}
_backend_breakers_lock = threading.Lock()


def _backend_breaker(name: str, dcfg: dict | None = None) -> CircuitBreaker:
"""Return (creating on first use) the runtime circuit breaker for a backend."""
with _backend_breakers_lock:
breaker = _backend_breakers.get(name)
if breaker is None:
cfg = dcfg or {}
breaker = CircuitBreaker(
name,
failure_threshold=int(cfg.get("breaker_failure_threshold", 3) or 3),
cooldown_seconds=float(cfg.get("breaker_cooldown_seconds", 60) or 60),
)
_backend_breakers[name] = breaker
return breaker


def _notify_backend_degraded(svc, name: str, breaker: CircuitBreaker) -> None:
"""Surface a backend trip via the NotificationStore (best-effort, never raises)."""
notifications = getattr(svc, "notifications", None)
if notifications is None:
return
try:
notifications.add(
agent="c3",
severity="warning",
title=f"Delegate backend degraded: {name}",
message=(
f"{name} failed {breaker.failure_threshold}x consecutively; c3_delegate "
f"will skip it for ~{int(breaker.cooldown_seconds)}s instead of re-spawning "
f"the CLI. Run '{name} --version' to diagnose."
),
replace_if_unacked=True,
)
except Exception:
pass


def get_delegate_metrics() -> dict:
return dict(_delegate_metrics)
Expand Down Expand Up @@ -765,6 +821,13 @@ def _handle_codex_delegate(task: str, task_type: str, context: str,
"[delegate:error] Codex CLI not available. Run 'codex --version' to diagnose.",
"unavailable")

breaker = _backend_breaker("codex", dcfg)
if not breaker.allow():
return finalize("c3_delegate", {"task_type": task_type, "backend": "codex"},
"[delegate:degraded] Codex skipped after repeated failures; retrying in "
f"~{breaker.cooldown_remaining()}s. Run 'codex --version' to diagnose.",
"degraded")

# Resolve model/sandbox/reasoning from config or defaults
cdef = CODEX_MODELS.get(task_type, CODEX_MODELS.get("ask", {}))
model = dcfg.get("codex_default_model") or cdef.get("model", "gpt-5.3-codex-spark")
Expand Down Expand Up @@ -807,10 +870,13 @@ def _handle_codex_delegate(task: str, task_type: str, context: str,
elapsed = round(time.monotonic() - t0, 1)

if not ok:
if breaker.record_failure():
_notify_backend_degraded(svc, "codex", breaker)
return finalize("c3_delegate",
{"task_type": task_type, "backend": "codex", "model": model, "elapsed": f"{elapsed}s"},
output, "error")

breaker.record_success()
_delegate_metrics["total_calls"] += 1
_delegate_cache[ckey] = (output, count_tokens(output))

Expand Down Expand Up @@ -880,6 +946,13 @@ def _handle_gemini_delegate(task: str, task_type: str, context: str,
"[delegate:error] Gemini CLI not available. Run 'gemini --version' to diagnose.",
"unavailable")

breaker = _backend_breaker("gemini", dcfg)
if not breaker.allow():
return finalize("c3_delegate", {"task_type": task_type, "backend": "gemini"},
"[delegate:degraded] Gemini skipped after repeated failures; retrying in "
f"~{breaker.cooldown_remaining()}s. Run 'gemini --version' to diagnose.",
"degraded")

# Resolve model from config or defaults
gdef = GEMINI_MODELS.get(task_type, GEMINI_MODELS.get("ask", {}))
model = dcfg.get("gemini_default_model") or gdef.get("model", "gemini-2.5-flash")
Expand Down Expand Up @@ -919,10 +992,13 @@ def _handle_gemini_delegate(task: str, task_type: str, context: str,
elapsed = round(time.monotonic() - t0, 1)

if not ok:
if breaker.record_failure():
_notify_backend_degraded(svc, "gemini", breaker)
return finalize("c3_delegate",
{"task_type": task_type, "backend": "gemini", "model": model, "elapsed": f"{elapsed}s"},
output, "error")

breaker.record_success()
_delegate_metrics["total_calls"] += 1
_delegate_cache[ckey] = (output, count_tokens(output))

Expand Down Expand Up @@ -1068,9 +1144,11 @@ def _check_claude():
_gemini_avail = (_gemini_available is True) or (
_gemini_available is None and task_type not in _light_tasks and _is_gemini_on_path()
)
if task_type in heavy_codex and _codex_avail and _codex_available is not False:
if (task_type in heavy_codex and _codex_avail and _codex_available is not False
and _backend_breaker("codex", dcfg).allow()):
backend = "codex"
elif task_type in heavy_gemini and _gemini_avail and _gemini_available is not False:
elif (task_type in heavy_gemini and _gemini_avail and _gemini_available is not False
and _backend_breaker("gemini", dcfg).allow()):
backend = "gemini"
else:
backend = "ollama"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "code-context-control"
version = "2.39.0"
version = "2.39.1"
description = "Local code-intelligence layer for AI coding tools (Claude Code, Codex, Gemini, Copilot). Retrieve less, read less, edit safer."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
86 changes: 86 additions & 0 deletions services/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Lightweight thread-safe circuit breaker for flapping subsystems.

Borrowed in spirit from Headroom's TransformPipeline breaker: after N
consecutive failures a subsystem is treated as unhealthy and calls are
short-circuited for a cooldown window instead of re-running (and re-failing)
the expensive operation every time. A single success closes the breaker.

First consumer: c3_delegate, to stop re-spawning a broken-but-installed CLI
backend (a 90-120s subprocess timeout) on every call. Deliberately
dependency-free so any call-time subsystem (e.g. the Ollama embed/generate
path) can reuse it later.
"""
from __future__ import annotations

import threading
import time


class CircuitBreaker:
"""Consecutive-failure breaker: closed -> open (after N fails) -> half-open (after cooldown)."""

def __init__(
self,
name: str = "",
*,
failure_threshold: int = 3,
cooldown_seconds: float = 60.0,
) -> None:
self.name = name
self.failure_threshold = max(1, int(failure_threshold))
self.cooldown_seconds = max(0.0, float(cooldown_seconds))
self._lock = threading.Lock()
self._failures = 0
self._open = False
self._opened_at = 0.0

def allow(self) -> bool:
"""Return True if a call may proceed.

An open breaker permits a single probe once the cooldown has elapsed
(half-open); the next ``record_success``/``record_failure`` resolves it.
"""
with self._lock:
if not self._open:
return True
return (time.monotonic() - self._opened_at) >= self.cooldown_seconds

def record_success(self) -> None:
"""Reset the breaker after a healthy call."""
with self._lock:
self._failures = 0
self._open = False
self._opened_at = 0.0

def record_failure(self) -> bool:
"""Count a failed call.

Returns True iff this failure *just* tripped the breaker open (callers
can use that edge to emit a one-shot notification). A failed half-open
probe restarts the cooldown but does not re-trip.
"""
with self._lock:
self._failures += 1
if not self._open and self._failures >= self.failure_threshold:
self._open = True
self._opened_at = time.monotonic()
return True
if self._open:
self._opened_at = time.monotonic()
return False

def cooldown_remaining(self) -> int:
"""Whole seconds left before the next probe is allowed (0 if closed/elapsed)."""
with self._lock:
if not self._open:
return 0
remaining = self.cooldown_seconds - (time.monotonic() - self._opened_at)
return max(0, int(round(remaining)))

@property
def is_open(self) -> bool:
"""True while calls are actively being short-circuited (open and within cooldown)."""
with self._lock:
if not self._open:
return False
return (time.monotonic() - self._opened_at) < self.cooldown_seconds
103 changes: 103 additions & 0 deletions tests/test_circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Tests for the CircuitBreaker primitive and the c3_delegate demote-on-failure wiring."""
import time

from services.circuit_breaker import CircuitBreaker


def test_closed_until_threshold_then_opens():
br = CircuitBreaker("x", failure_threshold=3, cooldown_seconds=60)
assert br.allow()
assert br.record_failure() is False # 1
assert br.record_failure() is False # 2
assert br.allow() # still closed below threshold
assert br.record_failure() is True # 3 -> trips open (edge)
assert not br.allow() # open, within cooldown
assert br.is_open


def test_success_closes_breaker():
br = CircuitBreaker("x", failure_threshold=2, cooldown_seconds=60)
br.record_failure()
assert br.record_failure() is True
assert not br.allow()
br.record_success()
assert br.allow()
assert br.cooldown_remaining() == 0
assert not br.is_open


def test_half_open_probe_after_cooldown():
br = CircuitBreaker("x", failure_threshold=1, cooldown_seconds=0.05)
assert br.record_failure() is True
assert not br.allow() # blocked within cooldown
time.sleep(0.08)
assert br.allow() # half-open: one probe allowed
assert br.record_failure() is False # failed probe re-arms, not a new trip
assert not br.allow()


def test_cooldown_remaining_within_bounds():
br = CircuitBreaker("x", failure_threshold=1, cooldown_seconds=30)
br.record_failure()
assert 0 < br.cooldown_remaining() <= 30


class _FakeNotifications:
def __init__(self):
self.entries = []

def add(self, **kwargs):
self.entries.append(kwargs)
return kwargs


class _FakeSvc:
def __init__(self):
self.project_path = "."
self.delegate_config = {
"gemini_enabled": True,
"auto_compress": False,
"breaker_failure_threshold": 3,
"breaker_cooldown_seconds": 60,
}
self.notifications = _FakeNotifications()
self.compressor = None
self._agent_progress_cb = None


def _finalize(_tool, _meta, _output, status):
return status


def test_gemini_demotes_after_repeated_failures(monkeypatch):
"""The core bug fix: a broken backend must stop re-spawning the CLI every call."""
from cli.tools import delegate

# Isolate module-global breaker + cache + availability state.
delegate._backend_breakers.clear()
delegate._delegate_cache.clear()
monkeypatch.setattr(delegate, "_gemini_available", True, raising=False)

calls = {"run": 0}

def fake_run_gemini(*_a, **_k):
calls["run"] += 1
return ("boom", False, {})

monkeypatch.setattr(delegate, "_run_gemini", fake_run_gemini)

svc = _FakeSvc()
dcfg = svc.delegate_config

# Three real failures trip the breaker (threshold=3).
for _ in range(3):
assert delegate._handle_gemini_delegate("t", "ask", "", "", svc, dcfg, _finalize) == "error"
assert calls["run"] == 3

# Fourth call: breaker open -> short-circuit, NO subprocess re-spawn.
assert delegate._handle_gemini_delegate("t", "ask", "", "", svc, dcfg, _finalize) == "degraded"
assert calls["run"] == 3

# The trip emitted exactly one degradation notification.
degraded = [e for e in svc.notifications.entries if "degraded" in e.get("title", "").lower()]
assert len(degraded) == 1
Loading