Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/internal/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
EventPRSessionChanged EventType = "pr_session_changed"
EventPRReviewThreadAdded EventType = "pr_review_thread_added"
EventPRReviewThreadResolved EventType = "pr_review_thread_resolved"
EventNotificationCreated EventType = "notification_created"
EventNotificationUpdated EventType = "notification_updated"
)

// Event is one CDC change read from change_log. Seq is the monotonic ordering +
Expand Down
14 changes: 13 additions & 1 deletion backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/zellij"
"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/httpd"
"github.com/aoagents/agent-orchestrator/backend/internal/runfile"
notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification"
projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
"github.com/aoagents/agent-orchestrator/backend/internal/terminal"
Expand Down Expand Up @@ -82,10 +84,20 @@ func Run() error {
// agent nudges (CI failure, review feedback, merge conflict).
messenger := newSessionMessenger(store, runtimeAdapter, log)

// The notification service owns enrichment, concise canonical copy, semantic
// action descriptors, durable dedupe, and SQLite persistence. It is wired into
// LCM independently from the messenger/nudge path.
notificationSvc := notificationsvc.New(notificationsvc.Deps{
Store: store,
Maker: notificationsvc.DefaultMaker{},
Clock: time.Now,
Logger: log,
})

// Bring up the Lifecycle Manager and the reaper first: it makes the session
// lifecycle write path live (reducer write -> store -> DB trigger ->
// change_log -> poller -> broadcaster) and gives startSession the shared LCM.
lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, log)
lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notificationSvc, log)
lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log)

// Wire the controller-facing session service over the same store + LCM, the
Expand Down
6 changes: 4 additions & 2 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ type lifecycleStack struct {
// reaper. The goroutine stops when ctx is cancelled; Stop waits for it to drain.
// The messenger is the per-daemon agent messenger the LCM uses to nudge agents
// in response to SCM observations (CI failure, review feedback, merge conflict).
func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, logger *slog.Logger) *lifecycleStack {
lcm := lifecycle.New(store, messenger)
func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, notifications interface {
Notify(context.Context, domain.NotificationIntent) error
}, logger *slog.Logger) *lifecycleStack {
lcm := lifecycle.NewWithDeps(lifecycle.Deps{Store: store, Messenger: messenger, Notifications: notifications, Logger: logger})
rp := reaper.New(lcm, store, runtime, reaper.Config{Logger: logger})
return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)}
}
Expand Down
36 changes: 35 additions & 1 deletion backend/internal/daemon/wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/lifecycle"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification"
sessionmanager "github.com/aoagents/agent-orchestrator/backend/internal/session_manager"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) {

log := slog.New(slog.NewTextHandler(io.Discard, nil))
messenger := &captureMessenger{}
stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, log)
stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, nil, log)
t.Cleanup(stack.Stop)
t.Cleanup(cancel)

Expand All @@ -339,6 +340,39 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) {
}
}

func TestWiring_StartLifecycleThreadsNotificationSinkIntoLCM(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
store, err := sqlite.Open(t.TempDir())
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = store.Close() })
if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil {
t.Fatal(err)
}
rec, err := store.CreateSession(ctx, domain.SessionRecord{ProjectID: "p", Kind: domain.KindWorker, Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}})
if err != nil {
t.Fatal(err)
}

log := slog.New(slog.NewTextHandler(io.Discard, nil))
notificationSvc := notificationsvc.New(notificationsvc.Deps{Store: store, Clock: time.Now, Logger: log})
stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), nil, notificationSvc, log)
t.Cleanup(stack.Stop)
t.Cleanup(cancel)

if err := stack.LCM.ApplyActivitySignal(ctx, rec.ID, ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput, Timestamp: time.Now()}); err != nil {
t.Fatal(err)
}
notifications, err := store.ListNotificationsBySession(ctx, rec.ID, 10)
if err != nil {
t.Fatal(err)
}
if len(notifications) != 1 || notifications[0].Type != domain.NotificationSessionInput {
t.Fatalf("notifications = %+v", notifications)
}
}

// TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo
// resolver turns a registered project into its on-disk repo path (so spawns
// materialise a worktree), and fails loudly for an unregistered project.
Expand Down
Loading
Loading