feat(backend): SQLite storage layer + CDC pipeline#37
Conversation
Add the two real outbound adapters that replace the in-memory fakeStore: internal/storage/sqlite (persistence satisfying ports.LifecycleStore) and internal/cdc (transactional-outbox publisher, JSONL delivery, durable consumer). Wire them into main.go alongside the Lifecycle Manager and reaper so the write path is live end-to-end: LCM.Upsert -> store -> outbox -> JSONL -> broadcaster. Storage (internal/storage/sqlite): - modernc.org/sqlite (pure Go, no CGO) for clean cross-compile; goose embedded migrations; sqlc-generated typed queries under gen/. - Atomic Upsert: session row + change_log + outbox written in one tx. - revision is an optimistic-concurrency (CAS) check: insert requires revision 0 and persists 1; update requires loaded revision == stored and bumps +1; zero rows affected returns a revision-mismatch error. - Metadata is an opaque map in session_metadata, off the CDC path. - Durable reaction_trackers (fixes the in-memory-only escalation budget that re-fired human pages on restart). CDC (internal/cdc): - Publisher drains the outbox to a JSONL log; size-based rotation with a reset marker. - Consumer tails via byte cursor, detects rotation (os.SameFile), resyncs from a full-state snapshot on gaps, and tracks a durable consumer_offsets cursor. - Janitor reclaims acknowledged outbox rows. - Broadcaster is the in-process fan-out port the FE transport will subscribe to (WS/SSE wiring deferred). Composition root (main.go + *_wiring.go): - startCDC stands up publisher/consumer/janitor + broadcaster. - startLifecycle constructs the LCM, makes escalation budgets durable via WithReactionStore, teaches it to enumerate sessions via WithSessionLister, and starts the reaper. - Notifier, AgentMessenger, and the reaper's runtime registry are TEMPORARY no-op/empty stubs (lifecycle_wiring.go) with TODO markers; see the PR description for how to fill them in. Tests: contract-parity, revision CAS, outbox atomicity, CDC ordering and idempotency, rotation/resync, janitor vacuum, reaction durability across a simulated restart, and composition-root adapters. gofmt/build/vet clean and go test -race ./... green.
Migration 0002 adds two tables off the canonical CDC path: - projects: durable registry of managed repos (the twin of the old YAML config). Soft-deletable via archived_at so a session's project_id always resolves; ListProjects returns active rows only, GetProject resolves any. - pr_enrichment: per-session cache of rich SCM facts (CI summary, review decision, mergeability, pending comments, CI log tail) that do not live in the canonical lifecycle. 1:1 with a session, cascades on session delete. Both are written outside the LCM write path: no revision bump, no change_log/outbox event. Store methods mirror the reaction_trackers adapter pattern with storage-local row structs.
The first storage cut modelled two side tables as free-form blobs. This
replaces both with opinionated, statically-typed schema so what a session
can carry is fixed by the schema, not by convention.
session_metadata: was a (session_id, key, value) KV bag with six
convention-only keys. Now a 1:1 table of named, typed columns. The domain
currency is a typed domain.SessionMetadata struct (was map[string]string),
threaded through ports.LifecycleStore, the LCM, the Session Manager and the
reaper, so an unknown key is a compile error rather than a silently-dropped
write. PatchMetadata keeps its non-destructive merge ("empty = leave
unchanged"). The off-canonical invariant is now enforced at the type level
via json:"-" on SessionRecord.Metadata, removing the manual `Metadata = nil`
scrub the change_log/snapshot paths had to remember; the Meta* string-key
constants are deleted.
pr_enrichment -> pr (+ pr_check, pr_comment): the scalar facts are now
typed columns with CHECK-constrained enums (review_decision, mergeability,
ci_state) and integer CI counts instead of opaque TEXT. The two list facts
the old `pending_comments`/ci_summary strings smuggled are normalized into
child tables (pr_check, pr_comment) that cascade from pr. The store exposes
UpsertPR/GetPR plus atomic ReplacePRChecks/ReplacePRComments + List.
Both tables remain off the canonical CDC path. sqlc regenerated; migrations
0001/0002 revised in place (nothing released). gofmt/vet clean; go test
-race green; daemon smoke-boots and creates the new schema.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
0f61f2e to
4ce9044
Compare
|
Code review from the LCM/SM lane (aa-27). Compat is green — port shape, schema columns, aa-7 v3 blockers, pragmas, the durable This is the code-quality pass on the adapter itself. Verdict: REQUEST-CHANGES on two correctness gaps; everything else below is "should fix" or nit. Re-reviewed after the two follow-up commits ( BlockersB1 — Consumer drops live events across restart-with-rotationFile: On Repro: Fix: on B2 —
|
… source The cdc integration test covers the synchronous Drain/Poll happy path but (1) resyncs from a fake snapshot and (2) never runs the publisher/consumer as the concurrent goroutines the daemon actually uses. Add two E2E tests in the composition-root package that wire the real sqlite.Store, outboxAdapter, Publisher, JSONL log, Consumer, Broadcaster and the REAL snapshotSource (store.ListAll): - RealSnapshotResyncThroughRotation: forces a rotation and asserts the consumer rebuilds from the sessions table, delivering the persisted record payload, with the offset landing at the change_log head. - ConcurrentPublisherConsumer: runs both as goroutines on their tickers and asserts every write is delivered exactly once, in order, offset at head (also exercises the broadcaster hand-off under -race). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
SetMaxOpenConns(1) forced every read (List/Get/GetPR/...) to queue behind the single connection, so the dashboard's reads contended with the LCM's writes. WAL already supports many concurrent readers, so raise the pool to 8 and instead serialize *writes* with a Store.writeMu. That keeps WAL's single-writer rule and the revision-CAS read-then-write atomic regardless of pool size, while reads now run in parallel across the pool. Every write method takes writeMu (Upsert, PatchMetadata, UpsertPR/DeletePR, the pr_check/pr_comment Replace* via inTx, the CDC outbox/offset writes, project writes, reaction-tracker writes); reads take nothing. Added TestConcurrentReadsAndWrites (16 writers + 16 readers) which passes under -race. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ge layer)
Reworks the storage + CDC layer to the simplified design agreed in review:
Schema (one clean migration, 0001): projects, sessions, pr, pr_checks,
pr_comment, change_log. sessions.id is a single string key "{project}-{num}"
(mer-1); operational metadata folded into sessions; is_alive replaces the
runtime axis; no revision (the per-session write mutex serializes, change_log.seq
orders). pr keyed by URL (1 session : many PRs). pr_checks is CI run history
(one row per check per commit) — the CI-fix-loop brake is a LIMIT 3 query, no
counter stored. change_log carries a required project_id FK + nullable session_id.
CDC is DB-native: AFTER INSERT/UPDATE triggers on sessions/pr/pr_checks append
to change_log atomically with the change (json_object payloads). The old durable
outbox/JSONL/janitor pipeline is gone; the cdc package is now a Poller that reads
change_log and fans events out through the in-memory Broadcaster (hardened with
recover()). Clients catch up via the log from their own offset (SSE Last-Event-ID).
Storage uses a single writer connection + a reader pool (read-your-writes for the
triggers' subqueries; concurrent reads). sqlc-generated typed queries.
Tests (-race): CRUD, per-project id assignment, the loop-brake query, concurrent
creates, triggers populating change_log; CDC end-to-end through the real store,
concurrent goroutine delivery, broadcaster panic-isolation.
NOTE: scoped to storage + CDC. The lifecycle-engine consumers (decide, lifecycle,
session, reaper, main wiring) still reference the old domain axes and need a
follow-up integration pass to compile against the new model.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR replaces the in-memory
Confidence Score: 5/5Safe to merge; the write path, CDC pipeline, and shutdown sequencing are all structurally sound with no data-loss or correctness issues introduced. Both new findings are quality nits: a stale timestamp in one CDC trigger (easily fixed in a follow-up migration) and a global mutex replacing a per-session one (a deliberate simplification with acceptable throughput impact at current scale). The previously-flagged build-breaking issues, the shutdown deadlock, and the CI-tracker-clearing bug have all been addressed. The core persistence and event-delivery plumbing looks correct. backend/internal/storage/sqlite/migrations/0001_init.sql — the pr_checks_cdc_update trigger's created_at, and backend/internal/lifecycle/manager.go — the global mutex trade-off worth a comment. Important Files Changed
Sequence DiagramsequenceDiagram
participant SM as SessionManager
participant LCM as lifecycle.Manager
participant Store as sqlite.Store
participant DB as SQLite (triggers)
participant CL as change_log
participant Poller as cdc.Poller
participant Bcast as cdc.Broadcaster
participant FE as SSE/WS (future)
SM->>Store: CreateSession(rec)
Store->>DB: INSERT sessions
DB-->>CL: sessions_cdc_insert trigger → session_created
LCM->>Store: UpdateSession(rec)
Store->>DB: UPDATE sessions
DB-->>CL: sessions_cdc_update trigger → session_updated
LCM->>Store: WritePRObservation(pr, checks, comments)
Store->>DB: UPSERT pr / pr_checks
DB-->>CL: pr_cdc_insert/update, pr_checks_cdc_insert/update
Poller->>Store: ReadChangeLogAfter(lastSeq, 512)
Store-->>Poller: []ChangeLogRow
Poller->>Bcast: Publish(Event)
Bcast->>FE: subscriber fn(Event)
Reviews (6): Last reviewed commit: "feat(backend): atomic PR-observation wri..." | Re-trigger Greptile |
a093de7 to
e5c4fd6
Compare
Reworks the LCM, reactions, session manager, reaper, and boot wiring onto
the redesigned domain model — collapsing the runtime axis to is_alive,
moving PR facts to the pr table (read back as PRFacts), replacing the
free-form SessionReason with a typed terminal-only TerminationReason, and
dropping Revision/EventType/durable reaction-trackers (CDC is trigger-driven,
escalation budgets are in-memory).
- ports: SessionStore + PRWriter interfaces; PRObservation/RuntimeFacts/
ActivitySignal DTOs; drop LifecycleStore/EventType/ReactionStore.
- lifecycle: single-writer reducer over is_alive; ApplyPRObservation writes
the pr tables and reacts; CI-fix-loop brake derived from pr_checks history;
review comments injected into the agent regardless of author (no bot
detection); merge auto-terminates with pr_merged.
- session: store-assigned "{project}-{n}" ids; folded metadata; status
derived from PRFacts on read.
- reaper: reports the four-valued probe vocabulary unchanged.
- boot: trigger -> poller -> broadcaster; storeAdapter bridges *sqlite.Store.
Lane shrinks 6218 -> 2803 LOC. go build/vet/test -race green.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
CDC is trigger-driven in the SQLite DB now; there is no JSONL log. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
@greptile please re-review — the branch has been substantially reworked since the last pass (force-pushed onto the trigger-driven storage/CDC redesign, then the whole lifecycle lane ported onto it). How the previous review was addressed:
The whole backend now builds and |
illegalcall
left a comment
There was a problem hiding this comment.
We have 2 PRs that recommend the backend architecture and library choices:
Let’s try to follow those.
If you disagree with any part of those recommendations, let’s comment/contribute there and keep those PRs as the source of truth.
Based on that, I recommend these changes here:
- Use the recommended SQLite driver path:
github.com/ncruces/go-sqlite3/driverfirst, unless the stack PR changes. - Keep this PR focused on
internal/storage/sqliteand CDC persistence. - Correlate the storage/CDC implementation with the architecture PR’s lifecycle/session boundaries.
- If we want to change the lifecycle architecture, let’s discuss that on the architecture PR first.
- Keep lifecycle writes routed through the LCM contract unless the architecture PR changes.
- Preserve the documented per-session LCM write model unless we agree to change it in the architecture PR.
- Align CDC with the recommended storage/change-log/outbox direction, or update/discuss that in the architecture PR first.
- Update stale comments in
main.gothat still mention the old JSONL/outbox flow.
| if err != nil { | ||
| return fmt.Errorf("start lifecycle: %w", err) | ||
| } | ||
| defer lcStack.Stop() |
There was a problem hiding this comment.
This can hang on early srv.Run failures. The CDC/reaper goroutines stop only when ctx is cancelled, but stop() is deferred before these Stop() waits, so LIFO defer order waits first and cancels last. If srv.Run(ctx) returns without a signal, for example runfile.Write fails, lcStack.Stop()/cdcPipe.Stop() can block forever. Please cancel the context before waiting on those goroutines.
There was a problem hiding this comment.
Fixed in 0dbd304 — same root cause you and Greptile both flagged. The goroutines are now drained explicitly after srv.Run returns: stop() cancels the context first, then lcStack.Stop()/cdcPipe.Stop() wait. Verified by booting + SIGTERM (drains and exits cleanly in <1s).
| -- +goose StatementEnd | ||
|
|
||
| -- +goose StatementBegin | ||
| CREATE TRIGGER pr_checks_cdc_insert |
There was a problem hiding this comment.
UpsertPRCheck updates an existing (pr_url, name, commit_hash) row on conflict, but CDC only has an AFTER INSERT trigger for pr_checks. A check moving from in_progress to failed/passed on the same commit updates SQLite without emitting change_log, so live clients and offset catch-up miss the status change. Please add an AFTER UPDATE trigger and a regression test.
There was a problem hiding this comment.
Great catch — this was the sharpest comment on the PR. Fixed in 70aab5e: added a pr_checks_cdc_update trigger guarded on OLD.status <> NEW.status, so a status flip on the same commit emits change_log while a no-op re-poll stays silent. Added a regression test asserting both (emit on change, suppress no-op).
| return cur, false, nil | ||
| // writePR upserts the scalar facts, records each check run, and replaces the | ||
| // comment set. PR-table CDC is emitted by the DB triggers. | ||
| func (m *Manager) writePR(ctx context.Context, id domain.SessionID, o ports.PRObservation) error { |
There was a problem hiding this comment.
This writes one PR observation as multiple independent store calls: scalar PR upsert, each check, then comment replacement. If a later step fails or the context is cancelled, the PR row and its CDC event can already be committed while checks/comments are stale or partial. The observation should be persisted in one storage transaction so the DB state and emitted CDC stay consistent.
There was a problem hiding this comment.
Fixed in 70aab5e. writePR now persists the scalar facts, checks, and comments in a single transaction via Store.WritePRObservation (scalar upsert first so the check/comment CDC triggers can resolve the session id within the same tx). Collapsed the three PRWriter write methods into one WritePR since the engine only ever writes an observation as a whole. Round-trip test added.
| "path/filepath" | ||
|
|
||
| "github.com/pressly/goose/v3" | ||
| _ "modernc.org/sqlite" |
There was a problem hiding this comment.
The stack PR recommends trying github.com/ncruces/go-sqlite3/driver first behind database/sql. If we are choosing modernc.org/sqlite instead, let’s capture the reason in the stack PR and keep that as the source of truth.
There was a problem hiding this comment.
Recorded the rationale at the import site in 70aab5e: modernc.org/sqlite is the pure-Go / CGO-free driver, chosen so the daemon cross-compiles and ships as a static binary with no libsqlite/CGO toolchain dependency (tradeoff: some raw throughput vs a C-backed driver). Happy to mirror that into the stack PR as the source of truth if you point me at it.
| return err | ||
| } | ||
|
|
||
| // Open the durable store and bring up the CDC substrate (outbox publisher, |
There was a problem hiding this comment.
This comment still describes the old outbox/JSONL/janitor flow, while the current implementation uses trigger-driven change_log polling. Please update this and the related startup comment below to match the implementation.
There was a problem hiding this comment.
Updated in 0dbd304 — both the store-open and lifecycle startup comments now describe the trigger-driven flow (DB trigger → change_log → poller → broadcaster), with no mention of the removed outbox/JSONL/janitor. Thanks!
…on-signal exit lcStack.Stop()/cdcPipe.Stop() block on done channels that close only when ctx is cancelled, but the deferred stop() that cancels ctx ran last (LIFO) — so any non-signal exit (e.g. a listener bind error) hung the daemon forever. Cancel ctx first, then drain, explicitly after srv.Run instead of via defer. Also refresh the startup comments that still described the removed outbox/JSONL/janitor flow. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Addresses review on PR-observation persistence: - pr_checks now has an AFTER UPDATE CDC trigger (guarded on status change), so a check flipping in_progress->failed on the same commit emits change_log instead of updating silently. Restores symmetry with the sessions/pr triggers. - writePR persists scalar facts + checks + comments in ONE transaction via Store.WritePRObservation, so a mid-write failure can't leave the pr row (and its CDC event) committed while checks/comments are partial. Collapses the PRWriter port's three write methods into one WritePR. - db.go: record why modernc.org/sqlite (pure-Go, CGO-free static binary) at the import site. Regression tests for both the update-trigger (emit on change, suppress no-op re-poll) and the transactional write. go test -race ./... green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary
Replaces the in-memory
fakeStorewith the two real outbound adapters the Go backend needs to persist and deliver session state:internal/storage/sqlite- persistence satisfying the existingports.LifecycleStorecontract.internal/cdc- transactional-outbox publisher, JSONL delivery, durable consumer, and an in-process broadcaster.Both are wired into
main.goalongside the Lifecycle Manager and reaper, so the write path is live end-to-end:What was introduced
Storage (
internal/storage/sqlite)modernc.org/sqlite(pure Go, no CGO) so the 3-OS binary cross-compiles cleanly.gooseembedded migrations (0001_init.sql);sqlc-generated typed queries checked in undergen/.Upsert: the session row,change_logentry, andoutboxrow are written in a single*sql.Tx(all-or-nothing).revisionis an optimistic-concurrency (CAS) check, not a debug counter: insert requires incoming revision0and persists1; update requires the loaded revision to equal the stored one and bumps it+1; zero rows affected returns a revision-mismatch error. This mirrors the originalfakeStoresemantics exactly.branch,workspacePath, etc.) lives in a separatesession_metadatatable written byPatchMetadata, deliberately off the canonical CDC path (no revision bump, no event).reaction_trackers: escalation budgets now survive a restart, fixing the in-memory-only behaviour that re-fired human pages after a daemon bounce.CDC (
internal/cdc)os.SameFile, resyncs from a full-state snapshot on a gap, and persists its position inconsumer_offsets.Composition root (
main.go,cdc_wiring.go,lifecycle_wiring.go)startCDCstands up publisher / consumer / janitor + broadcaster.startLifecycleconstructs the LCM, makes escalation budgets durable viaWithReactionStore, teaches it to enumerate sessions viaWithSessionLister(store.ListAll), and starts the reaper.Migration 0002: projects + pr_enrichment
Two further tables, both written outside the LCM write path (no revision bump, no
change_log/outboxevent), so neither rides the CDC pipeline. Store methods follow the existingreaction_trackersadapter pattern with storage-local row structs (ProjectRow,PREnrichmentRow).Table:
projectsThe durable registry of repos AO manages, the SQLite twin of the old YAML config (global
config.yamlplus per-repoagent-orchestrator.yaml). Keyed by the same{basename}_{sha256(path:originUrl)[:10]}id thatsessions.project_idreferences.archived_at(NULL = active). Archiving keeps the row so a session'sproject_idalways resolves; there is no FK fromsessions.project_idtoprojects.id(SQLite cannotALTER ADDa FK, and a backfill can land sessions before their project row), so the soft-delete row is what prevents dangling references.ListProjectsreturns active rows only (WHERE archived_at IS NULL);GetProjectresolves any row regardless of state and reportsArchivedAt.UpsertProject,GetProject(returnsok),ListProjects,ArchiveProject,DeleteProject(hard purge).Table:
pr_enrichmentA per-session cache of the rich SCM facts that do not belong in the canonical lifecycle (which keeps only
pr_state/reason/number/url).session_id PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE, so enrichment dies with its session (same cascade assession_metadata).UpsertPREnrichment,GetPREnrichment(returnsok),DeletePREnrichment.Both tables are currently dormant: the SCM observer that writes
pr_enrichmentand the project-registration/archive handler that writesprojectsare downstream daemon-lane work and do not exist yet. The schema, queries, and store methods are in place so that wiring is purely additive.Temporary stubs and how to fill them in
The write path is live, but three collaborators owned by the daemon/coding-agents lane do not have production implementations yet. They are stubbed with
TODO(daemon-lane)markers inlifecycle_wiring.go. A no-op here is safe only because it merely drops an outbound side-effect; none of them fake success.noopNotifierlifecycle_wiring.go, passed tolifecycle.New(store, noopNotifier{}, ...)noopNotifier{}. Until then human-facing notifications are silently dropped.noopMessengerlifecycle_wiring.go, passed tolifecycle.New(store, _, noopMessenger{})reaper.MapRegistry{}(empty)lifecycle_wiring.go,reaper.New(lcm, reaper.MapRegistry{}, ...)reaper.MapRegistry{"tmux": tmuxRuntime, "process": procRuntime}. With an empty registry the reaper ticks escalations but probes nothing, which is correct until runtimes exist.Deliberately NOT stubbed (see comment block in
main.go)session.Newneeds realRuntime/Agent/Workspaceplugins. No-op versions would makeSpawnappear to succeed while creating no live pane (a silent-failure footgun), so it is deferred rather than faked. The LCM already exposes the read surface (RunningSessions) the SM would wrap.httpd.Newtakes no SM/LCM today; surfacing the store over HTTP needs a constructor signature change plus handlers that call into the SM, so it is tracked with the SM work.The distinction: stub when the no-op is semantically honest (no notifier => no notification). Do not stub when the no-op would lie about success (no runtime => ghost session).
gitignore
Added
*.db,*.db-shm,*.db-wal,session-events.jsonl[.*]under the Go section. The defaultAO_DATA_DIRlives outside the repo, so nothing leaks today; this guards against a data dir pointed at the tree during local dev.Test plan
gofmt -l .cleango build ./...go vet ./...go test -race ./...greenao.db+ WAL +session-events.jsonl, drains cleanly on SIGTERM