diff --git a/backend/internal/cdc/event.go b/backend/internal/cdc/event.go index 4f43d8f..13283c2 100644 --- a/backend/internal/cdc/event.go +++ b/backend/internal/cdc/event.go @@ -28,6 +28,8 @@ const ( EventPRSessionChanged EventType = "pr_session_changed" EventPRReviewThreadAdded EventType = "pr_review_thread_added" EventPRReviewThreadResolved EventType = "pr_review_thread_resolved" + EventNotificationCreated EventType = "notification_created" + EventNotificationUpdated EventType = "notification_updated" ) // Event is one CDC change read from change_log. Seq is the monotonic ordering + diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 983b95a..2322a3b 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -10,11 +10,13 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/zellij" "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/httpd" "github.com/aoagents/agent-orchestrator/backend/internal/runfile" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" "github.com/aoagents/agent-orchestrator/backend/internal/terminal" @@ -82,10 +84,20 @@ func Run() error { // agent nudges (CI failure, review feedback, merge conflict). messenger := newSessionMessenger(store, runtimeAdapter, log) + // The notification service owns enrichment, concise canonical copy, semantic + // action descriptors, durable dedupe, and SQLite persistence. It is wired into + // LCM independently from the messenger/nudge path. + notificationSvc := notificationsvc.New(notificationsvc.Deps{ + Store: store, + Maker: notificationsvc.DefaultMaker{}, + Clock: time.Now, + Logger: log, + }) + // Bring up the Lifecycle Manager and the reaper first: it makes the session // lifecycle write path live (reducer write -> store -> DB trigger -> // change_log -> poller -> broadcaster) and gives startSession the shared LCM. - lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, log) + lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notificationSvc, log) lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log) // Wire the controller-facing session service over the same store + LCM, the diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 47251f2..e9248e5 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -34,8 +34,10 @@ type lifecycleStack struct { // reaper. The goroutine stops when ctx is cancelled; Stop waits for it to drain. // The messenger is the per-daemon agent messenger the LCM uses to nudge agents // in response to SCM observations (CI failure, review feedback, merge conflict). -func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, logger *slog.Logger) *lifecycleStack { - lcm := lifecycle.New(store, messenger) +func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runtime, messenger ports.AgentMessenger, notifications interface { + Notify(context.Context, domain.NotificationIntent) error +}, logger *slog.Logger) *lifecycleStack { + lcm := lifecycle.NewWithDeps(lifecycle.Deps{Store: store, Messenger: messenger, Notifications: notifications, Logger: logger}) rp := reaper.New(lcm, store, runtime, reaper.Config{Logger: logger}) return &lifecycleStack{LCM: lcm, reaperDone: rp.Start(ctx)} } diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 0e4815d..54e511d 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -16,6 +16,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" sessionmanager "github.com/aoagents/agent-orchestrator/backend/internal/session_manager" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" ) @@ -315,7 +316,7 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) { log := slog.New(slog.NewTextHandler(io.Discard, nil)) messenger := &captureMessenger{} - stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, log) + stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), messenger, nil, log) t.Cleanup(stack.Stop) t.Cleanup(cancel) @@ -339,6 +340,39 @@ func TestWiring_StartLifecycleThreadsMessengerIntoLCM(t *testing.T) { } } +func TestWiring_StartLifecycleThreadsNotificationSinkIntoLCM(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ProjectID: "p", Kind: domain.KindWorker, Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}}) + if err != nil { + t.Fatal(err) + } + + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + notificationSvc := notificationsvc.New(notificationsvc.Deps{Store: store, Clock: time.Now, Logger: log}) + stack := startLifecycle(ctx, store, zellij.New(zellij.Options{}), nil, notificationSvc, log) + t.Cleanup(stack.Stop) + t.Cleanup(cancel) + + if err := stack.LCM.ApplyActivitySignal(ctx, rec.ID, ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput, Timestamp: time.Now()}); err != nil { + t.Fatal(err) + } + notifications, err := store.ListNotificationsBySession(ctx, rec.ID, 10) + if err != nil { + t.Fatal(err) + } + if len(notifications) != 1 || notifications[0].Type != domain.NotificationSessionInput { + t.Fatalf("notifications = %+v", notifications) + } +} + // TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo // resolver turns a registered project into its on-disk repo path (so spawns // materialise a worktree), and fails loudly for an unregistered project. diff --git a/backend/internal/domain/notification.go b/backend/internal/domain/notification.go new file mode 100644 index 0000000..fb03ef6 --- /dev/null +++ b/backend/internal/domain/notification.go @@ -0,0 +1,299 @@ +package domain + +import ( + "encoding/json" + "errors" + "fmt" + "time" +) + +// NotificationID identifies one persisted canonical notification. +type NotificationID string + +// NotificationType identifies the product-level event a notification represents. +type NotificationType string + +// NotificationPriority controls the user-visible urgency of a notification. +type NotificationPriority string + +// NotificationStatus is the persisted lifecycle state of a notification. +type NotificationStatus string + +// Notification types emitted by lifecycle and persisted by storage. +const ( + NotificationCIFailing NotificationType = "ci.failing" + NotificationReviewChanges NotificationType = "review.changes_requested" + NotificationMergeConflicts NotificationType = "merge.conflicts" + NotificationMergeReady NotificationType = "merge.ready" + NotificationMergeCompleted NotificationType = "merge.completed" + NotificationSessionInput NotificationType = "session.needs_input" + NotificationSessionExited NotificationType = "session.exited" +) + +// Notification priorities persisted by storage. +const ( + NotificationUrgent NotificationPriority = "urgent" + NotificationPriorityAction NotificationPriority = "action" + NotificationWarning NotificationPriority = "warning" + NotificationInfo NotificationPriority = "info" +) + +// Notification statuses persisted by storage. +const ( + NotificationUnread NotificationStatus = "unread" + NotificationRead NotificationStatus = "read" + NotificationDismissed NotificationStatus = "dismissed" + NotificationResolved NotificationStatus = "resolved" +) + +// NotificationIntent is the lifecycle-to-notification contract. Lifecycle owns +// the relevance decision; the notification service owns enrichment, copy, +// semantic actions, dedupe, and persistence. +type NotificationIntent struct { + Type NotificationType + Priority NotificationPriority + ProjectID ProjectID + SessionID SessionID + Source string + DedupeKey string + OccurredAt time.Time + Context NotificationIntentContext +} + +// NotificationIntentContext carries small, stable fragments lifecycle already +// has at the decision point. It intentionally excludes raw provider payloads and +// channel/dashboard rendering details. +type NotificationIntentContext struct { + PRURL string `json:"prUrl,omitempty"` + CheckName string `json:"checkName,omitempty"` + CheckURL string `json:"checkUrl,omitempty"` + CommitHash string `json:"commitHash,omitempty"` + ReviewIDs []string `json:"reviewIds,omitempty"` + ThreadIDs []string `json:"threadIds,omitempty"` + MergeState string `json:"mergeState,omitempty"` + Reason string `json:"reason,omitempty"` + Facts map[string]any `json:"facts,omitempty"` +} + +// Validate rejects malformed notification intents before the service performs +// any enrichment or persistence work. +func (i NotificationIntent) Validate() error { + if i.Type == "" { + return errors.New("notification intent: missing type") + } + if !validNotificationType(i.Type) { + return fmt.Errorf("notification intent: unsupported type %q", i.Type) + } + if i.Priority == "" { + return errors.New("notification intent: missing priority") + } + if !validNotificationPriority(i.Priority) { + return fmt.Errorf("notification intent: unsupported priority %q", i.Priority) + } + if i.ProjectID == "" { + return errors.New("notification intent: missing project id") + } + if i.SessionID == "" { + return errors.New("notification intent: missing session id") + } + if i.Source == "" { + return errors.New("notification intent: missing source") + } + if i.DedupeKey == "" { + return errors.New("notification intent: missing dedupe key") + } + if err := ensureJSONSafe(i.Context); err != nil { + return fmt.Errorf("notification intent: context is not JSON-safe: %w", err) + } + return nil +} + +func validNotificationType(v NotificationType) bool { + switch v { + case NotificationCIFailing, NotificationReviewChanges, NotificationMergeConflicts, NotificationMergeReady, + NotificationMergeCompleted, NotificationSessionInput, NotificationSessionExited: + return true + default: + return false + } +} + +func validNotificationPriority(v NotificationPriority) bool { + switch v { + case NotificationUrgent, NotificationPriorityAction, NotificationWarning, NotificationInfo: + return true + default: + return false + } +} + +func validNotificationStatus(v NotificationStatus) bool { + switch v { + case NotificationUnread, NotificationRead, NotificationDismissed, NotificationResolved: + return true + default: + return false + } +} + +// Notification is the canonical persisted notification row. Every notification +// is session-scoped; project-level views should group these session-owned rows. +// It stores concise visible copy plus structured evidence and semantic action +// descriptors. +type Notification struct { + ID NotificationID + Type NotificationType + Priority NotificationPriority + Status NotificationStatus + ProjectID ProjectID + SessionID SessionID + Source string + DedupeKey string + Fingerprint string + Title string + Summary string + Body string + Subject NotificationSubject + Data map[string]any + Actions []NotificationAction + OccurredAt time.Time + ReadAt *time.Time + DismissedAt *time.Time + ResolvedAt *time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + +// NotificationSubject describes what the notification is about without tying +// consumers to an API route or dashboard URL shape. +type NotificationSubject struct { + Kind string `json:"kind,omitempty"` + Label string `json:"label,omitempty"` + ProjectID ProjectID `json:"projectId,omitempty"` + SessionID SessionID `json:"sessionId,omitempty"` + PRURL string `json:"prUrl,omitempty"` + PRNumber int `json:"prNumber,omitempty"` + PRTitle string `json:"prTitle,omitempty"` + ProjectName string `json:"projectName,omitempty"` +} + +// NotificationAction is a semantic descriptor. Execution of actions is a future +// API/dashboard concern; this backend scope only stores the descriptors. +type NotificationAction struct { + ID string `json:"id"` + Label string `json:"label"` + Kind string `json:"kind"` // route, link, command, callback + URL string `json:"url,omitempty"` + Route string `json:"route,omitempty"` + Payload map[string]any `json:"payload,omitempty"` + Primary bool `json:"primary,omitempty"` +} + +// Validate checks that an action descriptor can be safely persisted as JSON. +func (a NotificationAction) Validate() error { + if a.ID == "" { + return errors.New("missing id") + } + if a.Label == "" { + return errors.New("missing label") + } + if a.Kind == "" { + return errors.New("missing kind") + } + return ensureJSONSafe(a) +} + +// NotificationContent is channel-neutral canonical copy produced by the central +// notification maker. +type NotificationContent struct { + Title string + Summary string + Body string +} + +// NotificationResolveFilter selects earlier notifications that are no longer +// relevant after a later lifecycle fact, such as a merged PR. +type NotificationResolveFilter struct { + ProjectID ProjectID + SessionID *SessionID + PRURL string + Types []NotificationType + DedupeKeyPrefixes []string + Statuses []NotificationStatus +} + +// Normalize fills storage defaults that are meaningful at the domain boundary. +func (n Notification) Normalize() Notification { + if n.Status == "" { + n.Status = NotificationUnread + } + if n.Data == nil { + n.Data = map[string]any{} + } + if n.Actions == nil { + n.Actions = []NotificationAction{} + } + return n +} + +// Validate rejects invalid persisted notification rows before storage mapping. +func (n Notification) Validate() error { + n = n.Normalize() + if n.ID == "" { + return errors.New("notification: missing id") + } + if n.Type == "" || !validNotificationType(n.Type) { + return fmt.Errorf("notification: invalid type %q", n.Type) + } + if n.Priority == "" || !validNotificationPriority(n.Priority) { + return fmt.Errorf("notification: invalid priority %q", n.Priority) + } + if n.Status == "" || !validNotificationStatus(n.Status) { + return fmt.Errorf("notification: invalid status %q", n.Status) + } + if n.ProjectID == "" { + return errors.New("notification: missing project id") + } + if n.SessionID == "" { + return errors.New("notification: missing session id") + } + if n.Source == "" { + return errors.New("notification: missing source") + } + if n.DedupeKey == "" { + return errors.New("notification: missing dedupe key") + } + if n.Fingerprint == "" { + return errors.New("notification: missing fingerprint") + } + if n.Title == "" { + return errors.New("notification: missing title") + } + if n.Summary == "" { + return errors.New("notification: missing summary") + } + if n.OccurredAt.IsZero() || n.CreatedAt.IsZero() || n.UpdatedAt.IsZero() { + return errors.New("notification: missing timestamps") + } + for _, a := range n.Actions { + if err := a.Validate(); err != nil { + return fmt.Errorf("notification: action %q: %w", a.ID, err) + } + } + if err := ensureJSONSafe(n.Subject); err != nil { + return fmt.Errorf("notification: subject is not JSON-safe: %w", err) + } + if err := ensureJSONSafe(n.Data); err != nil { + return fmt.Errorf("notification: data is not JSON-safe: %w", err) + } + return nil +} + +func ensureJSONSafe(v any) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + var decoded any + return json.Unmarshal(b, &decoded) +} diff --git a/backend/internal/domain/notification_test.go b/backend/internal/domain/notification_test.go new file mode 100644 index 0000000..304a264 --- /dev/null +++ b/backend/internal/domain/notification_test.go @@ -0,0 +1,82 @@ +package domain + +import ( + "strings" + "testing" + "time" +) + +func validIntent() NotificationIntent { + return NotificationIntent{ + Type: NotificationCIFailing, + Priority: NotificationWarning, + ProjectID: "mer", + SessionID: "mer-1", + Source: "test", + DedupeKey: "ci:pr:build:c1", + OccurredAt: time.Now(), + } +} + +func TestNotificationIntentValidateRejectsMissingRequiredFields(t *testing.T) { + for _, tc := range []struct { + name string + mut func(*NotificationIntent) + want string + }{ + {"type", func(i *NotificationIntent) { i.Type = "" }, "type"}, + {"priority", func(i *NotificationIntent) { i.Priority = "" }, "priority"}, + {"project", func(i *NotificationIntent) { i.ProjectID = "" }, "project"}, + {"session", func(i *NotificationIntent) { i.SessionID = "" }, "session"}, + {"source", func(i *NotificationIntent) { i.Source = "" }, "source"}, + {"dedupe", func(i *NotificationIntent) { i.DedupeKey = "" }, "dedupe"}, + } { + t.Run(tc.name, func(t *testing.T) { + in := validIntent() + tc.mut(&in) + err := in.Validate() + if err == nil || !strings.Contains(err.Error(), tc.want) { + t.Fatalf("Validate err = %v, want mention %q", err, tc.want) + } + }) + } +} + +func TestNotificationConstantsMatchStorageValues(t *testing.T) { + if NotificationUrgent != "urgent" || NotificationPriorityAction != "action" || NotificationWarning != "warning" || NotificationInfo != "info" { + t.Fatalf("priority constants changed") + } + if NotificationUnread != "unread" || NotificationRead != "read" || NotificationDismissed != "dismissed" || NotificationResolved != "resolved" { + t.Fatalf("status constants changed") + } +} + +func TestNotificationValidateRequiresSessionID(t *testing.T) { + now := time.Now() + n := Notification{ + ID: "n1", + Type: NotificationCIFailing, + Priority: NotificationWarning, + Status: NotificationUnread, + ProjectID: "mer", + Source: "test", + DedupeKey: "ci:pr:build:c1", + Fingerprint: "fp", + Title: "CI failed", + Summary: "mer-1 has 1 failing check.", + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + } + err := n.Validate() + if err == nil || !strings.Contains(err.Error(), "session") { + t.Fatalf("Validate err = %v, want missing session", err) + } +} + +func TestNotificationActionPayloadIsJSONSafe(t *testing.T) { + a := NotificationAction{ID: "open_session", Label: "Open", Kind: "route", Route: "session", Payload: map[string]any{"sessionId": SessionID("mer-1")}} + if err := a.Validate(); err != nil { + t.Fatalf("action should validate: %v", err) + } +} diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index 9a0a996..c921c87 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -9,6 +9,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification" prsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/pr" sessionsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/session" sessionmanager "github.com/aoagents/agent-orchestrator/backend/internal/session_manager" @@ -93,7 +94,8 @@ func newStack(t *testing.T) *stack { t.Fatal(err) } msg := &captureMessenger{} - lcm := lifecycle.New(store, msg) + notif := notificationsvc.New(notificationsvc.Deps{Store: store, Clock: time.Now}) + lcm := lifecycle.NewWithDeps(lifecycle.Deps{Store: store, Messenger: msg, Notifications: notif}) prm := prsvc.New(prsvc.Deps{Writer: store, Lifecycle: lcm}) rt := &stubRuntime{} ws := &stubWorkspace{} @@ -181,3 +183,50 @@ func TestCDCPollerReceivesSessionAndPREvents(t *testing.T) { t.Fatalf("want CDC events, got %d", len(got)) } } + +func TestPRObservationPersistsNotificationAndBroadcastsCDC(t *testing.T) { + ctx := context.Background() + st := newStack(t) + sess, err := st.sm.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker}) + if err != nil { + t.Fatal(err) + } + + b := cdc.NewBroadcaster() + var got []cdc.Event + b.Subscribe(func(e cdc.Event) { got = append(got, e) }) + poller := cdc.NewPoller(st.store, b, cdc.PollerConfig{}) + if err := poller.SeekToHead(ctx); err != nil { + t.Fatal(err) + } + + obs := ports.PRObservation{ + Fetched: true, + URL: "https://github.com/o/r/pull/1", + Number: 1, + CI: domain.CIFailing, + Checks: []ports.PRCheckObservation{{Name: "build", CommitHash: "c1", Status: domain.PRCheckFailed, URL: "https://ci/build", LogTail: "boom"}}, + } + if err := st.prm.ApplyObservation(ctx, sess.ID, obs); err != nil { + t.Fatal(err) + } + notifications, err := st.store.ListNotificationsBySession(ctx, sess.ID, 10) + if err != nil { + t.Fatal(err) + } + if len(notifications) != 1 || notifications[0].Type != domain.NotificationCIFailing { + t.Fatalf("notifications = %+v", notifications) + } + if err := poller.Poll(ctx); err != nil { + t.Fatal(err) + } + var sawNotification bool + for _, ev := range got { + if ev.Type == cdc.EventNotificationCreated { + sawNotification = true + } + } + if !sawNotification { + t.Fatalf("no notification CDC event broadcast; got %+v", got) + } +} diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index d0bd9c2..f453e68 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -7,6 +7,7 @@ package lifecycle import ( "context" "fmt" + "log/slog" "sync" "time" @@ -23,11 +24,26 @@ type sessionStore interface { UpdatePRLastNudgeSignature(ctx context.Context, prURL, payload string) error } +type notificationSink interface { + Notify(ctx context.Context, intent domain.NotificationIntent) error +} + +// Deps are the explicit collaborators used by Manager. Notifications is +// optional so tests and transitional wiring can keep using New. +type Deps struct { + Store sessionStore + Messenger ports.AgentMessenger + Notifications notificationSink + Logger *slog.Logger +} + // Manager reduces runtime, activity, spawn, and termination observations into durable session facts. // It also owns agent nudges caused by PR observations, including merge-conflict, CI-failure, and review-feedback prompts. type Manager struct { - store sessionStore - messenger ports.AgentMessenger + store sessionStore + messenger ports.AgentMessenger + notifications notificationSink + logger *slog.Logger mu sync.Mutex window time.Duration @@ -37,33 +53,47 @@ type Manager struct { // New builds a Lifecycle Manager over the session store it writes and the messenger it uses for agent nudges. func New(store sessionStore, messenger ports.AgentMessenger) *Manager { - return &Manager{store: store, messenger: messenger, window: defaultRecentActivityWindow, clock: time.Now, react: newReactionState()} + return NewWithDeps(Deps{Store: store, Messenger: messenger}) +} + +// NewWithDeps builds a Lifecycle Manager from explicit dependencies. +func NewWithDeps(deps Deps) *Manager { + logger := deps.Logger + if logger == nil { + logger = slog.Default() + } + return &Manager{store: deps.Store, messenger: deps.Messenger, notifications: deps.Notifications, logger: logger, window: defaultRecentActivityWindow, clock: time.Now, react: newReactionState()} } func (m *Manager) mutate(ctx context.Context, id domain.SessionID, fn func(domain.SessionRecord, time.Time) (domain.SessionRecord, bool)) error { + _, _, err := m.mutateRecord(ctx, id, fn) + return err +} + +func (m *Manager) mutateRecord(ctx context.Context, id domain.SessionID, fn func(domain.SessionRecord, time.Time) (domain.SessionRecord, bool)) (domain.SessionRecord, bool, error) { m.mu.Lock() defer m.mu.Unlock() rec, ok, err := m.store.GetSession(ctx, id) if err != nil || !ok { - return err + return domain.SessionRecord{}, false, err } now := m.clock() next, changed := fn(rec, now) if !changed { - return nil + return next, false, nil } next.UpdatedAt = now if err := m.store.UpdateSession(ctx, next); err != nil { - return err + return domain.SessionRecord{}, false, err } - return nil + return next, true, nil } // ApplyRuntimeObservation only writes when runtime liveness is unambiguous. A // failed probe or liveness disagreement is ignored; no transient lifecycle state is stored. func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.SessionID, f ports.RuntimeFacts) error { - return m.mutate(ctx, id, func(cur domain.SessionRecord, now time.Time) (domain.SessionRecord, bool) { + next, changed, err := m.mutateRecord(ctx, id, func(cur domain.SessionRecord, now time.Time) (domain.SessionRecord, bool) { if cur.IsTerminated || !runtimeClearlyDead(f, cur.Activity, now, m.window) { return cur, false } @@ -72,6 +102,23 @@ func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.Session next.Activity = domain.Activity{State: domain.ActivityExited, LastActivityAt: timeOr(f.ObservedAt, now)} return next, true }) + if err != nil || !changed { + return err + } + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationSessionExited, + Priority: domain.NotificationWarning, + ProjectID: next.ProjectID, + SessionID: next.ID, + Source: "lifecycle.runtime_observation", + DedupeKey: "session-exited:" + string(next.ID) + ":" + next.Activity.LastActivityAt.UTC().Format(time.RFC3339Nano), + OccurredAt: next.Activity.LastActivityAt, + Context: domain.NotificationIntentContext{ + Reason: "runtime_dead", + Facts: map[string]any{"probe": f.Probe}, + }, + }) + return nil } // ApplyActivitySignal records an authoritative agent activity signal. @@ -79,7 +126,7 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, if !s.Valid { return nil } - return m.mutate(ctx, id, func(cur domain.SessionRecord, now time.Time) (domain.SessionRecord, bool) { + next, changed, err := m.mutateRecord(ctx, id, func(cur domain.SessionRecord, now time.Time) (domain.SessionRecord, bool) { if cur.IsTerminated { return cur, false } @@ -94,6 +141,37 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, } return next, true }) + if err != nil || !changed { + return err + } + switch next.Activity.State { + case domain.ActivityWaitingInput: + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationSessionInput, + Priority: domain.NotificationUrgent, + ProjectID: next.ProjectID, + SessionID: next.ID, + Source: "lifecycle.activity_signal", + DedupeKey: "session-input:" + string(next.ID) + ":" + next.Activity.LastActivityAt.UTC().Format(time.RFC3339Nano), + OccurredAt: next.Activity.LastActivityAt, + Context: domain.NotificationIntentContext{Reason: "waiting_input"}, + }) + return nil + case domain.ActivityExited: + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationSessionExited, + Priority: domain.NotificationWarning, + ProjectID: next.ProjectID, + SessionID: next.ID, + Source: "lifecycle.activity_signal", + DedupeKey: "session-exited:" + string(next.ID) + ":" + next.Activity.LastActivityAt.UTC().Format(time.RFC3339Nano), + OccurredAt: next.Activity.LastActivityAt, + Context: domain.NotificationIntentContext{Reason: "activity_exited"}, + }) + return nil + default: + return nil + } } // MarkSpawned marks a newly spawned or restored session live and stores runtime/workspace handles. @@ -127,6 +205,15 @@ func (m *Manager) MarkTerminated(ctx context.Context, id domain.SessionID) error }) } +func (m *Manager) notifyBestEffort(ctx context.Context, intent domain.NotificationIntent) { + if m.notifications == nil { + return + } + if err := m.notifications.Notify(ctx, intent); err != nil { + m.logger.Warn("notification intent failed", "type", intent.Type, "project", intent.ProjectID, "session", intent.SessionID, "source", intent.Source, "dedupeKey", intent.DedupeKey, "error", err) + } +} + // sameActivity reports whether two activity signals describe the same state. // LastActivityAt is intentionally ignored: same-state repeats (e.g. a stream // of idle notifications) must not rewrite UpdatedAt or fan out a CDC event. diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 5a87298..bd87e7d 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -497,3 +497,186 @@ func TestPRObservation_RetriesAfterMessengerFailure(t *testing.T) { t.Fatalf("want retry to send once, got %v", msg.msgs) } } + +type fakeNotifications struct { + intents []domain.NotificationIntent + err error +} + +func (f *fakeNotifications) Notify(_ context.Context, intent domain.NotificationIntent) error { + if f.err != nil { + return f.err + } + f.intents = append(f.intents, intent) + return nil +} + +func newManagerWithNotifications() (*Manager, *fakeStore, *fakeMessenger, *fakeNotifications) { + st := newFakeStore() + msg := &fakeMessenger{} + n := &fakeNotifications{} + return NewWithDeps(Deps{Store: st, Messenger: msg, Notifications: n}), st, msg, n +} + +func TestPRObservation_CIFailingEmitsNotificationIntent(t *testing.T) { + m, st, _, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + o := ports.PRObservation{Fetched: true, URL: "pr1", CI: domain.CIFailing, Checks: []ports.PRCheckObservation{{Name: "build", CommitHash: "c1", Status: domain.PRCheckFailed, URL: "ci", LogTail: "boom"}}} + if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { + t.Fatal(err) + } + if len(notifications.intents) != 1 { + t.Fatalf("intents = %d", len(notifications.intents)) + } + got := notifications.intents[0] + if got.Type != domain.NotificationCIFailing || got.Priority != domain.NotificationWarning || got.DedupeKey != "ci:pr1:build:c1" || got.Context.CheckURL != "ci" { + t.Fatalf("intent = %+v", got) + } +} + +func TestPRObservation_CICancelledEmitsNotificationIntent(t *testing.T) { + m, st, msg, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + o := ports.PRObservation{Fetched: true, URL: "pr1", CI: domain.CIFailing, Checks: []ports.PRCheckObservation{{Name: "build", CommitHash: "c1", Status: domain.PRCheckCancelled, URL: "ci"}}} + if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { + t.Fatal(err) + } + if len(notifications.intents) != 1 { + t.Fatalf("intents = %d", len(notifications.intents)) + } + got := notifications.intents[0] + if got.Type != domain.NotificationCIFailing || got.Context.Facts["status"] != domain.PRCheckCancelled { + t.Fatalf("intent = %+v", got) + } + if len(msg.msgs) != 1 { + t.Fatalf("cancelled failing CI should still nudge once, got %v", msg.msgs) + } +} + +func TestSCMObservationReviewHashSortsIDs(t *testing.T) { + mk := func(threads []ports.SCMReviewThreadObservation) ports.SCMObservation { + return ports.SCMObservation{ + Fetched: true, + Review: ports.SCMReviewObservation{ + Decision: string(domain.ReviewChangesRequest), + Threads: threads, + }, + } + } + first := scmToPRObservation(mk([]ports.SCMReviewThreadObservation{ + {ID: "t2", Comments: []ports.SCMReviewCommentObservation{{ID: "c2"}}}, + {ID: "t1", Comments: []ports.SCMReviewCommentObservation{{ID: "c1"}}}, + })) + second := scmToPRObservation(mk([]ports.SCMReviewThreadObservation{ + {ID: "t1", Comments: []ports.SCMReviewCommentObservation{{ID: "c1"}}}, + {ID: "t2", Comments: []ports.SCMReviewCommentObservation{{ID: "c2"}}}, + })) + if first.ReviewHash == "" || first.ReviewHash != second.ReviewHash { + t.Fatalf("review hash should be stable across provider ordering: first=%q second=%q", first.ReviewHash, second.ReviewHash) + } +} + +func TestPRObservation_ReviewMergeConflictReadyAndMergedIntents(t *testing.T) { + for _, tc := range []struct { + name string + obs ports.PRObservation + want domain.NotificationType + }{ + {"review", ports.PRObservation{Fetched: true, URL: "pr1", Review: domain.ReviewChangesRequest, Comments: []ports.PRCommentObservation{{ID: "c1", Body: "fix"}}}, domain.NotificationReviewChanges}, + {"conflict", ports.PRObservation{Fetched: true, URL: "pr1", Mergeability: domain.MergeConflicting, BaseSHA: "b1", HeadSHA: "h1"}, domain.NotificationMergeConflicts}, + {"ready", ports.PRObservation{Fetched: true, URL: "pr1", CI: domain.CIPassing, Review: domain.ReviewApproved, Mergeability: domain.MergeMergeable, HeadSHA: "h1"}, domain.NotificationMergeReady}, + {"merged", ports.PRObservation{Fetched: true, URL: "pr1", Merged: true, MergeCommitSHA: "m1"}, domain.NotificationMergeCompleted}, + } { + t.Run(tc.name, func(t *testing.T) { + m, st, _, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + if err := m.ApplyPRObservation(ctx, "mer-1", tc.obs); err != nil { + t.Fatal(err) + } + if len(notifications.intents) != 1 || notifications.intents[0].Type != tc.want { + t.Fatalf("intents = %+v, want %s", notifications.intents, tc.want) + } + if tc.want == domain.NotificationMergeCompleted && !st.sessions["mer-1"].IsTerminated { + t.Fatal("merged PR should terminate session") + } + }) + } +} + +func TestActivitySignalWaitingInputAndExitedEmitNotifications(t *testing.T) { + m, st, _, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + ts := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput, Timestamp: ts}); err != nil { + t.Fatal(err) + } + if len(notifications.intents) != 1 || notifications.intents[0].Type != domain.NotificationSessionInput || !strings.Contains(notifications.intents[0].DedupeKey, ts.Format(time.RFC3339Nano)) { + t.Fatalf("waiting input intents = %+v", notifications.intents) + } + st.sessions["mer-1"] = working("mer-1") + notifications.intents = nil + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityExited, Timestamp: ts}); err != nil { + t.Fatal(err) + } + if len(notifications.intents) != 1 || notifications.intents[0].Type != domain.NotificationSessionExited { + t.Fatalf("exited intents = %+v", notifications.intents) + } +} + +func TestWaitingInputSuppressesAgentNudgeButNotNotification(t *testing.T) { + m, st, msg, notifications := newManagerWithNotifications() + rec := working("mer-1") + rec.Activity.State = domain.ActivityWaitingInput + st.sessions["mer-1"] = rec + o := ports.PRObservation{Fetched: true, URL: "pr1", CI: domain.CIFailing, Checks: []ports.PRCheckObservation{{Name: "build", CommitHash: "c1", Status: domain.PRCheckFailed}}} + if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { + t.Fatal(err) + } + if len(msg.msgs) != 0 { + t.Fatalf("waiting_input should suppress agent nudge, got %v", msg.msgs) + } + if len(notifications.intents) != 1 || notifications.intents[0].Type != domain.NotificationCIFailing { + t.Fatalf("waiting_input should still notify user, got %+v", notifications.intents) + } +} + +func TestNotificationSinkFailureIsBestEffort(t *testing.T) { + m, st, msg, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + notifications.err = errors.New("notify failed") + err := m.ApplyPRObservation(ctx, "mer-1", ports.PRObservation{Fetched: true, URL: "pr1", Mergeability: domain.MergeConflicting}) + if err != nil { + t.Fatalf("notification failure should not fail lifecycle reaction: %v", err) + } + if len(msg.msgs) != 1 { + t.Fatalf("notification failure should not block agent nudge, got %v", msg.msgs) + } +} + +func TestNotificationSinkFailureDoesNotBlockLifecycleFacts(t *testing.T) { + m, st, _, notifications := newManagerWithNotifications() + st.sessions["mer-1"] = working("mer-1") + notifications.err = errors.New("notify failed") + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput, Timestamp: time.Now()}); err != nil { + t.Fatalf("waiting-input notification failure should not fail activity update: %v", err) + } + if got := st.sessions["mer-1"]; got.Activity.State != domain.ActivityWaitingInput { + t.Fatalf("activity update not persisted: %+v", got) + } + + st.sessions["mer-1"] = working("mer-1") + if err := m.ApplyPRObservation(ctx, "mer-1", ports.PRObservation{Fetched: true, URL: "pr1", Merged: true}); err != nil { + t.Fatalf("merge-completed notification failure should not fail termination: %v", err) + } + if got := st.sessions["mer-1"]; !got.IsTerminated || got.Activity.State != domain.ActivityExited { + t.Fatalf("merged PR should still terminate session: %+v", got) + } +} + +func TestNilNotificationSinkIsNoop(t *testing.T) { + m, st, _ := newManager() + st.sessions["mer-1"] = working("mer-1") + if err := m.ApplyActivitySignal(ctx, "mer-1", ports.ActivitySignal{Valid: true, State: domain.ActivityWaitingInput, Timestamp: time.Now()}); err != nil { + t.Fatal(err) + } +} diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 614d3db..e22bda6 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "log/slog" + "sort" "strings" "sync" @@ -36,50 +37,167 @@ type reactionPayload struct { } // ApplyPRObservation reacts to a fetched PR observation after the PR service has -// persisted it. It does not write PR rows; it owns PR-driven lifecycle effects -// and sends actionable agent nudges such as rebase, fix-CI, and -// address-review-feedback prompts. +// persisted it. It does not write PR rows; it owns PR-driven lifecycle effects, +// emits user notification intent, and sends actionable agent nudges such as +// rebase, fix-CI, and address-review-feedback prompts. func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o ports.PRObservation) error { if !o.Fetched { return nil } - if o.Merged { - return m.MarkTerminated(ctx, id) - } - if o.Closed { - return nil - } rec, ok, err := m.store.GetSession(ctx, id) if err != nil || !ok { return err } - if rec.IsTerminated || rec.Activity.State == domain.ActivityWaitingInput { + occurredAt := timeOr(o.ObservedAt, m.clock()) + prURL := firstNonEmptyString(o.URL, o.HTMLURL) + if o.Merged { + if !rec.IsTerminated { + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationMergeCompleted, + Priority: domain.NotificationInfo, + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle.pr_observation", + DedupeKey: mergeCompletedDedupeKey(prURL, o.MergeCommitSHA), + OccurredAt: occurredAt, + Context: domain.NotificationIntentContext{ + PRURL: prURL, + CommitHash: firstNonEmptyString(o.MergeCommitSHA, o.HeadSHA), + MergeState: string(domain.PRStateMerged), + Facts: map[string]any{ + "headSha": o.HeadSHA, + "baseSha": o.BaseSHA, + "mergeCommitSha": o.MergeCommitSHA, + }, + }, + }) + } + return m.MarkTerminated(ctx, id) + } + if o.Closed || rec.IsTerminated { return nil } + suppressAgentNudge := rec.Activity.State == domain.ActivityWaitingInput if o.CI == domain.CIFailing { for _, ch := range o.Checks { - if ch.Status == domain.PRCheckFailed { - msg := "CI is failing on your PR. Review the output below and push a fix." - if ch.LogTail != "" { - msg += "\n\nFailing output:\n" + ch.LogTail - } - return m.sendOnce(ctx, id, o.URL, "ci:"+o.URL+":"+ch.Name, ch.CommitHash+":"+ch.LogTail, msg, 0) + if !ciCheckNeedsAttention(ch.Status) { + continue } + commit := firstNonEmptyString(ch.CommitHash, o.HeadSHA, "unknown") + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationCIFailing, + Priority: domain.NotificationWarning, + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle.pr_observation", + DedupeKey: "ci:" + prURL + ":" + ch.Name + ":" + commit, + OccurredAt: occurredAt, + Context: domain.NotificationIntentContext{ + PRURL: prURL, + CheckName: ch.Name, + CheckURL: ch.URL, + CommitHash: commit, + Reason: "failed_check", + Facts: map[string]any{ + "status": ch.Status, + "logTail": boundedString(ch.LogTail, 4000), + }, + }, + }) + if suppressAgentNudge { + return nil + } + msg := "CI is failing on your PR. Review the output below and push a fix." + if ch.LogTail != "" { + msg += "\n\nFailing output:\n" + ch.LogTail + } + return m.sendOnce(ctx, id, prURL, "ci:"+prURL+":"+ch.Name, commit+":"+ch.LogTail, msg, 0) } } if o.Review == domain.ReviewChangesRequest || hasUnresolvedComments(o.Comments) { comments, sig := reviewContent(o.Comments) - msg := "A reviewer left feedback on your PR. Address it and push." - if comments != "" { - msg += "\n\n" + comments + if o.ReviewHash != "" { + sig = o.ReviewHash + } + if sig == "" && len(o.ThreadIDs) > 0 { + sig = strings.Join(sortedStrings(o.ThreadIDs), ",") } if sig == "" { sig = string(o.Review) } - return m.sendOnce(ctx, id, o.URL, "review:"+o.URL, sig, msg, reviewMaxNudge) + reviewIDs := reviewIDs(o.Comments) + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationReviewChanges, + Priority: domain.NotificationPriorityAction, + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle.pr_observation", + DedupeKey: "review:" + prURL + ":" + sig, + OccurredAt: occurredAt, + Context: domain.NotificationIntentContext{ + PRURL: prURL, + ReviewIDs: reviewIDs, + ThreadIDs: append([]string(nil), o.ThreadIDs...), + Reason: "review_feedback", + Facts: map[string]any{ + "commentCount": len(reviewIDs), + "review": o.Review, + }, + }, + }) + if suppressAgentNudge { + return nil + } + msg := "A reviewer left feedback on your PR. Address it and push." + if comments != "" { + msg += "\n\n" + comments + } + return m.sendOnce(ctx, id, prURL, "review:"+prURL, sig, msg, reviewMaxNudge) } if o.Mergeability == domain.MergeConflicting { - return m.sendOnce(ctx, id, o.URL, "merge-conflict:"+o.URL, string(o.Mergeability), "Your PR has merge conflicts. Rebase onto the base branch and resolve them.", 0) + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationMergeConflicts, + Priority: domain.NotificationPriorityAction, + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle.pr_observation", + DedupeKey: mergeConflictDedupeKey(prURL, o.BaseSHA, o.HeadSHA), + OccurredAt: occurredAt, + Context: domain.NotificationIntentContext{ + PRURL: prURL, + CommitHash: o.HeadSHA, + MergeState: string(o.Mergeability), + Reason: "merge_conflicts", + Facts: map[string]any{"baseSha": o.BaseSHA, "headSha": o.HeadSHA}, + }, + }) + if suppressAgentNudge { + return nil + } + return m.sendOnce(ctx, id, prURL, "merge-conflict:"+prURL, string(o.Mergeability), "Your PR has merge conflicts. Rebase onto the base branch and resolve them.", 0) + } + if prReadyToMerge(o) { + m.notifyBestEffort(ctx, domain.NotificationIntent{ + Type: domain.NotificationMergeReady, + Priority: domain.NotificationPriorityAction, + ProjectID: rec.ProjectID, + SessionID: rec.ID, + Source: "lifecycle.pr_observation", + DedupeKey: mergeReadyDedupeKey(prURL, o.HeadSHA), + OccurredAt: occurredAt, + Context: domain.NotificationIntentContext{ + PRURL: prURL, + CommitHash: o.HeadSHA, + MergeState: string(o.Mergeability), + Reason: "merge_ready", + Facts: map[string]any{ + "ci": o.CI, + "review": o.Review, + "mergeability": o.Mergeability, + }, + }, + }) + return nil } return nil } @@ -97,15 +215,20 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, func scmToPRObservation(o ports.SCMObservation) ports.PRObservation { pr := ports.PRObservation{ - Fetched: o.Fetched, - URL: firstSCMNonEmpty(o.PR.URL, o.PR.HTMLURL), - Number: o.PR.Number, - Draft: o.PR.Draft, - Merged: o.PR.Merged, - Closed: o.PR.Closed, - CI: domain.CIState(o.CI.Summary), - Review: domain.ReviewDecision(o.Review.Decision), - Mergeability: domain.Mergeability(o.Mergeability.State), + Fetched: o.Fetched, + URL: firstSCMNonEmpty(o.PR.URL, o.PR.HTMLURL), + Number: o.PR.Number, + Draft: o.PR.Draft, + Merged: o.PR.Merged, + Closed: o.PR.Closed, + CI: domain.CIState(o.CI.Summary), + Review: domain.ReviewDecision(o.Review.Decision), + Mergeability: domain.Mergeability(o.Mergeability.State), + ObservedAt: o.ObservedAt, + HeadSHA: firstSCMNonEmpty(o.CI.HeadSHA, o.PR.HeadSHA), + BaseSHA: o.PR.BaseSHA, + MergeCommitSHA: o.PR.MergeCommitSHA, + HTMLURL: o.PR.HTMLURL, } if pr.CI == "" { pr.CI = domain.CIUnknown @@ -134,14 +257,22 @@ func scmToPRObservation(o ports.SCMObservation) ports.PRObservation { LogTail: logTail, }) } + var reviewSigParts []string for _, th := range o.Review.Threads { if th.Resolved || th.IsBot { continue } + if th.ID != "" { + pr.ThreadIDs = append(pr.ThreadIDs, th.ID) + reviewSigParts = append(reviewSigParts, th.ID) + } for _, c := range th.Comments { if c.IsBot { continue } + if c.ID != "" { + reviewSigParts = append(reviewSigParts, c.ID) + } pr.Comments = append(pr.Comments, ports.PRCommentObservation{ ID: c.ID, Author: c.Author, @@ -152,6 +283,7 @@ func scmToPRObservation(o ports.SCMObservation) ports.PRObservation { }) } } + pr.ReviewHash = strings.Join(sortedStrings(reviewSigParts), ",") return pr } @@ -257,7 +389,72 @@ func reviewContent(comments []ports.PRCommentObservation) (string, string) { bodies = append(bodies, c.Body) ids = append(ids, c.ID) } - return strings.Join(bodies, "\n\n"), strings.Join(ids, ",") + return strings.Join(bodies, "\n\n"), strings.Join(sortedStrings(ids), ",") +} + +func ciCheckNeedsAttention(status domain.PRCheckStatus) bool { + return status == domain.PRCheckFailed || status == domain.PRCheckCancelled +} + +func reviewIDs(comments []ports.PRCommentObservation) []string { + ids := make([]string, 0, len(comments)) + for _, c := range comments { + if c.Resolved || c.ID == "" { + continue + } + ids = append(ids, c.ID) + } + return ids +} + +func prReadyToMerge(o ports.PRObservation) bool { + return !o.Draft && !o.Merged && !o.Closed && + o.CI == domain.CIPassing && + o.Review == domain.ReviewApproved && + o.Mergeability == domain.MergeMergeable +} + +func mergeConflictDedupeKey(prURL, baseSHA, headSHA string) string { + if baseSHA != "" && headSHA != "" { + return "merge-conflict:" + prURL + ":" + baseSHA + ":" + headSHA + } + return "merge-conflict:" + prURL +} + +func mergeReadyDedupeKey(prURL, headSHA string) string { + if headSHA != "" { + return "merge-ready:" + prURL + ":" + headSHA + } + return "merge-ready:" + prURL +} + +func mergeCompletedDedupeKey(prURL, mergeCommitSHA string) string { + if mergeCommitSHA != "" { + return "merge-completed:" + prURL + ":" + mergeCommitSHA + } + return "merge-completed:" + prURL +} + +func firstNonEmptyString(values ...string) string { + for _, v := range values { + if strings.TrimSpace(v) != "" { + return v + } + } + return "" +} + +func boundedString(s string, limit int) string { + if limit <= 0 || len(s) <= limit { + return s + } + return s[:limit] +} + +func sortedStrings(in []string) []string { + out := append([]string(nil), in...) + sort.Strings(out) + return out } func (m *Manager) sendOnce(ctx context.Context, id domain.SessionID, prURL, key, sig, msg string, maxAttempts int) error { diff --git a/backend/internal/ports/pr_observations.go b/backend/internal/ports/pr_observations.go index eb29615..22a0fca 100644 --- a/backend/internal/ports/pr_observations.go +++ b/backend/internal/ports/pr_observations.go @@ -2,6 +2,7 @@ package ports import ( "context" + "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" ) @@ -21,17 +22,24 @@ type PRObserver interface { // read it as "PR closed". Checks/Comments are observation DTOs, not persistence // rows; the PR Manager owns mapping them into stored domain.PullRequest rows. type PRObservation struct { - Fetched bool - URL string - Number int - Draft bool - Merged bool - Closed bool - CI domain.CIState - Review domain.ReviewDecision - Mergeability domain.Mergeability - Checks []PRCheckObservation - Comments []PRCommentObservation + Fetched bool + URL string + Number int + Draft bool + Merged bool + Closed bool + CI domain.CIState + Review domain.ReviewDecision + Mergeability domain.Mergeability + ObservedAt time.Time + HeadSHA string + BaseSHA string + MergeCommitSHA string + HTMLURL string + ReviewHash string + ThreadIDs []string + Checks []PRCheckObservation + Comments []PRCommentObservation } // PRCheckObservation is one SCM check result on the observed PR. diff --git a/backend/internal/service/notification/actions.go b/backend/internal/service/notification/actions.go new file mode 100644 index 0000000..b86f4f6 --- /dev/null +++ b/backend/internal/service/notification/actions.go @@ -0,0 +1,88 @@ +package notification + +import ( + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func buildActions(intent domain.NotificationIntent, facts EnrichedFacts) []domain.NotificationAction { + openSession := domain.NotificationAction{ + ID: "open_session", + Label: "Open session", + Kind: "route", + Route: "session", + Payload: map[string]any{"projectId": intent.ProjectID, "sessionId": intent.SessionID}, + } + viewPR := func(primary bool) (domain.NotificationAction, bool) { + url := prURL(facts, intent) + if url == "" { + return domain.NotificationAction{}, false + } + return domain.NotificationAction{ID: "view_pr", Label: "View PR", Kind: "link", URL: url, Primary: primary}, true + } + viewCI := func() domain.NotificationAction { + url := intent.Context.CheckURL + if url == "" && len(facts.FailedChecks) > 0 { + url = facts.FailedChecks[0].URL + } + if url == "" { + return domain.NotificationAction{ID: "view_ci", Label: "View CI", Kind: "callback", Payload: map[string]any{"checkName": intent.Context.CheckName, "prUrl": prURL(facts, intent)}} + } + return domain.NotificationAction{ID: "view_ci", Label: "View CI", Kind: "link", URL: url} + } + viewReview := func() domain.NotificationAction { + for _, c := range facts.Comments { + if c.URL != "" { + return domain.NotificationAction{ID: "view_review", Label: "View review", Kind: "link", URL: c.URL} + } + } + url := prURL(facts, intent) + if url == "" { + return domain.NotificationAction{ID: "view_review", Label: "View review", Kind: "callback", Payload: map[string]any{"reviewIds": intent.Context.ReviewIDs, "threadIds": intent.Context.ThreadIDs}} + } + return domain.NotificationAction{ID: "view_review", Label: "View review", Kind: "link", URL: url} + } + + var actions []domain.NotificationAction + add := func(a domain.NotificationAction, ok bool) { + if ok { + actions = append(actions, a) + } + } + switch intent.Type { + case domain.NotificationCIFailing: + openSession.Primary = true + actions = append(actions, openSession, viewCI()) + add(viewPR(false)) + case domain.NotificationReviewChanges: + openSession.Primary = true + actions = append(actions, openSession, viewReview()) + add(viewPR(false)) + case domain.NotificationMergeConflicts: + openSession.Primary = true + actions = append(actions, openSession) + add(viewPR(false)) + case domain.NotificationMergeReady, domain.NotificationMergeCompleted: + add(viewPR(true)) + openSession.Primary = len(actions) == 0 + actions = append(actions, openSession) + case domain.NotificationSessionInput, domain.NotificationSessionExited: + openSession.Primary = true + actions = append(actions, openSession) + } + return actions +} + +func prURL(facts EnrichedFacts, intent domain.NotificationIntent) string { + if facts.PR != nil { + if facts.PR.HTMLURL != "" { + return facts.PR.HTMLURL + } + if facts.PR.URL != "" { + return facts.PR.URL + } + } + if facts.PRURL != "" { + return facts.PRURL + } + return intent.Context.PRURL +} diff --git a/backend/internal/service/notification/dedupe.go b/backend/internal/service/notification/dedupe.go new file mode 100644 index 0000000..577c536 --- /dev/null +++ b/backend/internal/service/notification/dedupe.go @@ -0,0 +1,72 @@ +package notification + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "sort" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func fingerprint(intent domain.NotificationIntent, facts EnrichedFacts, actions []domain.NotificationAction, content domain.NotificationContent) (string, error) { + type actionFingerprint struct { + ID string `json:"id"` + URL string `json:"url,omitempty"` + Route string `json:"route,omitempty"` + Primary bool `json:"primary,omitempty"` + } + actionFP := make([]actionFingerprint, 0, len(actions)) + for _, a := range actions { + actionFP = append(actionFP, actionFingerprint{ID: a.ID, URL: a.URL, Route: a.Route, Primary: a.Primary}) + } + sort.Slice(actionFP, func(i, j int) bool { return actionFP[i].ID < actionFP[j].ID }) + + failed := make([]map[string]string, 0, len(facts.FailedChecks)) + for _, c := range facts.FailedChecks { + failed = append(failed, map[string]string{"name": c.Name, "commit": c.CommitHash, "status": string(c.Status), "url": c.URL}) + } + sort.Slice(failed, func(i, j int) bool { + if failed[i]["name"] == failed[j]["name"] { + return failed[i]["commit"] < failed[j]["commit"] + } + return failed[i]["name"] < failed[j]["name"] + }) + + var pr map[string]any + if facts.PR != nil { + pr = map[string]any{ + "url": facts.PR.URL, + "headSha": facts.PR.HeadSHA, + "baseSha": facts.PR.BaseSHA, + "mergeCommitSha": facts.PR.MergeCommitSHA, + "ci": facts.PR.CI, + "review": facts.PR.Review, + "mergeability": facts.PR.Mergeability, + } + } else if facts.PRURL != "" { + pr = map[string]any{"url": facts.PRURL} + } + input := map[string]any{ + "type": intent.Type, + "priority": intent.Priority, + "title": content.Title, + "summary": content.Summary, + "actions": actionFP, + "pr": pr, + "checkName": intent.Context.CheckName, + "checkURL": intent.Context.CheckURL, + "commitHash": intent.Context.CommitHash, + "reviewIDs": sortedCopy(intent.Context.ReviewIDs), + "threadIDs": sortedCopy(intent.Context.ThreadIDs), + "mergeState": intent.Context.MergeState, + "reason": intent.Context.Reason, + "failed": failed, + } + b, err := json.Marshal(input) + if err != nil { + return "", err + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]), nil +} diff --git a/backend/internal/service/notification/enrich.go b/backend/internal/service/notification/enrich.go new file mode 100644 index 0000000..c36bc75 --- /dev/null +++ b/backend/internal/service/notification/enrich.go @@ -0,0 +1,300 @@ +package notification + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// EnrichedFacts are local durable facts used by actions, copy, data, and +// fingerprinting. They intentionally contain no network-fetched data. +type EnrichedFacts struct { + Session domain.SessionRecord + Project domain.ProjectRecord + + PR *domain.PullRequest + PRURL string + Checks []domain.PullRequestCheck + FailedChecks []domain.PullRequestCheck + Comments []domain.PullRequestComment + ReviewThreads []domain.PullRequestReviewThread + + SessionLabel string + ProjectLabel string +} + +func (s *Service) enrich(ctx context.Context, intent domain.NotificationIntent) (EnrichedFacts, error) { + session, ok, err := s.store.GetSession(ctx, intent.SessionID) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: get session %s: %w", intent.SessionID, err) + } + if !ok { + return EnrichedFacts{}, fmt.Errorf("notification: unknown session %s", intent.SessionID) + } + if session.ProjectID != intent.ProjectID { + return EnrichedFacts{}, fmt.Errorf("notification: session %s belongs to project %s, not %s", intent.SessionID, session.ProjectID, intent.ProjectID) + } + project, ok, err := s.store.GetProject(ctx, string(intent.ProjectID)) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: get project %s: %w", intent.ProjectID, err) + } + if !ok { + return EnrichedFacts{}, fmt.Errorf("notification: unknown project %s", intent.ProjectID) + } + + facts := EnrichedFacts{ + Session: session, + Project: project, + SessionLabel: sessionLabel(session), + ProjectLabel: projectLabel(project), + } + facts.PRURL = strings.TrimSpace(intent.Context.PRURL) + + prs, err := s.store.ListPRsBySession(ctx, intent.SessionID) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: list PRs for %s: %w", intent.SessionID, err) + } + facts.PR = choosePR(prs, facts.PRURL) + if facts.PR != nil { + facts.PRURL = facts.PR.URL + } else if facts.PRURL != "" { + facts.PR = &domain.PullRequest{URL: facts.PRURL, HTMLURL: facts.PRURL, SessionID: intent.SessionID} + } + + if facts.PRURL != "" { + checks, err := s.store.ListChecks(ctx, facts.PRURL) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: list checks for %s: %w", facts.PRURL, err) + } + facts.Checks = checks + facts.FailedChecks = failedChecks(checks, intent) + if len(facts.FailedChecks) == 0 && intent.Context.CheckName != "" { + facts.FailedChecks = []domain.PullRequestCheck{{ + Name: intent.Context.CheckName, + CommitHash: intent.Context.CommitHash, + Status: domain.PRCheckFailed, + URL: intent.Context.CheckURL, + CreatedAt: intent.OccurredAt, + }} + } + comments, err := s.store.ListPRComments(ctx, facts.PRURL) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: list PR comments for %s: %w", facts.PRURL, err) + } + facts.Comments = comments + threads, err := s.store.ListPRReviewThreads(ctx, facts.PRURL) + if err != nil { + return EnrichedFacts{}, fmt.Errorf("notification: list PR review threads for %s: %w", facts.PRURL, err) + } + facts.ReviewThreads = threads + } + return facts, nil +} + +func choosePR(prs []domain.PullRequest, wantURL string) *domain.PullRequest { + if len(prs) == 0 { + return nil + } + if wantURL != "" { + for i := range prs { + if prs[i].URL == wantURL || prs[i].HTMLURL == wantURL { + pr := prs[i] + return &pr + } + } + return nil + } + pr := prs[0] + return &pr +} + +func failedChecks(checks []domain.PullRequestCheck, intent domain.NotificationIntent) []domain.PullRequestCheck { + out := make([]domain.PullRequestCheck, 0, len(checks)) + for _, c := range checks { + if c.Status != domain.PRCheckFailed && c.Status != domain.PRCheckCancelled { + continue + } + if intent.Context.CheckName != "" && c.Name != intent.Context.CheckName { + continue + } + if intent.Context.CommitHash != "" && c.CommitHash != intent.Context.CommitHash { + continue + } + out = append(out, c) + } + return out +} + +func sessionLabel(s domain.SessionRecord) string { + if strings.TrimSpace(s.DisplayName) != "" { + return strings.TrimSpace(s.DisplayName) + } + if s.IssueID != "" { + return fmt.Sprintf("%s (%s)", s.ID, s.IssueID) + } + return string(s.ID) +} + +func projectLabel(p domain.ProjectRecord) string { + if strings.TrimSpace(p.DisplayName) != "" { + return strings.TrimSpace(p.DisplayName) + } + if p.ID != "" { + return p.ID + } + return p.Path +} + +func subjectForFacts(f EnrichedFacts) domain.NotificationSubject { + s := domain.NotificationSubject{ + Kind: "session", + Label: f.SessionLabel, + ProjectID: f.Session.ProjectID, + SessionID: f.Session.ID, + ProjectName: f.ProjectLabel, + } + if f.PR != nil { + s.Kind = "pull_request" + s.PRURL = f.PR.URL + s.PRNumber = f.PR.Number + s.PRTitle = f.PR.Title + if f.PR.Title != "" { + s.Label = f.PR.Title + } + } else if f.PRURL != "" { + s.Kind = "pull_request" + s.PRURL = f.PRURL + } + return s +} + +func dataForIntent(intent domain.NotificationIntent, f EnrichedFacts) map[string]any { + data := map[string]any{ + "intent": map[string]any{ + "type": intent.Type, + "priority": intent.Priority, + "source": intent.Source, + "dedupeKey": intent.DedupeKey, + "occurredAt": intent.OccurredAt.UTC().Format(time.RFC3339Nano), + "context": intent.Context, + }, + "project": map[string]any{ + "id": f.Project.ID, + "displayName": f.Project.DisplayName, + "path": f.Project.Path, + "repoOriginUrl": f.Project.RepoOriginURL, + }, + "session": map[string]any{ + "id": f.Session.ID, + "displayName": f.Session.DisplayName, + "label": f.SessionLabel, + "kind": f.Session.Kind, + "issueId": f.Session.IssueID, + "activityState": f.Session.Activity.State, + "activityLastAt": f.Session.Activity.LastActivityAt.UTC().Format(time.RFC3339Nano), + "isTerminated": f.Session.IsTerminated, + }, + } + if f.PR != nil || f.PRURL != "" { + pr := f.PR + if pr == nil { + pr = &domain.PullRequest{URL: f.PRURL} + } + data["pr"] = map[string]any{ + "url": pr.URL, + "htmlUrl": firstNonEmpty(pr.HTMLURL, pr.URL), + "number": pr.Number, + "title": pr.Title, + "headSha": pr.HeadSHA, + "baseSha": pr.BaseSHA, + "mergeCommitSha": pr.MergeCommitSHA, + "ci": pr.CI, + "review": pr.Review, + "mergeability": pr.Mergeability, + } + } + if len(f.Checks) > 0 || intent.Context.CheckName != "" { + data["ci"] = map[string]any{ + "checkName": intent.Context.CheckName, + "checkUrl": intent.Context.CheckURL, + "commitHash": intent.Context.CommitHash, + "failedCount": len(f.FailedChecks), + "failedChecks": checkData(f.FailedChecks), + } + } + if len(f.Comments) > 0 || len(f.ReviewThreads) > 0 || len(intent.Context.ReviewIDs) > 0 || len(intent.Context.ThreadIDs) > 0 { + data["review"] = map[string]any{ + "reviewIds": sortedCopy(intent.Context.ReviewIDs), + "threadIds": sortedCopy(intent.Context.ThreadIDs), + "commentCount": len(f.Comments), + "unresolvedCommentCount": unresolvedCommentCount(f.Comments), + "threadCount": len(f.ReviewThreads), + "unresolvedThreadCount": unresolvedThreadCount(f.ReviewThreads), + } + } + if intent.Context.MergeState != "" { + data["merge"] = map[string]any{"state": intent.Context.MergeState} + } + return data +} + +func checkData(checks []domain.PullRequestCheck) []map[string]any { + out := make([]map[string]any, 0, len(checks)) + for _, c := range checks { + out = append(out, map[string]any{ + "name": c.Name, + "commitHash": c.CommitHash, + "status": c.Status, + "url": c.URL, + "details": c.Details, + "logTail": bounded(c.LogTail, 4000), + }) + } + return out +} + +func unresolvedCommentCount(comments []domain.PullRequestComment) int { + var n int + for _, c := range comments { + if !c.Resolved && !c.IsBot { + n++ + } + } + return n +} + +func unresolvedThreadCount(threads []domain.PullRequestReviewThread) int { + var n int + for _, th := range threads { + if !th.Resolved && !th.IsBot { + n++ + } + } + return n +} + +func sortedCopy(in []string) []string { + out := append([]string(nil), in...) + sort.Strings(out) + return out +} + +func firstNonEmpty(vs ...string) string { + for _, v := range vs { + if strings.TrimSpace(v) != "" { + return v + } + } + return "" +} + +func bounded(s string, limit int) string { + if len(s) <= limit { + return s + } + return s[:limit] +} diff --git a/backend/internal/service/notification/maker.go b/backend/internal/service/notification/maker.go new file mode 100644 index 0000000..44c862f --- /dev/null +++ b/backend/internal/service/notification/maker.go @@ -0,0 +1,84 @@ +package notification + +import ( + "context" + "fmt" + "unicode/utf8" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// Maker turns intent plus enriched facts and actions into channel-neutral copy. +type Maker interface { + Make(ctx context.Context, input MakeInput) (domain.NotificationContent, error) +} + +// MakeInput is the central maker input. +type MakeInput struct { + Intent domain.NotificationIntent + Facts EnrichedFacts + Actions []domain.NotificationAction +} + +// DefaultMaker produces concise, canonical fallback copy for all V1 +// notification types. It is intentionally not Slack/email/desktop/dashboard +// formatting. +type DefaultMaker struct{} + +// Make returns concise channel-neutral fallback copy for a notification. +func (DefaultMaker) Make(_ context.Context, input MakeInput) (domain.NotificationContent, error) { + session := input.Facts.SessionLabel + if session == "" { + session = string(input.Intent.SessionID) + } + var title, summary string + switch input.Intent.Type { + case domain.NotificationCIFailing: + n := len(input.Facts.FailedChecks) + if n == 0 && input.Intent.Context.CheckName != "" { + n = 1 + } + title = "CI failed" + if n == 1 { + summary = fmt.Sprintf("%s has 1 failing check.", session) + } else if n > 1 { + summary = fmt.Sprintf("%s has %d failing checks.", session, n) + } else { + summary = fmt.Sprintf("%s has failing CI.", session) + } + case domain.NotificationReviewChanges: + title = "Changes requested" + summary = fmt.Sprintf("Review feedback is waiting on %s.", session) + case domain.NotificationMergeConflicts: + title = "Merge conflicts" + summary = fmt.Sprintf("%s needs a rebase before it can merge.", session) + case domain.NotificationMergeReady: + title = "Ready to merge" + summary = fmt.Sprintf("%s is approved and green.", session) + case domain.NotificationMergeCompleted: + title = "Merged" + summary = fmt.Sprintf("%s was merged.", session) + case domain.NotificationSessionInput: + title = "Input needed" + summary = fmt.Sprintf("%s is waiting for you.", session) + case domain.NotificationSessionExited: + title = "Session exited" + summary = fmt.Sprintf("%s stopped unexpectedly.", session) + default: + return domain.NotificationContent{}, fmt.Errorf("unsupported notification type %q", input.Intent.Type) + } + // Body intentionally mirrors Summary for V1. Richer channel-specific or + // long-form content can be produced later by a custom Maker implementation. + return domain.NotificationContent{Title: truncateRunes(title, 40), Summary: truncateRunes(summary, 120), Body: truncateRunes(summary, 500)}, nil +} + +func truncateRunes(s string, limit int) string { + if limit <= 0 || utf8.RuneCountInString(s) <= limit { + return s + } + runes := []rune(s) + if limit <= 1 { + return string(runes[:limit]) + } + return string(runes[:limit-1]) + "…" +} diff --git a/backend/internal/service/notification/maker_test.go b/backend/internal/service/notification/maker_test.go new file mode 100644 index 0000000..67241fa --- /dev/null +++ b/backend/internal/service/notification/maker_test.go @@ -0,0 +1,56 @@ +package notification + +import ( + "context" + "strings" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func TestDefaultMakerCopyForV1Types(t *testing.T) { + maker := DefaultMaker{} + for _, tc := range []struct { + typ domain.NotificationType + wantT string + wantSub string + }{ + {domain.NotificationCIFailing, "CI failed", "failing check"}, + {domain.NotificationReviewChanges, "Changes requested", "Review feedback"}, + {domain.NotificationMergeConflicts, "Merge conflicts", "rebase"}, + {domain.NotificationMergeReady, "Ready to merge", "approved and green"}, + {domain.NotificationMergeCompleted, "Merged", "was merged"}, + {domain.NotificationSessionInput, "Input needed", "waiting for you"}, + {domain.NotificationSessionExited, "Session exited", "stopped unexpectedly"}, + } { + t.Run(string(tc.typ), func(t *testing.T) { + got, err := maker.Make(context.Background(), MakeInput{ + Intent: domain.NotificationIntent{Type: tc.typ, SessionID: "mer-1"}, + Facts: EnrichedFacts{SessionLabel: "mer-1", FailedChecks: []domain.PullRequestCheck{{Name: "build"}}}, + }) + if err != nil { + t.Fatal(err) + } + if got.Title != tc.wantT || !strings.Contains(got.Summary, tc.wantSub) { + t.Fatalf("content = %+v", got) + } + if len([]rune(got.Title)) > 40 || len([]rune(got.Summary)) > 120 { + t.Fatalf("copy too long: %+v", got) + } + }) + } +} + +func TestDefaultMakerKeepsEvidenceOutOfSummary(t *testing.T) { + longLog := strings.Repeat("boom ", 100) + got, err := (DefaultMaker{}).Make(context.Background(), MakeInput{ + Intent: domain.NotificationIntent{Type: domain.NotificationCIFailing, SessionID: "mer-1", Context: domain.NotificationIntentContext{Facts: map[string]any{"logTail": longLog}}}, + Facts: EnrichedFacts{SessionLabel: "mer-1", FailedChecks: []domain.PullRequestCheck{{Name: "build", LogTail: longLog}}}, + }) + if err != nil { + t.Fatal(err) + } + if strings.Contains(got.Summary, "boom") { + t.Fatalf("summary leaked log evidence: %q", got.Summary) + } +} diff --git a/backend/internal/service/notification/service.go b/backend/internal/service/notification/service.go new file mode 100644 index 0000000..14f72e2 --- /dev/null +++ b/backend/internal/service/notification/service.go @@ -0,0 +1,151 @@ +package notification + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "log/slog" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// Service validates notification intent, enriches it from local durable facts, +// builds semantic actions and canonical copy, and persists a deduped +// notification row. +type Service struct { + store Store + maker Maker + clock func() time.Time + logger *slog.Logger +} + +// Deps are the collaborators needed by Service. +type Deps struct { + Store Store + Maker Maker + Clock func() time.Time + Logger *slog.Logger +} + +// New builds a notification service with sensible defaults. +func New(deps Deps) *Service { + s := &Service{store: deps.Store, maker: deps.Maker, clock: deps.Clock, logger: deps.Logger} + if s.maker == nil { + s.maker = DefaultMaker{} + } + if s.clock == nil { + s.clock = time.Now + } + if s.logger == nil { + s.logger = slog.Default() + } + return s +} + +// Notify persists or updates the logical notification represented by intent. +// The bool returned by Store.UpsertNotification is deliberately swallowed here: +// callers care that the intent has been durably handled, not whether it inserted, +// updated, or no-op deduped. +func (s *Service) Notify(ctx context.Context, intent domain.NotificationIntent) error { + if s.store == nil { + return fmt.Errorf("notification: nil store") + } + if intent.OccurredAt.IsZero() { + intent.OccurredAt = s.clock().UTC() + } + if err := intent.Validate(); err != nil { + return err + } + + facts, err := s.enrich(ctx, intent) + if err != nil { + return err + } + actions := buildActions(intent, facts) + content, err := s.maker.Make(ctx, MakeInput{Intent: intent, Facts: facts, Actions: actions}) + if err != nil { + return fmt.Errorf("notification: make content: %w", err) + } + if content.Title == "" || content.Summary == "" { + return fmt.Errorf("notification: maker returned empty title or summary") + } + fp, err := fingerprint(intent, facts, actions, content) + if err != nil { + return fmt.Errorf("notification: fingerprint: %w", err) + } + + now := s.clock().UTC() + n := domain.Notification{ + ID: newNotificationID(), + Type: intent.Type, + Priority: intent.Priority, + Status: domain.NotificationUnread, + ProjectID: intent.ProjectID, + SessionID: intent.SessionID, + Source: intent.Source, + DedupeKey: intent.DedupeKey, + Fingerprint: fp, + Title: content.Title, + Summary: content.Summary, + Body: content.Body, + Subject: subjectForFacts(facts), + Data: dataForIntent(intent, facts), + Actions: actions, + OccurredAt: intent.OccurredAt.UTC(), + CreatedAt: now, + UpdatedAt: now, + }.Normalize() + if err := n.Validate(); err != nil { + return err + } + stored, _, err := s.store.UpsertNotification(ctx, n) + if err != nil { + return fmt.Errorf("notification: persist %s: %w", intent.DedupeKey, err) + } + if intent.Type == domain.NotificationMergeCompleted { + if err := s.resolveSupersededPRNotifications(ctx, stored, facts, now); err != nil { + return err + } + } + return nil +} + +func (s *Service) resolveSupersededPRNotifications(ctx context.Context, n domain.Notification, facts EnrichedFacts, resolvedAt time.Time) error { + prURL := facts.PRURL + if prURL == "" { + prURL = n.Subject.PRURL + } + if prURL == "" { + return nil + } + sessionID := n.SessionID + count, err := s.store.ResolveNotifications(ctx, domain.NotificationResolveFilter{ + ProjectID: n.ProjectID, + SessionID: &sessionID, + PRURL: prURL, + Types: []domain.NotificationType{ + domain.NotificationCIFailing, + domain.NotificationReviewChanges, + domain.NotificationMergeConflicts, + domain.NotificationMergeReady, + }, + Statuses: []domain.NotificationStatus{domain.NotificationUnread, domain.NotificationRead}, + }, resolvedAt) + if err != nil { + return fmt.Errorf("notification: resolve superseded PR notifications: %w", err) + } + if count > 0 { + s.logger.Debug("resolved superseded notification rows", "count", count, "pr", prURL) + } + return nil +} + +func newNotificationID() domain.NotificationID { + var b [16]byte + if _, err := rand.Read(b[:]); err != nil { + return domain.NotificationID(fmt.Sprintf("ntf_%d", time.Now().UnixNano())) + } + return domain.NotificationID("ntf_" + hex.EncodeToString(b[:])) +} diff --git a/backend/internal/service/notification/service_test.go b/backend/internal/service/notification/service_test.go new file mode 100644 index 0000000..5ade823 --- /dev/null +++ b/backend/internal/service/notification/service_test.go @@ -0,0 +1,214 @@ +package notification + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +type fakeStore struct { + sessions map[domain.SessionID]domain.SessionRecord + projects map[string]domain.ProjectRecord + prs map[domain.SessionID][]domain.PullRequest + checks map[string][]domain.PullRequestCheck + comments map[string][]domain.PullRequestComment + threads map[string][]domain.PullRequestReviewThread + + checkLookups []string + commentLookups []string + threadLookups []string + + notifications map[string]domain.Notification + upsertChanged int + resolveCount int + upsertErr error +} + +func newFakeStore() *fakeStore { + return &fakeStore{ + sessions: map[domain.SessionID]domain.SessionRecord{}, + projects: map[string]domain.ProjectRecord{}, + prs: map[domain.SessionID][]domain.PullRequest{}, + checks: map[string][]domain.PullRequestCheck{}, + comments: map[string][]domain.PullRequestComment{}, + threads: map[string][]domain.PullRequestReviewThread{}, + notifications: map[string]domain.Notification{}, + } +} + +func (f *fakeStore) UpsertNotification(_ context.Context, n domain.Notification) (domain.Notification, bool, error) { + if f.upsertErr != nil { + return domain.Notification{}, false, f.upsertErr + } + key := string(n.ProjectID) + "\x00" + n.DedupeKey + if cur, ok := f.notifications[key]; ok { + if cur.Fingerprint == n.Fingerprint { + return cur, false, nil + } + n.ID = cur.ID + n.CreatedAt = cur.CreatedAt + f.notifications[key] = n + f.upsertChanged++ + return n, true, nil + } + f.notifications[key] = n + f.upsertChanged++ + return n, true, nil +} +func (f *fakeStore) ResolveNotifications(_ context.Context, filter domain.NotificationResolveFilter, _ time.Time) (int, error) { + f.resolveCount++ + return 1, nil +} +func (f *fakeStore) GetSession(_ context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) { + r, ok := f.sessions[id] + return r, ok, nil +} +func (f *fakeStore) GetProject(_ context.Context, id string) (domain.ProjectRecord, bool, error) { + r, ok := f.projects[id] + return r, ok, nil +} +func (f *fakeStore) ListPRsBySession(_ context.Context, id domain.SessionID) ([]domain.PullRequest, error) { + return f.prs[id], nil +} +func (f *fakeStore) ListChecks(_ context.Context, prURL string) ([]domain.PullRequestCheck, error) { + f.checkLookups = append(f.checkLookups, prURL) + return f.checks[prURL], nil +} +func (f *fakeStore) ListPRComments(_ context.Context, prURL string) ([]domain.PullRequestComment, error) { + f.commentLookups = append(f.commentLookups, prURL) + return f.comments[prURL], nil +} +func (f *fakeStore) ListPRReviewThreads(_ context.Context, prURL string) ([]domain.PullRequestReviewThread, error) { + f.threadLookups = append(f.threadLookups, prURL) + return f.threads[prURL], nil +} + +func seededServiceStore(now time.Time) *fakeStore { + st := newFakeStore() + st.projects["mer"] = domain.ProjectRecord{ID: "mer", Path: "/repo/mer", DisplayName: "Mer"} + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", DisplayName: "Fix CI", Activity: domain.Activity{State: domain.ActivityActive, LastActivityAt: now}} + st.prs["mer-1"] = []domain.PullRequest{{URL: "https://github.com/o/r/pull/1", HTMLURL: "https://github.com/o/r/pull/1", SessionID: "mer-1", Number: 1, Title: "PR title", HeadSHA: "c1"}} + st.checks["https://github.com/o/r/pull/1"] = []domain.PullRequestCheck{{Name: "build", CommitHash: "c1", Status: domain.PRCheckFailed, URL: "https://ci/build", LogTail: "boom", CreatedAt: now}} + st.comments["https://github.com/o/r/pull/1"] = []domain.PullRequestComment{{ID: "c1", ThreadID: "t1", Body: "fix", CreatedAt: now}} + st.threads["https://github.com/o/r/pull/1"] = []domain.PullRequestReviewThread{{ThreadID: "t1", UpdatedAt: now}} + return st +} + +func TestNotifyEnrichesMakesActionsAndDedupes(t *testing.T) { + now := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + st := seededServiceStore(now) + svc := New(Deps{Store: st, Clock: func() time.Time { return now }}) + intent := domain.NotificationIntent{Type: domain.NotificationCIFailing, Priority: domain.NotificationWarning, ProjectID: "mer", SessionID: "mer-1", Source: "test", DedupeKey: "ci:pr:build:c1", OccurredAt: now, Context: domain.NotificationIntentContext{PRURL: "https://github.com/o/r/pull/1", CheckName: "build", CheckURL: "https://ci/build", CommitHash: "c1"}} + if err := svc.Notify(context.Background(), intent); err != nil { + t.Fatal(err) + } + if st.upsertChanged != 1 { + t.Fatalf("writes = %d, want 1", st.upsertChanged) + } + var got domain.Notification + for _, n := range st.notifications { + got = n + } + if got.Title != "CI failed" || got.Subject.PRURL == "" || len(got.Actions) != 3 || !got.Actions[0].Primary { + t.Fatalf("notification not enriched: %+v", got) + } + if err := svc.Notify(context.Background(), intent); err != nil { + t.Fatal(err) + } + if st.upsertChanged != 1 { + t.Fatalf("same fingerprint should no-op, writes = %d", st.upsertChanged) + } + intent.Context.CheckURL = "https://ci/build/2" + if err := svc.Notify(context.Background(), intent); err != nil { + t.Fatal(err) + } + if st.upsertChanged != 2 { + t.Fatalf("changed fingerprint should update, writes = %d", st.upsertChanged) + } +} + +func TestNotifyMissingOptionalFactsStillProducesFallback(t *testing.T) { + now := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + st := newFakeStore() + st.projects["mer"] = domain.ProjectRecord{ID: "mer"} + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer"} + svc := New(Deps{Store: st, Clock: func() time.Time { return now }}) + err := svc.Notify(context.Background(), domain.NotificationIntent{Type: domain.NotificationSessionInput, Priority: domain.NotificationUrgent, ProjectID: "mer", SessionID: "mer-1", Source: "test", DedupeKey: "session-input:mer-1:t", OccurredAt: now}) + if err != nil { + t.Fatal(err) + } + for _, n := range st.notifications { + if n.Summary == "" || n.Actions[0].ID != "open_session" { + t.Fatalf("fallback notification = %+v", n) + } + } +} + +func TestNotifyPreservesRequestedPRURLWhenStoredPRDoesNotMatch(t *testing.T) { + now := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + st := newFakeStore() + st.projects["mer"] = domain.ProjectRecord{ID: "mer"} + st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer"} + newerPRURL := "https://github.com/o/r/pull/2" + requestedPRURL := "https://github.com/o/r/pull/1" + st.prs["mer-1"] = []domain.PullRequest{{URL: newerPRURL, HTMLURL: newerPRURL, SessionID: "mer-1", Number: 2, Title: "newer"}} + st.checks[newerPRURL] = []domain.PullRequestCheck{{Name: "build", CommitHash: "c2", Status: domain.PRCheckFailed, URL: "https://ci/newer"}} + svc := New(Deps{Store: st, Clock: func() time.Time { return now }}) + + err := svc.Notify(context.Background(), domain.NotificationIntent{ + Type: domain.NotificationCIFailing, + Priority: domain.NotificationWarning, + ProjectID: "mer", + SessionID: "mer-1", + Source: "test", + DedupeKey: "ci:older:build:c1", + Context: domain.NotificationIntentContext{ + PRURL: requestedPRURL, + CheckName: "build", + CheckURL: "https://ci/older", + CommitHash: "c1", + }, + }) + if err != nil { + t.Fatal(err) + } + if len(st.checkLookups) != 1 || st.checkLookups[0] != requestedPRURL { + t.Fatalf("checks looked up for %v, want only requested PR %q", st.checkLookups, requestedPRURL) + } + for _, n := range st.notifications { + if n.Subject.PRURL != requestedPRURL { + t.Fatalf("notification subject PR URL = %q, want %q (notification=%+v)", n.Subject.PRURL, requestedPRURL, n) + } + } +} + +func TestNotifyRequiredFactAndStoreErrors(t *testing.T) { + now := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + st := seededServiceStore(now) + svc := New(Deps{Store: st, Clock: func() time.Time { return now }}) + base := domain.NotificationIntent{Type: domain.NotificationSessionInput, Priority: domain.NotificationUrgent, ProjectID: "mer", SessionID: "missing", Source: "test", DedupeKey: "k", OccurredAt: now} + if err := svc.Notify(context.Background(), base); err == nil { + t.Fatal("unknown session should error") + } + base.SessionID = "mer-1" + st.upsertErr = errors.New("disk full") + if err := svc.Notify(context.Background(), base); err == nil { + t.Fatal("store failure should error") + } +} + +func TestMergeCompletedResolvesSupersededPRNotifications(t *testing.T) { + now := time.Date(2026, 6, 6, 12, 0, 0, 0, time.UTC) + st := seededServiceStore(now) + svc := New(Deps{Store: st, Clock: func() time.Time { return now }}) + err := svc.Notify(context.Background(), domain.NotificationIntent{Type: domain.NotificationMergeCompleted, Priority: domain.NotificationInfo, ProjectID: "mer", SessionID: "mer-1", Source: "test", DedupeKey: "merge-completed:pr:m1", OccurredAt: now, Context: domain.NotificationIntentContext{PRURL: "https://github.com/o/r/pull/1"}}) + if err != nil { + t.Fatal(err) + } + if st.resolveCount != 1 { + t.Fatalf("resolve count = %d, want 1", st.resolveCount) + } +} diff --git a/backend/internal/service/notification/store.go b/backend/internal/service/notification/store.go new file mode 100644 index 0000000..ad74c2f --- /dev/null +++ b/backend/internal/service/notification/store.go @@ -0,0 +1,23 @@ +package notification + +import ( + "context" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// Store is the local durable-facts boundary used by the notification service. +// Implementations must not perform network calls and must rely on SQLite +// triggers for notification CDC. +type Store interface { + UpsertNotification(ctx context.Context, n domain.Notification) (domain.Notification, bool, error) + ResolveNotifications(ctx context.Context, filter domain.NotificationResolveFilter, resolvedAt time.Time) (int, error) + + GetSession(ctx context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) + GetProject(ctx context.Context, id string) (domain.ProjectRecord, bool, error) + ListPRsBySession(ctx context.Context, id domain.SessionID) ([]domain.PullRequest, error) + ListChecks(ctx context.Context, prURL string) ([]domain.PullRequestCheck, error) + ListPRComments(ctx context.Context, prURL string) ([]domain.PullRequestComment, error) + ListPRReviewThreads(ctx context.Context, prURL string) ([]domain.PullRequestReviewThread, error) +} diff --git a/backend/internal/service/pr/manager.go b/backend/internal/service/pr/manager.go index 86696ca..b3dacd9 100644 --- a/backend/internal/service/pr/manager.go +++ b/backend/internal/service/pr/manager.go @@ -54,7 +54,7 @@ func (m *Manager) ApplyObservation(ctx context.Context, id domain.SessionID, o p func (m *Manager) write(ctx context.Context, id domain.SessionID, o ports.PRObservation) error { now := m.clock() - row := domain.PullRequest{URL: o.URL, SessionID: id, Number: o.Number, Draft: o.Draft, Merged: o.Merged, Closed: o.Closed, CI: o.CI, Review: o.Review, Mergeability: o.Mergeability, UpdatedAt: now} + row := domain.PullRequest{URL: o.URL, SessionID: id, Number: o.Number, Draft: o.Draft, Merged: o.Merged, Closed: o.Closed, CI: o.CI, Review: o.Review, Mergeability: o.Mergeability, UpdatedAt: now, HeadSHA: o.HeadSHA, BaseSHA: o.BaseSHA, MergeCommitSHA: o.MergeCommitSHA, HTMLURL: o.HTMLURL, ReviewHash: o.ReviewHash} checks := make([]domain.PullRequestCheck, len(o.Checks)) for i, c := range o.Checks { checks[i] = domain.PullRequestCheck{Name: c.Name, CommitHash: c.CommitHash, Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: now} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index 824825a..245ad63 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -21,6 +21,30 @@ type ChangeLog struct { CreatedAt time.Time } +type Notification struct { + ID domain.NotificationID + ProjectID domain.ProjectID + SessionID domain.SessionID + Type domain.NotificationType + Priority domain.NotificationPriority + Status domain.NotificationStatus + Source string + DedupeKey string + Fingerprint string + Title string + Summary string + Body string + SubjectJson string + DataJson string + ActionsJson string + ReadAt sql.NullTime + DismissedAt sql.NullTime + ResolvedAt sql.NullTime + OccurredAt time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + type PR struct { URL string SessionID domain.SessionID diff --git a/backend/internal/storage/sqlite/gen/notifications.sql.go b/backend/internal/storage/sqlite/gen/notifications.sql.go new file mode 100644 index 0000000..0edc832 --- /dev/null +++ b/backend/internal/storage/sqlite/gen/notifications.sql.go @@ -0,0 +1,362 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: notifications.sql + +package gen + +import ( + "context" + "database/sql" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +const getNotification = `-- name: GetNotification :one +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE id = ? +` + +func (q *Queries) GetNotification(ctx context.Context, id domain.NotificationID) (Notification, error) { + row := q.db.QueryRowContext(ctx, getNotification, id) + var i Notification + err := row.Scan( + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Type, + &i.Priority, + &i.Status, + &i.Source, + &i.DedupeKey, + &i.Fingerprint, + &i.Title, + &i.Summary, + &i.Body, + &i.SubjectJson, + &i.DataJson, + &i.ActionsJson, + &i.ReadAt, + &i.DismissedAt, + &i.ResolvedAt, + &i.OccurredAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getNotificationByDedupeKey = `-- name: GetNotificationByDedupeKey :one +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE project_id = ? AND dedupe_key = ? +` + +type GetNotificationByDedupeKeyParams struct { + ProjectID domain.ProjectID + DedupeKey string +} + +func (q *Queries) GetNotificationByDedupeKey(ctx context.Context, arg GetNotificationByDedupeKeyParams) (Notification, error) { + row := q.db.QueryRowContext(ctx, getNotificationByDedupeKey, arg.ProjectID, arg.DedupeKey) + var i Notification + err := row.Scan( + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Type, + &i.Priority, + &i.Status, + &i.Source, + &i.DedupeKey, + &i.Fingerprint, + &i.Title, + &i.Summary, + &i.Body, + &i.SubjectJson, + &i.DataJson, + &i.ActionsJson, + &i.ReadAt, + &i.DismissedAt, + &i.ResolvedAt, + &i.OccurredAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const insertNotification = `-- name: InsertNotification :exec +INSERT INTO notifications ( + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +` + +type InsertNotificationParams struct { + ID domain.NotificationID + ProjectID domain.ProjectID + SessionID domain.SessionID + Type domain.NotificationType + Priority domain.NotificationPriority + Status domain.NotificationStatus + Source string + DedupeKey string + Fingerprint string + Title string + Summary string + Body string + SubjectJson string + DataJson string + ActionsJson string + ReadAt sql.NullTime + DismissedAt sql.NullTime + ResolvedAt sql.NullTime + OccurredAt time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + +func (q *Queries) InsertNotification(ctx context.Context, arg InsertNotificationParams) error { + _, err := q.db.ExecContext(ctx, insertNotification, + arg.ID, + arg.ProjectID, + arg.SessionID, + arg.Type, + arg.Priority, + arg.Status, + arg.Source, + arg.DedupeKey, + arg.Fingerprint, + arg.Title, + arg.Summary, + arg.Body, + arg.SubjectJson, + arg.DataJson, + arg.ActionsJson, + arg.ReadAt, + arg.DismissedAt, + arg.ResolvedAt, + arg.OccurredAt, + arg.CreatedAt, + arg.UpdatedAt, + ) + return err +} + +const listNotificationsByProject = `-- name: ListNotificationsByProject :many +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE project_id = ? +ORDER BY created_at DESC +LIMIT ? +` + +type ListNotificationsByProjectParams struct { + ProjectID domain.ProjectID + Limit int64 +} + +func (q *Queries) ListNotificationsByProject(ctx context.Context, arg ListNotificationsByProjectParams) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listNotificationsByProject, arg.ProjectID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Type, + &i.Priority, + &i.Status, + &i.Source, + &i.DedupeKey, + &i.Fingerprint, + &i.Title, + &i.Summary, + &i.Body, + &i.SubjectJson, + &i.DataJson, + &i.ActionsJson, + &i.ReadAt, + &i.DismissedAt, + &i.ResolvedAt, + &i.OccurredAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listNotificationsBySession = `-- name: ListNotificationsBySession :many +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE session_id = ? +ORDER BY created_at DESC +LIMIT ? +` + +type ListNotificationsBySessionParams struct { + SessionID domain.SessionID + Limit int64 +} + +func (q *Queries) ListNotificationsBySession(ctx context.Context, arg ListNotificationsBySessionParams) ([]Notification, error) { + rows, err := q.db.QueryContext(ctx, listNotificationsBySession, arg.SessionID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Notification{} + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.ProjectID, + &i.SessionID, + &i.Type, + &i.Priority, + &i.Status, + &i.Source, + &i.DedupeKey, + &i.Fingerprint, + &i.Title, + &i.Summary, + &i.Body, + &i.SubjectJson, + &i.DataJson, + &i.ActionsJson, + &i.ReadAt, + &i.DismissedAt, + &i.ResolvedAt, + &i.OccurredAt, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const resolveNotification = `-- name: ResolveNotification :execrows +UPDATE notifications +SET status = 'resolved', resolved_at = ?, updated_at = ? +WHERE id = ? AND status IN ('unread', 'read') +` + +type ResolveNotificationParams struct { + ResolvedAt sql.NullTime + UpdatedAt time.Time + ID domain.NotificationID +} + +func (q *Queries) ResolveNotification(ctx context.Context, arg ResolveNotificationParams) (int64, error) { + result, err := q.db.ExecContext(ctx, resolveNotification, arg.ResolvedAt, arg.UpdatedAt, arg.ID) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const updateNotificationContent = `-- name: UpdateNotificationContent :exec +UPDATE notifications +SET + type = ?, + priority = ?, + status = ?, + source = ?, + fingerprint = ?, + title = ?, + summary = ?, + body = ?, + subject_json = ?, + data_json = ?, + actions_json = ?, + read_at = ?, + dismissed_at = ?, + resolved_at = ?, + occurred_at = ?, + updated_at = ? +WHERE id = ? +` + +type UpdateNotificationContentParams struct { + Type domain.NotificationType + Priority domain.NotificationPriority + Status domain.NotificationStatus + Source string + Fingerprint string + Title string + Summary string + Body string + SubjectJson string + DataJson string + ActionsJson string + ReadAt sql.NullTime + DismissedAt sql.NullTime + ResolvedAt sql.NullTime + OccurredAt time.Time + UpdatedAt time.Time + ID domain.NotificationID +} + +func (q *Queries) UpdateNotificationContent(ctx context.Context, arg UpdateNotificationContentParams) error { + _, err := q.db.ExecContext(ctx, updateNotificationContent, + arg.Type, + arg.Priority, + arg.Status, + arg.Source, + arg.Fingerprint, + arg.Title, + arg.Summary, + arg.Body, + arg.SubjectJson, + arg.DataJson, + arg.ActionsJson, + arg.ReadAt, + arg.DismissedAt, + arg.ResolvedAt, + arg.OccurredAt, + arg.UpdatedAt, + arg.ID, + ) + return err +} diff --git a/backend/internal/storage/sqlite/migrations/0008_notifications.sql b/backend/internal/storage/sqlite/migrations/0008_notifications.sql new file mode 100644 index 0000000..4427e28 --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0008_notifications.sql @@ -0,0 +1,469 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE notifications ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL REFERENCES projects (id), + session_id TEXT NOT NULL REFERENCES sessions (id), + type TEXT NOT NULL, + priority TEXT NOT NULL CHECK (priority IN ('urgent', 'action', 'warning', 'info')), + status TEXT NOT NULL DEFAULT 'unread' + CHECK (status IN ('unread', 'read', 'dismissed', 'resolved')), + source TEXT NOT NULL, + dedupe_key TEXT NOT NULL, + fingerprint TEXT NOT NULL, + title TEXT NOT NULL, + summary TEXT NOT NULL, + body TEXT NOT NULL DEFAULT '', + subject_json TEXT NOT NULL CHECK (json_valid(subject_json)), + data_json TEXT NOT NULL CHECK (json_valid(data_json)), + actions_json TEXT NOT NULL CHECK (json_valid(actions_json)), + read_at TIMESTAMP, + dismissed_at TIMESTAMP, + resolved_at TIMESTAMP, + occurred_at TIMESTAMP NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE (project_id, dedupe_key) +); + +CREATE INDEX idx_notifications_project_created + ON notifications (project_id, created_at DESC); +CREATE INDEX idx_notifications_project_status_created + ON notifications (project_id, status, created_at DESC); +CREATE INDEX idx_notifications_session_created + ON notifications (session_id, created_at DESC); +CREATE INDEX idx_notifications_project_type_created + ON notifications (project_id, type, created_at DESC); + +DROP TRIGGER IF EXISTS pr_review_threads_cdc_update; +DROP TRIGGER IF EXISTS pr_review_threads_cdc_insert; +DROP TRIGGER IF EXISTS sessions_cdc_insert; +DROP TRIGGER IF EXISTS sessions_cdc_update; +DROP TRIGGER IF EXISTS pr_cdc_insert; +DROP TRIGGER IF EXISTS pr_cdc_update; +DROP TRIGGER IF EXISTS pr_session_cdc_update; +DROP TRIGGER IF EXISTS pr_checks_cdc_insert; +DROP TRIGGER IF EXISTS pr_checks_cdc_update; + +CREATE TABLE change_log_new ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + project_id TEXT NOT NULL REFERENCES projects (id), + session_id TEXT REFERENCES sessions (id), + event_type TEXT NOT NULL + CHECK (event_type IN ( + 'session_created', + 'session_updated', + 'pr_created', + 'pr_updated', + 'pr_check_recorded', + 'pr_session_changed', + 'pr_review_thread_added', + 'pr_review_thread_resolved', + 'notification_created', + 'notification_updated' + )), + payload TEXT NOT NULL CHECK (json_valid(payload)), + created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) +); + +INSERT INTO change_log_new (seq, project_id, session_id, event_type, payload, created_at) +SELECT seq, project_id, session_id, event_type, payload, created_at +FROM change_log; + +DROP INDEX IF EXISTS idx_change_log_project; +DROP TABLE change_log; +ALTER TABLE change_log_new RENAME TO change_log; +CREATE INDEX idx_change_log_project ON change_log (project_id, seq); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_review_threads_cdc_insert +AFTER INSERT ON pr_review_threads +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_review_thread_added', + json_object( + 'pr', NEW.pr_url, + 'thread', NEW.thread_id, + 'path', NEW.path, + 'line', NEW.line, + 'resolved', json(CASE WHEN NEW.resolved THEN 'true' ELSE 'false' END), + 'isBot', json(CASE WHEN NEW.is_bot THEN 'true' ELSE 'false' END) + ), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_review_threads_cdc_update +AFTER UPDATE ON pr_review_threads +WHEN OLD.resolved <> NEW.resolved +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_review_thread_resolved', + json_object( + 'pr', NEW.pr_url, + 'thread', NEW.thread_id, + 'path', NEW.path, + 'line', NEW.line, + 'resolved', json(CASE WHEN NEW.resolved THEN 'true' ELSE 'false' END) + ), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER sessions_cdc_insert +AFTER INSERT ON sessions +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES (NEW.project_id, NEW.id, 'session_created', + json_object('id', NEW.id, 'activity', NEW.activity_state, 'isTerminated', json(CASE WHEN NEW.is_terminated THEN 'true' ELSE 'false' END)), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER sessions_cdc_update +AFTER UPDATE ON sessions +WHEN OLD.activity_state <> NEW.activity_state + OR OLD.is_terminated <> NEW.is_terminated +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES (NEW.project_id, NEW.id, 'session_updated', + json_object('id', NEW.id, 'activity', NEW.activity_state, 'isTerminated', json(CASE WHEN NEW.is_terminated THEN 'true' ELSE 'false' END)), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_cdc_insert +AFTER INSERT ON pr +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ((SELECT project_id FROM sessions WHERE id = NEW.session_id), NEW.session_id, 'pr_created', + json_object('url', NEW.url, 'session', NEW.session_id, 'state', NEW.pr_state, + 'ci', NEW.ci_state, 'review', NEW.review_decision, 'mergeability', NEW.mergeability), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_cdc_update +AFTER UPDATE ON pr +WHEN OLD.pr_state <> NEW.pr_state + OR OLD.ci_state <> NEW.ci_state + OR OLD.review_decision <> NEW.review_decision + OR OLD.mergeability <> NEW.mergeability +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ((SELECT project_id FROM sessions WHERE id = NEW.session_id), NEW.session_id, 'pr_updated', + json_object('url', NEW.url, 'session', NEW.session_id, 'state', NEW.pr_state, + 'ci', NEW.ci_state, 'review', NEW.review_decision, 'mergeability', NEW.mergeability), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_session_cdc_update +AFTER UPDATE ON pr +WHEN OLD.session_id <> NEW.session_id +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT project_id FROM sessions WHERE id = NEW.session_id), + NEW.session_id, + 'pr_session_changed', + json_object( + 'url', NEW.url, + 'fromSession', OLD.session_id, + 'toSession', NEW.session_id), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_checks_cdc_insert +AFTER INSERT ON pr_checks +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_check_recorded', + json_object('pr', NEW.pr_url, 'name', NEW.name, 'commit', NEW.commit_hash, 'status', NEW.status), + NEW.created_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_checks_cdc_update +AFTER UPDATE ON pr_checks +WHEN OLD.status <> NEW.status +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_check_recorded', + json_object('pr', NEW.pr_url, 'name', NEW.name, 'commit', NEW.commit_hash, 'status', NEW.status), + datetime('now')); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notifications_cdc_insert +AFTER INSERT ON notifications +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_created', + json_object( + 'id', NEW.id, + 'projectId', NEW.project_id, + 'sessionId', NEW.session_id, + 'type', NEW.type, + 'priority', NEW.priority, + 'status', NEW.status, + 'title', NEW.title, + 'summary', NEW.summary, + 'actionCount', COALESCE(json_array_length(NEW.actions_json), 0), + 'subject', json(NEW.subject_json), + 'actions', json(NEW.actions_json) + ), + NEW.created_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER notifications_cdc_update +AFTER UPDATE ON notifications +WHEN OLD.fingerprint <> NEW.fingerprint + OR OLD.priority <> NEW.priority + OR OLD.status <> NEW.status + OR OLD.title <> NEW.title + OR OLD.summary <> NEW.summary + OR OLD.body <> NEW.body + OR OLD.subject_json <> NEW.subject_json + OR OLD.data_json <> NEW.data_json + OR OLD.actions_json <> NEW.actions_json + OR OLD.read_at IS NOT NEW.read_at + OR OLD.dismissed_at IS NOT NEW.dismissed_at + OR OLD.resolved_at IS NOT NEW.resolved_at +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + NEW.project_id, + NEW.session_id, + 'notification_updated', + json_object( + 'id', NEW.id, + 'projectId', NEW.project_id, + 'sessionId', NEW.session_id, + 'type', NEW.type, + 'priority', NEW.priority, + 'status', NEW.status, + 'title', NEW.title, + 'summary', NEW.summary, + 'actionCount', COALESCE(json_array_length(NEW.actions_json), 0), + 'subject', json(NEW.subject_json), + 'actions', json(NEW.actions_json) + ), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TRIGGER IF EXISTS notifications_cdc_update; +DROP TRIGGER IF EXISTS notifications_cdc_insert; +DROP TRIGGER IF EXISTS pr_review_threads_cdc_update; +DROP TRIGGER IF EXISTS pr_review_threads_cdc_insert; +DROP TRIGGER IF EXISTS sessions_cdc_insert; +DROP TRIGGER IF EXISTS sessions_cdc_update; +DROP TRIGGER IF EXISTS pr_cdc_insert; +DROP TRIGGER IF EXISTS pr_cdc_update; +DROP TRIGGER IF EXISTS pr_session_cdc_update; +DROP TRIGGER IF EXISTS pr_checks_cdc_insert; +DROP TRIGGER IF EXISTS pr_checks_cdc_update; + +CREATE TABLE change_log_old ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + project_id TEXT NOT NULL REFERENCES projects (id), + session_id TEXT REFERENCES sessions (id), + event_type TEXT NOT NULL + CHECK (event_type IN ( + 'session_created', + 'session_updated', + 'pr_created', + 'pr_updated', + 'pr_check_recorded', + 'pr_session_changed', + 'pr_review_thread_added', + 'pr_review_thread_resolved' + )), + payload TEXT NOT NULL CHECK (json_valid(payload)), + created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) +); + +INSERT INTO change_log_old (seq, project_id, session_id, event_type, payload, created_at) +SELECT seq, project_id, session_id, event_type, payload, created_at +FROM change_log +WHERE event_type NOT IN ('notification_created', 'notification_updated'); + +DROP INDEX IF EXISTS idx_change_log_project; +DROP TABLE change_log; +ALTER TABLE change_log_old RENAME TO change_log; +CREATE INDEX idx_change_log_project ON change_log (project_id, seq); + +DROP TABLE notifications; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_review_threads_cdc_insert +AFTER INSERT ON pr_review_threads +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_review_thread_added', + json_object( + 'pr', NEW.pr_url, + 'thread', NEW.thread_id, + 'path', NEW.path, + 'line', NEW.line, + 'resolved', json(CASE WHEN NEW.resolved THEN 'true' ELSE 'false' END), + 'isBot', json(CASE WHEN NEW.is_bot THEN 'true' ELSE 'false' END) + ), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_review_threads_cdc_update +AFTER UPDATE ON pr_review_threads +WHEN OLD.resolved <> NEW.resolved +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_review_thread_resolved', + json_object( + 'pr', NEW.pr_url, + 'thread', NEW.thread_id, + 'path', NEW.path, + 'line', NEW.line, + 'resolved', json(CASE WHEN NEW.resolved THEN 'true' ELSE 'false' END) + ), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER sessions_cdc_insert +AFTER INSERT ON sessions +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES (NEW.project_id, NEW.id, 'session_created', + json_object('id', NEW.id, 'activity', NEW.activity_state, 'isTerminated', json(CASE WHEN NEW.is_terminated THEN 'true' ELSE 'false' END)), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER sessions_cdc_update +AFTER UPDATE ON sessions +WHEN OLD.activity_state <> NEW.activity_state + OR OLD.is_terminated <> NEW.is_terminated +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES (NEW.project_id, NEW.id, 'session_updated', + json_object('id', NEW.id, 'activity', NEW.activity_state, 'isTerminated', json(CASE WHEN NEW.is_terminated THEN 'true' ELSE 'false' END)), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_cdc_insert +AFTER INSERT ON pr +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ((SELECT project_id FROM sessions WHERE id = NEW.session_id), NEW.session_id, 'pr_created', + json_object('url', NEW.url, 'session', NEW.session_id, 'state', NEW.pr_state, + 'ci', NEW.ci_state, 'review', NEW.review_decision, 'mergeability', NEW.mergeability), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_cdc_update +AFTER UPDATE ON pr +WHEN OLD.pr_state <> NEW.pr_state + OR OLD.ci_state <> NEW.ci_state + OR OLD.review_decision <> NEW.review_decision + OR OLD.mergeability <> NEW.mergeability +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ((SELECT project_id FROM sessions WHERE id = NEW.session_id), NEW.session_id, 'pr_updated', + json_object('url', NEW.url, 'session', NEW.session_id, 'state', NEW.pr_state, + 'ci', NEW.ci_state, 'review', NEW.review_decision, 'mergeability', NEW.mergeability), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_session_cdc_update +AFTER UPDATE ON pr +WHEN OLD.session_id <> NEW.session_id +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT project_id FROM sessions WHERE id = NEW.session_id), + NEW.session_id, + 'pr_session_changed', + json_object( + 'url', NEW.url, + 'fromSession', OLD.session_id, + 'toSession', NEW.session_id), + NEW.updated_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_checks_cdc_insert +AFTER INSERT ON pr_checks +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_check_recorded', + json_object('pr', NEW.pr_url, 'name', NEW.name, 'commit', NEW.commit_hash, 'status', NEW.status), + NEW.created_at); +END; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TRIGGER pr_checks_cdc_update +AFTER UPDATE ON pr_checks +WHEN OLD.status <> NEW.status +BEGIN + INSERT INTO change_log (project_id, session_id, event_type, payload, created_at) + VALUES ( + (SELECT s.project_id FROM pr p JOIN sessions s ON s.id = p.session_id WHERE p.url = NEW.pr_url), + (SELECT session_id FROM pr WHERE url = NEW.pr_url), + 'pr_check_recorded', + json_object('pr', NEW.pr_url, 'name', NEW.name, 'commit', NEW.commit_hash, 'status', NEW.status), + datetime('now')); +END; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/queries/notifications.sql b/backend/internal/storage/sqlite/queries/notifications.sql new file mode 100644 index 0000000..4e049af --- /dev/null +++ b/backend/internal/storage/sqlite/queries/notifications.sql @@ -0,0 +1,69 @@ +-- name: InsertNotification :exec +INSERT INTO notifications ( + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + +-- name: UpdateNotificationContent :exec +UPDATE notifications +SET + type = ?, + priority = ?, + status = ?, + source = ?, + fingerprint = ?, + title = ?, + summary = ?, + body = ?, + subject_json = ?, + data_json = ?, + actions_json = ?, + read_at = ?, + dismissed_at = ?, + resolved_at = ?, + occurred_at = ?, + updated_at = ? +WHERE id = ?; + +-- name: ResolveNotification :execrows +UPDATE notifications +SET status = 'resolved', resolved_at = ?, updated_at = ? +WHERE id = ? AND status IN ('unread', 'read'); + +-- name: GetNotification :one +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE id = ?; + +-- name: GetNotificationByDedupeKey :one +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE project_id = ? AND dedupe_key = ?; + +-- name: ListNotificationsByProject :many +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE project_id = ? +ORDER BY created_at DESC +LIMIT ?; + +-- name: ListNotificationsBySession :many +SELECT + id, project_id, session_id, type, priority, status, source, dedupe_key, fingerprint, + title, summary, body, subject_json, data_json, actions_json, + read_at, dismissed_at, resolved_at, occurred_at, created_at, updated_at +FROM notifications +WHERE session_id = ? +ORDER BY created_at DESC +LIMIT ?; diff --git a/backend/internal/storage/sqlite/store/notification_cdc_test.go b/backend/internal/storage/sqlite/store/notification_cdc_test.go new file mode 100644 index 0000000..6b80aba --- /dev/null +++ b/backend/internal/storage/sqlite/store/notification_cdc_test.go @@ -0,0 +1,92 @@ +package store_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/cdc" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func notificationEvents(ctx context.Context, t *testing.T, s interface { + EventsAfter(context.Context, int64, int) ([]cdc.Event, error) +}) []cdc.Event { + t.Helper() + events, err := s.EventsAfter(ctx, 0, 100) + if err != nil { + t.Fatal(err) + } + var out []cdc.Event + for _, ev := range events { + if ev.Type == cdc.EventNotificationCreated || ev.Type == cdc.EventNotificationUpdated { + out = append(out, ev) + } + } + return out +} + +func TestNotificationCDCInsertUpdateNoopAndResolve(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + n := sampleNotification("n1", rec.ProjectID, rec.ID, "ci:1", "fp1", now) + if _, _, err := s.UpsertNotification(ctx, n); err != nil { + t.Fatal(err) + } + if events := notificationEvents(ctx, t, s); len(events) != 1 || events[0].Type != cdc.EventNotificationCreated { + t.Fatalf("after insert events = %+v", events) + } + // Same fingerprint is a store no-op and must not append change_log rows. + if _, _, err := s.UpsertNotification(ctx, n); err != nil { + t.Fatal(err) + } + if events := notificationEvents(ctx, t, s); len(events) != 1 { + t.Fatalf("same fingerprint emitted extra event: %+v", events) + } + updated := n + updated.Fingerprint = "fp2" + updated.Summary = "updated" + updated.UpdatedAt = now.Add(time.Minute) + if _, _, err := s.UpsertNotification(ctx, updated); err != nil { + t.Fatal(err) + } + if events := notificationEvents(ctx, t, s); len(events) != 2 || events[1].Type != cdc.EventNotificationUpdated { + t.Fatalf("after update events = %+v", events) + } + if _, err := s.ResolveNotifications(ctx, domain.NotificationResolveFilter{ProjectID: rec.ProjectID, SessionID: &rec.ID, PRURL: n.Subject.PRURL}, now.Add(time.Hour)); err != nil { + t.Fatal(err) + } + if events := notificationEvents(ctx, t, s); len(events) != 3 || events[2].Type != cdc.EventNotificationUpdated { + t.Fatalf("after resolve events = %+v", events) + } +} + +func TestNotificationCDCPayloadEmbedsRealJSON(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + if _, _, err := s.UpsertNotification(ctx, sampleNotification("n1", rec.ProjectID, rec.ID, "ci:1", "fp1", now)); err != nil { + t.Fatal(err) + } + events := notificationEvents(ctx, t, s) + if len(events) != 1 { + t.Fatalf("events = %+v", events) + } + var payload map[string]any + if err := json.Unmarshal(events[0].Payload, &payload); err != nil { + t.Fatal(err) + } + if payload["actionCount"] != float64(1) { + t.Fatalf("payload action count = %#v", payload["actionCount"]) + } + if _, ok := payload["actions"].([]any); !ok { + t.Fatalf("actions should be a JSON array, got %#v", payload["actions"]) + } + if _, ok := payload["subject"].(map[string]any); !ok { + t.Fatalf("subject should be a JSON object, got %#v", payload["subject"]) + } +} diff --git a/backend/internal/storage/sqlite/store/notification_store.go b/backend/internal/storage/sqlite/store/notification_store.go new file mode 100644 index 0000000..857158e --- /dev/null +++ b/backend/internal/storage/sqlite/store/notification_store.go @@ -0,0 +1,307 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +// UpsertNotification inserts a new logical notification or updates the existing +// row for (project_id, dedupe_key) when the fingerprint changed. It returns +// changed=false when the stored fingerprint already matches, so callers can +// rely on triggers to suppress duplicate CDC events on daemon restarts. +func (s *Store) UpsertNotification(ctx context.Context, n domain.Notification) (domain.Notification, bool, error) { + n = n.Normalize() + if err := n.Validate(); err != nil { + return domain.Notification{}, false, err + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + + var out domain.Notification + var changed bool + err := s.inTx(ctx, "upsert notification", func(q *gen.Queries) error { + existing, err := q.GetNotificationByDedupeKey(ctx, gen.GetNotificationByDedupeKeyParams{ProjectID: n.ProjectID, DedupeKey: n.DedupeKey}) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } + if errors.Is(err, sql.ErrNoRows) { + params, err := notificationInsertParams(n) + if err != nil { + return err + } + if err := q.InsertNotification(ctx, params); err != nil { + return err + } + row, err := q.GetNotification(ctx, n.ID) + if err != nil { + return err + } + out, err = notificationFromGen(row) + if err != nil { + return err + } + changed = true + return nil + } + cur, err := notificationFromGen(existing) + if err != nil { + return err + } + if cur.Fingerprint == n.Fingerprint { + out = cur + changed = false + return nil + } + n.ID = cur.ID + n.CreatedAt = cur.CreatedAt + params, err := notificationUpdateParams(n) + if err != nil { + return err + } + if err := q.UpdateNotificationContent(ctx, params); err != nil { + return err + } + row, err := q.GetNotification(ctx, cur.ID) + if err != nil { + return err + } + out, err = notificationFromGen(row) + if err != nil { + return err + } + changed = true + return nil + }) + if err != nil { + return domain.Notification{}, false, err + } + return out, changed, nil +} + +// ResolveNotifications marks matching unread/read notifications resolved. The +// UPDATE itself is the only write; notification_updated CDC is emitted by the DB +// trigger when rows actually change. +func (s *Store) ResolveNotifications(ctx context.Context, filter domain.NotificationResolveFilter, resolvedAt time.Time) (int, error) { + if filter.ProjectID == "" { + return 0, errors.New("resolve notifications: missing project id") + } + if resolvedAt.IsZero() { + resolvedAt = time.Now().UTC() + } + statuses := filter.Statuses + if len(statuses) == 0 { + statuses = []domain.NotificationStatus{domain.NotificationUnread, domain.NotificationRead} + } + + clauses := []string{"project_id = ?"} + args := []any{filter.ProjectID} + if filter.SessionID != nil { + clauses = append(clauses, "session_id = ?") + args = append(args, *filter.SessionID) + } + if len(statuses) > 0 { + ph := placeholders(len(statuses)) + clauses = append(clauses, "status IN ("+ph+")") + for _, st := range statuses { + args = append(args, st) + } + } + if len(filter.Types) > 0 { + ph := placeholders(len(filter.Types)) + clauses = append(clauses, "type IN ("+ph+")") + for _, typ := range filter.Types { + args = append(args, typ) + } + } + if filter.PRURL != "" { + clauses = append(clauses, "(json_extract(subject_json, '$.prUrl') = ? OR json_extract(data_json, '$.pr.url') = ? OR json_extract(data_json, '$.intent.context.prUrl') = ?)") + args = append(args, filter.PRURL, filter.PRURL, filter.PRURL) + } + if len(filter.DedupeKeyPrefixes) > 0 { + ors := make([]string, 0, len(filter.DedupeKeyPrefixes)) + for _, prefix := range filter.DedupeKeyPrefixes { + ors = append(ors, "dedupe_key LIKE ? ESCAPE '\\'") + args = append(args, escapeLikePrefix(prefix)+"%") + } + clauses = append(clauses, "("+strings.Join(ors, " OR ")+")") + } + + // #nosec G202 -- clauses are selected from fixed strings above and all values are bound placeholders. + query := "UPDATE notifications SET status = 'resolved', resolved_at = ?, updated_at = ? WHERE " + strings.Join(clauses, " AND ") + args = append([]any{resolvedAt, resolvedAt}, args...) + + s.writeMu.Lock() + defer s.writeMu.Unlock() + res, err := s.writeDB.ExecContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("resolve notifications: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("resolve notifications rows affected: %w", err) + } + return int(n), nil +} + +// GetNotification returns one notification by id, or ok=false when absent. +func (s *Store) GetNotification(ctx context.Context, id domain.NotificationID) (domain.Notification, bool, error) { + row, err := s.qr.GetNotification(ctx, id) + if errors.Is(err, sql.ErrNoRows) { + return domain.Notification{}, false, nil + } + if err != nil { + return domain.Notification{}, false, fmt.Errorf("get notification %s: %w", id, err) + } + n, err := notificationFromGen(row) + if err != nil { + return domain.Notification{}, false, fmt.Errorf("get notification %s: %w", id, err) + } + return n, true, nil +} + +// ListNotificationsByProject returns newest notifications for a project. +func (s *Store) ListNotificationsByProject(ctx context.Context, projectID domain.ProjectID, limit int) ([]domain.Notification, error) { + if limit <= 0 { + limit = 50 + } + rows, err := s.qr.ListNotificationsByProject(ctx, gen.ListNotificationsByProjectParams{ProjectID: projectID, Limit: int64(limit)}) + if err != nil { + return nil, fmt.Errorf("list notifications for project %s: %w", projectID, err) + } + return notificationsFromGen(rows) +} + +// ListNotificationsBySession returns newest notifications for a session. +func (s *Store) ListNotificationsBySession(ctx context.Context, sessionID domain.SessionID, limit int) ([]domain.Notification, error) { + if limit <= 0 { + limit = 50 + } + rows, err := s.qr.ListNotificationsBySession(ctx, gen.ListNotificationsBySessionParams{SessionID: sessionID, Limit: int64(limit)}) + if err != nil { + return nil, fmt.Errorf("list notifications for session %s: %w", sessionID, err) + } + return notificationsFromGen(rows) +} + +func notificationInsertParams(n domain.Notification) (gen.InsertNotificationParams, error) { + subject, data, actions, err := notificationJSON(n) + if err != nil { + return gen.InsertNotificationParams{}, err + } + return gen.InsertNotificationParams{ + ID: n.ID, ProjectID: n.ProjectID, SessionID: n.SessionID, + Type: n.Type, Priority: n.Priority, Status: n.Status, Source: n.Source, + DedupeKey: n.DedupeKey, Fingerprint: n.Fingerprint, + Title: n.Title, Summary: n.Summary, Body: n.Body, + SubjectJson: subject, DataJson: data, ActionsJson: actions, + ReadAt: nullPtrTime(n.ReadAt), DismissedAt: nullPtrTime(n.DismissedAt), ResolvedAt: nullPtrTime(n.ResolvedAt), + OccurredAt: n.OccurredAt, CreatedAt: n.CreatedAt, UpdatedAt: n.UpdatedAt, + }, nil +} + +func notificationUpdateParams(n domain.Notification) (gen.UpdateNotificationContentParams, error) { + subject, data, actions, err := notificationJSON(n) + if err != nil { + return gen.UpdateNotificationContentParams{}, err + } + return gen.UpdateNotificationContentParams{ + ID: n.ID, + Type: n.Type, Priority: n.Priority, Status: n.Status, Source: n.Source, + Fingerprint: n.Fingerprint, + Title: n.Title, Summary: n.Summary, Body: n.Body, + SubjectJson: subject, DataJson: data, ActionsJson: actions, + ReadAt: nullPtrTime(n.ReadAt), DismissedAt: nullPtrTime(n.DismissedAt), ResolvedAt: nullPtrTime(n.ResolvedAt), + OccurredAt: n.OccurredAt, UpdatedAt: n.UpdatedAt, + }, nil +} + +func notificationJSON(n domain.Notification) (subject, data, actions string, err error) { + n = n.Normalize() + subjectBytes, err := json.Marshal(n.Subject) + if err != nil { + return "", "", "", fmt.Errorf("marshal notification subject: %w", err) + } + dataBytes, err := json.Marshal(n.Data) + if err != nil { + return "", "", "", fmt.Errorf("marshal notification data: %w", err) + } + actionBytes, err := json.Marshal(n.Actions) + if err != nil { + return "", "", "", fmt.Errorf("marshal notification actions: %w", err) + } + return string(subjectBytes), string(dataBytes), string(actionBytes), nil +} + +func notificationsFromGen(rows []gen.Notification) ([]domain.Notification, error) { + out := make([]domain.Notification, 0, len(rows)) + for _, row := range rows { + n, err := notificationFromGen(row) + if err != nil { + return nil, err + } + out = append(out, n) + } + return out, nil +} + +func notificationFromGen(row gen.Notification) (domain.Notification, error) { + var subject domain.NotificationSubject + if err := json.Unmarshal([]byte(row.SubjectJson), &subject); err != nil { + return domain.Notification{}, fmt.Errorf("unmarshal notification subject: %w", err) + } + var data map[string]any + if err := json.Unmarshal([]byte(row.DataJson), &data); err != nil { + return domain.Notification{}, fmt.Errorf("unmarshal notification data: %w", err) + } + var actions []domain.NotificationAction + if err := json.Unmarshal([]byte(row.ActionsJson), &actions); err != nil { + return domain.Notification{}, fmt.Errorf("unmarshal notification actions: %w", err) + } + return domain.Notification{ + ID: row.ID, Type: row.Type, Priority: row.Priority, Status: row.Status, + ProjectID: row.ProjectID, SessionID: row.SessionID, + Source: row.Source, DedupeKey: row.DedupeKey, Fingerprint: row.Fingerprint, + Title: row.Title, Summary: row.Summary, Body: row.Body, + Subject: subject, Data: data, Actions: actions, + OccurredAt: row.OccurredAt, CreatedAt: row.CreatedAt, UpdatedAt: row.UpdatedAt, + ReadAt: ptrTime(row.ReadAt), DismissedAt: ptrTime(row.DismissedAt), ResolvedAt: ptrTime(row.ResolvedAt), + }.Normalize(), nil +} + +func nullPtrTime(t *time.Time) sql.NullTime { + if t == nil || t.IsZero() { + return sql.NullTime{} + } + return sql.NullTime{Time: *t, Valid: true} +} + +func ptrTime(t sql.NullTime) *time.Time { + if !t.Valid { + return nil + } + v := t.Time + return &v +} + +func placeholders(n int) string { + if n <= 0 { + return "" + } + parts := make([]string, n) + for i := range parts { + parts[i] = "?" + } + return strings.Join(parts, ",") +} + +func escapeLikePrefix(prefix string) string { + return strings.NewReplacer(`\`, `\\`, `%`, `\%`, `_`, `\_`).Replace(prefix) +} diff --git a/backend/internal/storage/sqlite/store/notification_store_test.go b/backend/internal/storage/sqlite/store/notification_store_test.go new file mode 100644 index 0000000..cbeff7b --- /dev/null +++ b/backend/internal/storage/sqlite/store/notification_store_test.go @@ -0,0 +1,156 @@ +package store_test + +import ( + "context" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func seedNotificationSession(ctx context.Context, t *testing.T, s interface { + UpsertProject(context.Context, domain.ProjectRecord) error + CreateSession(context.Context, domain.SessionRecord) (domain.SessionRecord, error) +}) domain.SessionRecord { + t.Helper() + if err := s.UpsertProject(ctx, domain.ProjectRecord{ID: "mer", Path: "/repo/mer", RegisteredAt: time.Now().UTC()}); err != nil { + t.Fatal(err) + } + rec, err := s.CreateSession(ctx, sampleRecord("mer")) + if err != nil { + t.Fatal(err) + } + return rec +} + +func sampleNotification(id domain.NotificationID, project domain.ProjectID, session domain.SessionID, dedupe, fp string, at time.Time) domain.Notification { + return domain.Notification{ + ID: id, Type: domain.NotificationCIFailing, Priority: domain.NotificationWarning, Status: domain.NotificationUnread, + ProjectID: project, SessionID: session, Source: "test", DedupeKey: dedupe, Fingerprint: fp, + Title: "CI failed", Summary: "mer-1 has 1 failing check.", Body: "body", + Subject: domain.NotificationSubject{Kind: "pull_request", ProjectID: project, SessionID: session, PRURL: "https://github.com/o/r/pull/1", Label: "PR"}, + Data: map[string]any{"pr": map[string]any{"url": "https://github.com/o/r/pull/1"}, "ci": map[string]any{"failedCount": 1}}, + Actions: []domain.NotificationAction{{ID: "open_session", Label: "Open session", Kind: "route", Route: "session", Primary: true}}, + OccurredAt: at, CreatedAt: at, UpdatedAt: at, + }.Normalize() +} + +func TestNotificationStoreInsertReadListAndJSONRoundTrip(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + n := sampleNotification("n1", rec.ProjectID, rec.ID, "ci:1", "fp1", now) + stored, changed, err := s.UpsertNotification(ctx, n) + if err != nil || !changed { + t.Fatalf("upsert changed=%v err=%v", changed, err) + } + got, ok, err := s.GetNotification(ctx, stored.ID) + if err != nil || !ok { + t.Fatalf("get ok=%v err=%v", ok, err) + } + if got.Subject.PRURL != n.Subject.PRURL || got.Actions[0].ID != "open_session" || got.Data["pr"] == nil { + t.Fatalf("json round trip mismatch: %+v", got) + } + second := sampleNotification("n2", rec.ProjectID, rec.ID, "ci:2", "fp2", now.Add(time.Minute)) + if _, _, err := s.UpsertNotification(ctx, second); err != nil { + t.Fatal(err) + } + byProject, err := s.ListNotificationsByProject(ctx, rec.ProjectID, 10) + if err != nil || len(byProject) != 2 || byProject[0].ID != "n2" { + t.Fatalf("list by project = %+v err=%v", byProject, err) + } + bySession, err := s.ListNotificationsBySession(ctx, rec.ID, 10) + if err != nil || len(bySession) != 2 || bySession[0].ID != "n2" { + t.Fatalf("list by session = %+v err=%v", bySession, err) + } +} + +func TestNotificationStoreDedupeAndUpdateSameRow(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + n := sampleNotification("n1", rec.ProjectID, rec.ID, "ci:1", "fp1", now) + first, changed, err := s.UpsertNotification(ctx, n) + if err != nil || !changed { + t.Fatalf("first changed=%v err=%v", changed, err) + } + same := n + same.ID = "other" + again, changed, err := s.UpsertNotification(ctx, same) + if err != nil || changed || again.ID != first.ID { + t.Fatalf("same fingerprint got changed=%v id=%s err=%v", changed, again.ID, err) + } + updated := n + updated.ID = "other2" + updated.Fingerprint = "fp2" + updated.Summary = "mer-1 has 2 failing checks." + updated.UpdatedAt = now.Add(time.Minute) + got, changed, err := s.UpsertNotification(ctx, updated) + if err != nil || !changed || got.ID != first.ID || got.Summary != updated.Summary { + t.Fatalf("changed fingerprint got changed=%v notification=%+v err=%v", changed, got, err) + } +} + +func TestNotificationStoreResolve(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + n := sampleNotification("n1", rec.ProjectID, rec.ID, "ci:1", "fp1", now) + if _, _, err := s.UpsertNotification(ctx, n); err != nil { + t.Fatal(err) + } + resolvedAt := now.Add(time.Hour) + count, err := s.ResolveNotifications(ctx, domain.NotificationResolveFilter{ProjectID: rec.ProjectID, SessionID: &rec.ID, PRURL: n.Subject.PRURL, Types: []domain.NotificationType{domain.NotificationCIFailing}}, resolvedAt) + if err != nil || count != 1 { + t.Fatalf("resolve count=%d err=%v", count, err) + } + got, _, err := s.GetNotification(ctx, "n1") + if err != nil { + t.Fatal(err) + } + if got.Status != domain.NotificationResolved || got.ResolvedAt == nil || !got.ResolvedAt.Equal(resolvedAt) { + t.Fatalf("resolved notification = %+v", got) + } +} + +func TestNotificationStoreResolveEscapesDedupeKeyPrefixWildcards(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + rec := seedNotificationSession(ctx, t, s) + now := time.Now().UTC().Truncate(time.Second) + + literalPercent := sampleNotification("n1", rec.ProjectID, rec.ID, "ci:%:build:c1", "fp1", now) + if _, _, err := s.UpsertNotification(ctx, literalPercent); err != nil { + t.Fatal(err) + } + plainCI := sampleNotification("n2", rec.ProjectID, rec.ID, "ci:abc:build:c1", "fp2", now) + if _, _, err := s.UpsertNotification(ctx, plainCI); err != nil { + t.Fatal(err) + } + + resolvedAt := now.Add(time.Hour) + count, err := s.ResolveNotifications(ctx, domain.NotificationResolveFilter{ + ProjectID: rec.ProjectID, + DedupeKeyPrefixes: []string{"ci:%:"}, + }, resolvedAt) + if err != nil || count != 1 { + t.Fatalf("resolve count=%d err=%v", count, err) + } + gotLiteral, _, err := s.GetNotification(ctx, "n1") + if err != nil { + t.Fatal(err) + } + gotPlain, _, err := s.GetNotification(ctx, "n2") + if err != nil { + t.Fatal(err) + } + if gotLiteral.Status != domain.NotificationResolved { + t.Fatalf("literal-percent notification should resolve, got %+v", gotLiteral) + } + if gotPlain.Status != domain.NotificationUnread { + t.Fatalf("plain CI notification should remain unread; prefix wildcard was not escaped: %+v", gotPlain) + } +} diff --git a/backend/sqlc.yaml b/backend/sqlc.yaml index 81707b2..5fc1bf5 100644 --- a/backend/sqlc.yaml +++ b/backend/sqlc.yaml @@ -30,6 +30,31 @@ sql: go_type: import: "github.com/aoagents/agent-orchestrator/backend/internal/cdc" type: "EventType" + + - column: "notifications.id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationID" + - column: "notifications.project_id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "ProjectID" + - column: "notifications.session_id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "SessionID" + - column: "notifications.type" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationType" + - column: "notifications.priority" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationPriority" + - column: "notifications.status" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "NotificationStatus" - column: "pr.session_id" go_type: import: "github.com/aoagents/agent-orchestrator/backend/internal/domain"