diff --git a/checkpoint/handle.go b/checkpoint/handle.go index 6a59f3d..db2d8ba 100644 --- a/checkpoint/handle.go +++ b/checkpoint/handle.go @@ -46,15 +46,27 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error { h.mu.Unlock() return nil // throttled — a more recent snapshot will land shortly } - h.lastSave = now h.mu.Unlock() - return h.store.Save(ctx, RunCheckpoint{ + // Advance the throttle clock only AFTER a successful persist. If the store + // write fails, lastSave stays put so the next Save isn't throttled away — + // otherwise a transient store error would silently drop the snapshot the + // caller believes was saved. (A run drives one Save goroutine, so the brief + // unguarded window here can't double-write.) + if err := h.store.Save(ctx, RunCheckpoint{ Meta: h.meta, Messages: st.Messages, Iteration: st.Iteration, UpdatedAt: now, - }) + }); err != nil { + return err + } + h.mu.Lock() + if now.After(h.lastSave) { + h.lastSave = now + } + h.mu.Unlock() + return nil } func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) } diff --git a/critic/critic.go b/critic/critic.go index f8dbcf8..ceb5445 100644 --- a/critic/critic.go +++ b/critic/critic.go @@ -16,6 +16,7 @@ package critic import ( "context" + "log/slog" "sync" "time" @@ -47,25 +48,34 @@ type Escalator interface { OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision } -// ExtendOnce is the default Escalator: the first time a run stalls it extends -// the deadline by By (giving a slow-but-healthy run room), then takes no -// further action — so a genuinely hung run is later killed by the hard -// backstop. A nil/zero By falls back to one soft-timeout's worth. +// ExtendOnce is the default Escalator: the first time a given run stalls it +// extends that run's deadline by By (giving a slow-but-healthy run room), then +// takes no further action for it — so a genuinely hung run is later killed by +// the hard backstop. A nil/zero By falls back to one soft-timeout's worth. +// +// The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one +// ExtendOnce across every run it monitors, so a global flag would let only the +// first run to stall ever get its extension. The fired set grows with the +// number of distinct runs that stall — fine for a process's run volume; a host +// running unboundedly long can construct a fresh System periodically. type ExtendOnce struct { By time.Duration mu sync.Mutex - fired bool + fired map[string]bool // run ids that have already had their one extension } // OnSoftTimeout implements Escalator. -func (e *ExtendOnce) OnSoftTimeout(_ context.Context, _ run.RunInfo, p Progress) Decision { +func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision { e.mu.Lock() defer e.mu.Unlock() - if e.fired { + if e.fired[info.RunID] { return Decision{} } - e.fired = true + if e.fired == nil { + e.fired = map[string]bool{} + } + e.fired[info.RunID] = true by := e.By if by <= 0 { by = p.Idle // ~one soft timeout @@ -80,6 +90,14 @@ type System struct { backstopMul float64 // hard deadline = softTimeout * backstopMul from start checkInterval time.Duration now func() time.Time + logger *slog.Logger +} + +func (s *System) log() *slog.Logger { + if s.logger != nil { + return s.logger + } + return slog.Default() } // New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is @@ -138,6 +156,7 @@ type handle struct { steer []llm.Message iterations int lastTool string + killed bool // sticky: once an Escalator kills, no later decision un-kills it stopped bool stopCh chan struct{} } @@ -185,6 +204,14 @@ func (h *handle) Stop() { // watch fires the Escalator once per idle period the run crosses its soft // timeout, and applies the returned Decision. func (h *handle) watch(ctx context.Context, interval time.Duration) { + // A misbehaving Escalator that panics must not silently kill the watch + // goroutine (which would leave the run unmonitored for its lifetime). Log + // and exit cleanly — the run falls back to the deadline already set. + defer func() { + if r := recover(); r != nil { + h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r) + } + }() t := time.NewTicker(interval) defer t.Stop() for { @@ -201,6 +228,12 @@ func (h *handle) watch(ctx context.Context, interval time.Duration) { func (h *handle) tick(ctx context.Context) { h.mu.Lock() + // Kill is sticky: once an Escalator has killed this run, no later tick (and + // no later Decision) un-collapses the deadline. + if h.killed { + h.mu.Unlock() + return + } idle := h.now().Sub(h.lastActivity) // Only escalate once per idle period: skip if we already escalated for this // exact lastActivity (a fresh step/tool updates lastActivity and re-arms). @@ -216,13 +249,18 @@ func (h *handle) tick(ctx context.Context) { h.mu.Lock() defer h.mu.Unlock() + if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran + return + } + if d.Kill { + h.killed = true + h.deadline = h.now() // immediate hard deadline → executor cancels + return // ignore any Nudge/ExtendBy paired with a Kill + } if len(d.Nudge) > 0 { h.steer = append(h.steer, d.Nudge...) } if d.ExtendBy > 0 { h.deadline = h.deadline.Add(d.ExtendBy) } - if d.Kill { - h.deadline = h.now() // immediate hard deadline → executor cancels - } } diff --git a/critic/critic_test.go b/critic/critic_test.go index 645e5c0..e9b4b2a 100644 --- a/critic/critic_test.go +++ b/critic/critic_test.go @@ -77,13 +77,18 @@ func TestKillCollapsesDeadline(t *testing.T) { func TestExtendOnceOnlyFiresOnce(t *testing.T) { e := &ExtendOnce{By: time.Minute} - d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) - d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) + // Same run id: only the first call extends. + d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{}) + d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{}) if d1.ExtendBy != time.Minute { t.Errorf("first decision should extend, got %+v", d1) } if d2.ExtendBy != 0 || d2.Kill { - t.Errorf("second decision should be a no-op, got %+v", d2) + t.Errorf("second call for the same run should be a no-op, got %+v", d2) + } + // A DIFFERENT run still gets its own one extension (per-run, not global). + if d3 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r2"}, Progress{}); d3.ExtendBy != time.Minute { + t.Errorf("a different run should get its own extension, got %+v", d3) } } diff --git a/schedule/runner.go b/schedule/runner.go index 2a7d8b4..c21c94e 100644 --- a/schedule/runner.go +++ b/schedule/runner.go @@ -9,6 +9,7 @@ package schedule import ( "context" + "errors" "log/slog" "time" ) @@ -58,31 +59,50 @@ func (r *Runner) log() *slog.Logger { // bad job never stalls the others. Returns the error from Due (the only // pass-fatal step). func (r *Runner) Tick(ctx context.Context) error { + if err := r.validate(); err != nil { + return err + } now := r.now() due, err := r.Due(ctx, now) if err != nil { return err } for _, j := range due { - if err := r.Run(ctx, j.ID); err != nil { - r.log().Warn("scheduled job failed; will retry next tick", "job", j.ID, "error", err) - continue - } + // Compute the next fire BEFORE running. A permanently-unparseable cron + // then skips the job entirely (logged) rather than running it — an + // unstamped job stays due, so checking Next first avoids a hot-loop of + // real Run executions every tick. next, err := r.Next(j.Cron, now) if err != nil { - r.log().Warn("scheduled job has an unparseable cron; not rescheduling", "job", j.ID, "cron", j.Cron, "error", err) + r.log().Warn("scheduled job has an unparseable cron; skipping (not run, not rescheduled)", "job", j.ID, "cron", j.Cron, "error", err) continue } - if err := r.Mark(ctx, j.ID, now, next); err != nil { - r.log().Warn("failed to stamp scheduled job's next run", "job", j.ID, "error", err) + if err := r.Run(ctx, j.ID); err != nil { + r.log().Warn("scheduled job failed; stays due, will retry next tick", "job", j.ID, "error", err) + continue } + // A Mark failure leaves the job due, so it re-runs next tick — Run must + // be idempotent (there is no atomic run+stamp across two host callbacks). + if err := r.Mark(ctx, j.ID, now, next); err != nil { + r.log().Warn("failed to stamp next run; job may re-execute next tick (Run must be idempotent)", "job", j.ID, "error", err) + } + } + return nil +} + +// validate reports a misconfigured Runner (a required callback left nil) as a +// clear error rather than a nil-deref panic on first tick. +func (r *Runner) validate() error { + if r.Due == nil || r.Run == nil || r.Mark == nil || r.Next == nil { + return errors.New("schedule: Runner requires non-nil Due, Run, Mark, and Next") } return nil } // Loop ticks every Interval until ctx is cancelled. A Tick error (the Due // lister failing) is logged and the loop continues — a transient store hiccup -// shouldn't kill the scheduler. +// shouldn't kill the scheduler — and a panic from any host callback is +// recovered so one bad tick can't silently kill the scheduler goroutine. func (r *Runner) Loop(ctx context.Context) { interval := r.Interval if interval <= 0 { @@ -95,9 +115,18 @@ func (r *Runner) Loop(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - if err := r.Tick(ctx); err != nil { - r.log().Warn("schedule tick failed", "error", err) - } + r.safeTick(ctx) } } } + +func (r *Runner) safeTick(ctx context.Context) { + defer func() { + if rec := recover(); rec != nil { + r.log().Error("schedule tick panicked; scheduler continues", "panic", rec) + } + }() + if err := r.Tick(ctx); err != nil { + r.log().Warn("schedule tick failed", "error", err) + } +} diff --git a/schedule/runner_test.go b/schedule/runner_test.go index bbde4fc..3d67a83 100644 --- a/schedule/runner_test.go +++ b/schedule/runner_test.go @@ -30,9 +30,10 @@ func TestTickRunsDueAndStampsNext(t *testing.T) { if err := r.Tick(ctx); err != nil { t.Fatal(err) } - // Both ran; only the parseable one got a next stamp. - if len(ran) != 2 { - t.Errorf("ran = %v, want both", ran) + // Next is checked first, so the bad-cron job is skipped BEFORE Run — only + // the parseable job runs and gets stamped (no hot-loop of a bad-cron Run). + if len(ran) != 1 || ran[0] != "a" { + t.Errorf("ran = %v, want only [a] (bad-cron b skipped before Run)", ran) } if marked["a"] != now.Add(time.Hour) { t.Errorf("a next = %v, want +1h", marked["a"]) @@ -75,8 +76,36 @@ func TestTickRunFailureDoesNotStampOrStall(t *testing.T) { } func TestTickDueErrorIsFatalToPass(t *testing.T) { - r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }} + r := &Runner{ + Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }, + Run: func(context.Context, string) error { return nil }, + Mark: func(context.Context, string, time.Time, time.Time) error { return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil }, + } if err := r.Tick(context.Background()); err == nil { t.Error("Tick should surface the Due lister error") } } + +func TestUnparseableCronSkipsRunEntirely(t *testing.T) { + var ran []string + r := &Runner{ + Due: func(context.Context, time.Time) ([]Due, error) { return []Due{{ID: "z", Cron: "bad"}}, nil }, + Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil }, + Mark: func(context.Context, string, time.Time, time.Time) error { return nil }, + Next: func(string, time.Time) (time.Time, error) { return time.Time{}, errors.New("bad cron") }, + } + if err := r.Tick(context.Background()); err != nil { + t.Fatal(err) + } + if len(ran) != 0 { + t.Errorf("a job with an unparseable cron must NOT be run (avoids hot-loop), ran=%v", ran) + } +} + +func TestValidateRejectsNilCallbacks(t *testing.T) { + r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, nil }} // missing Run/Mark/Next + if err := r.Tick(context.Background()); err == nil { + t.Error("Tick should return a validation error for a partially-wired Runner, not panic") + } +}