From dc2d4ec42595a1ff4296aa3bc7f64bae12319781 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Sat, 27 Jun 2026 00:15:32 -0400 Subject: [PATCH] =?UTF-8?q?P4c:=20remaining=20batteries=20=E2=80=94=20chec?= =?UTF-8?q?kpoint=20+=20schedule=20+=20critic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the P4 battery set (squashed onto main from phase-4c-batteries). - checkpoint/: run.Checkpointer durable-resume (CheckpointStore + throttled handle + Memory). - schedule/: generic cron Runner (Tick/Loop; no cron grammar of its own). - critic/: two-tier timeout watchdog (run.Critic) + Escalator policy seam + ExtendOnce default. Includes the verified gadfly #6 fixes (ExtendOnce per-run, Kill-sticky, watch panic-recovery; checkpoint throttle-after-success; schedule Next-before-Run + nil-guard + Loop recovery). P4 battery set complete: audit, budget, persona, skill, checkpoint, schedule, critic — each nil-safe, each with a default, each core-import-clean. Executor wiring for Critic/Checkpointer remains a P2 follow-up. Co-Authored-By: Claude Opus 4.8 (1M context) --- checkpoint/checkpoint.go | 50 +++++++ checkpoint/checkpoint_test.go | 64 ++++++++ checkpoint/handle.go | 83 +++++++++++ checkpoint/memory.go | 55 +++++++ critic/critic.go | 266 ++++++++++++++++++++++++++++++++++ critic/critic_test.go | 100 +++++++++++++ schedule/runner.go | 132 +++++++++++++++++ schedule/runner_test.go | 111 ++++++++++++++ 8 files changed, 861 insertions(+) create mode 100644 checkpoint/checkpoint.go create mode 100644 checkpoint/checkpoint_test.go create mode 100644 checkpoint/handle.go create mode 100644 checkpoint/memory.go create mode 100644 critic/critic.go create mode 100644 critic/critic_test.go create mode 100644 schedule/runner.go create mode 100644 schedule/runner_test.go diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go new file mode 100644 index 0000000..64d17cf --- /dev/null +++ b/checkpoint/checkpoint.go @@ -0,0 +1,50 @@ +// Package checkpoint is the durable-resume battery: it persists a run's +// resumable progress so a run interrupted by a shutdown can be recovered and +// continued on the next boot, rather than silently lost. It plugs into +// run.Ports.Checkpointer. +// +// Mort backs CheckpointStore with its durable-job table; Memory() is the +// zero-dependency default; contrib/store can add a SQLite one. NOTE: the +// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery +// provides the seam + impls ahead of that wiring. +package checkpoint + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// RunCheckpointMeta is the run attribution needed to resume a run from scratch +// (mirrors mort's agentexec.RunCheckpointMeta). +type RunCheckpointMeta struct { + RunID string + AgentID string + AgentName string + CallerID string + ChannelID string + GuildID string + Prompt string + ModelTier string + ParentRunID string +} + +// RunCheckpoint is one persisted snapshot of a run's resumable progress. +type RunCheckpoint struct { + Meta RunCheckpointMeta + Messages []llm.Message // conversation so far + Iteration int // completed agent-loop iterations + ActivePhase string // current phase name (multi-phase agents); "" otherwise + UpdatedAt time.Time +} + +// CheckpointStore persists run checkpoints keyed by run id. A live checkpoint +// means "this run was in flight and not cleanly finished"; Complete/Fail delete +// it. ListInterrupted returns every surviving checkpoint at boot for recovery. +type CheckpointStore interface { + Save(ctx context.Context, cp RunCheckpoint) error + Load(ctx context.Context, runID string) (*RunCheckpoint, error) + Delete(ctx context.Context, runID string) error + ListInterrupted(ctx context.Context) ([]RunCheckpoint, error) +} diff --git a/checkpoint/checkpoint_test.go b/checkpoint/checkpoint_test.go new file mode 100644 index 0000000..6eb747d --- /dev/null +++ b/checkpoint/checkpoint_test.go @@ -0,0 +1,64 @@ +package checkpoint + +import ( + "context" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +func TestHandleSaveCompleteDelete(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + meta := RunCheckpointMeta{RunID: "r1", AgentID: "a1", CallerID: "c1"} + cp := New(mem, meta, 0, nil) // throttle 0 = save every call + + if err := cp.Save(ctx, run.RunCheckpointState{Messages: []llm.Message{{Role: "user"}}, Iteration: 2}); err != nil { + t.Fatal(err) + } + got, _ := mem.Load(ctx, "r1") + if got == nil || got.Iteration != 2 || got.Meta.AgentID != "a1" { + t.Fatalf("checkpoint not persisted: %+v", got) + } + if il, _ := mem.ListInterrupted(ctx); len(il) != 1 { + t.Errorf("ListInterrupted = %d, want 1 (in-flight)", len(il)) + } + // Complete clears it (no longer a recovery candidate). + if err := cp.Complete(ctx); err != nil { + t.Fatal(err) + } + if il, _ := mem.ListInterrupted(ctx); len(il) != 0 { + t.Errorf("after Complete, ListInterrupted = %d, want 0", len(il)) + } +} + +func TestHandleThrottle(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + now := time.Now() + cp := New(mem, RunCheckpointMeta{RunID: "r"}, time.Minute, func() time.Time { return now }) + + cp.Save(ctx, run.RunCheckpointState{Iteration: 1}) + now = now.Add(10 * time.Second) // within throttle window + cp.Save(ctx, run.RunCheckpointState{Iteration: 2}) + if got, _ := mem.Load(ctx, "r"); got.Iteration != 1 { + t.Errorf("throttled save should keep iteration 1, got %d", got.Iteration) + } + now = now.Add(time.Minute) // past throttle + cp.Save(ctx, run.RunCheckpointState{Iteration: 3}) + if got, _ := mem.Load(ctx, "r"); got.Iteration != 3 { + t.Errorf("post-throttle save should land iteration 3, got %d", got.Iteration) + } +} + +func TestNilStoreNoop(t *testing.T) { + cp := New(nil, RunCheckpointMeta{RunID: "r"}, 0, nil) + if err := cp.Save(context.Background(), run.RunCheckpointState{}); err != nil { + t.Errorf("nil-store Save should be a no-op, got %v", err) + } + if err := cp.Complete(context.Background()); err != nil { + t.Error(err) + } +} diff --git a/checkpoint/handle.go b/checkpoint/handle.go new file mode 100644 index 0000000..db2d8ba --- /dev/null +++ b/checkpoint/handle.go @@ -0,0 +1,83 @@ +package checkpoint + +import ( + "context" + "sync" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// handle is a per-run run.Checkpointer bound to one run's id + meta. Save writes +// a fresh snapshot (throttled), Complete/Fail delete the checkpoint (a cleanly +// finished or terminally failed run is NOT a recovery candidate). A run +// interrupted by shutdown never calls Complete/Fail, so its checkpoint survives +// for ListInterrupted at boot. +type handle struct { + store CheckpointStore + meta RunCheckpointMeta + throttle time.Duration + now func() time.Time + + mu sync.Mutex + lastSave time.Time +} + +var _ run.Checkpointer = (*handle)(nil) + +// New returns a run.Checkpointer that persists snapshots of the run identified +// by meta.RunID to store, no more often than throttle (Save calls inside the +// window are skipped). A nil store yields a no-op Checkpointer. throttle <= 0 +// saves every call; now defaults to time.Now. +func New(store CheckpointStore, meta RunCheckpointMeta, throttle time.Duration, now func() time.Time) run.Checkpointer { + if store == nil { + return noop{} + } + if now == nil { + now = time.Now + } + return &handle{store: store, meta: meta, throttle: throttle, now: now} +} + +func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error { + h.mu.Lock() + now := h.now() + if h.throttle > 0 && !h.lastSave.IsZero() && now.Sub(h.lastSave) < h.throttle { + h.mu.Unlock() + return nil // throttled — a more recent snapshot will land shortly + } + h.mu.Unlock() + + // Advance the throttle clock only AFTER a successful persist. If the store + // write fails, lastSave stays put so the next Save isn't throttled away — + // otherwise a transient store error would silently drop the snapshot the + // caller believes was saved. (A run drives one Save goroutine, so the brief + // unguarded window here can't double-write.) + if err := h.store.Save(ctx, RunCheckpoint{ + Meta: h.meta, + Messages: st.Messages, + Iteration: st.Iteration, + UpdatedAt: now, + }); err != nil { + return err + } + h.mu.Lock() + if now.After(h.lastSave) { + h.lastSave = now + } + h.mu.Unlock() + return nil +} + +func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) } + +func (h *handle) Fail(ctx context.Context, _ error) error { return h.store.Delete(ctx, h.meta.RunID) } + +// noop is the nil-store Checkpointer: every method is a successful no-op. +type noop struct{} + +var _ run.Checkpointer = noop{} + +func (noop) Save(context.Context, run.RunCheckpointState) error { return nil } +func (noop) Complete(context.Context) error { return nil } +func (noop) Fail(context.Context, error) error { return nil } diff --git a/checkpoint/memory.go b/checkpoint/memory.go new file mode 100644 index 0000000..3572e35 --- /dev/null +++ b/checkpoint/memory.go @@ -0,0 +1,55 @@ +package checkpoint + +import ( + "context" + "sync" +) + +// Memory is a zero-dependency in-process CheckpointStore. NOTE: an in-memory +// checkpoint store does NOT survive the process restart it exists to recover +// from — it is the test/light-host default and makes ListInterrupted meaningful +// only within a single process lifetime. A host that wants real +// crash-recovery wires a durable CheckpointStore (mort's durable-job table). +type Memory struct { + mu sync.RWMutex + cps map[string]RunCheckpoint // by run id +} + +// NewMemory returns an empty in-memory CheckpointStore. +func NewMemory() *Memory { return &Memory{cps: map[string]RunCheckpoint{}} } + +var _ CheckpointStore = (*Memory)(nil) + +func (m *Memory) Save(_ context.Context, cp RunCheckpoint) error { + m.mu.Lock() + defer m.mu.Unlock() + m.cps[cp.Meta.RunID] = cp + return nil +} + +func (m *Memory) Load(_ context.Context, runID string) (*RunCheckpoint, error) { + m.mu.RLock() + defer m.mu.RUnlock() + cp, ok := m.cps[runID] + if !ok { + return nil, nil // no checkpoint (not an error — the run finished cleanly or never started) + } + return &cp, nil +} + +func (m *Memory) Delete(_ context.Context, runID string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.cps, runID) + return nil +} + +func (m *Memory) ListInterrupted(_ context.Context) ([]RunCheckpoint, error) { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]RunCheckpoint, 0, len(m.cps)) + for _, cp := range m.cps { + out = append(out, cp) + } + return out, nil +} diff --git a/critic/critic.go b/critic/critic.go new file mode 100644 index 0000000..ceb5445 --- /dev/null +++ b/critic/critic.go @@ -0,0 +1,266 @@ +// Package critic is the run-watchdog battery: a two-tier timeout monitor that +// catches a run that has stopped making progress. It plugs into +// run.Ports.Critic. +// +// The split of concerns is deliberate. executus owns the deterministic +// MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill +// backstop, carry steer messages and the extendable deadline back to the +// executor. The POLICY — what to actually do when a run stalls (nudge it, +// extend its deadline, kill it, escalate to a human) — is the Escalator seam. +// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the +// zero-dependency default. +// +// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this +// battery provides the seam + impl ahead of that wiring. +package critic + +import ( + "context" + "log/slog" + "sync" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// Progress is the snapshot the critic hands an Escalator when a run stalls. +type Progress struct { + Iterations int // completed agent-loop iterations so far + LastActivity time.Time // wall-clock of the last step/tool event + Idle time.Duration // now - LastActivity + LastTool string // name of the most recently started tool ("" if none) +} + +// Decision is the Escalator's verdict for a stalled run. Zero value = do +// nothing (let the hard backstop eventually kill a truly hung run). +type Decision struct { + Nudge []llm.Message // injected before the agent's next turn (a steer) + ExtendBy time.Duration // push the hard deadline out by this much + Kill bool // cancel the run now + KillReason string +} + +// Escalator decides what to do when a run crosses its soft timeout. It is +// called at most once per idle period (a fresh step/tool event re-arms it). +type Escalator interface { + OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision +} + +// ExtendOnce is the default Escalator: the first time a given run stalls it +// extends that run's deadline by By (giving a slow-but-healthy run room), then +// takes no further action for it — so a genuinely hung run is later killed by +// the hard backstop. A nil/zero By falls back to one soft-timeout's worth. +// +// The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one +// ExtendOnce across every run it monitors, so a global flag would let only the +// first run to stall ever get its extension. The fired set grows with the +// number of distinct runs that stall — fine for a process's run volume; a host +// running unboundedly long can construct a fresh System periodically. +type ExtendOnce struct { + By time.Duration + + mu sync.Mutex + fired map[string]bool // run ids that have already had their one extension +} + +// OnSoftTimeout implements Escalator. +func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision { + e.mu.Lock() + defer e.mu.Unlock() + if e.fired[info.RunID] { + return Decision{} + } + if e.fired == nil { + e.fired = map[string]bool{} + } + e.fired[info.RunID] = true + by := e.By + if by <= 0 { + by = p.Idle // ~one soft timeout + } + return Decision{ExtendBy: by} +} + +// System implements run.Critic. Construct with New; one System monitors many +// runs concurrently (each Monitor returns an independent handle). +type System struct { + esc Escalator + backstopMul float64 // hard deadline = softTimeout * backstopMul from start + checkInterval time.Duration + now func() time.Time + logger *slog.Logger +} + +func (s *System) log() *slog.Logger { + if s.logger != nil { + return s.logger + } + return slog.Default() +} + +// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is +// the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A +// nil esc + the default backstop gives a safe "extend once, then hard-kill" +// watchdog with no host wiring. +func New(esc Escalator, backstopMul float64) *System { + if esc == nil { + esc = &ExtendOnce{} + } + if backstopMul <= 1 { + backstopMul = 3 + } + return &System{esc: esc, backstopMul: backstopMul, now: time.Now} +} + +var _ run.Critic = (*System)(nil) + +// Monitor starts watching a run and returns its handle. Implements run.Critic. +func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle { + if softTimeout <= 0 { + return run.CriticHandle(nil) // no soft timeout → not monitored + } + now := s.now() + check := s.checkInterval + if check <= 0 { + check = softTimeout / 2 + if check < time.Second { + check = time.Second + } + } + h := &handle{ + sys: s, + info: info, + softTimeout: softTimeout, + now: s.now, + lastActivity: now, + deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)), + stopCh: make(chan struct{}), + } + go h.watch(ctx, check) + return h +} + +// handle is one run's live critic link. Implements run.CriticHandle. +type handle struct { + sys *System + info run.RunInfo + softTimeout time.Duration + now func() time.Time + + mu sync.Mutex + lastActivity time.Time + escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period) + deadline time.Time + steer []llm.Message + iterations int + lastTool string + killed bool // sticky: once an Escalator kills, no later decision un-kills it + stopped bool + stopCh chan struct{} +} + +func (h *handle) RecordStep(iter int) { + h.mu.Lock() + h.iterations = iter + h.lastActivity = h.now() + h.mu.Unlock() +} + +func (h *handle) RecordToolStart(name, _ string) { + h.mu.Lock() + h.lastTool = name + h.lastActivity = h.now() + h.mu.Unlock() +} + +func (h *handle) Steer() []llm.Message { + h.mu.Lock() + defer h.mu.Unlock() + if len(h.steer) == 0 { + return nil + } + out := h.steer + h.steer = nil + return out +} + +func (h *handle) Deadline() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + return h.deadline +} + +func (h *handle) Stop() { + h.mu.Lock() + if !h.stopped { + h.stopped = true + close(h.stopCh) + } + h.mu.Unlock() +} + +// watch fires the Escalator once per idle period the run crosses its soft +// timeout, and applies the returned Decision. +func (h *handle) watch(ctx context.Context, interval time.Duration) { + // A misbehaving Escalator that panics must not silently kill the watch + // goroutine (which would leave the run unmonitored for its lifetime). Log + // and exit cleanly — the run falls back to the deadline already set. + defer func() { + if r := recover(); r != nil { + h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r) + } + }() + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-h.stopCh: + return + case <-ctx.Done(): + return + case <-t.C: + h.tick(ctx) + } + } +} + +func (h *handle) tick(ctx context.Context) { + h.mu.Lock() + // Kill is sticky: once an Escalator has killed this run, no later tick (and + // no later Decision) un-collapses the deadline. + if h.killed { + h.mu.Unlock() + return + } + idle := h.now().Sub(h.lastActivity) + // Only escalate once per idle period: skip if we already escalated for this + // exact lastActivity (a fresh step/tool updates lastActivity and re-arms). + if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) { + h.mu.Unlock() + return + } + h.escalatedAt = h.lastActivity + snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool} + h.mu.Unlock() + + d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap) + + h.mu.Lock() + defer h.mu.Unlock() + if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran + return + } + if d.Kill { + h.killed = true + h.deadline = h.now() // immediate hard deadline → executor cancels + return // ignore any Nudge/ExtendBy paired with a Kill + } + if len(d.Nudge) > 0 { + h.steer = append(h.steer, d.Nudge...) + } + if d.ExtendBy > 0 { + h.deadline = h.deadline.Add(d.ExtendBy) + } +} diff --git a/critic/critic_test.go b/critic/critic_test.go new file mode 100644 index 0000000..e9b4b2a --- /dev/null +++ b/critic/critic_test.go @@ -0,0 +1,100 @@ +package critic + +import ( + "context" + "sync" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// escFunc adapts a func to an Escalator. +type escFunc func(context.Context, run.RunInfo, Progress) Decision + +func (f escFunc) OnSoftTimeout(ctx context.Context, i run.RunInfo, p Progress) Decision { + return f(ctx, i, p) +} + +func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) { + var mu sync.Mutex + var calls int + esc := escFunc(func(_ context.Context, _ run.RunInfo, p Progress) Decision { + mu.Lock() + calls++ + mu.Unlock() + return Decision{ExtendBy: 50 * time.Millisecond, Nudge: []llm.Message{{Role: llm.RoleUser}}} + }) + s := New(esc, 3) + s.checkInterval = 5 * time.Millisecond + h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond) + defer h.Stop() + + d0 := h.Deadline() + time.Sleep(60 * time.Millisecond) // cross the soft timeout with no activity + mu.Lock() + c := calls + mu.Unlock() + if c < 1 { + t.Fatalf("expected at least one escalation, got %d", c) + } + // Nudge was queued and is drained once. + if msgs := h.Steer(); len(msgs) == 0 { + t.Error("expected a queued steer nudge") + } + if msgs := h.Steer(); len(msgs) != 0 { + t.Error("steer should drain (be empty on second read)") + } + // Deadline was extended. + if !h.Deadline().After(d0) { + t.Error("deadline should have been extended past the original") + } + // A fresh step re-arms; another idle period escalates again. + h.RecordStep(1) + time.Sleep(60 * time.Millisecond) + mu.Lock() + c2 := calls + mu.Unlock() + if c2 <= c { + t.Errorf("a re-armed idle period should escalate again (%d -> %d)", c, c2) + } +} + +func TestKillCollapsesDeadline(t *testing.T) { + esc := escFunc(func(context.Context, run.RunInfo, Progress) Decision { + return Decision{Kill: true, KillReason: "hung"} + }) + s := New(esc, 10) // big backstop so only Kill collapses it + s.checkInterval = 5 * time.Millisecond + h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond) + defer h.Stop() + time.Sleep(60 * time.Millisecond) + if h.Deadline().After(time.Now().Add(time.Second)) { + t.Error("Kill should collapse the deadline to ~now") + } +} + +func TestExtendOnceOnlyFiresOnce(t *testing.T) { + e := &ExtendOnce{By: time.Minute} + // Same run id: only the first call extends. + d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{}) + d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{}) + if d1.ExtendBy != time.Minute { + t.Errorf("first decision should extend, got %+v", d1) + } + if d2.ExtendBy != 0 || d2.Kill { + t.Errorf("second call for the same run should be a no-op, got %+v", d2) + } + // A DIFFERENT run still gets its own one extension (per-run, not global). + if d3 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r2"}, Progress{}); d3.ExtendBy != time.Minute { + t.Errorf("a different run should get its own extension, got %+v", d3) + } +} + +func TestZeroSoftTimeoutNotMonitored(t *testing.T) { + s := New(nil, 3) + if h := s.Monitor(context.Background(), run.RunInfo{}, 0); h != nil { + t.Error("zero soft timeout should return a nil handle (not monitored)") + } +} diff --git a/schedule/runner.go b/schedule/runner.go new file mode 100644 index 0000000..c21c94e --- /dev/null +++ b/schedule/runner.go @@ -0,0 +1,132 @@ +// Package schedule is the cron-runner battery: a generic ticker that, each +// interval, asks a store for the jobs whose next-run time has passed, runs each +// one, and stamps its next fire time. It is host-agnostic orchestration — the +// host wires the store (skill.SkillStore.ListDueScheduled / +// persona.Storage.ListScheduledAgents), the run (run.Executor), and the cron +// "next fire" function (a cron library, or skill's schedule parser). The +// battery owns no cron grammar of its own, so it never duplicates the parser. +package schedule + +import ( + "context" + "errors" + "log/slog" + "time" +) + +// Due is one schedulable job: its id and its cron expression. +type Due struct { + ID string + Cron string +} + +// Runner periodically fires due jobs. Every func field is required except Now +// (defaults to time.Now) and Logger (defaults to slog.Default). Construct the +// struct directly and call Loop (or Tick for a single pass / tests). +type Runner struct { + // Interval is how often Loop checks for due jobs. <= 0 defaults to 1m. + Interval time.Duration + // Due lists the jobs due at now. + Due func(ctx context.Context, now time.Time) ([]Due, error) + // Run executes one job by id. + Run func(ctx context.Context, id string) error + // Mark records that a job ran at ranAt and is next due at nextAt. + Mark func(ctx context.Context, id string, ranAt, nextAt time.Time) error + // Next computes a cron expression's next fire after a given time. + Next func(cron string, after time.Time) (time.Time, error) + + Now func() time.Time + Logger *slog.Logger +} + +func (r *Runner) now() time.Time { + if r.Now != nil { + return r.Now() + } + return time.Now() +} + +func (r *Runner) log() *slog.Logger { + if r.Logger != nil { + return r.Logger + } + return slog.Default() +} + +// Tick runs one pass: every currently-due job is run, then stamped with its +// next fire time. A job whose Run or Next errors is logged and skipped (its +// next-run time is left unchanged so it stays due and retries next tick) — one +// bad job never stalls the others. Returns the error from Due (the only +// pass-fatal step). +func (r *Runner) Tick(ctx context.Context) error { + if err := r.validate(); err != nil { + return err + } + now := r.now() + due, err := r.Due(ctx, now) + if err != nil { + return err + } + for _, j := range due { + // Compute the next fire BEFORE running. A permanently-unparseable cron + // then skips the job entirely (logged) rather than running it — an + // unstamped job stays due, so checking Next first avoids a hot-loop of + // real Run executions every tick. + next, err := r.Next(j.Cron, now) + if err != nil { + r.log().Warn("scheduled job has an unparseable cron; skipping (not run, not rescheduled)", "job", j.ID, "cron", j.Cron, "error", err) + continue + } + if err := r.Run(ctx, j.ID); err != nil { + r.log().Warn("scheduled job failed; stays due, will retry next tick", "job", j.ID, "error", err) + continue + } + // A Mark failure leaves the job due, so it re-runs next tick — Run must + // be idempotent (there is no atomic run+stamp across two host callbacks). + if err := r.Mark(ctx, j.ID, now, next); err != nil { + r.log().Warn("failed to stamp next run; job may re-execute next tick (Run must be idempotent)", "job", j.ID, "error", err) + } + } + return nil +} + +// validate reports a misconfigured Runner (a required callback left nil) as a +// clear error rather than a nil-deref panic on first tick. +func (r *Runner) validate() error { + if r.Due == nil || r.Run == nil || r.Mark == nil || r.Next == nil { + return errors.New("schedule: Runner requires non-nil Due, Run, Mark, and Next") + } + return nil +} + +// Loop ticks every Interval until ctx is cancelled. A Tick error (the Due +// lister failing) is logged and the loop continues — a transient store hiccup +// shouldn't kill the scheduler — and a panic from any host callback is +// recovered so one bad tick can't silently kill the scheduler goroutine. +func (r *Runner) Loop(ctx context.Context) { + interval := r.Interval + if interval <= 0 { + interval = time.Minute + } + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.safeTick(ctx) + } + } +} + +func (r *Runner) safeTick(ctx context.Context) { + defer func() { + if rec := recover(); rec != nil { + r.log().Error("schedule tick panicked; scheduler continues", "panic", rec) + } + }() + if err := r.Tick(ctx); err != nil { + r.log().Warn("schedule tick failed", "error", err) + } +} diff --git a/schedule/runner_test.go b/schedule/runner_test.go new file mode 100644 index 0000000..3d67a83 --- /dev/null +++ b/schedule/runner_test.go @@ -0,0 +1,111 @@ +package schedule + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestTickRunsDueAndStampsNext(t *testing.T) { + ctx := context.Background() + now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC) + var ran []string + marked := map[string]time.Time{} + + r := &Runner{ + Now: func() time.Time { return now }, + Due: func(_ context.Context, _ time.Time) ([]Due, error) { + return []Due{{ID: "a", Cron: "hourly"}, {ID: "b", Cron: "bad"}}, nil + }, + Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil }, + Mark: func(_ context.Context, id string, _, next time.Time) error { marked[id] = next; return nil }, + Next: func(cron string, after time.Time) (time.Time, error) { + if cron == "bad" { + return time.Time{}, errors.New("unparseable") + } + return after.Add(time.Hour), nil + }, + } + if err := r.Tick(ctx); err != nil { + t.Fatal(err) + } + // Next is checked first, so the bad-cron job is skipped BEFORE Run — only + // the parseable job runs and gets stamped (no hot-loop of a bad-cron Run). + if len(ran) != 1 || ran[0] != "a" { + t.Errorf("ran = %v, want only [a] (bad-cron b skipped before Run)", ran) + } + if marked["a"] != now.Add(time.Hour) { + t.Errorf("a next = %v, want +1h", marked["a"]) + } + if _, ok := marked["b"]; ok { + t.Errorf("b should not be stamped (bad cron), got %v", marked["b"]) + } +} + +func TestTickRunFailureDoesNotStampOrStall(t *testing.T) { + ctx := context.Background() + var ran []string + marked := map[string]bool{} + r := &Runner{ + Due: func(_ context.Context, _ time.Time) ([]Due, error) { + return []Due{{ID: "x", Cron: "h"}, {ID: "y", Cron: "h"}}, nil + }, + Run: func(_ context.Context, id string) error { + ran = append(ran, id) + if id == "x" { + return errors.New("boom") + } + return nil + }, + Mark: func(_ context.Context, id string, _, _ time.Time) error { marked[id] = true; return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil }, + } + if err := r.Tick(ctx); err != nil { + t.Fatal(err) + } + if len(ran) != 2 { // y still runs despite x failing + t.Errorf("ran = %v, want both attempted", ran) + } + if marked["x"] { // failed job NOT stamped -> stays due, retries + t.Error("failed job x should not be stamped") + } + if !marked["y"] { + t.Error("y should be stamped") + } +} + +func TestTickDueErrorIsFatalToPass(t *testing.T) { + r := &Runner{ + Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }, + Run: func(context.Context, string) error { return nil }, + Mark: func(context.Context, string, time.Time, time.Time) error { return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil }, + } + if err := r.Tick(context.Background()); err == nil { + t.Error("Tick should surface the Due lister error") + } +} + +func TestUnparseableCronSkipsRunEntirely(t *testing.T) { + var ran []string + r := &Runner{ + Due: func(context.Context, time.Time) ([]Due, error) { return []Due{{ID: "z", Cron: "bad"}}, nil }, + Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil }, + Mark: func(context.Context, string, time.Time, time.Time) error { return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Time{}, errors.New("bad cron") }, + } + if err := r.Tick(context.Background()); err != nil { + t.Fatal(err) + } + if len(ran) != 0 { + t.Errorf("a job with an unparseable cron must NOT be run (avoids hot-loop), ran=%v", ran) + } +} + +func TestValidateRejectsNilCallbacks(t *testing.T) { + r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, nil }} // missing Run/Mark/Next + if err := r.Tick(context.Background()); err == nil { + t.Error("Tick should return a validation error for a partially-wired Runner, not panic") + } +}