-
Notifications
You must be signed in to change notification settings - Fork 1
feat(backend): SQLite storage layer + CDC pipeline #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
f5bc4c7
feat(backend): SQLite storage layer + CDC pipeline, LCM/reaper wiring
Pritom14 23b8fe4
feat(backend): add projects and pr_enrichment tables to SQLite store
Pritom14 4ce9044
refactor(storage): make session metadata + PR facts typed and structured
AgentWrapper b0e4fff
test(cdc): add full-stack E2E tests through the real store + snapshot…
AgentWrapper ba47212
perf(storage): allow concurrent reads; serialize writes via a mutex
AgentWrapper e5c4fd6
feat(storage,cdc): minimal 6-table schema + trigger-driven CDC (stora…
AgentWrapper cdf55eb
feat(backend): port lifecycle lane onto the new storage+CDC model
AgentWrapper 0a69b84
docs(config): drop stale CDC-JSONL mention in resolveDataDir
AgentWrapper 0dbd304
fix(backend): drain CDC/lifecycle goroutines without deadlocking on n…
AgentWrapper 70aab5e
feat(backend): atomic PR-observation write + CDC on check status updates
AgentWrapper File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "log/slog" | ||
|
|
||
| "github.com/aoagents/agent-orchestrator/backend/internal/cdc" | ||
| "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" | ||
| ) | ||
|
|
||
| // cdcPipeline owns the running CDC poller and the broadcaster the SSE transport | ||
| // subscribes to. The DB triggers write change_log; the poller tails it and fans | ||
| // each new event out through the broadcaster. Durable catch-up is the client's | ||
| // job (it reads change_log from its own Last-Event-ID), so the poller only | ||
| // pushes live events and re-seeks to head on restart. | ||
| type cdcPipeline struct { | ||
| Broadcaster *cdc.Broadcaster | ||
| done <-chan struct{} | ||
| } | ||
|
|
||
| // startCDC seeks the poller to the current head and starts its loop. It stops | ||
| // when ctx is cancelled; Stop waits for it to drain. | ||
| func startCDC(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*cdcPipeline, error) { | ||
| bcast := cdc.NewBroadcaster() | ||
| poller := cdc.NewPoller(cdcSource{store}, bcast, cdc.PollerConfig{Logger: logger}) | ||
| if err := poller.SeekToHead(ctx); err != nil { | ||
| return nil, err | ||
| } | ||
| return &cdcPipeline{Broadcaster: bcast, done: poller.Start(ctx)}, nil | ||
| } | ||
|
|
||
| // Stop waits for the poller goroutine to exit (the caller must have cancelled the | ||
| // ctx passed to startCDC). | ||
| func (p *cdcPipeline) Stop() error { | ||
| <-p.done | ||
| return nil | ||
| } | ||
|
|
||
| // cdcSource adapts *sqlite.Store's change_log reads to cdc.Source. | ||
| type cdcSource struct{ store *sqlite.Store } | ||
|
|
||
| func (s cdcSource) EventsAfter(ctx context.Context, after int64, limit int) ([]cdc.Event, error) { | ||
| rows, err := s.store.ReadChangeLogAfter(ctx, after, limit) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| out := make([]cdc.Event, len(rows)) | ||
| for i, r := range rows { | ||
| out[i] = cdc.Event{ | ||
| Seq: r.Seq, | ||
| ProjectID: r.ProjectID, | ||
| SessionID: r.SessionID, | ||
| Type: cdc.EventType(r.EventType), | ||
| Payload: json.RawMessage(r.Payload), | ||
| CreatedAt: r.CreatedAt, | ||
| } | ||
| } | ||
| return out, nil | ||
| } | ||
|
|
||
| func (s cdcSource) LatestSeq(ctx context.Context) (int64, error) { | ||
| return s.store.MaxChangeLogSeq(ctx) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,25 @@ | ||
| module github.com/aoagents/agent-orchestrator/backend | ||
|
|
||
| go 1.22 | ||
| go 1.25.7 | ||
|
|
||
| require github.com/go-chi/chi/v5 v5.1.0 | ||
| require ( | ||
| github.com/go-chi/chi/v5 v5.1.0 | ||
| github.com/pressly/goose/v3 v3.27.1 | ||
| modernc.org/sqlite v1.51.0 | ||
| ) | ||
|
|
||
| require ( | ||
| github.com/dustin/go-humanize v1.0.1 // indirect | ||
| github.com/google/uuid v1.6.0 // indirect | ||
| github.com/mattn/go-isatty v0.0.21 // indirect | ||
| github.com/mfridman/interpolate v0.0.2 // indirect | ||
| github.com/ncruces/go-strftime v1.0.0 // indirect | ||
| github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect | ||
| github.com/sethvargo/go-retry v0.3.0 // indirect | ||
| go.uber.org/multierr v1.11.0 // indirect | ||
| golang.org/x/sync v0.20.0 // indirect | ||
| golang.org/x/sys v0.43.0 // indirect | ||
| modernc.org/libc v1.72.3 // indirect | ||
| modernc.org/mathutil v1.7.1 // indirect | ||
| modernc.org/memory v1.11.0 // indirect | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,68 @@ | ||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
| github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= | ||
| github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= | ||
| github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= | ||
| github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= | ||
| github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= | ||
| github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= | ||
| github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= | ||
| github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||
| github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= | ||
| github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= | ||
| github.com/mattn/go-isatty v0.0.21 h1:xYae+lCNBP7QuW4PUnNG61ffM4hVIfm+zUzDuSzYLGs= | ||
| github.com/mattn/go-isatty v0.0.21/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= | ||
| github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= | ||
| github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg= | ||
| github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= | ||
| github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= | ||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
| github.com/pressly/goose/v3 v3.27.1 h1:6uEvcprBybDmW4hcz3gYujhARhye+GoWKhEWyzD5sh4= | ||
| github.com/pressly/goose/v3 v3.27.1/go.mod h1:maruOxsPnIG2yHHyo8UqKWXYKFcH7Q76csUV7+7KYoM= | ||
| github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= | ||
| github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= | ||
| github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE= | ||
| github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas= | ||
| github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= | ||
| github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= | ||
| go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= | ||
| go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= | ||
| golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= | ||
| golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= | ||
| golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= | ||
| golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= | ||
| golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= | ||
| golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= | ||
| golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= | ||
| golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= | ||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||
| modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= | ||
| modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= | ||
| modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= | ||
| modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A= | ||
| modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= | ||
| modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= | ||
| modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= | ||
| modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= | ||
| modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= | ||
| modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= | ||
| modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= | ||
| modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= | ||
| modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU= | ||
| modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs= | ||
| modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= | ||
| modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= | ||
| modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= | ||
| modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= | ||
| modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg= | ||
| modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= | ||
| modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= | ||
| modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= | ||
| modernc.org/sqlite v1.51.0 h1:aH/MMSoayAIhozZ7uJbVTT9QO/VhzBf0J9tymmmuC/U= | ||
| modernc.org/sqlite v1.51.0/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM= | ||
| modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= | ||
| modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= | ||
| modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= | ||
| modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| package cdc | ||
|
|
||
| import ( | ||
| "log/slog" | ||
| "sync" | ||
| ) | ||
|
|
||
| // Broadcaster is the in-process fan-out the poller feeds. Subscribers (the | ||
| // WS/SSE transport, wired in the frontend task) register a callback; every | ||
| // polled Event is delivered to all current subscribers. It is the single seam | ||
| // between the CDC poller and live delivery, so the transport can be built and | ||
| // swapped without touching the poller. | ||
| type Broadcaster struct { | ||
| mu sync.RWMutex | ||
| nextID int | ||
| subs map[int]func(Event) | ||
| logger *slog.Logger | ||
| } | ||
|
|
||
| // NewBroadcaster returns an empty Broadcaster ready for subscriptions. | ||
| func NewBroadcaster() *Broadcaster { | ||
| return &Broadcaster{subs: map[int]func(Event){}, logger: slog.Default()} | ||
| } | ||
|
|
||
| // Subscribe registers fn and returns an unsubscribe function. fn is called | ||
| // synchronously from the poller loop, so it must not block; a transport that | ||
| // needs buffering should push onto its own channel inside fn. | ||
| func (b *Broadcaster) Subscribe(fn func(Event)) (unsubscribe func()) { | ||
| b.mu.Lock() | ||
| id := b.nextID | ||
| b.nextID++ | ||
| b.subs[id] = fn | ||
| b.mu.Unlock() | ||
| return func() { | ||
| b.mu.Lock() | ||
| delete(b.subs, id) | ||
| b.mu.Unlock() | ||
| } | ||
| } | ||
|
|
||
| // SubscriberCount reports the number of current subscribers. | ||
| func (b *Broadcaster) SubscriberCount() int { | ||
| b.mu.RLock() | ||
| defer b.mu.RUnlock() | ||
| return len(b.subs) | ||
| } | ||
|
|
||
| // Publish delivers e to every current subscriber. A panicking subscriber is | ||
| // recovered and logged so one bad callback can't kill the poller goroutine or | ||
| // starve the other subscribers. | ||
| func (b *Broadcaster) Publish(e Event) { | ||
| b.mu.RLock() | ||
| defer b.mu.RUnlock() | ||
| for _, fn := range b.subs { | ||
| b.deliver(fn, e) | ||
| } | ||
| } | ||
|
|
||
| func (b *Broadcaster) deliver(fn func(Event), e Event) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| b.logger.Error("cdc broadcaster: subscriber panicked", "seq", e.Seq, "panic", r) | ||
| } | ||
| }() | ||
| fn(e) | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.