feat(core): reactor subscribes to source.synced and runs per-unit extract (#29)#62
Open
mroops0111 wants to merge 4 commits into
Open
feat(core): reactor subscribes to source.synced and runs per-unit extract (#29)#62mroops0111 wants to merge 4 commits into
mroops0111 wants to merge 4 commits into
Conversation
…ract (#29) When a sync brings new or changed intent units into a workspace, `ReactorService` dispatches the active ontology's per-unit skill against each, runs one checkpoint pass once they settle, and writes observations to the SourceUnitState ledger. HITL stays intact: the per-unit skill still emits Proposals into the pending inbox. Locked decisions: - per-unit dispatch (not batched), sequential — preserves the per-document isolation #43 shipped - intent-role only; `role: 'code'` source.synced events fall through - first-ingest does NOT fire reactor; the operator drives the initial Run Batch where the cost preview is visible - throttle: rolling 1h window per workspace; the (N+1)th dispatch emits `reactor.throttled` and drops - no gate assumption: emits `reactor.completed`, leaves apply to upstream layers (future generative axis can attach an auto-apply consumer to the same event) Schema: - `ProductManifest.reactor.{enabled, maxRunsPerHour}` (opt-in per workspace; off by default) - new WorkspaceEvent variants: `reactor.dispatched` / `reactor.completed` / `reactor.throttled` Server: - `ReactorService` in `@braidhq/core/application/` + `Reactor` port - shared `computeSourceDiff` helper consumed by both the reactor and the new diff endpoint so the two surfaces always agree - new REST `GET /workspaces/:ws/source-unit-states/:sourceId/diff` - `composeFsApp` subscribes the reactor at boot for workspaces whose `reactor.enabled` is true Studio: - new `<ReactorBanner>` above the existing in-flight-run banner that reads the SSE stream and shows "processing N units from <source>" while the pass runs (plus a brief amber notice on throttle) - source-intent picker options carry per-unit freshness badges (a green "Nm ago" chip when the on-disk sha matches the ledger; an amber "stale" chip when it differs); join the diff + ledger endpoints client-side under the existing useQueries pattern - `useWorkspaceEvents` invalidates source-unit-diff + proposals + graph queries on `reactor.completed` Tests: - `packages/core/test/application/ReactorService.test.ts` covers 3-new-units → 3 sequential per-unit dispatches + 1 checkpoint, no-checkpoint-when-no-success, strict sequential ordering proof, role-filter rejection, throttle, no-op completed{0} when diff is empty, dispose-detaches-listener Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Self-review of #62 surfaced seven real issues. Each is addressed below without changing the locked behaviour from #29. Concurrency: - The pass now runs under `PerWorkspaceLock`, so two `source.synced` events arriving for the same workspace in the same tick cannot both pass the throttle check (the previous Map<workspace, ts[]> code had a TOCTOU window of ~1ms between `exceedsThrottle` and the dispatch record). Other workspaces' passes remain independent. - The throttle counter moves into a small `ThrottleWindow` value object that encapsulates the rolling-window pruning. Service code no longer mixes a predicate (isOverLimit) with a side effect. Clean code: - The 80-line `onSourceSynced` splits into `resolveContext` (filters and loads ontology binding), `changedPathsForPass` (diff), and `runDispatchLoop` (per-unit + checkpoint). Each substep takes a small `PassContext` instead of a long parameter list. The orchestrator reads top-to-bottom as five guarded steps. - The Reactor port collapses to `start(workspaceId)` + `stop(workspaceId)` with `Promise<void>` returns. The redundant `ReactorSubscription` handle is removed; callers always knew the workspace id anyway. - A new `argsForPath(binding, path)` helper centralises the one `as unknown as` cast needed to call `argsFor` (typed against PlanUnit) from a path-only context. The rest of the service stays cast-free. Server: - The diff endpoint no longer returns 503 at request time when intentLister or digest aren't wired. It is now mounted conditionally on a `diffSupport: { workspaceService, intentLister, digest }` tuple, matching how BatchService / SkillsRouter are guarded in `app.ts`. An unconfigured server simply does not expose the route (404 from Hono's router), which is consistent with the rest of the codebase. Throttle config: - The throttle now reads `productManifest.reactor.maxRunsPerHour` at `start()` time and stores the limit on the per-workspace window. Previously the service hardcoded the default and ignored the manifest. Tests: - New test: "mixed success/failure in one pass" — first per-unit fails, second succeeds, third fails; the loop must not abort on the first failure, and the checkpoint must still run because at least one per-unit succeeded. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…+ ontology source roles Closes the remaining feedback-loop scope on PR #62. Five things land together because dogfood reactor without them is misleading or visually broken. GithubLoader — realized-intent only (A1+A2): - New always-on filter: an issue is written to `issues/<n>.md` only if its `closedByPullRequestsReferences` carries at least one merged PR. Open issues with no PR yet, abandoned issues, and docs-only closes are skipped. Matches the project's "intent ⊕ code convergence" contract — pure speculative intent does not pollute the ledger. - Implementation: per-issue GraphQL query alongside the existing REST fetch. Frontmatter gains `closedByMergedPRs: [{number, mergeCommit}]` so downstream readers see provenance. - `includePullRequests` config field is now deprecated (silently ignored + one-time warn) so existing PRODUCT.md configs don't break. - Mock fetch grows a GraphQL handler; new tests cover merged / closed- unmerged / no-PR / multi-PR cases. Reactor pass persistence + per-unit events (B1+B2+B3): - New `ReactorPass` schema + `ReactorPassRepository` port + InMemory/Fs impls. One JSON file per pass at `artifacts/reactor-passes/<passId>.json`; atomic rename on every state transition so the Activity page can read a partially-finished pass without races. - Four new `WorkspaceEvent` variants: `reactor.unit.started`, `reactor.unit.completed`, `reactor.checkpoint.started`, `reactor.checkpoint.completed`. The existing dispatched / completed / throttled events gain a `passId` so subscribers can join. - `ReactorService` writes pass state + emits the matching event at every transition. Failures stay localised (per-unit) and the checkpoint runs iff at least one per-unit succeeded. REST + Studio Activity surface (C1+C2+C3+C4): - New `GET /workspaces/:ws/reactor-passes` list + `/:passId` detail. - New `<TopBanner>` component unifies Reactor / Run / Batch banners so they stack with consistent height + padding. `InFlightRunBanner` and `BatchInFlightBanner` retrofit to use it. - `ReactorBanner` now reads `reactor.unit.completed` events to show live `processed/total` progress, plus a link to the Activity page. - New Studio surface "Activity" (`G B` shortcut) lives between Proposals and History in the sidebar. Layout mirrors Proposals: pass list on the left, selected pass detail on the right with the per-unit timeline + checkpoint row. Live-updates via SSE invalidation. OntologyPlugin source-role contract (D): - `OntologyPlugin.requiredSourceRoles?: readonly ('code' | 'intent')[]` declares which source roles a workspace must carry for the ontology to make sense. DDD declares `['intent', 'code']` (intent⊕code is Braid's value prop). A future generative ontology with no code dimension declares `['intent']` only. - Scaffold endpoint validates the draft manifest against the chosen ontology's `requiredSourceRoles` BEFORE any filesystem writes, returning 400 with the missing-role list so the Wizard can prompt. - The previous "POST /batch returns 400 when no sources" test is now blocked upstream by this scaffold validation; replaced with a NOTE pointing to the new enforcement site. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #29.
Summary
When a sync brings new or changed intent units into a workspace,
ReactorServicedispatches the active ontology's per-unit skill against each, runs one checkpoint pass once they settle, and writes observations to the SourceUnitState ledger so the diff stays consistent. HITL is preserved: the per-unit skill still emits Proposals into the pending inbox.Studio gains two surfaces consuming the same ledger: a top-of-app banner showing the active reactor pass progress, and per-option freshness badges (
Nm agogreen /staleamber / nothing) on the source-intent picker. The diff endpoint that drives the badges is the same one the reactor consumes internally via a sharedcomputeSourceDiffhelper, so the picker and the reactor can never disagree about what is fresh.Flowchart
flowchart TD A["source.synced event"] --> B{role === 'intent'?} B -- no --> X1["drop (no event emitted)"] B -- yes --> C{ontology has<br/>batch.perUnit.skillId?} C -- no --> X2["drop"] C -- yes --> D["computeSourceDiff(workspace, sourceId)<br/>= intentLister + digest + diffAgainst(ledger)"] D --> E{new + changed<br/>units empty?} E -- yes --> Y0["emit reactor.completed<br/>{ totalUnits: 0, checkpointRan: false }"] E -- no --> F{exceeds<br/>maxRunsPerHour<br/>in last 1h?} F -- yes --> Y1["emit reactor.throttled<br/>{ limit }"] F -- no --> G["record dispatch timestamp<br/>emit reactor.dispatched { totalUnits }"] G --> H["sequential loop:<br/>for each unit in (new ∪ changed):<br/>skillRunner.start(perUnit.skillId, args)<br/>+ waitForCompletion<br/>+ recordObservation(ledger)"] H --> I{any per-unit<br/>succeeded?} I -- no --> Y2["emit reactor.completed<br/>{ checkpointRan: false }"] I -- yes --> J{ontology has<br/>checkpoint.skillId?} J -- no --> Y2 J -- yes --> K["skillRunner.start(checkpoint.skillId)<br/>+ waitForCompletion"] K --> Y3["emit reactor.completed<br/>{ checkpointRan: true }"]Key invariants visible in the chart: per-unit failures do NOT block subsequent units (they keep running); the checkpoint runs at most once per pass and only when at least one per-unit succeeded; the throttle check happens after the diff so a sync that ends up empty does not consume a dispatch slot.
Locked decisions
These were discussed and answered on the issue before implementation started. Captured here so the PR review does not relitigate them.
Per-unit, not batched. A sync that brings in ten changed issues dispatches ten separate
braid-extractsubprocesses, not one batched extract. Batching would undo the per-document isolation #43 just shipped.Sequential, no concurrency. Per-unit dispatches run in strict order so a sprint of ten unit changes never spikes LLM concurrency past one worker. Combined with
maxRunsPerHour: 5this gives two layers of cost containment.First-ingest does NOT fire reactor. The operator drives the initial Run Batch explicitly, where the cost preview is visible. Once a source is connected, subsequent
cmd.syncSourcedeliveries flow through the reactor normally.Reactor only fires for
role: 'intent'sources. Code-only diffs do not trigger re-extract because the per-unit skill consumes intent units; re-running it because code beneath an unchanged intent unit changed would produce non-deterministic LLM noise without semantic gain. Stale cross-references between intent and code are handled by the checkpoint pass that fires after per-unit dispatches settle. Workspaces whose intent stream is sparse should lean on an intent-shaped loader (see #61 for the planned PR-descriptions-as-intent source) rather than asking the reactor to scan code.No gate assumption. The reactor emits
reactor.completedonce the per-unit and checkpoint passes settle and leaves the question of who applies the resulting proposals to upstream layers. A future generative axis can attach anapply(gate: auto)consumer to the same event.Test plan
pnpm --filter @braidhq/core test— 217 pass, including 7 newReactorServicetests covering 3-new-units → 3 sequential dispatches + 1 checkpoint, no-checkpoint-when-no-success, strict sequential ordering proof (unit N+1 does not start until unit N finishes), role-filter rejection forrole: 'code', throttle aftermaxRunsPerHourin a rolling 1h window, no-opcompleted{0}when the diff is empty, dispose-detaches-listener.pnpm --filter @braidhq/server test— 290 pass.pnpm --filter @braidhq/studio typecheck— clean.pnpm lint— clean.reactor: { enabled: true }toPRODUCT.md, restart server, close a roadmap issue, and confirm: (a) the new top-of-app banner shows the reactor pass while it runs; (b) a new pending proposal appears in the inbox without any CLI invocation; (c) the source-intent picker option for the closed issue showsNm ago(green) right after; (d) editing an issue and re-syncing turns the badge amber (stale) before the next reactor pass clears it.Notes for review
The reactor and the diff endpoint share a single
computeSourceDiffhelper so the two surfaces always partition the same way. Studio's banner uses its own EventSource subscription rather than threading state throughuseWorkspaceEvents; that hook continues to be a pure side-effect cache invalidator. The picker badge join happens client-side via twouseQueries(diff + ledger states) deduped by react-query, so multiple options for the same source share a single network round trip.🤖 Generated with Claude Code