Skip to content

feat(core): reactor subscribes to source.synced and runs per-unit extract (#29)#62

Open
mroops0111 wants to merge 4 commits into
masterfrom
feat/reactor-source-synced
Open

feat(core): reactor subscribes to source.synced and runs per-unit extract (#29)#62
mroops0111 wants to merge 4 commits into
masterfrom
feat/reactor-source-synced

Conversation

@mroops0111

Copy link
Copy Markdown
Owner

Closes #29.

Summary

When a sync brings new or changed intent units into a workspace, ReactorService dispatches the active ontology's per-unit skill against each, runs one checkpoint pass once they settle, and writes observations to the SourceUnitState ledger so the diff stays consistent. HITL is preserved: the per-unit skill still emits Proposals into the pending inbox.

Studio gains two surfaces consuming the same ledger: a top-of-app banner showing the active reactor pass progress, and per-option freshness badges (Nm ago green / stale amber / nothing) on the source-intent picker. The diff endpoint that drives the badges is the same one the reactor consumes internally via a shared computeSourceDiff helper, so the picker and the reactor can never disagree about what is fresh.

Flowchart

flowchart TD
  A["source.synced event"] --> B{role === 'intent'?}
  B -- no --> X1["drop (no event emitted)"]
  B -- yes --> C{ontology has<br/>batch.perUnit.skillId?}
  C -- no --> X2["drop"]
  C -- yes --> D["computeSourceDiff(workspace, sourceId)<br/>= intentLister + digest + diffAgainst(ledger)"]
  D --> E{new + changed<br/>units empty?}
  E -- yes --> Y0["emit reactor.completed<br/>{ totalUnits: 0, checkpointRan: false }"]
  E -- no --> F{exceeds<br/>maxRunsPerHour<br/>in last 1h?}
  F -- yes --> Y1["emit reactor.throttled<br/>{ limit }"]
  F -- no --> G["record dispatch timestamp<br/>emit reactor.dispatched { totalUnits }"]
  G --> H["sequential loop:<br/>for each unit in (new ∪ changed):<br/>skillRunner.start(perUnit.skillId, args)<br/>+ waitForCompletion<br/>+ recordObservation(ledger)"]
  H --> I{any per-unit<br/>succeeded?}
  I -- no --> Y2["emit reactor.completed<br/>{ checkpointRan: false }"]
  I -- yes --> J{ontology has<br/>checkpoint.skillId?}
  J -- no --> Y2
  J -- yes --> K["skillRunner.start(checkpoint.skillId)<br/>+ waitForCompletion"]
  K --> Y3["emit reactor.completed<br/>{ checkpointRan: true }"]
Loading

Key invariants visible in the chart: per-unit failures do NOT block subsequent units (they keep running); the checkpoint runs at most once per pass and only when at least one per-unit succeeded; the throttle check happens after the diff so a sync that ends up empty does not consume a dispatch slot.

Locked decisions

These were discussed and answered on the issue before implementation started. Captured here so the PR review does not relitigate them.

Per-unit, not batched. A sync that brings in ten changed issues dispatches ten separate braid-extract subprocesses, not one batched extract. Batching would undo the per-document isolation #43 just shipped.

Sequential, no concurrency. Per-unit dispatches run in strict order so a sprint of ten unit changes never spikes LLM concurrency past one worker. Combined with maxRunsPerHour: 5 this gives two layers of cost containment.

First-ingest does NOT fire reactor. The operator drives the initial Run Batch explicitly, where the cost preview is visible. Once a source is connected, subsequent cmd.syncSource deliveries flow through the reactor normally.

Reactor only fires for role: 'intent' sources. Code-only diffs do not trigger re-extract because the per-unit skill consumes intent units; re-running it because code beneath an unchanged intent unit changed would produce non-deterministic LLM noise without semantic gain. Stale cross-references between intent and code are handled by the checkpoint pass that fires after per-unit dispatches settle. Workspaces whose intent stream is sparse should lean on an intent-shaped loader (see #61 for the planned PR-descriptions-as-intent source) rather than asking the reactor to scan code.

No gate assumption. The reactor emits reactor.completed once the per-unit and checkpoint passes settle and leaves the question of who applies the resulting proposals to upstream layers. A future generative axis can attach an apply(gate: auto) consumer to the same event.

Test plan

  • pnpm --filter @braidhq/core test — 217 pass, including 7 new ReactorService tests covering 3-new-units → 3 sequential dispatches + 1 checkpoint, no-checkpoint-when-no-success, strict sequential ordering proof (unit N+1 does not start until unit N finishes), role-filter rejection for role: 'code', throttle after maxRunsPerHour in a rolling 1h window, no-op completed{0} when the diff is empty, dispose-detaches-listener.
  • pnpm --filter @braidhq/server test — 290 pass.
  • pnpm --filter @braidhq/studio typecheck — clean.
  • pnpm lint — clean.
  • Dogfood: enable reactor on this repo's own workspace by adding reactor: { enabled: true } to PRODUCT.md, restart server, close a roadmap issue, and confirm: (a) the new top-of-app banner shows the reactor pass while it runs; (b) a new pending proposal appears in the inbox without any CLI invocation; (c) the source-intent picker option for the closed issue shows Nm ago (green) right after; (d) editing an issue and re-syncing turns the badge amber (stale) before the next reactor pass clears it.

Notes for review

The reactor and the diff endpoint share a single computeSourceDiff helper so the two surfaces always partition the same way. Studio's banner uses its own EventSource subscription rather than threading state through useWorkspaceEvents; that hook continues to be a pure side-effect cache invalidator. The picker badge join happens client-side via two useQueries (diff + ledger states) deduped by react-query, so multiple options for the same source share a single network round trip.

🤖 Generated with Claude Code

mroops0111 and others added 2 commits June 22, 2026 15:11
…ract (#29)

When a sync brings new or changed intent units into a workspace,
`ReactorService` dispatches the active ontology's per-unit skill
against each, runs one checkpoint pass once they settle, and writes
observations to the SourceUnitState ledger. HITL stays intact: the
per-unit skill still emits Proposals into the pending inbox.

Locked decisions:
- per-unit dispatch (not batched), sequential — preserves the
  per-document isolation #43 shipped
- intent-role only; `role: 'code'` source.synced events fall through
- first-ingest does NOT fire reactor; the operator drives the initial
  Run Batch where the cost preview is visible
- throttle: rolling 1h window per workspace; the (N+1)th dispatch
  emits `reactor.throttled` and drops
- no gate assumption: emits `reactor.completed`, leaves apply to
  upstream layers (future generative axis can attach an auto-apply
  consumer to the same event)

Schema:
- `ProductManifest.reactor.{enabled, maxRunsPerHour}` (opt-in per
  workspace; off by default)
- new WorkspaceEvent variants: `reactor.dispatched` / `reactor.completed`
  / `reactor.throttled`

Server:
- `ReactorService` in `@braidhq/core/application/` + `Reactor` port
- shared `computeSourceDiff` helper consumed by both the reactor and
  the new diff endpoint so the two surfaces always agree
- new REST `GET /workspaces/:ws/source-unit-states/:sourceId/diff`
- `composeFsApp` subscribes the reactor at boot for workspaces whose
  `reactor.enabled` is true

Studio:
- new `<ReactorBanner>` above the existing in-flight-run banner that
  reads the SSE stream and shows "processing N units from <source>"
  while the pass runs (plus a brief amber notice on throttle)
- source-intent picker options carry per-unit freshness badges (a
  green "Nm ago" chip when the on-disk sha matches the ledger; an
  amber "stale" chip when it differs); join the diff + ledger
  endpoints client-side under the existing useQueries pattern
- `useWorkspaceEvents` invalidates source-unit-diff + proposals +
  graph queries on `reactor.completed`

Tests:
- `packages/core/test/application/ReactorService.test.ts` covers
  3-new-units → 3 sequential per-unit dispatches + 1 checkpoint,
  no-checkpoint-when-no-success, strict sequential ordering proof,
  role-filter rejection, throttle, no-op completed{0} when diff is
  empty, dispose-detaches-listener

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
mroops0111 and others added 2 commits June 22, 2026 17:05
Self-review of #62 surfaced seven real issues. Each is addressed below
without changing the locked behaviour from #29.

Concurrency:
- The pass now runs under `PerWorkspaceLock`, so two `source.synced`
  events arriving for the same workspace in the same tick cannot both
  pass the throttle check (the previous Map<workspace, ts[]> code had
  a TOCTOU window of ~1ms between `exceedsThrottle` and the dispatch
  record). Other workspaces' passes remain independent.
- The throttle counter moves into a small `ThrottleWindow` value
  object that encapsulates the rolling-window pruning. Service code
  no longer mixes a predicate (isOverLimit) with a side effect.

Clean code:
- The 80-line `onSourceSynced` splits into `resolveContext` (filters
  and loads ontology binding), `changedPathsForPass` (diff), and
  `runDispatchLoop` (per-unit + checkpoint). Each substep takes a
  small `PassContext` instead of a long parameter list. The orchestrator
  reads top-to-bottom as five guarded steps.
- The Reactor port collapses to `start(workspaceId)` + `stop(workspaceId)`
  with `Promise<void>` returns. The redundant `ReactorSubscription`
  handle is removed; callers always knew the workspace id anyway.
- A new `argsForPath(binding, path)` helper centralises the one
  `as unknown as` cast needed to call `argsFor` (typed against
  PlanUnit) from a path-only context. The rest of the service stays
  cast-free.

Server:
- The diff endpoint no longer returns 503 at request time when
  intentLister or digest aren't wired. It is now mounted conditionally
  on a `diffSupport: { workspaceService, intentLister, digest }` tuple,
  matching how BatchService / SkillsRouter are guarded in `app.ts`. An
  unconfigured server simply does not expose the route (404 from
  Hono's router), which is consistent with the rest of the codebase.

Throttle config:
- The throttle now reads `productManifest.reactor.maxRunsPerHour` at
  `start()` time and stores the limit on the per-workspace window.
  Previously the service hardcoded the default and ignored the manifest.

Tests:
- New test: "mixed success/failure in one pass" — first per-unit fails,
  second succeeds, third fails; the loop must not abort on the first
  failure, and the checkpoint must still run because at least one
  per-unit succeeded.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…+ ontology source roles

Closes the remaining feedback-loop scope on PR #62. Five things land
together because dogfood reactor without them is misleading or
visually broken.

GithubLoader — realized-intent only (A1+A2):
- New always-on filter: an issue is written to `issues/<n>.md` only if
  its `closedByPullRequestsReferences` carries at least one merged PR.
  Open issues with no PR yet, abandoned issues, and docs-only closes
  are skipped. Matches the project's "intent ⊕ code convergence"
  contract — pure speculative intent does not pollute the ledger.
- Implementation: per-issue GraphQL query alongside the existing REST
  fetch. Frontmatter gains `closedByMergedPRs: [{number, mergeCommit}]`
  so downstream readers see provenance.
- `includePullRequests` config field is now deprecated (silently
  ignored + one-time warn) so existing PRODUCT.md configs don't break.
- Mock fetch grows a GraphQL handler; new tests cover merged / closed-
  unmerged / no-PR / multi-PR cases.

Reactor pass persistence + per-unit events (B1+B2+B3):
- New `ReactorPass` schema + `ReactorPassRepository` port +
  InMemory/Fs impls. One JSON file per pass at
  `artifacts/reactor-passes/<passId>.json`; atomic rename on every
  state transition so the Activity page can read a partially-finished
  pass without races.
- Four new `WorkspaceEvent` variants: `reactor.unit.started`,
  `reactor.unit.completed`, `reactor.checkpoint.started`,
  `reactor.checkpoint.completed`. The existing dispatched / completed /
  throttled events gain a `passId` so subscribers can join.
- `ReactorService` writes pass state + emits the matching event at
  every transition. Failures stay localised (per-unit) and the
  checkpoint runs iff at least one per-unit succeeded.

REST + Studio Activity surface (C1+C2+C3+C4):
- New `GET /workspaces/:ws/reactor-passes` list + `/:passId` detail.
- New `<TopBanner>` component unifies Reactor / Run / Batch banners so
  they stack with consistent height + padding. `InFlightRunBanner`
  and `BatchInFlightBanner` retrofit to use it.
- `ReactorBanner` now reads `reactor.unit.completed` events to show
  live `processed/total` progress, plus a link to the Activity page.
- New Studio surface "Activity" (`G B` shortcut) lives between
  Proposals and History in the sidebar. Layout mirrors Proposals:
  pass list on the left, selected pass detail on the right with the
  per-unit timeline + checkpoint row. Live-updates via SSE
  invalidation.

OntologyPlugin source-role contract (D):
- `OntologyPlugin.requiredSourceRoles?: readonly ('code' | 'intent')[]`
  declares which source roles a workspace must carry for the ontology
  to make sense. DDD declares `['intent', 'code']` (intent⊕code is
  Braid's value prop). A future generative ontology with no code
  dimension declares `['intent']` only.
- Scaffold endpoint validates the draft manifest against the chosen
  ontology's `requiredSourceRoles` BEFORE any filesystem writes,
  returning 400 with the missing-role list so the Wizard can prompt.
- The previous "POST /batch returns 400 when no sources" test is now
  blocked upstream by this scaffold validation; replaced with a NOTE
  pointing to the new enforcement site.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(core): add reactor that runs incremental extract + model on source.synced

1 participant