fix: address verified gadfly P4c findings (3-cloud fleet)
executus CI / test (pull_request) Successful in 1m39s

critic (all 3 models — HIGH):
- ExtendOnce was a single global one-shot shared across every run a System
  monitors, so only the FIRST run to stall got its extension and all others
  were killed by the backstop. Key the fired-state per run (RunInfo.RunID).
- Kill is now sticky: a `killed` flag short-circuits later ticks so a wavering
  Escalator returning ExtendBy after a Kill can't un-collapse the deadline; a
  Kill paired with Nudge/ExtendBy ignores the latter.
- watch() recovers panics from a misbehaving Escalator (logs; the run falls
  back to its existing deadline) instead of silently killing the watch goroutine.

checkpoint (deepseek — HIGH): handle.Save advanced the throttle clock BEFORE
the store write, so a failed save was silently throttled away (caller believes
it persisted). Advance lastSave only after a successful persist.

schedule (all 3): compute Next BEFORE Run — a permanently-unparseable cron now
skips the job entirely instead of re-running it every tick forever; nil required
callbacks return a validate() error instead of a first-tick nil panic; Loop
recovers tick panics; the Mark-failure => possible-re-run trade-off is documented
(Run must be idempotent). + tests for each.

Triaged-but-kept: critic backstopMul<=1 floor (it's a total-runtime multiple, so
a floor >1 is intentional, not the reported footgun); checkpoint Load (nil,nil)
on miss (documented convention).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 23:32:27 -04:00
parent e5cab5525e
commit eea84e6e2c
5 changed files with 145 additions and 32 deletions
+15 -3
View File
@@ -46,15 +46,27 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error {
h.mu.Unlock() h.mu.Unlock()
return nil // throttled — a more recent snapshot will land shortly return nil // throttled — a more recent snapshot will land shortly
} }
h.lastSave = now
h.mu.Unlock() h.mu.Unlock()
return h.store.Save(ctx, RunCheckpoint{ // Advance the throttle clock only AFTER a successful persist. If the store
// write fails, lastSave stays put so the next Save isn't throttled away —
// otherwise a transient store error would silently drop the snapshot the
// caller believes was saved. (A run drives one Save goroutine, so the brief
// unguarded window here can't double-write.)
if err := h.store.Save(ctx, RunCheckpoint{
Meta: h.meta, Meta: h.meta,
Messages: st.Messages, Messages: st.Messages,
Iteration: st.Iteration, Iteration: st.Iteration,
UpdatedAt: now, UpdatedAt: now,
}) }); err != nil {
return err
}
h.mu.Lock()
if now.After(h.lastSave) {
h.lastSave = now
}
h.mu.Unlock()
return nil
} }
func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) } func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) }
+49 -11
View File
@@ -16,6 +16,7 @@ package critic
import ( import (
"context" "context"
"log/slog"
"sync" "sync"
"time" "time"
@@ -47,25 +48,34 @@ type Escalator interface {
OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision
} }
// ExtendOnce is the default Escalator: the first time a run stalls it extends // ExtendOnce is the default Escalator: the first time a given run stalls it
// the deadline by By (giving a slow-but-healthy run room), then takes no // extends that run's deadline by By (giving a slow-but-healthy run room), then
// further action — so a genuinely hung run is later killed by the hard // takes no further action for it — so a genuinely hung run is later killed by
// backstop. A nil/zero By falls back to one soft-timeout's worth. // the hard backstop. A nil/zero By falls back to one soft-timeout's worth.
//
// The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one
// ExtendOnce across every run it monitors, so a global flag would let only the
// first run to stall ever get its extension. The fired set grows with the
// number of distinct runs that stall — fine for a process's run volume; a host
// running unboundedly long can construct a fresh System periodically.
type ExtendOnce struct { type ExtendOnce struct {
By time.Duration By time.Duration
mu sync.Mutex mu sync.Mutex
fired bool fired map[string]bool // run ids that have already had their one extension
} }
// OnSoftTimeout implements Escalator. // OnSoftTimeout implements Escalator.
func (e *ExtendOnce) OnSoftTimeout(_ context.Context, _ run.RunInfo, p Progress) Decision { func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.fired { if e.fired[info.RunID] {
return Decision{} return Decision{}
} }
e.fired = true if e.fired == nil {
e.fired = map[string]bool{}
}
e.fired[info.RunID] = true
by := e.By by := e.By
if by <= 0 { if by <= 0 {
by = p.Idle // ~one soft timeout by = p.Idle // ~one soft timeout
@@ -80,6 +90,14 @@ type System struct {
backstopMul float64 // hard deadline = softTimeout * backstopMul from start backstopMul float64 // hard deadline = softTimeout * backstopMul from start
checkInterval time.Duration checkInterval time.Duration
now func() time.Time now func() time.Time
logger *slog.Logger
}
func (s *System) log() *slog.Logger {
if s.logger != nil {
return s.logger
}
return slog.Default()
} }
// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is // New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is
@@ -138,6 +156,7 @@ type handle struct {
steer []llm.Message steer []llm.Message
iterations int iterations int
lastTool string lastTool string
killed bool // sticky: once an Escalator kills, no later decision un-kills it
stopped bool stopped bool
stopCh chan struct{} stopCh chan struct{}
} }
@@ -185,6 +204,14 @@ func (h *handle) Stop() {
// watch fires the Escalator once per idle period the run crosses its soft // watch fires the Escalator once per idle period the run crosses its soft
// timeout, and applies the returned Decision. // timeout, and applies the returned Decision.
func (h *handle) watch(ctx context.Context, interval time.Duration) { func (h *handle) watch(ctx context.Context, interval time.Duration) {
// A misbehaving Escalator that panics must not silently kill the watch
// goroutine (which would leave the run unmonitored for its lifetime). Log
// and exit cleanly — the run falls back to the deadline already set.
defer func() {
if r := recover(); r != nil {
h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r)
}
}()
t := time.NewTicker(interval) t := time.NewTicker(interval)
defer t.Stop() defer t.Stop()
for { for {
@@ -201,6 +228,12 @@ func (h *handle) watch(ctx context.Context, interval time.Duration) {
func (h *handle) tick(ctx context.Context) { func (h *handle) tick(ctx context.Context) {
h.mu.Lock() h.mu.Lock()
// Kill is sticky: once an Escalator has killed this run, no later tick (and
// no later Decision) un-collapses the deadline.
if h.killed {
h.mu.Unlock()
return
}
idle := h.now().Sub(h.lastActivity) idle := h.now().Sub(h.lastActivity)
// Only escalate once per idle period: skip if we already escalated for this // Only escalate once per idle period: skip if we already escalated for this
// exact lastActivity (a fresh step/tool updates lastActivity and re-arms). // exact lastActivity (a fresh step/tool updates lastActivity and re-arms).
@@ -216,13 +249,18 @@ func (h *handle) tick(ctx context.Context) {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran
return
}
if d.Kill {
h.killed = true
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
}
if len(d.Nudge) > 0 { if len(d.Nudge) > 0 {
h.steer = append(h.steer, d.Nudge...) h.steer = append(h.steer, d.Nudge...)
} }
if d.ExtendBy > 0 { if d.ExtendBy > 0 {
h.deadline = h.deadline.Add(d.ExtendBy) h.deadline = h.deadline.Add(d.ExtendBy)
} }
if d.Kill {
h.deadline = h.now() // immediate hard deadline → executor cancels
}
} }
+8 -3
View File
@@ -77,13 +77,18 @@ func TestKillCollapsesDeadline(t *testing.T) {
func TestExtendOnceOnlyFiresOnce(t *testing.T) { func TestExtendOnceOnlyFiresOnce(t *testing.T) {
e := &ExtendOnce{By: time.Minute} e := &ExtendOnce{By: time.Minute}
d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) // Same run id: only the first call extends.
d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{})
d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{})
if d1.ExtendBy != time.Minute { if d1.ExtendBy != time.Minute {
t.Errorf("first decision should extend, got %+v", d1) t.Errorf("first decision should extend, got %+v", d1)
} }
if d2.ExtendBy != 0 || d2.Kill { if d2.ExtendBy != 0 || d2.Kill {
t.Errorf("second decision should be a no-op, got %+v", d2) t.Errorf("second call for the same run should be a no-op, got %+v", d2)
}
// A DIFFERENT run still gets its own one extension (per-run, not global).
if d3 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r2"}, Progress{}); d3.ExtendBy != time.Minute {
t.Errorf("a different run should get its own extension, got %+v", d3)
} }
} }
+40 -11
View File
@@ -9,6 +9,7 @@ package schedule
import ( import (
"context" "context"
"errors"
"log/slog" "log/slog"
"time" "time"
) )
@@ -58,31 +59,50 @@ func (r *Runner) log() *slog.Logger {
// bad job never stalls the others. Returns the error from Due (the only // bad job never stalls the others. Returns the error from Due (the only
// pass-fatal step). // pass-fatal step).
func (r *Runner) Tick(ctx context.Context) error { func (r *Runner) Tick(ctx context.Context) error {
if err := r.validate(); err != nil {
return err
}
now := r.now() now := r.now()
due, err := r.Due(ctx, now) due, err := r.Due(ctx, now)
if err != nil { if err != nil {
return err return err
} }
for _, j := range due { for _, j := range due {
if err := r.Run(ctx, j.ID); err != nil { // Compute the next fire BEFORE running. A permanently-unparseable cron
r.log().Warn("scheduled job failed; will retry next tick", "job", j.ID, "error", err) // then skips the job entirely (logged) rather than running it — an
continue // unstamped job stays due, so checking Next first avoids a hot-loop of
} // real Run executions every tick.
next, err := r.Next(j.Cron, now) next, err := r.Next(j.Cron, now)
if err != nil { if err != nil {
r.log().Warn("scheduled job has an unparseable cron; not rescheduling", "job", j.ID, "cron", j.Cron, "error", err) r.log().Warn("scheduled job has an unparseable cron; skipping (not run, not rescheduled)", "job", j.ID, "cron", j.Cron, "error", err)
continue continue
} }
if err := r.Mark(ctx, j.ID, now, next); err != nil { if err := r.Run(ctx, j.ID); err != nil {
r.log().Warn("failed to stamp scheduled job's next run", "job", j.ID, "error", err) r.log().Warn("scheduled job failed; stays due, will retry next tick", "job", j.ID, "error", err)
continue
} }
// A Mark failure leaves the job due, so it re-runs next tick — Run must
// be idempotent (there is no atomic run+stamp across two host callbacks).
if err := r.Mark(ctx, j.ID, now, next); err != nil {
r.log().Warn("failed to stamp next run; job may re-execute next tick (Run must be idempotent)", "job", j.ID, "error", err)
}
}
return nil
}
// validate reports a misconfigured Runner (a required callback left nil) as a
// clear error rather than a nil-deref panic on first tick.
func (r *Runner) validate() error {
if r.Due == nil || r.Run == nil || r.Mark == nil || r.Next == nil {
return errors.New("schedule: Runner requires non-nil Due, Run, Mark, and Next")
} }
return nil return nil
} }
// Loop ticks every Interval until ctx is cancelled. A Tick error (the Due // Loop ticks every Interval until ctx is cancelled. A Tick error (the Due
// lister failing) is logged and the loop continues — a transient store hiccup // lister failing) is logged and the loop continues — a transient store hiccup
// shouldn't kill the scheduler. // shouldn't kill the scheduler — and a panic from any host callback is
// recovered so one bad tick can't silently kill the scheduler goroutine.
func (r *Runner) Loop(ctx context.Context) { func (r *Runner) Loop(ctx context.Context) {
interval := r.Interval interval := r.Interval
if interval <= 0 { if interval <= 0 {
@@ -95,9 +115,18 @@ func (r *Runner) Loop(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
if err := r.Tick(ctx); err != nil { r.safeTick(ctx)
r.log().Warn("schedule tick failed", "error", err)
}
} }
} }
} }
func (r *Runner) safeTick(ctx context.Context) {
defer func() {
if rec := recover(); rec != nil {
r.log().Error("schedule tick panicked; scheduler continues", "panic", rec)
}
}()
if err := r.Tick(ctx); err != nil {
r.log().Warn("schedule tick failed", "error", err)
}
}
+33 -4
View File
@@ -30,9 +30,10 @@ func TestTickRunsDueAndStampsNext(t *testing.T) {
if err := r.Tick(ctx); err != nil { if err := r.Tick(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Both ran; only the parseable one got a next stamp. // Next is checked first, so the bad-cron job is skipped BEFORE Run — only
if len(ran) != 2 { // the parseable job runs and gets stamped (no hot-loop of a bad-cron Run).
t.Errorf("ran = %v, want both", ran) if len(ran) != 1 || ran[0] != "a" {
t.Errorf("ran = %v, want only [a] (bad-cron b skipped before Run)", ran)
} }
if marked["a"] != now.Add(time.Hour) { if marked["a"] != now.Add(time.Hour) {
t.Errorf("a next = %v, want +1h", marked["a"]) t.Errorf("a next = %v, want +1h", marked["a"])
@@ -75,8 +76,36 @@ func TestTickRunFailureDoesNotStampOrStall(t *testing.T) {
} }
func TestTickDueErrorIsFatalToPass(t *testing.T) { func TestTickDueErrorIsFatalToPass(t *testing.T) {
r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") }} r := &Runner{
Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") },
Run: func(context.Context, string) error { return nil },
Mark: func(context.Context, string, time.Time, time.Time) error { return nil },
Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil },
}
if err := r.Tick(context.Background()); err == nil { if err := r.Tick(context.Background()); err == nil {
t.Error("Tick should surface the Due lister error") t.Error("Tick should surface the Due lister error")
} }
} }
func TestUnparseableCronSkipsRunEntirely(t *testing.T) {
var ran []string
r := &Runner{
Due: func(context.Context, time.Time) ([]Due, error) { return []Due{{ID: "z", Cron: "bad"}}, nil },
Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil },
Mark: func(context.Context, string, time.Time, time.Time) error { return nil },
Next: func(string, time.Time) (time.Time, error) { return time.Time{}, errors.New("bad cron") },
}
if err := r.Tick(context.Background()); err != nil {
t.Fatal(err)
}
if len(ran) != 0 {
t.Errorf("a job with an unparseable cron must NOT be run (avoids hot-loop), ran=%v", ran)
}
}
func TestValidateRejectsNilCallbacks(t *testing.T) {
r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, nil }} // missing Run/Mark/Next
if err := r.Tick(context.Background()); err == nil {
t.Error("Tick should return a validation error for a partially-wired Runner, not panic")
}
}