diff --git a/CLAUDE.md b/CLAUDE.md index cf4efff..ff80e46 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -70,7 +70,8 @@ BATTERIES (opt-in siblings, each nil-safe + a default): audit/ run.Audit Sink + Writer + queryable Memory [P4 ✓] default (skillaudit Storage iface; GORM stays in mort) critic/ two-tier timeout state machine + Escalator [P4] - schedule/ cron runner cores [P4] + schedule/ generic cron Runner (Tick/Loop over a wired [P4 ✓] + Due/Run/Mark/Next; no cron grammar of its own) checkpoint/ CheckpointStore + run.Checkpointer handle [P4 ✓] (throttled Save/Complete/Fail) + Memory (exec wiring=P2 follow-up) budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓] diff --git a/schedule/runner.go b/schedule/runner.go new file mode 100644 index 0000000..2a7d8b4 --- /dev/null +++ b/schedule/runner.go @@ -0,0 +1,103 @@ +// Package schedule is the cron-runner battery: a generic ticker that, each +// interval, asks a store for the jobs whose next-run time has passed, runs each +// one, and stamps its next fire time. It is host-agnostic orchestration — the +// host wires the store (skill.SkillStore.ListDueScheduled / +// persona.Storage.ListScheduledAgents), the run (run.Executor), and the cron +// "next fire" function (a cron library, or skill's schedule parser). The +// battery owns no cron grammar of its own, so it never duplicates the parser. +package schedule + +import ( + "context" + "log/slog" + "time" +) + +// Due is one schedulable job: its id and its cron expression. +type Due struct { + ID string + Cron string +} + +// Runner periodically fires due jobs. Every func field is required except Now +// (defaults to time.Now) and Logger (defaults to slog.Default). Construct the +// struct directly and call Loop (or Tick for a single pass / tests). +type Runner struct { + // Interval is how often Loop checks for due jobs. <= 0 defaults to 1m. + Interval time.Duration + // Due lists the jobs due at now. + Due func(ctx context.Context, now time.Time) ([]Due, error) + // Run executes one job by id. + Run func(ctx context.Context, id string) error + // Mark records that a job ran at ranAt and is next due at nextAt. + Mark func(ctx context.Context, id string, ranAt, nextAt time.Time) error + // Next computes a cron expression's next fire after a given time. + Next func(cron string, after time.Time) (time.Time, error) + + Now func() time.Time + Logger *slog.Logger +} + +func (r *Runner) now() time.Time { + if r.Now != nil { + return r.Now() + } + return time.Now() +} + +func (r *Runner) log() *slog.Logger { + if r.Logger != nil { + return r.Logger + } + return slog.Default() +} + +// Tick runs one pass: every currently-due job is run, then stamped with its +// next fire time. A job whose Run or Next errors is logged and skipped (its +// next-run time is left unchanged so it stays due and retries next tick) — one +// bad job never stalls the others. Returns the error from Due (the only +// pass-fatal step). +func (r *Runner) Tick(ctx context.Context) error { + now := r.now() + due, err := r.Due(ctx, now) + if err != nil { + return err + } + for _, j := range due { + if err := r.Run(ctx, j.ID); err != nil { + r.log().Warn("scheduled job failed; will retry next tick", "job", j.ID, "error", err) + continue + } + next, err := r.Next(j.Cron, now) + if err != nil { + r.log().Warn("scheduled job has an unparseable cron; not rescheduling", "job", j.ID, "cron", j.Cron, "error", err) + continue + } + if err := r.Mark(ctx, j.ID, now, next); err != nil { + r.log().Warn("failed to stamp scheduled job's next run", "job", j.ID, "error", err) + } + } + return nil +} + +// Loop ticks every Interval until ctx is cancelled. A Tick error (the Due +// lister failing) is logged and the loop continues — a transient store hiccup +// shouldn't kill the scheduler. +func (r *Runner) Loop(ctx context.Context) { + interval := r.Interval + if interval <= 0 { + interval = time.Minute + } + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := r.Tick(ctx); err != nil { + r.log().Warn("schedule tick failed", "error", err) + } + } + } +} diff --git a/schedule/runner_test.go b/schedule/runner_test.go new file mode 100644 index 0000000..bbde4fc --- /dev/null +++ b/schedule/runner_test.go @@ -0,0 +1,82 @@ +package schedule + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestTickRunsDueAndStampsNext(t *testing.T) { + ctx := context.Background() + now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC) + var ran []string + marked := map[string]time.Time{} + + r := &Runner{ + Now: func() time.Time { return now }, + Due: func(_ context.Context, _ time.Time) ([]Due, error) { + return []Due{{ID: "a", Cron: "hourly"}, {ID: "b", Cron: "bad"}}, nil + }, + Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil }, + Mark: func(_ context.Context, id string, _, next time.Time) error { marked[id] = next; return nil }, + Next: func(cron string, after time.Time) (time.Time, error) { + if cron == "bad" { + return time.Time{}, errors.New("unparseable") + } + return after.Add(time.Hour), nil + }, + } + if err := r.Tick(ctx); err != nil { + t.Fatal(err) + } + // Both ran; only the parseable one got a next stamp. + if len(ran) != 2 { + t.Errorf("ran = %v, want both", ran) + } + if marked["a"] != now.Add(time.Hour) { + t.Errorf("a next = %v, want +1h", marked["a"]) + } + if _, ok := marked["b"]; ok { + t.Errorf("b should not be stamped (bad cron), got %v", marked["b"]) + } +} + +func TestTickRunFailureDoesNotStampOrStall(t *testing.T) { + ctx := context.Background() + var ran []string + marked := map[string]bool{} + r := &Runner{ + Due: func(_ context.Context, _ time.Time) ([]Due, error) { + return []Due{{ID: "x", Cron: "h"}, {ID: "y", Cron: "h"}}, nil + }, + Run: func(_ context.Context, id string) error { + ran = append(ran, id) + if id == "x" { + return errors.New("boom") + } + return nil + }, + Mark: func(_ context.Context, id string, _, _ time.Time) error { marked[id] = true; return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil }, + } + if err := r.Tick(ctx); err != nil { + t.Fatal(err) + } + if len(ran) != 2 { // y still runs despite x failing + t.Errorf("ran = %v, want both attempted", ran) + } + if marked["x"] { // failed job NOT stamped -> stays due, retries + t.Error("failed job x should not be stamped") + } + if !marked["y"] { + t.Error("y should be stamped") + } +} + +func TestTickDueErrorIsFatalToPass(t *testing.T) { + r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }} + if err := r.Tick(context.Background()); err == nil { + t.Error("Tick should surface the Due lister error") + } +}