diff --git a/CHANGELOG.md b/CHANGELOG.md index de2ae59..a7cf5bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,24 @@ All notable changes to this project are documented in this file. - Dashboard empty state with "Get Started" prompt when no providers are configured. - Dynamic version display in sidebar (uses build-time `version` variable instead of hardcoded value). +## 0.3.6 - 2026-03-04 + +### Added + +- Recurring job scheduler in `serve` mode (15s poll interval) with SQLite-backed `jobs` execution for `sync` and `validate`. +- Jobs management surfaces: + - CLI: `jobs list|add|pause|resume|run-now|delete` + - UI: `/jobs` + - API: `/api/jobs` CRUD plus pause/resume/run-now routes. +- Shared `internal/jobsvc` package for cron parsing/validation, next-run calculation, and reusable job execution logic. +- Store job APIs for `GetJob`, `DeleteJob`, and `ListDueJobs`, plus index migration `idx_jobs_status_next_run`. + +### Changed + +- `config set KEY VALUE` now persists typed YAML values to disk and reloads config in-process. +- Validation jobs now persist invalid files into `failed_files` for follow-up retry/triage workflows. +- Documentation refreshed for scheduler/job behavior, new routes/commands, and persistent config editing. + ## 0.3.5 - 2026-02-24 ### Changed diff --git a/README.md b/README.md index 74c9b81..10f3653 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,8 @@ Top-level sections: For full details, see [docs/configuration.md](docs/configuration.md). +When `schedule.enabled` is `true`, recurring jobs are executed by the scheduler while `airgap serve` is running. + ## Provider Types Provider configs are stored in SQLite (`provider_configs`). YAML provider entries are used for first-run seeding when the table is empty. @@ -95,8 +97,9 @@ Supported as config/target types: - `serve`: web UI + API server - `providers list`: list provider configs from SQLite - `registry push`: push mirrored container images to a registry target +- `jobs list|add|pause|resume|run-now|delete`: manage scheduled sync/validate jobs - `config show`: print loaded config -- `config set`: currently a stub (prints intended change; does not persist) +- `config set`: persist a typed config value to YAML (`KEY` dot-path + YAML `VALUE`) ## Web UI and API @@ -106,6 +109,7 @@ Main pages: - `/providers/{name}` - `/transfer` - `/ocp/clients` +- `/jobs` API routes are documented in [docs/http-api.md](docs/http-api.md). diff --git a/cmd/airgap/config_cmd.go b/cmd/airgap/config_cmd.go index 338d765..8d784ae 100644 --- a/cmd/airgap/config_cmd.go +++ b/cmd/airgap/config_cmd.go @@ -3,7 +3,11 @@ package main import ( "fmt" "log/slog" + "os" + "path/filepath" + "strings" + "github.com/BadgerOps/airgap/internal/config" "github.com/spf13/cobra" "gopkg.in/yaml.v3" ) @@ -95,7 +99,125 @@ func configSetRun(cmd *cobra.Command, args []string) error { log.Info("set configuration", "key", key, "value", value) - fmt.Printf("STUB: Would set config key '%s' to '%s'\n", key, value) + targetPath, err := resolveConfigWritePath() + if err != nil { + return err + } + + rawCfg, err := loadRawConfig(targetPath) + if err != nil { + return err + } + + parsedValue, err := parseConfigValue(value) + if err != nil { + return err + } + + if err := setDotKey(rawCfg, key, parsedValue); err != nil { + return err + } + + data, err := yaml.Marshal(rawCfg) + if err != nil { + return fmt.Errorf("failed to marshal updated config: %w", err) + } + + if err := os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + if err := os.WriteFile(targetPath, data, 0o644); err != nil { + return fmt.Errorf("failed to write config file: %w", err) + } + + loaded, err := config.Load(targetPath) + if err != nil { + return fmt.Errorf("failed to reload updated config: %w", err) + } + globalCfg = loaded + + fmt.Printf("Updated %s in %s\n", key, targetPath) + + return nil +} +func resolveConfigWritePath() (string, error) { + if strings.TrimSpace(cfgPath) != "" { + return cfgPath, nil + } + + found, err := config.FindConfigFile() + if err == nil { + return found, nil + } + + // Fallback when no discovered config exists yet. + return "airgap.yaml", nil +} + +func loadRawConfig(path string) (map[string]interface{}, error) { + raw := make(map[string]interface{}) + + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return raw, nil + } + return nil, fmt.Errorf("failed to read config file: %w", err) + } + if len(strings.TrimSpace(string(data))) == 0 { + return raw, nil + } + + if err := yaml.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("failed to parse existing config: %w", err) + } + if raw == nil { + raw = make(map[string]interface{}) + } + return raw, nil +} + +func parseConfigValue(raw string) (interface{}, error) { + var value interface{} + if err := yaml.Unmarshal([]byte(raw), &value); err != nil { + return nil, fmt.Errorf("invalid value %q: %w", raw, err) + } + return value, nil +} + +func setDotKey(cfg map[string]interface{}, key string, value interface{}) error { + parts := strings.Split(key, ".") + if len(parts) == 0 { + return fmt.Errorf("config key is required") + } + + current := cfg + for i := 0; i < len(parts)-1; i++ { + part := strings.TrimSpace(parts[i]) + if part == "" { + return fmt.Errorf("invalid key %q: empty segment", key) + } + + existing, ok := current[part] + if !ok { + next := make(map[string]interface{}) + current[part] = next + current = next + continue + } + + next, ok := existing.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid key %q: segment %q is not a map", key, part) + } + current = next + } + + last := strings.TrimSpace(parts[len(parts)-1]) + if last == "" { + return fmt.Errorf("invalid key %q: empty segment", key) + } + current[last] = value return nil } diff --git a/cmd/airgap/config_cmd_test.go b/cmd/airgap/config_cmd_test.go new file mode 100644 index 0000000..f6baa08 --- /dev/null +++ b/cmd/airgap/config_cmd_test.go @@ -0,0 +1,125 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + + "github.com/BadgerOps/airgap/internal/config" + "gopkg.in/yaml.v3" +) + +func TestConfigSetRunPersistsTypedValues(t *testing.T) { + tmp := t.TempDir() + prevWD, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd failed: %v", err) + } + if err := os.Chdir(tmp); err != nil { + t.Fatalf("Chdir failed: %v", err) + } + t.Cleanup(func() { + _ = os.Chdir(prevWD) + }) + + prevCfgPath := cfgPath + prevGlobalCfg := globalCfg + t.Cleanup(func() { + cfgPath = prevCfgPath + globalCfg = prevGlobalCfg + }) + + cfgPath = filepath.Join(tmp, "airgap.yaml") + globalCfg = config.DefaultConfig() + + if err := configSetRun(nil, []string{"server.listen", "127.0.0.1:9000"}); err != nil { + t.Fatalf("configSetRun(server.listen) failed: %v", err) + } + if err := configSetRun(nil, []string{"providers.custom.sources", `["https://example.com/a","https://example.com/b"]`}); err != nil { + t.Fatalf("configSetRun(providers.custom.sources) failed: %v", err) + } + if err := configSetRun(nil, []string{"schedule.enabled", "false"}); err != nil { + t.Fatalf("configSetRun(schedule.enabled) failed: %v", err) + } + + data, err := os.ReadFile(cfgPath) + if err != nil { + t.Fatalf("reading config file failed: %v", err) + } + raw := map[string]interface{}{} + if err := yaml.Unmarshal(data, &raw); err != nil { + t.Fatalf("unmarshal updated config failed: %v", err) + } + + serverRaw, ok := raw["server"].(map[string]interface{}) + if !ok { + t.Fatalf("server map missing or wrong type: %#v", raw["server"]) + } + if got := serverRaw["listen"]; got != "127.0.0.1:9000" { + t.Fatalf("server.listen mismatch: got %#v", got) + } + + scheduleRaw, ok := raw["schedule"].(map[string]interface{}) + if !ok { + t.Fatalf("schedule map missing or wrong type: %#v", raw["schedule"]) + } + if got, ok := scheduleRaw["enabled"].(bool); !ok || got { + t.Fatalf("schedule.enabled mismatch: got %#v", scheduleRaw["enabled"]) + } + + if globalCfg.Server.Listen != "127.0.0.1:9000" { + t.Fatalf("globalCfg not reloaded: got %q", globalCfg.Server.Listen) + } +} + +func TestConfigSetRunCreatesDefaultConfigFile(t *testing.T) { + tmp := t.TempDir() + prevWD, err := os.Getwd() + if err != nil { + t.Fatalf("Getwd failed: %v", err) + } + if err := os.Chdir(tmp); err != nil { + t.Fatalf("Chdir failed: %v", err) + } + t.Cleanup(func() { + _ = os.Chdir(prevWD) + }) + + prevCfgPath := cfgPath + prevGlobalCfg := globalCfg + t.Cleanup(func() { + cfgPath = prevCfgPath + globalCfg = prevGlobalCfg + }) + + cfgPath = "" + globalCfg = config.DefaultConfig() + + if err := configSetRun(nil, []string{"server.listen", "127.0.0.1:8081"}); err != nil { + t.Fatalf("configSetRun failed: %v", err) + } + + target := filepath.Join(tmp, "airgap.yaml") + if _, err := os.Stat(target); err != nil { + t.Fatalf("expected fallback config file at %s: %v", target, err) + } +} + +func TestConfigSetRunValidationErrors(t *testing.T) { + prevCfgPath := cfgPath + prevGlobalCfg := globalCfg + t.Cleanup(func() { + cfgPath = prevCfgPath + globalCfg = prevGlobalCfg + }) + + cfgPath = filepath.Join(t.TempDir(), "airgap.yaml") + globalCfg = config.DefaultConfig() + + if err := configSetRun(nil, []string{"server..listen", "127.0.0.1:9000"}); err == nil { + t.Fatal("expected error for invalid key with empty segment") + } + if err := configSetRun(nil, []string{"server.listen", "["}); err == nil { + t.Fatal("expected error for invalid YAML value") + } +} diff --git a/cmd/airgap/jobs.go b/cmd/airgap/jobs.go new file mode 100644 index 0000000..7f26d16 --- /dev/null +++ b/cmd/airgap/jobs.go @@ -0,0 +1,337 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "strings" + "time" + + "github.com/BadgerOps/airgap/internal/jobsvc" + "github.com/BadgerOps/airgap/internal/store" + "github.com/spf13/cobra" +) + +var ( + jobsListStatus string + + jobsAddType string + jobsAddProvider string + jobsAddCron string +) + +func newJobsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "jobs", + Short: "Manage scheduled jobs", + Long: `Manage scheduled sync and validation jobs. +Jobs are stored in SQLite and automatically executed when the server scheduler is enabled.`, + } + + cmd.AddCommand( + newJobsListCmd(), + newJobsAddCmd(), + newJobsPauseCmd(), + newJobsResumeCmd(), + newJobsRunNowCmd(), + newJobsDeleteCmd(), + ) + return cmd +} + +func newJobsListCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List configured jobs", + RunE: jobsListRun, + } + cmd.Flags().StringVar(&jobsListStatus, "status", "", "optional status filter") + return cmd +} + +func jobsListRun(cmd *cobra.Command, args []string) error { + if globalStore == nil { + return fmt.Errorf("store not initialized") + } + + jobs, err := globalStore.ListJobs(strings.TrimSpace(jobsListStatus), 0) + if err != nil { + return fmt.Errorf("listing jobs: %w", err) + } + if len(jobs) == 0 { + fmt.Println("No jobs configured.") + return nil + } + + fmt.Println("Configured Jobs") + fmt.Println("===============") + fmt.Println("") + fmt.Printf("%-5s %-10s %-20s %-17s %-10s %-19s %-19s\n", "ID", "Type", "Provider", "Cron", "Status", "Last Run", "Next Run") + fmt.Println(strings.Repeat("-", 110)) + for _, job := range jobs { + fmt.Printf( + "%-5d %-10s %-20s %-17s %-10s %-19s %-19s\n", + job.ID, + job.Type, + jobsvc.DisplayProviderName(job.Provider), + job.CronExpr, + job.Status, + formatJobTime(job.LastRun), + formatJobTime(job.NextRun), + ) + } + fmt.Println("") + return nil +} + +func newJobsAddCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "add", + Short: "Add a scheduled job", + RunE: jobsAddRun, + } + cmd.Flags().StringVar(&jobsAddType, "type", "sync", "job type: sync|validate") + cmd.Flags().StringVar(&jobsAddProvider, "provider", "all", `provider name or "all"`) + cmd.Flags().StringVar(&jobsAddCron, "cron", "", "cron expression (5 fields)") + return cmd +} + +func jobsAddRun(cmd *cobra.Command, args []string) error { + if globalStore == nil { + return fmt.Errorf("store not initialized") + } + if globalCfg == nil { + return fmt.Errorf("config not loaded") + } + + if err := jobsvc.ValidateJobType(jobsAddType); err != nil { + return err + } + + providerName := jobsvc.NormalizeProviderName(jobsAddProvider) + if err := jobsvc.ValidateProviderName(globalStore, providerName); err != nil { + return err + } + + cronExpr := strings.TrimSpace(jobsAddCron) + if cronExpr == "" { + cronExpr = strings.TrimSpace(globalCfg.Schedule.DefaultCron) + } + if cronExpr == "" { + return fmt.Errorf("cron expression is required") + } + if err := jobsvc.ValidateCronExpr(cronExpr); err != nil { + return err + } + + now := time.Now() + nextRun, err := jobsvc.NextRun(cronExpr, now) + if err != nil { + return err + } + + job := &store.Job{ + Type: jobsvc.NormalizeJobType(jobsAddType), + Provider: providerName, + CronExpr: cronExpr, + Status: "scheduled", + NextRun: nextRun, + CreatedAt: now, + UpdatedAt: now, + } + if err := globalStore.CreateJob(job); err != nil { + return fmt.Errorf("creating job: %w", err) + } + + fmt.Printf( + "Created job %d (%s, provider=%s, cron=%s, next=%s)\n", + job.ID, + job.Type, + jobsvc.DisplayProviderName(job.Provider), + job.CronExpr, + formatJobTime(job.NextRun), + ) + return nil +} + +func newJobsPauseCmd() *cobra.Command { + return &cobra.Command{ + Use: "pause JOB_ID", + Short: "Pause a scheduled job", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + jobID, err := parseJobIDArg(args[0]) + if err != nil { + return err + } + job, err := globalStore.GetJob(jobID) + if err != nil { + return err + } + if job.Status == "running" { + return fmt.Errorf("cannot pause a running job") + } + job.Status = "paused" + job.UpdatedAt = time.Now() + if err := globalStore.UpdateJob(job); err != nil { + return err + } + fmt.Printf("Paused job %d\n", job.ID) + return nil + }, + } +} + +func newJobsResumeCmd() *cobra.Command { + return &cobra.Command{ + Use: "resume JOB_ID", + Short: "Resume a paused job", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + jobID, err := parseJobIDArg(args[0]) + if err != nil { + return err + } + job, err := globalStore.GetJob(jobID) + if err != nil { + return err + } + if job.Status == "running" { + return fmt.Errorf("cannot resume a running job") + } + nextRun, err := jobsvc.NextRun(job.CronExpr, time.Now()) + if err != nil { + return err + } + job.Status = "scheduled" + job.NextRun = nextRun + job.UpdatedAt = time.Now() + if err := globalStore.UpdateJob(job); err != nil { + return err + } + fmt.Printf("Resumed job %d (next run %s)\n", job.ID, formatJobTime(job.NextRun)) + return nil + }, + } +} + +func newJobsRunNowCmd() *cobra.Command { + return &cobra.Command{ + Use: "run-now JOB_ID", + Short: "Run a job immediately", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + jobID, err := parseJobIDArg(args[0]) + if err != nil { + return err + } + job, err := globalStore.GetJob(jobID) + if err != nil { + return err + } + if job.Status == "running" { + return fmt.Errorf("job is already running") + } + + if err := executeJobNow(context.Background(), job); err != nil { + return err + } + + job, _ = globalStore.GetJob(jobID) + if job != nil { + fmt.Printf("Executed job %d: status=%s next=%s\n", job.ID, job.Status, formatJobTime(job.NextRun)) + } + return nil + }, + } +} + +func newJobsDeleteCmd() *cobra.Command { + return &cobra.Command{ + Use: "delete JOB_ID", + Short: "Delete a job", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + jobID, err := parseJobIDArg(args[0]) + if err != nil { + return err + } + job, err := globalStore.GetJob(jobID) + if err != nil { + return err + } + if job.Status == "running" { + return fmt.Errorf("cannot delete a running job") + } + if err := globalStore.DeleteJob(jobID); err != nil { + return err + } + fmt.Printf("Deleted job %d\n", jobID) + return nil + }, + } +} + +func executeJobNow(ctx context.Context, job *store.Job) error { + if globalStore == nil { + return fmt.Errorf("store not initialized") + } + if globalEngine == nil { + return fmt.Errorf("engine not initialized") + } + if job == nil { + return fmt.Errorf("job is required") + } + + prevStatus := job.Status + job.Status = "running" + job.UpdatedAt = time.Now() + if err := globalStore.UpdateJob(job); err != nil { + return err + } + + log := logger + if log == nil { + log = slog.Default() + } + _, runErr := jobsvc.ExecuteJob(ctx, globalEngine, globalStore, *job, log) + finishedAt := time.Now() + + if prevStatus == "paused" { + job.Status = "paused" + } else if runErr != nil { + job.Status = "failed" + } else { + job.Status = "completed" + } + job.LastRun = finishedAt + + nextRun, nextErr := jobsvc.NextRun(job.CronExpr, finishedAt) + if nextErr != nil { + job.NextRun = finishedAt.Add(24 * time.Hour) + } else { + job.NextRun = nextRun + } + job.UpdatedAt = finishedAt + + if err := globalStore.UpdateJob(job); err != nil { + return err + } + return runErr +} + +func parseJobIDArg(raw string) (int64, error) { + id, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64) + if err != nil || id <= 0 { + return 0, fmt.Errorf("invalid job id %q", raw) + } + return id, nil +} + +func formatJobTime(t time.Time) string { + if t.IsZero() { + return "-" + } + return t.Format("2006-01-02 15:04:05") +} diff --git a/cmd/airgap/jobs_test.go b/cmd/airgap/jobs_test.go new file mode 100644 index 0000000..524c4ac --- /dev/null +++ b/cmd/airgap/jobs_test.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "io" + "log/slog" + "strconv" + "testing" + "time" + + "github.com/BadgerOps/airgap/internal/config" + "github.com/BadgerOps/airgap/internal/download" + "github.com/BadgerOps/airgap/internal/engine" + "github.com/BadgerOps/airgap/internal/provider" +) + +func TestJobsLifecycleCommands(t *testing.T) { + st := newTestStore(t) + mustCreateProviderConfig(t, st, "provider-a", "epel", true) + + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + cfg := config.DefaultConfig() + cfg.Server.DataDir = t.TempDir() + cfg.Providers = map[string]config.ProviderConfig{ + "provider-a": {"enabled": true}, + } + + reg := provider.NewRegistry() + reg.RegisterAs("provider-a", &jobsTestProvider{name: "provider-a"}) + eng := engine.NewSyncManager(reg, st, download.NewClient(log), cfg, log) + + origStore := globalStore + origRegistry := globalRegistry + origEngine := globalEngine + origCfg := globalCfg + origLogger := logger + t.Cleanup(func() { + globalStore = origStore + globalRegistry = origRegistry + globalEngine = origEngine + globalCfg = origCfg + logger = origLogger + }) + + globalStore = st + globalRegistry = reg + globalEngine = eng + globalCfg = cfg + logger = log + + jobsAddType = "sync" + jobsAddProvider = "provider-a" + jobsAddCron = "*/5 * * * *" + if err := jobsAddRun(nil, nil); err != nil { + t.Fatalf("jobsAddRun failed: %v", err) + } + + jobs, err := st.ListJobs("", 0) + if err != nil { + t.Fatalf("ListJobs failed: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected one job, got %d", len(jobs)) + } + jobID := jobs[0].ID + + pauseCmd := newJobsPauseCmd() + if err := pauseCmd.RunE(pauseCmd, []string{strconv.FormatInt(jobID, 10)}); err != nil { + t.Fatalf("pause failed: %v", err) + } + job, err := st.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed after pause: %v", err) + } + if job.Status != "paused" { + t.Fatalf("expected paused status, got %q", job.Status) + } + + resumeCmd := newJobsResumeCmd() + if err := resumeCmd.RunE(resumeCmd, []string{strconv.FormatInt(jobID, 10)}); err != nil { + t.Fatalf("resume failed: %v", err) + } + job, err = st.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed after resume: %v", err) + } + if job.Status != "scheduled" { + t.Fatalf("expected scheduled status, got %q", job.Status) + } + + runNowCmd := newJobsRunNowCmd() + if err := runNowCmd.RunE(runNowCmd, []string{strconv.FormatInt(jobID, 10)}); err != nil { + t.Fatalf("run-now failed: %v", err) + } + job, err = st.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed after run-now: %v", err) + } + if job.Status != "completed" { + t.Fatalf("expected completed status, got %q", job.Status) + } + if job.LastRun.IsZero() { + t.Fatal("expected last_run to be set") + } + + deleteCmd := newJobsDeleteCmd() + if err := deleteCmd.RunE(deleteCmd, []string{strconv.FormatInt(jobID, 10)}); err != nil { + t.Fatalf("delete failed: %v", err) + } + if _, err := st.GetJob(jobID); err == nil { + t.Fatal("expected deleted job to be missing") + } +} + +func TestJobsCommandsInvalidID(t *testing.T) { + pauseCmd := newJobsPauseCmd() + if err := pauseCmd.RunE(pauseCmd, []string{"abc"}); err == nil { + t.Fatal("expected invalid id error") + } +} + +func TestJobsAddValidationErrors(t *testing.T) { + st := newTestStore(t) + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + cfg := config.DefaultConfig() + cfg.Server.DataDir = t.TempDir() + cfg.Providers = map[string]config.ProviderConfig{} + + reg := provider.NewRegistry() + eng := engine.NewSyncManager(reg, st, download.NewClient(log), cfg, log) + + origStore := globalStore + origRegistry := globalRegistry + origEngine := globalEngine + origCfg := globalCfg + origLogger := logger + t.Cleanup(func() { + globalStore = origStore + globalRegistry = origRegistry + globalEngine = origEngine + globalCfg = origCfg + logger = origLogger + }) + + globalStore = st + globalRegistry = reg + globalEngine = eng + globalCfg = cfg + logger = log + + jobsAddType = "sync" + jobsAddProvider = "missing-provider" + jobsAddCron = "*/5 * * * *" + if err := jobsAddRun(nil, nil); err == nil { + t.Fatal("expected unknown provider error") + } + + mustCreateProviderConfig(t, st, "provider-a", "epel", true) + jobsAddProvider = "provider-a" + jobsAddCron = "invalid cron" + if err := jobsAddRun(nil, nil); err == nil { + t.Fatal("expected invalid cron error") + } +} + +type jobsTestProvider struct { + name string +} + +func (p *jobsTestProvider) Name() string { return p.name } + +func (p *jobsTestProvider) SetName(name string) { p.name = name } + +func (p *jobsTestProvider) Type() string { return "test" } + +func (p *jobsTestProvider) Configure(cfg provider.ProviderConfig) error { return nil } + +func (p *jobsTestProvider) Plan(ctx context.Context) (*provider.SyncPlan, error) { + return &provider.SyncPlan{ + Provider: p.name, + Actions: nil, + TotalSize: 0, + TotalFiles: 0, + Timestamp: time.Now(), + }, nil +} + +func (p *jobsTestProvider) Sync(ctx context.Context, plan *provider.SyncPlan, opts provider.SyncOptions) (*provider.SyncReport, error) { + return &provider.SyncReport{Provider: p.name, StartTime: time.Now(), EndTime: time.Now()}, nil +} + +func (p *jobsTestProvider) Validate(ctx context.Context) (*provider.ValidationReport, error) { + return &provider.ValidationReport{ + Provider: p.name, + TotalFiles: 0, + ValidFiles: 0, + InvalidFiles: nil, + Timestamp: time.Now(), + }, nil +} diff --git a/cmd/airgap/root.go b/cmd/airgap/root.go index 02b25e8..7b62d60 100644 --- a/cmd/airgap/root.go +++ b/cmd/airgap/root.go @@ -251,6 +251,7 @@ registries, and custom file sources.`, newStatusCmd(), newRegistryCmd(), newProvidersCmd(), + newJobsCmd(), newExportCmd(), newImportCmd(), newConfigCmd(), diff --git a/docs/architecture.md b/docs/architecture.md index aff9683..7e1fae5 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -10,6 +10,7 @@ Core packages: - `cmd/airgap`: CLI command wiring - `internal/engine`: sync/export/import orchestration +- `internal/jobsvc`: cron parsing and shared job execution logic - `internal/provider/*`: provider implementations - `internal/store`: SQLite models, migrations, CRUD - `internal/server`: web UI and API handlers @@ -23,6 +24,7 @@ Core packages: 4. Load provider configs from DB. 5. Instantiate enabled providers and register them. 6. Start CLI command execution (or HTTP server for `serve`). +7. In `serve` mode, start scheduler loop if `schedule.enabled=true`. ## Sync Flow @@ -36,6 +38,15 @@ Notes: - Sync/push operations are serialized at server level (`syncRunning` guard). - Progress is tracked through an in-memory `SyncTracker` used by UI polling/SSE paths. +## Job Scheduler Flow + +1. Server loop polls due jobs every 15 seconds from `jobs`. +2. Due jobs are selected by `next_run <= now` and non-paused/non-running status. +3. Scheduler acquires the same operation lock used by sync/push/validate API actions. +4. Job is marked `running`, then executed via `internal/jobsvc`. +5. Job transitions to `completed` or `failed`, updates `last_run`, and advances `next_run`. +6. Paused jobs never auto-run; run-now can execute them once and keep paused state. + ## Transfer Flow ### Export @@ -72,6 +83,7 @@ SQLite tables include: - `sync_runs` - `file_records` - `failed_files` +- `jobs` - `transfers` - `transfer_archives` - `provider_configs` @@ -81,8 +93,9 @@ Migrations are managed in `internal/store/migrations.go`. ## HTTP Surface Server routes include: -- UI pages (`/dashboard`, `/providers`, `/transfer`, `/ocp/clients`) +- UI pages (`/dashboard`, `/providers`, `/jobs`, `/transfer`, `/ocp/clients`) - Sync/status/provider APIs +- Job CRUD and run-control APIs - Provider config CRUD APIs - Transfer APIs - Mirror discovery/speed-test APIs diff --git a/docs/configuration.md b/docs/configuration.md index 649e017..638afef 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -30,6 +30,15 @@ schedule: providers: {} ``` +## Schedule Settings + +- `schedule.enabled`: enables the recurring job scheduler while `airgap serve` is running. +- `schedule.default_cron`: default cron used by `airgap jobs add --cron` when `--cron` is omitted. + +Notes: +- Scheduler execution is serve-only; it does not run from non-server CLI commands. +- Cron format is standard 5-field (`minute hour day month day-of-week`). + ## Provider Config Storage Model At runtime, provider configs are read from SQLite (`provider_configs`), not directly from YAML. @@ -62,7 +71,14 @@ See [configs/airgap.example.yaml](../configs/airgap.example.yaml). ## CLI Config Commands - `airgap config show`: prints effective loaded config -- `airgap config set KEY VALUE`: currently stubbed (does not persist changes) +- `airgap config set KEY VALUE`: persists updates to YAML (typed value parsing) + +`config set` write target resolution: +- explicit `--config` path (if provided) +- discovered config path from file discovery order +- fallback `./airgap.yaml` when no config file exists + +`VALUE` is parsed as YAML, so booleans/numbers/lists/maps are stored as typed values. ## Global CLI Flags diff --git a/docs/http-api.md b/docs/http-api.md index 799b2c3..e17a199 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -9,6 +9,7 @@ Routes are defined in `internal/server/server.go`. - `GET /providers` - `GET /providers/{name}` - `GET /sync` +- `GET /jobs` - `GET /transfer` - `GET /ocp/clients` - `GET /static/*` (embedded static assets) @@ -31,6 +32,16 @@ Routes are defined in `internal/server/server.go`. - `POST /api/sync/failures/resolve` - bulk resolve failures - `POST /api/sync/retry` - retry failed downloads +## Jobs API + +- `GET /api/jobs` - list scheduled jobs +- `POST /api/jobs` - create a job (`type`, `provider`, `cron_expr`) +- `PUT /api/jobs/{id}` - update a job +- `DELETE /api/jobs/{id}` - delete a job +- `POST /api/jobs/{id}/pause` - pause job execution +- `POST /api/jobs/{id}/resume` - resume paused job +- `POST /api/jobs/{id}/run` - run a job immediately + ## Provider Config Management - `GET /api/providers/config` @@ -67,3 +78,4 @@ Routes are defined in `internal/server/server.go`. - Several endpoints support HTMX form requests in addition to JSON. - Long-running sync/push operations are asynchronous and update shared progress state. +- Job scheduler runs in `serve` mode when `schedule.enabled=true`; scheduler poll interval is 15 seconds. diff --git a/internal/jobsvc/cron.go b/internal/jobsvc/cron.go new file mode 100644 index 0000000..6f073c8 --- /dev/null +++ b/internal/jobsvc/cron.go @@ -0,0 +1,211 @@ +package jobsvc + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +const maxNextRunLookaheadMinutes = 60 * 24 * 366 * 5 + +type cronField struct { + any bool + values map[int]struct{} +} + +func (f cronField) matches(v int) bool { + if f.any { + return true + } + _, ok := f.values[v] + return ok +} + +type cronSchedule struct { + minute cronField + hour cronField + dayOfMonth cronField + month cronField + dayOfWeek cronField +} + +func (s *cronSchedule) matches(t time.Time) bool { + minuteMatch := s.minute.matches(t.Minute()) + hourMatch := s.hour.matches(t.Hour()) + monthMatch := s.month.matches(int(t.Month())) + domMatch := s.dayOfMonth.matches(t.Day()) + dowMatch := s.dayOfWeek.matches(int(t.Weekday())) + + dayMatch := false + if !s.dayOfMonth.any && !s.dayOfWeek.any { + // Standard cron semantics: when both DOM and DOW are restricted, + // either field matching is enough. + dayMatch = domMatch || dowMatch + } else { + // If one is wildcard, the other controls matching. + dayMatch = domMatch && dowMatch + } + + return minuteMatch && hourMatch && monthMatch && dayMatch +} + +// ValidateCronExpr validates a 5-field cron expression. +func ValidateCronExpr(expr string) error { + _, err := parseCron(expr) + return err +} + +// NextRun computes the next run time after from, rounded to minute granularity. +func NextRun(expr string, from time.Time) (time.Time, error) { + s, err := parseCron(expr) + if err != nil { + return time.Time{}, err + } + + candidate := from.In(from.Location()).Truncate(time.Minute).Add(time.Minute) + for i := 0; i < maxNextRunLookaheadMinutes; i++ { + if s.matches(candidate) { + return candidate, nil + } + candidate = candidate.Add(time.Minute) + } + + return time.Time{}, fmt.Errorf("unable to find next run for %q within lookahead window", expr) +} + +func parseCron(expr string) (*cronSchedule, error) { + fields := strings.Fields(strings.TrimSpace(expr)) + if len(fields) != 5 { + return nil, fmt.Errorf("cron expression must have 5 fields, got %d", len(fields)) + } + + minute, err := parseCronField(fields[0], 0, 59, false) + if err != nil { + return nil, fmt.Errorf("invalid minute field: %w", err) + } + hour, err := parseCronField(fields[1], 0, 23, false) + if err != nil { + return nil, fmt.Errorf("invalid hour field: %w", err) + } + dayOfMonth, err := parseCronField(fields[2], 1, 31, false) + if err != nil { + return nil, fmt.Errorf("invalid day-of-month field: %w", err) + } + month, err := parseCronField(fields[3], 1, 12, false) + if err != nil { + return nil, fmt.Errorf("invalid month field: %w", err) + } + dayOfWeek, err := parseCronField(fields[4], 0, 6, true) + if err != nil { + return nil, fmt.Errorf("invalid day-of-week field: %w", err) + } + + return &cronSchedule{ + minute: minute, + hour: hour, + dayOfMonth: dayOfMonth, + month: month, + dayOfWeek: dayOfWeek, + }, nil +} + +func parseCronField(raw string, minVal, maxVal int, allowSevenAsSunday bool) (cronField, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return cronField{}, fmt.Errorf("empty field") + } + if raw == "*" { + return cronField{any: true}, nil + } + + field := cronField{values: make(map[int]struct{})} + parts := strings.Split(raw, ",") + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + return cronField{}, fmt.Errorf("empty list item") + } + if err := addCronPart(field.values, p, minVal, maxVal, allowSevenAsSunday); err != nil { + return cronField{}, err + } + } + if len(field.values) == 0 { + return cronField{}, fmt.Errorf("no values parsed") + } + return field, nil +} + +func addCronPart(out map[int]struct{}, part string, minVal, maxVal int, allowSevenAsSunday bool) error { + base := part + step := 1 + + if strings.Contains(part, "/") { + parts := strings.Split(part, "/") + if len(parts) != 2 { + return fmt.Errorf("invalid step expression %q", part) + } + base = strings.TrimSpace(parts[0]) + stepValue, err := strconv.Atoi(strings.TrimSpace(parts[1])) + if err != nil || stepValue <= 0 { + return fmt.Errorf("invalid step value in %q", part) + } + step = stepValue + } + + var start, end int + switch { + case base == "*": + start, end = minVal, maxVal + case strings.Contains(base, "-"): + rangeParts := strings.Split(base, "-") + if len(rangeParts) != 2 { + return fmt.Errorf("invalid range %q", base) + } + var err error + start, err = parseCronValue(strings.TrimSpace(rangeParts[0]), minVal, maxVal, allowSevenAsSunday) + if err != nil { + return err + } + end, err = parseCronValue(strings.TrimSpace(rangeParts[1]), minVal, maxVal, allowSevenAsSunday) + if err != nil { + return err + } + if end < start { + return fmt.Errorf("range end before start in %q", base) + } + default: + value, err := parseCronValue(base, minVal, maxVal, allowSevenAsSunday) + if err != nil { + return err + } + start, end = value, value + } + + for v := start; v <= end; v += step { + normalized := v + if allowSevenAsSunday && normalized == 7 { + normalized = 0 + } + if normalized < minVal || normalized > maxVal { + return fmt.Errorf("value %d out of range [%d,%d]", normalized, minVal, maxVal) + } + out[normalized] = struct{}{} + } + + return nil +} + +func parseCronValue(raw string, minVal, maxVal int, allowSevenAsSunday bool) (int, error) { + value, err := strconv.Atoi(raw) + if err != nil { + return 0, fmt.Errorf("invalid value %q", raw) + } + if allowSevenAsSunday && value == 7 { + value = 0 + } + if value < minVal || value > maxVal { + return 0, fmt.Errorf("value %d out of range [%d,%d]", value, minVal, maxVal) + } + return value, nil +} diff --git a/internal/jobsvc/cron_test.go b/internal/jobsvc/cron_test.go new file mode 100644 index 0000000..060b6b0 --- /dev/null +++ b/internal/jobsvc/cron_test.go @@ -0,0 +1,56 @@ +package jobsvc + +import ( + "testing" + "time" +) + +func TestValidateCronExpr(t *testing.T) { + tests := []struct { + expr string + wantErr bool + }{ + {expr: "*/15 * * * *", wantErr: false}, + {expr: "0 2 * * 0", wantErr: false}, + {expr: "5,35 8-18/2 * * 1-5", wantErr: false}, + {expr: "", wantErr: true}, + {expr: "* * * *", wantErr: true}, + {expr: "61 * * * *", wantErr: true}, + {expr: "* 24 * * *", wantErr: true}, + } + + for _, tt := range tests { + err := ValidateCronExpr(tt.expr) + if tt.wantErr && err == nil { + t.Fatalf("expected error for %q", tt.expr) + } + if !tt.wantErr && err != nil { + t.Fatalf("unexpected error for %q: %v", tt.expr, err) + } + } +} + +func TestNextRun(t *testing.T) { + base := time.Date(2026, 3, 4, 10, 7, 45, 0, time.UTC) + + next, err := NextRun("*/15 * * * *", base) + if err != nil { + t.Fatalf("NextRun failed: %v", err) + } + want := time.Date(2026, 3, 4, 10, 15, 0, 0, time.UTC) + if !next.Equal(want) { + t.Fatalf("unexpected next run: got %s want %s", next, want) + } +} + +func TestProviderNormalization(t *testing.T) { + if got := NormalizeProviderName("all"); got != "" { + t.Fatalf("expected all => empty, got %q", got) + } + if got := NormalizeProviderName(""); got != "" { + t.Fatalf("expected empty => empty, got %q", got) + } + if got := NormalizeProviderName("epel-main"); got != "epel-main" { + t.Fatalf("unexpected provider normalization: %q", got) + } +} diff --git a/internal/jobsvc/service.go b/internal/jobsvc/service.go new file mode 100644 index 0000000..c12888b --- /dev/null +++ b/internal/jobsvc/service.go @@ -0,0 +1,196 @@ +package jobsvc + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/BadgerOps/airgap/internal/engine" + "github.com/BadgerOps/airgap/internal/provider" + "github.com/BadgerOps/airgap/internal/store" +) + +const ( + JobTypeSync = "sync" + JobTypeValidate = "validate" +) + +// JobExecutionResult summarizes an executed job. +type JobExecutionResult struct { + InvalidFiles int + Message string +} + +// NormalizeJobType normalizes job type input. +func NormalizeJobType(t string) string { + return strings.ToLower(strings.TrimSpace(t)) +} + +// ValidateJobType validates allowed job types. +func ValidateJobType(t string) error { + switch NormalizeJobType(t) { + case JobTypeSync, JobTypeValidate: + return nil + default: + return fmt.Errorf("invalid job type %q: must be one of %q, %q", t, JobTypeSync, JobTypeValidate) + } +} + +// NormalizeProviderName normalizes provider input; "all" is represented as empty string in DB. +func NormalizeProviderName(name string) string { + name = strings.TrimSpace(name) + if name == "" || strings.EqualFold(name, "all") { + return "" + } + return name +} + +// DisplayProviderName converts internal provider representation to display text. +func DisplayProviderName(name string) string { + if strings.TrimSpace(name) == "" { + return "all" + } + return name +} + +// ValidateProviderName ensures provider is either "all" (empty) or an existing configured provider. +func ValidateProviderName(st *store.Store, providerName string) error { + providerName = NormalizeProviderName(providerName) + if providerName == "" { + return nil + } + if st == nil { + return fmt.Errorf("store is required to validate provider") + } + if _, err := st.GetProviderConfig(providerName); err != nil { + return fmt.Errorf("unknown provider %q", providerName) + } + return nil +} + +// ExecuteJob executes one sync/validate job. +func ExecuteJob( + ctx context.Context, + eng *engine.SyncManager, + st *store.Store, + job store.Job, + logger *slog.Logger, +) (JobExecutionResult, error) { + if logger == nil { + logger = slog.Default() + } + if eng == nil { + return JobExecutionResult{}, fmt.Errorf("sync engine is required") + } + + jobType := NormalizeJobType(job.Type) + if err := ValidateJobType(jobType); err != nil { + return JobExecutionResult{}, err + } + + providerName := NormalizeProviderName(job.Provider) + result := JobExecutionResult{} + + switch jobType { + case JobTypeSync: + opts := provider.SyncOptions{MaxWorkers: 4} + if providerName == "" { + reports, err := eng.SyncAll(ctx, opts) + result.Message = fmt.Sprintf("sync completed for %d provider(s)", len(reports)) + if err != nil { + return result, fmt.Errorf("sync all failed: %w", err) + } + return result, nil + } + + report, err := eng.SyncProvider(ctx, providerName, opts) + if err != nil { + return result, fmt.Errorf("sync failed for provider %q: %w", providerName, err) + } + result.Message = fmt.Sprintf( + "sync completed for %s (downloaded=%d skipped=%d deleted=%d failed=%d)", + providerName, report.Downloaded, report.Skipped, report.Deleted, len(report.Failed), + ) + return result, nil + + case JobTypeValidate: + if providerName == "" { + reports, err := eng.ValidateAll(ctx) + invalidCount := persistValidationReports(st, reports, logger) + result.InvalidFiles = invalidCount + if err != nil { + return result, fmt.Errorf("validate all failed: %w", err) + } + if invalidCount > 0 { + return result, fmt.Errorf("validation failed: %d invalid file(s)", invalidCount) + } + result.Message = fmt.Sprintf("validation passed for %d provider(s)", len(reports)) + return result, nil + } + + report, err := eng.ValidateProvider(ctx, providerName) + if err != nil { + return result, fmt.Errorf("validation failed for provider %q: %w", providerName, err) + } + invalidCount := persistValidationReport(st, providerName, report, logger) + result.InvalidFiles = invalidCount + if invalidCount > 0 { + return result, fmt.Errorf("validation failed for %q: %d invalid file(s)", providerName, invalidCount) + } + result.Message = fmt.Sprintf("validation passed for %s (%d files)", providerName, report.TotalFiles) + return result, nil + } + + return result, fmt.Errorf("unsupported job type %q", jobType) +} + +func persistValidationReports(st *store.Store, reports map[string]*provider.ValidationReport, logger *slog.Logger) int { + if st == nil { + return 0 + } + total := 0 + for providerName, report := range reports { + if report == nil { + continue + } + total += persistValidationReport(st, providerName, report, logger) + } + return total +} + +func persistValidationReport(st *store.Store, providerName string, report *provider.ValidationReport, logger *slog.Logger) int { + if st == nil || report == nil || len(report.InvalidFiles) == 0 { + return 0 + } + + now := time.Now() + count := 0 + for _, inv := range report.InvalidFiles { + errText := "validation: checksum mismatch" + if strings.EqualFold(inv.Actual, "missing") { + errText = "validation: file missing" + } + rec := &store.FailedFileRecord{ + Provider: providerName, + FilePath: inv.Path, + URL: inv.URL, + DestPath: inv.LocalPath, + ExpectedChecksum: inv.Expected, + ExpectedSize: inv.Size, + Error: errText, + RetryCount: 0, + FirstFailure: now, + LastFailure: now, + Resolved: false, + } + + if err := st.AddFailedFile(rec); err != nil { + logger.Warn("failed to persist validation failure", "provider", providerName, "path", inv.Path, "error", err) + continue + } + count++ + } + return count +} diff --git a/internal/server/job_handlers.go b/internal/server/job_handlers.go new file mode 100644 index 0000000..042062f --- /dev/null +++ b/internal/server/job_handlers.go @@ -0,0 +1,350 @@ +package server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/BadgerOps/airgap/internal/jobsvc" + "github.com/BadgerOps/airgap/internal/store" +) + +type jobJSON struct { + ID int64 `json:"id"` + Type string `json:"type"` + Provider string `json:"provider"` + CronExpr string `json:"cron_expr"` + Status string `json:"status"` + LastRun time.Time `json:"last_run"` + NextRun time.Time `json:"next_run"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type jobRequest struct { + Type string `json:"type"` + Provider string `json:"provider"` + CronExpr string `json:"cron_expr"` + Status string `json:"status"` +} + +func (s *Server) handleJobs(w http.ResponseWriter, r *http.Request) { + providers := []string{} + if s.store != nil { + configs, err := s.store.ListProviderConfigs() + if err != nil { + s.logger.Warn("failed to list providers for jobs page", "error", err) + } else { + for _, cfg := range configs { + providers = append(providers, cfg.Name) + } + sort.Strings(providers) + } + } + + defaultCron := "" + if s.config != nil { + defaultCron = s.config.Schedule.DefaultCron + } + + data := map[string]interface{}{ + "Title": "Jobs", + "Providers": providers, + "DefaultCron": defaultCron, + } + + s.renderTemplate(w, "templates/jobs.html", data) +} + +func (s *Server) handleAPIJobsList(w http.ResponseWriter, r *http.Request) { + jobs, err := s.store.ListJobs("", 0) + if err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + resp := make([]jobJSON, 0, len(jobs)) + for _, j := range jobs { + resp = append(resp, toJobJSON(j)) + } + + w.Header().Set("Content-Type", "application/json") + s.writeJSON(w, resp) +} + +func (s *Server) handleAPIJobsCreate(w http.ResponseWriter, r *http.Request) { + var req jobRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + jsonError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) + return + } + + job, err := s.buildJobFromRequest(nil, req) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + if err := s.store.CreateJob(job); err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + s.writeJSON(w, toJobJSON(*job)) +} + +func (s *Server) handleAPIJobsUpdate(w http.ResponseWriter, r *http.Request) { + id, err := parseJobID(r.PathValue("id")) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + existing, err := s.store.GetJob(id) + if err != nil { + jsonError(w, http.StatusNotFound, "job not found") + return + } + if existing.Status == "running" { + jsonError(w, http.StatusConflict, "cannot update a running job") + return + } + + var req jobRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + jsonError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) + return + } + + job, err := s.buildJobFromRequest(existing, req) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + job.ID = existing.ID + job.CreatedAt = existing.CreatedAt + job.LastRun = existing.LastRun + + if err := s.store.UpdateJob(job); err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + s.writeJSON(w, toJobJSON(*job)) +} + +func (s *Server) handleAPIJobsDelete(w http.ResponseWriter, r *http.Request) { + id, err := parseJobID(r.PathValue("id")) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + job, err := s.store.GetJob(id) + if err != nil { + jsonError(w, http.StatusNotFound, "job not found") + return + } + if job.Status == "running" { + jsonError(w, http.StatusConflict, "cannot delete a running job") + return + } + + if err := s.store.DeleteJob(id); err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleAPIJobsPause(w http.ResponseWriter, r *http.Request) { + id, err := parseJobID(r.PathValue("id")) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + job, err := s.store.GetJob(id) + if err != nil { + jsonError(w, http.StatusNotFound, "job not found") + return + } + if job.Status == "running" { + jsonError(w, http.StatusConflict, "cannot pause a running job") + return + } + + job.Status = "paused" + job.UpdatedAt = time.Now() + if err := s.store.UpdateJob(job); err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + s.writeJSON(w, toJobJSON(*job)) +} + +func (s *Server) handleAPIJobsResume(w http.ResponseWriter, r *http.Request) { + id, err := parseJobID(r.PathValue("id")) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + job, err := s.store.GetJob(id) + if err != nil { + jsonError(w, http.StatusNotFound, "job not found") + return + } + if job.Status == "running" { + jsonError(w, http.StatusConflict, "cannot resume a running job") + return + } + + nextRun, err := jobsvc.NextRun(job.CronExpr, time.Now()) + if err != nil { + jsonError(w, http.StatusBadRequest, "invalid cron expression: "+err.Error()) + return + } + + job.Status = "scheduled" + job.NextRun = nextRun + job.UpdatedAt = time.Now() + if err := s.store.UpdateJob(job); err != nil { + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + s.writeJSON(w, toJobJSON(*job)) +} + +func (s *Server) handleAPIJobsRunNow(w http.ResponseWriter, r *http.Request) { + id, err := parseJobID(r.PathValue("id")) + if err != nil { + jsonError(w, http.StatusBadRequest, err.Error()) + return + } + + job, err := s.store.GetJob(id) + if err != nil { + jsonError(w, http.StatusNotFound, "job not found") + return + } + if job.Status == "running" { + jsonError(w, http.StatusConflict, "job is already running") + return + } + + runCtx, release, err := s.beginOperation(context.Background()) + if err != nil { + if err == errOperationRunning { + jsonError(w, http.StatusConflict, "another operation is already running") + return + } + jsonError(w, http.StatusInternalServerError, err.Error()) + return + } + + preservePaused := job.Status == "paused" + go func(j store.Job) { + defer release() + if runErr := s.executeJobAndPersist(runCtx, &j, preservePaused); runErr != nil { + s.logger.Warn("run-now job failed", "job_id", j.ID, "error", runErr) + } + }(*job) + + w.Header().Set("Content-Type", "application/json") + s.writeJSON(w, map[string]interface{}{ + "status": "started", + "job": toJobJSON(*job), + }) +} + +func (s *Server) buildJobFromRequest(existing *store.Job, req jobRequest) (*store.Job, error) { + job := &store.Job{} + if existing != nil { + *job = *existing + } + + if strings.TrimSpace(req.Type) != "" || existing == nil { + job.Type = jobsvc.NormalizeJobType(req.Type) + } + if err := jobsvc.ValidateJobType(job.Type); err != nil { + return nil, err + } + + if strings.TrimSpace(req.Provider) != "" || existing == nil { + job.Provider = jobsvc.NormalizeProviderName(req.Provider) + } + if err := jobsvc.ValidateProviderName(s.store, job.Provider); err != nil { + return nil, err + } + + if strings.TrimSpace(req.CronExpr) != "" || existing == nil { + job.CronExpr = strings.TrimSpace(req.CronExpr) + } + if err := jobsvc.ValidateCronExpr(job.CronExpr); err != nil { + return nil, err + } + + status := strings.ToLower(strings.TrimSpace(req.Status)) + if status == "" && existing == nil { + status = "scheduled" + } + if status != "" { + switch status { + case "scheduled", "paused", "completed", "failed": + job.Status = status + default: + return nil, fmt.Errorf("invalid status %q", req.Status) + } + } + if strings.TrimSpace(job.Status) == "" { + job.Status = "scheduled" + } + + nextRun, err := jobsvc.NextRun(job.CronExpr, time.Now()) + if err != nil { + return nil, err + } + job.NextRun = nextRun + now := time.Now() + if existing == nil { + job.CreatedAt = now + } + job.UpdatedAt = now + + return job, nil +} + +func parseJobID(raw string) (int64, error) { + id, err := strconv.ParseInt(raw, 10, 64) + if err != nil || id <= 0 { + return 0, fmt.Errorf("invalid job id") + } + return id, nil +} + +func toJobJSON(job store.Job) jobJSON { + return jobJSON{ + ID: job.ID, + Type: job.Type, + Provider: jobsvc.DisplayProviderName(job.Provider), + CronExpr: job.CronExpr, + Status: job.Status, + LastRun: job.LastRun, + NextRun: job.NextRun, + CreatedAt: job.CreatedAt, + UpdatedAt: job.UpdatedAt, + } +} diff --git a/internal/server/job_handlers_test.go b/internal/server/job_handlers_test.go new file mode 100644 index 0000000..8ba1c06 --- /dev/null +++ b/internal/server/job_handlers_test.go @@ -0,0 +1,152 @@ +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/BadgerOps/airgap/internal/store" +) + +func TestHandleAPIJobsCreateRejectsInvalidType(t *testing.T) { + srv := setupTestServer(t) + + reqBody := `{"type":"export","provider":"all","cron_expr":"*/5 * * * *"}` + req := httptest.NewRequest(http.MethodPost, "/api/jobs", bytes.NewBufferString(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + srv.handleAPIJobsCreate(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestHandleAPIJobsCreateAndListNormalizesAllProvider(t *testing.T) { + srv := setupTestServer(t) + + reqBody := `{"type":"sync","provider":"all","cron_expr":"*/5 * * * *"}` + req := httptest.NewRequest(http.MethodPost, "/api/jobs", bytes.NewBufferString(reqBody)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + srv.handleAPIJobsCreate(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + var created jobJSON + if err := json.NewDecoder(w.Body).Decode(&created); err != nil { + t.Fatalf("decode create response failed: %v", err) + } + if created.Provider != "all" { + t.Fatalf("expected provider=all, got %q", created.Provider) + } + + listReq := httptest.NewRequest(http.MethodGet, "/api/jobs", nil) + listW := httptest.NewRecorder() + srv.handleAPIJobsList(listW, listReq) + if listW.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", listW.Code) + } + var jobs []jobJSON + if err := json.NewDecoder(listW.Body).Decode(&jobs); err != nil { + t.Fatalf("decode list response failed: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected 1 job, got %d", len(jobs)) + } + if jobs[0].Provider != "all" { + t.Fatalf("expected list provider=all, got %q", jobs[0].Provider) + } +} + +func TestHandleAPIJobsPauseResumeTransitions(t *testing.T) { + srv := setupTestServer(t) + jobID := mustCreateJob(t, srv, "sync", "", "*/5 * * * *", "scheduled", time.Now().Add(time.Minute)) + + pauseReq := httptest.NewRequest(http.MethodPost, "/api/jobs/"+strconv.FormatInt(jobID, 10)+"/pause", nil) + pauseReq.SetPathValue("id", strconv.FormatInt(jobID, 10)) + pauseW := httptest.NewRecorder() + srv.handleAPIJobsPause(pauseW, pauseReq) + if pauseW.Code != http.StatusOK { + t.Fatalf("pause expected 200, got %d: %s", pauseW.Code, pauseW.Body.String()) + } + + job, err := srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed: %v", err) + } + if job.Status != "paused" { + t.Fatalf("expected paused status, got %q", job.Status) + } + + resumeReq := httptest.NewRequest(http.MethodPost, "/api/jobs/"+strconv.FormatInt(jobID, 10)+"/resume", nil) + resumeReq.SetPathValue("id", strconv.FormatInt(jobID, 10)) + resumeW := httptest.NewRecorder() + srv.handleAPIJobsResume(resumeW, resumeReq) + if resumeW.Code != http.StatusOK { + t.Fatalf("resume expected 200, got %d: %s", resumeW.Code, resumeW.Body.String()) + } + + job, err = srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed after resume: %v", err) + } + if job.Status != "scheduled" { + t.Fatalf("expected scheduled status, got %q", job.Status) + } +} + +func TestHandleAPIJobsRunNowConflictWhenBusy(t *testing.T) { + srv := setupTestServer(t) + jobID := mustCreateJob(t, srv, "sync", "", "*/5 * * * *", "scheduled", time.Now().Add(time.Minute)) + + srv.syncMu.Lock() + srv.syncRunning = true + srv.syncMu.Unlock() + + req := httptest.NewRequest(http.MethodPost, "/api/jobs/"+strconv.FormatInt(jobID, 10)+"/run", nil) + req.SetPathValue("id", strconv.FormatInt(jobID, 10)) + w := httptest.NewRecorder() + srv.handleAPIJobsRunNow(w, req) + + if w.Code != http.StatusConflict { + t.Fatalf("expected 409, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestHandleAPIJobsDeleteRejectsRunning(t *testing.T) { + srv := setupTestServer(t) + jobID := mustCreateJob(t, srv, "sync", "", "*/5 * * * *", "running", time.Now().Add(time.Minute)) + + req := httptest.NewRequest(http.MethodDelete, "/api/jobs/"+strconv.FormatInt(jobID, 10), nil) + req.SetPathValue("id", strconv.FormatInt(jobID, 10)) + w := httptest.NewRecorder() + srv.handleAPIJobsDelete(w, req) + + if w.Code != http.StatusConflict { + t.Fatalf("expected 409, got %d: %s", w.Code, w.Body.String()) + } +} + +func mustCreateJob(t *testing.T, srv *Server, jobType, provider, cronExpr, status string, nextRun time.Time) int64 { + t.Helper() + now := time.Now() + job := &store.Job{ + Type: jobType, + Provider: provider, + CronExpr: cronExpr, + Status: status, + NextRun: nextRun, + CreatedAt: now, + UpdatedAt: now, + } + if err := srv.store.CreateJob(job); err != nil { + t.Fatalf("CreateJob failed: %v", err) + } + return job.ID +} diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go new file mode 100644 index 0000000..949a1a0 --- /dev/null +++ b/internal/server/scheduler.go @@ -0,0 +1,151 @@ +package server + +import ( + "context" + "errors" + "time" + + "github.com/BadgerOps/airgap/internal/jobsvc" + "github.com/BadgerOps/airgap/internal/store" +) + +const schedulerPollInterval = 15 * time.Second + +var errOperationRunning = errors.New("operation already running") + +func (s *Server) startScheduler() { + if s.store == nil || s.engine == nil { + return + } + if s.schedulerCancel != nil { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + s.schedulerCancel = cancel + s.schedulerWG.Add(1) + go func() { + defer s.schedulerWG.Done() + s.schedulerLoop(ctx) + }() + s.logger.Info("job scheduler started", "interval", schedulerPollInterval.String()) +} + +func (s *Server) stopScheduler() { + if s.schedulerCancel == nil { + return + } + s.schedulerCancel() + s.schedulerWG.Wait() + s.schedulerCancel = nil + s.logger.Info("job scheduler stopped") +} + +func (s *Server) schedulerLoop(ctx context.Context) { + ticker := time.NewTicker(schedulerPollInterval) + defer ticker.Stop() + + s.runDueJobs(ctx) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.runDueJobs(ctx) + } + } +} + +func (s *Server) runDueJobs(ctx context.Context) { + jobs, err := s.store.ListDueJobs(time.Now(), 20) + if err != nil { + s.logger.Warn("failed to list due jobs", "error", err) + return + } + + for _, job := range jobs { + select { + case <-ctx.Done(): + return + default: + } + + runCtx, release, err := s.beginOperation(ctx) + if err != nil { + if errors.Is(err, errOperationRunning) { + // A user-triggered sync/validate is in progress. + return + } + s.logger.Warn("failed to begin scheduled job operation", "job_id", job.ID, "error", err) + continue + } + + if runErr := s.executeJobAndPersist(runCtx, &job, false); runErr != nil { + s.logger.Warn("scheduled job execution failed", "job_id", job.ID, "type", job.Type, "provider", jobsvc.DisplayProviderName(job.Provider), "error", runErr) + } + release() + } +} + +// beginOperation acquires the global operation lock. +func (s *Server) beginOperation(parent context.Context) (context.Context, func(), error) { + s.syncMu.Lock() + if s.syncRunning { + s.syncMu.Unlock() + return nil, nil, errOperationRunning + } + + runCtx, cancel := context.WithCancel(parent) + s.syncCancel = cancel + s.syncRunning = true + s.syncMu.Unlock() + + release := func() { + s.syncMu.Lock() + s.syncRunning = false + s.syncCancel = nil + s.syncMu.Unlock() + } + + return runCtx, release, nil +} + +func (s *Server) executeJobAndPersist(ctx context.Context, job *store.Job, preservePaused bool) error { + if job == nil { + return errors.New("job is required") + } + + previousStatus := job.Status + job.Status = "running" + job.UpdatedAt = time.Now() + if err := s.store.UpdateJob(job); err != nil { + return err + } + + _, runErr := jobsvc.ExecuteJob(ctx, s.engine, s.store, *job, s.logger) + finishedAt := time.Now() + + if preservePaused && previousStatus == "paused" { + job.Status = "paused" + } else if runErr != nil { + job.Status = "failed" + } else { + job.Status = "completed" + } + + job.LastRun = finishedAt + nextRun, nextErr := jobsvc.NextRun(job.CronExpr, finishedAt) + if nextErr != nil { + // Cron is validated on write paths; fallback avoids tight-loop retries. + s.logger.Warn("failed to compute next run for job", "job_id", job.ID, "cron_expr", job.CronExpr, "error", nextErr) + job.NextRun = finishedAt.Add(24 * time.Hour) + } else { + job.NextRun = nextRun + } + job.UpdatedAt = finishedAt + if err := s.store.UpdateJob(job); err != nil { + return err + } + + return runErr +} diff --git a/internal/server/scheduler_test.go b/internal/server/scheduler_test.go new file mode 100644 index 0000000..c098188 --- /dev/null +++ b/internal/server/scheduler_test.go @@ -0,0 +1,135 @@ +package server + +import ( + "context" + "testing" + "time" + + "github.com/BadgerOps/airgap/internal/store" +) + +func TestRunDueJobsExecutesDueJobAndAdvancesSchedule(t *testing.T) { + srv := setupTestServer(t) + now := time.Now() + jobID := mustCreateScheduledJob(t, srv, &store.Job{ + Type: "validate", + Provider: "", + CronExpr: "*/5 * * * *", + Status: "scheduled", + NextRun: now.Add(-time.Minute), + CreatedAt: now, + UpdatedAt: now, + }) + + srv.runDueJobs(context.Background()) + + job, err := srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed: %v", err) + } + if job.Status != "completed" { + t.Fatalf("expected completed status, got %q", job.Status) + } + if job.LastRun.IsZero() { + t.Fatal("expected last_run to be set") + } + if !job.NextRun.After(now) { + t.Fatalf("expected next_run to be advanced, got %s", job.NextRun) + } +} + +func TestRunDueJobsSkipsPausedJobs(t *testing.T) { + srv := setupTestServer(t) + now := time.Now() + jobID := mustCreateScheduledJob(t, srv, &store.Job{ + Type: "validate", + Provider: "", + CronExpr: "*/5 * * * *", + Status: "paused", + NextRun: now.Add(-time.Minute), + CreatedAt: now, + UpdatedAt: now, + }) + + srv.runDueJobs(context.Background()) + + job, err := srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed: %v", err) + } + if job.Status != "paused" { + t.Fatalf("expected paused status, got %q", job.Status) + } + if !job.LastRun.IsZero() { + t.Fatal("expected paused job not to run") + } +} + +func TestRunDueJobsBusyOperationDefersExecution(t *testing.T) { + srv := setupTestServer(t) + now := time.Now() + jobID := mustCreateScheduledJob(t, srv, &store.Job{ + Type: "validate", + Provider: "", + CronExpr: "*/5 * * * *", + Status: "scheduled", + NextRun: now.Add(-time.Minute), + CreatedAt: now, + UpdatedAt: now, + }) + + srv.syncMu.Lock() + srv.syncRunning = true + srv.syncMu.Unlock() + + srv.runDueJobs(context.Background()) + + job, err := srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed: %v", err) + } + if job.Status != "scheduled" { + t.Fatalf("expected scheduled status, got %q", job.Status) + } + if !job.LastRun.IsZero() { + t.Fatal("expected job not to run while busy") + } +} + +func TestRunDueJobsFailureSetsFailedAndAdvancesNextRun(t *testing.T) { + srv := setupTestServer(t) + now := time.Now() + jobID := mustCreateScheduledJob(t, srv, &store.Job{ + Type: "validate", + Provider: "missing-provider", + CronExpr: "*/5 * * * *", + Status: "scheduled", + NextRun: now.Add(-time.Minute), + CreatedAt: now, + UpdatedAt: now, + }) + + srv.runDueJobs(context.Background()) + + job, err := srv.store.GetJob(jobID) + if err != nil { + t.Fatalf("GetJob failed: %v", err) + } + if job.Status != "failed" { + t.Fatalf("expected failed status, got %q", job.Status) + } + if job.LastRun.IsZero() { + t.Fatal("expected failed job to update last_run") + } + if !job.NextRun.After(now) { + t.Fatalf("expected next_run to advance even on failure, got %s", job.NextRun) + } +} + +func mustCreateScheduledJob(t *testing.T, srv *Server, job *store.Job) int64 { + t.Helper() + if err := srv.store.CreateJob(job); err != nil { + t.Fatalf("CreateJob failed: %v", err) + } + return job.ID +} diff --git a/internal/server/server.go b/internal/server/server.go index 072ff5c..7b43138 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -41,6 +41,10 @@ type Server struct { syncMu sync.Mutex syncCancel context.CancelFunc syncRunning bool + + // Scheduler lifecycle + schedulerCancel context.CancelFunc + schedulerWG sync.WaitGroup } // SetVersion sets the version string displayed in the UI. @@ -81,6 +85,11 @@ func (s *Server) Start(listenAddr string) error { // Setup routes mux := s.setupRoutes() + // Start scheduler loop (serve mode only) + if s.config != nil && s.config.Schedule.Enabled { + s.startScheduler() + } + // Create and start HTTP server s.httpServer = &http.Server{ Addr: listenAddr, @@ -101,6 +110,8 @@ func (s *Server) Start(listenAddr string) error { // Shutdown gracefully shuts down the HTTP server. func (s *Server) Shutdown(ctx context.Context) error { + s.stopScheduler() + if s.httpServer == nil { return nil } @@ -121,6 +132,7 @@ func (s *Server) parseTemplates() error { "templates/provider_detail.html", "templates/transfer.html", "templates/ocp_clients.html", + "templates/jobs.html", } for _, page := range pages { @@ -165,6 +177,7 @@ func (s *Server) setupRoutes() *http.ServeMux { mux.HandleFunc("GET /providers/{name}", s.handleProviderDetail) mux.HandleFunc("GET /providers", s.handleProviders) mux.HandleFunc("GET /sync", s.handleSync) + mux.HandleFunc("GET /jobs", s.handleJobs) // API routes mux.HandleFunc("GET /api/status", s.handleAPIStatus) @@ -180,6 +193,13 @@ func (s *Server) setupRoutes() *http.ServeMux { mux.HandleFunc("POST /api/sync/failures/resolve", s.handleAPISyncFailuresResolve) mux.HandleFunc("POST /api/sync/retry", s.handleAPISyncRetry) mux.HandleFunc("POST /api/registry/push", s.handleAPIRegistryPush) + mux.HandleFunc("GET /api/jobs", s.handleAPIJobsList) + mux.HandleFunc("POST /api/jobs", s.handleAPIJobsCreate) + mux.HandleFunc("PUT /api/jobs/{id}", s.handleAPIJobsUpdate) + mux.HandleFunc("DELETE /api/jobs/{id}", s.handleAPIJobsDelete) + mux.HandleFunc("POST /api/jobs/{id}/pause", s.handleAPIJobsPause) + mux.HandleFunc("POST /api/jobs/{id}/resume", s.handleAPIJobsResume) + mux.HandleFunc("POST /api/jobs/{id}/run", s.handleAPIJobsRunNow) // Provider config CRUD routes mux.HandleFunc("GET /api/providers/config", s.handleListProviderConfigs) diff --git a/internal/server/template_isolation_test.go b/internal/server/template_isolation_test.go index 33d4477..9c283a6 100644 --- a/internal/server/template_isolation_test.go +++ b/internal/server/template_isolation_test.go @@ -39,6 +39,11 @@ func TestTemplateIsolationEndToEnd(t *testing.T) { mustContain: "providerManager", mustNotContain: "Start Export", }, + { + path: "/jobs", + mustContain: "Create Job", + mustNotContain: "Start Export", + }, } for _, tt := range tests { diff --git a/internal/server/templates/jobs.html b/internal/server/templates/jobs.html new file mode 100644 index 0000000..03d3064 --- /dev/null +++ b/internal/server/templates/jobs.html @@ -0,0 +1,218 @@ +{{define "content"}} +
+
+ +
+

Create Job

+

Schedule recurring sync or validation jobs.

+ +
+
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+
+
+ +
+
+
+

Jobs

+

Auto-refreshes every 10 seconds.

+
+ +
+ +
No jobs configured.
+ +
+ + + + + + + + + + + + + + + + +
IDTypeProviderCronStatusLast RunNext RunActions
+
+
+
+ + +{{end}} diff --git a/internal/server/templates/layout.html b/internal/server/templates/layout.html index 107ab45..12cc502 100644 --- a/internal/server/templates/layout.html +++ b/internal/server/templates/layout.html @@ -647,6 +647,10 @@

AIRGAP

Sync Status + + + Jobs + Transfer diff --git a/internal/store/migrations.go b/internal/store/migrations.go index 8bdd317..5de117f 100644 --- a/internal/store/migrations.go +++ b/internal/store/migrations.go @@ -175,6 +175,12 @@ func (s *Store) migrate() error { ALTER TABLE failed_files ADD COLUMN dest_path TEXT DEFAULT ''; `, }, + { + version: 7, + sql: ` + CREATE INDEX IF NOT EXISTS idx_jobs_status_next_run ON jobs(status, next_run); + `, + }, } // Run pending migrations diff --git a/internal/store/models.go b/internal/store/models.go index 7d1a038..95bfaa9 100644 --- a/internal/store/models.go +++ b/internal/store/models.go @@ -32,10 +32,10 @@ type FileRecord struct { // Job represents a scheduled or completed job type Job struct { ID int64 - Type string // "sync", "validate", "export", "import" - Provider string // empty for "all providers" jobs + Type string // "sync", "validate" + Provider string // empty means "all providers" CronExpr string // for scheduled jobs - Status string // "scheduled", "running", "completed", "failed" + Status string // "scheduled", "paused", "running", "completed", "failed" LastRun time.Time NextRun time.Time CreatedAt time.Time diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index e6963b8..6417f5f 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -3,8 +3,10 @@ package store import ( "database/sql" "encoding/json" + "errors" "fmt" "log/slog" + "time" _ "modernc.org/sqlite" ) @@ -580,6 +582,96 @@ func (s *Store) ListJobs(status string, limit int) ([]Job, error) { return jobs, nil } +// GetJob retrieves a single Job by ID. +func (s *Store) GetJob(id int64) (*Job, error) { + const query = ` + SELECT id, type, provider, cron_expr, status, last_run, next_run, created_at, updated_at + FROM jobs + WHERE id = ? + ` + + job := &Job{} + err := s.db.QueryRow(query, id).Scan( + &job.ID, &job.Type, &job.Provider, &job.CronExpr, &job.Status, + &job.LastRun, &job.NextRun, &job.CreatedAt, &job.UpdatedAt, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("job not found: %d", id) + } + return nil, fmt.Errorf("failed to query job: %w", err) + } + + return job, nil +} + +// DeleteJob deletes a Job by ID. +func (s *Store) DeleteJob(id int64) error { + const query = `DELETE FROM jobs WHERE id = ?` + + result, err := s.db.Exec(query, id) + if err != nil { + return fmt.Errorf("failed to delete job: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("job not found: %d", id) + } + + return nil +} + +// ListDueJobs retrieves jobs that are due for execution. +// It excludes paused and running jobs. +func (s *Store) ListDueJobs(now time.Time, limit int) ([]Job, error) { + query := ` + SELECT id, type, provider, cron_expr, status, last_run, next_run, created_at, updated_at + FROM jobs + WHERE status IN ('scheduled', 'completed', 'failed') + AND next_run IS NOT NULL + AND next_run <= ? + ORDER BY next_run ASC, id ASC + ` + args := []interface{}{now} + + if limit > 0 { + query += " LIMIT ?" + args = append(args, limit) + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query due jobs: %w", err) + } + defer func() { + _ = rows.Close() + }() + + var jobs []Job + for rows.Next() { + job := Job{} + err := rows.Scan( + &job.ID, &job.Type, &job.Provider, &job.CronExpr, &job.Status, + &job.LastRun, &job.NextRun, &job.CreatedAt, &job.UpdatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan due job: %w", err) + } + jobs = append(jobs, job) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating due jobs: %w", err) + } + + return jobs, nil +} + // ============================================================================ // Transfer Operations // ============================================================================ diff --git a/internal/store/sqlite_test.go b/internal/store/sqlite_test.go index aa680ae..9680909 100644 --- a/internal/store/sqlite_test.go +++ b/internal/store/sqlite_test.go @@ -1260,6 +1260,145 @@ func TestListJobsWithLimit(t *testing.T) { } } +func TestGetJob(t *testing.T) { + store := newTestStore(t) + now := time.Now() + + job := &Job{ + Type: "sync", + Provider: "provider-a", + CronExpr: "*/15 * * * *", + Status: "scheduled", + LastRun: now.Add(-time.Hour), + NextRun: now.Add(time.Hour), + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateJob(job); err != nil { + t.Fatalf("CreateJob() failed: %v", err) + } + + got, err := store.GetJob(job.ID) + if err != nil { + t.Fatalf("GetJob() failed: %v", err) + } + if got.ID != job.ID { + t.Fatalf("ID mismatch: got %d want %d", got.ID, job.ID) + } + if got.Type != job.Type || got.Provider != job.Provider || got.CronExpr != job.CronExpr { + t.Fatalf("job mismatch: got %+v want %+v", got, job) + } +} + +func TestGetJobNotFound(t *testing.T) { + store := newTestStore(t) + if _, err := store.GetJob(123456); err == nil { + t.Fatal("expected error for missing job") + } +} + +func TestDeleteJob(t *testing.T) { + store := newTestStore(t) + now := time.Now() + + job := &Job{ + Type: "validate", + Status: "scheduled", + CronExpr: "0 2 * * 0", + NextRun: now.Add(time.Hour), + CreatedAt: now, + UpdatedAt: now, + } + if err := store.CreateJob(job); err != nil { + t.Fatalf("CreateJob() failed: %v", err) + } + + if err := store.DeleteJob(job.ID); err != nil { + t.Fatalf("DeleteJob() failed: %v", err) + } + if _, err := store.GetJob(job.ID); err == nil { + t.Fatal("expected deleted job to be missing") + } +} + +func TestDeleteJobNotFound(t *testing.T) { + store := newTestStore(t) + if err := store.DeleteJob(999999); err == nil { + t.Fatal("expected error for missing job") + } +} + +func TestListDueJobs(t *testing.T) { + store := newTestStore(t) + now := time.Now() + + cases := []Job{ + { + Type: "sync", + Status: "scheduled", + CronExpr: "*/5 * * * *", + NextRun: now.Add(-2 * time.Minute), + CreatedAt: now, + UpdatedAt: now, + }, + { + Type: "validate", + Status: "completed", + CronExpr: "0 * * * *", + NextRun: now.Add(-1 * time.Minute), + CreatedAt: now, + UpdatedAt: now, + }, + { + Type: "sync", + Status: "paused", + CronExpr: "* * * * *", + NextRun: now.Add(-5 * time.Minute), + CreatedAt: now, + UpdatedAt: now, + }, + { + Type: "sync", + Status: "running", + CronExpr: "* * * * *", + NextRun: now.Add(-5 * time.Minute), + CreatedAt: now, + UpdatedAt: now, + }, + { + Type: "sync", + Status: "failed", + CronExpr: "*/10 * * * *", + NextRun: now.Add(time.Minute), + CreatedAt: now, + UpdatedAt: now, + }, + } + + for i := range cases { + j := cases[i] + if err := store.CreateJob(&j); err != nil { + t.Fatalf("CreateJob(%d) failed: %v", i, err) + } + } + + due, err := store.ListDueJobs(now, 0) + if err != nil { + t.Fatalf("ListDueJobs() failed: %v", err) + } + if len(due) != 2 { + t.Fatalf("expected 2 due jobs, got %d", len(due)) + } + if due[0].NextRun.After(due[1].NextRun) { + t.Fatal("due jobs not ordered by next_run ASC") + } + for _, job := range due { + if job.Status == "paused" || job.Status == "running" { + t.Fatalf("unexpected due job status: %s", job.Status) + } + } +} + // ============================================================================ // Transfer Operations Tests // ============================================================================