P4: schedule battery — generic cron Runner
A host-agnostic ticker (Tick = one pass; Loop = run on an interval until ctx done) that fires due jobs. Every dependency is wired by the host: - Due lists due jobs (skill.ListDueScheduled / persona.ListScheduledAgents), - Run executes one (run.Executor), - Mark stamps the next fire (store.MarkScheduledRun), - Next computes the cron next-fire (a cron lib / skill's parser). The battery owns NO cron grammar, so it never duplicates the parser. A job whose Run or Next errors is logged and left un-stamped (stays due, retries next tick) — one bad job can't stall the others; only a failing Due lister is pass-fatal. Tests: due jobs run + stamped, bad-cron job runs but isn't stamped, a failing Run doesn't stamp or stall siblings, Due error surfaces. Core imports ZERO. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -70,7 +70,8 @@ BATTERIES (opt-in siblings, each nil-safe + a default):
|
||||
audit/ run.Audit Sink + Writer + queryable Memory [P4 ✓]
|
||||
default (skillaudit Storage iface; GORM stays in mort)
|
||||
critic/ two-tier timeout state machine + Escalator [P4]
|
||||
schedule/ cron runner cores [P4]
|
||||
schedule/ generic cron Runner (Tick/Loop over a wired [P4 ✓]
|
||||
Due/Run/Mark/Next; no cron grammar of its own)
|
||||
checkpoint/ CheckpointStore + run.Checkpointer handle [P4 ✓]
|
||||
(throttled Save/Complete/Fail) + Memory (exec wiring=P2 follow-up)
|
||||
budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓]
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
// Package schedule is the cron-runner battery: a generic ticker that, each
|
||||
// interval, asks a store for the jobs whose next-run time has passed, runs each
|
||||
// one, and stamps its next fire time. It is host-agnostic orchestration — the
|
||||
// host wires the store (skill.SkillStore.ListDueScheduled /
|
||||
// persona.Storage.ListScheduledAgents), the run (run.Executor), and the cron
|
||||
// "next fire" function (a cron library, or skill's schedule parser). The
|
||||
// battery owns no cron grammar of its own, so it never duplicates the parser.
|
||||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Due is one schedulable job: its id and its cron expression.
|
||||
type Due struct {
|
||||
ID string
|
||||
Cron string
|
||||
}
|
||||
|
||||
// Runner periodically fires due jobs. Every func field is required except Now
|
||||
// (defaults to time.Now) and Logger (defaults to slog.Default). Construct the
|
||||
// struct directly and call Loop (or Tick for a single pass / tests).
|
||||
type Runner struct {
|
||||
// Interval is how often Loop checks for due jobs. <= 0 defaults to 1m.
|
||||
Interval time.Duration
|
||||
// Due lists the jobs due at now.
|
||||
Due func(ctx context.Context, now time.Time) ([]Due, error)
|
||||
// Run executes one job by id.
|
||||
Run func(ctx context.Context, id string) error
|
||||
// Mark records that a job ran at ranAt and is next due at nextAt.
|
||||
Mark func(ctx context.Context, id string, ranAt, nextAt time.Time) error
|
||||
// Next computes a cron expression's next fire after a given time.
|
||||
Next func(cron string, after time.Time) (time.Time, error)
|
||||
|
||||
Now func() time.Time
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
func (r *Runner) now() time.Time {
|
||||
if r.Now != nil {
|
||||
return r.Now()
|
||||
}
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (r *Runner) log() *slog.Logger {
|
||||
if r.Logger != nil {
|
||||
return r.Logger
|
||||
}
|
||||
return slog.Default()
|
||||
}
|
||||
|
||||
// Tick runs one pass: every currently-due job is run, then stamped with its
|
||||
// next fire time. A job whose Run or Next errors is logged and skipped (its
|
||||
// next-run time is left unchanged so it stays due and retries next tick) — one
|
||||
// bad job never stalls the others. Returns the error from Due (the only
|
||||
// pass-fatal step).
|
||||
func (r *Runner) Tick(ctx context.Context) error {
|
||||
now := r.now()
|
||||
due, err := r.Due(ctx, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, j := range due {
|
||||
if err := r.Run(ctx, j.ID); err != nil {
|
||||
r.log().Warn("scheduled job failed; will retry next tick", "job", j.ID, "error", err)
|
||||
continue
|
||||
}
|
||||
next, err := r.Next(j.Cron, now)
|
||||
if err != nil {
|
||||
r.log().Warn("scheduled job has an unparseable cron; not rescheduling", "job", j.ID, "cron", j.Cron, "error", err)
|
||||
continue
|
||||
}
|
||||
if err := r.Mark(ctx, j.ID, now, next); err != nil {
|
||||
r.log().Warn("failed to stamp scheduled job's next run", "job", j.ID, "error", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Loop ticks every Interval until ctx is cancelled. A Tick error (the Due
|
||||
// lister failing) is logged and the loop continues — a transient store hiccup
|
||||
// shouldn't kill the scheduler.
|
||||
func (r *Runner) Loop(ctx context.Context) {
|
||||
interval := r.Interval
|
||||
if interval <= 0 {
|
||||
interval = time.Minute
|
||||
}
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if err := r.Tick(ctx); err != nil {
|
||||
r.log().Warn("schedule tick failed", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTickRunsDueAndStampsNext(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
var ran []string
|
||||
marked := map[string]time.Time{}
|
||||
|
||||
r := &Runner{
|
||||
Now: func() time.Time { return now },
|
||||
Due: func(_ context.Context, _ time.Time) ([]Due, error) {
|
||||
return []Due{{ID: "a", Cron: "hourly"}, {ID: "b", Cron: "bad"}}, nil
|
||||
},
|
||||
Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil },
|
||||
Mark: func(_ context.Context, id string, _, next time.Time) error { marked[id] = next; return nil },
|
||||
Next: func(cron string, after time.Time) (time.Time, error) {
|
||||
if cron == "bad" {
|
||||
return time.Time{}, errors.New("unparseable")
|
||||
}
|
||||
return after.Add(time.Hour), nil
|
||||
},
|
||||
}
|
||||
if err := r.Tick(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Both ran; only the parseable one got a next stamp.
|
||||
if len(ran) != 2 {
|
||||
t.Errorf("ran = %v, want both", ran)
|
||||
}
|
||||
if marked["a"] != now.Add(time.Hour) {
|
||||
t.Errorf("a next = %v, want +1h", marked["a"])
|
||||
}
|
||||
if _, ok := marked["b"]; ok {
|
||||
t.Errorf("b should not be stamped (bad cron), got %v", marked["b"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestTickRunFailureDoesNotStampOrStall(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
var ran []string
|
||||
marked := map[string]bool{}
|
||||
r := &Runner{
|
||||
Due: func(_ context.Context, _ time.Time) ([]Due, error) {
|
||||
return []Due{{ID: "x", Cron: "h"}, {ID: "y", Cron: "h"}}, nil
|
||||
},
|
||||
Run: func(_ context.Context, id string) error {
|
||||
ran = append(ran, id)
|
||||
if id == "x" {
|
||||
return errors.New("boom")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
Mark: func(_ context.Context, id string, _, _ time.Time) error { marked[id] = true; return nil },
|
||||
Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil },
|
||||
}
|
||||
if err := r.Tick(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(ran) != 2 { // y still runs despite x failing
|
||||
t.Errorf("ran = %v, want both attempted", ran)
|
||||
}
|
||||
if marked["x"] { // failed job NOT stamped -> stays due, retries
|
||||
t.Error("failed job x should not be stamped")
|
||||
}
|
||||
if !marked["y"] {
|
||||
t.Error("y should be stamped")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTickDueErrorIsFatalToPass(t *testing.T) {
|
||||
r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }}
|
||||
if err := r.Tick(context.Background()); err == nil {
|
||||
t.Error("Tick should surface the Due lister error")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user