From e5cab5525e0bc89d7593acc6024714148842fe93 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 23:04:29 -0400 Subject: [PATCH] =?UTF-8?q?P4:=20critic=20battery=20=E2=80=94=20two-tier?= =?UTF-8?q?=20timeout=20watchdog=20+=20Escalator=20seam?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The last Tier-2 battery, plugging into run.Ports.Critic (executor call site is a P2 follow-up). Clean split of concerns: - executus owns the deterministic MECHANICS: System.Monitor returns a run.CriticHandle that tracks activity (RecordStep/RecordToolStart), and a watcher goroutine fires once per idle period a run crosses its soft timeout, applies the decision (queue Steer nudges / extend the Deadline / collapse it to now on Kill), and enforces a hard-kill backstop (softTimeout * mul). - the POLICY is the Escalator seam (nudge/extend/kill/escalate). Mort plugs its LLM critic-agent in here; ExtendOnce is the zero-dependency default (extend once, then let the backstop kill a truly hung run). Race-tested: escalate-once-per-idle-period with re-arm on fresh activity, Kill collapses the deadline, ExtendOnce fires once, zero soft-timeout => nil handle. Core imports ZERO from critic. This completes the P4 battery set: audit, budget, persona, skill, checkpoint, schedule, critic — each nil-safe, each with a default, each core-import-clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- critic/critic.go | 228 ++++++++++++++++++++++++++++++++++++++++++ critic/critic_test.go | 95 ++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 critic/critic.go create mode 100644 critic/critic_test.go diff --git a/critic/critic.go b/critic/critic.go new file mode 100644 index 0000000..f8dbcf8 --- /dev/null +++ b/critic/critic.go @@ -0,0 +1,228 @@ +// Package critic is the run-watchdog battery: a two-tier timeout monitor that +// catches a run that has stopped making progress. It plugs into +// run.Ports.Critic. +// +// The split of concerns is deliberate. executus owns the deterministic +// MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill +// backstop, carry steer messages and the extendable deadline back to the +// executor. The POLICY — what to actually do when a run stalls (nudge it, +// extend its deadline, kill it, escalate to a human) — is the Escalator seam. +// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the +// zero-dependency default. +// +// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this +// battery provides the seam + impl ahead of that wiring. +package critic + +import ( + "context" + "sync" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// Progress is the snapshot the critic hands an Escalator when a run stalls. +type Progress struct { + Iterations int // completed agent-loop iterations so far + LastActivity time.Time // wall-clock of the last step/tool event + Idle time.Duration // now - LastActivity + LastTool string // name of the most recently started tool ("" if none) +} + +// Decision is the Escalator's verdict for a stalled run. Zero value = do +// nothing (let the hard backstop eventually kill a truly hung run). +type Decision struct { + Nudge []llm.Message // injected before the agent's next turn (a steer) + ExtendBy time.Duration // push the hard deadline out by this much + Kill bool // cancel the run now + KillReason string +} + +// Escalator decides what to do when a run crosses its soft timeout. It is +// called at most once per idle period (a fresh step/tool event re-arms it). +type Escalator interface { + OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision +} + +// ExtendOnce is the default Escalator: the first time a run stalls it extends +// the deadline by By (giving a slow-but-healthy run room), then takes no +// further action — so a genuinely hung run is later killed by the hard +// backstop. A nil/zero By falls back to one soft-timeout's worth. +type ExtendOnce struct { + By time.Duration + + mu sync.Mutex + fired bool +} + +// OnSoftTimeout implements Escalator. +func (e *ExtendOnce) OnSoftTimeout(_ context.Context, _ run.RunInfo, p Progress) Decision { + e.mu.Lock() + defer e.mu.Unlock() + if e.fired { + return Decision{} + } + e.fired = true + by := e.By + if by <= 0 { + by = p.Idle // ~one soft timeout + } + return Decision{ExtendBy: by} +} + +// System implements run.Critic. Construct with New; one System monitors many +// runs concurrently (each Monitor returns an independent handle). +type System struct { + esc Escalator + backstopMul float64 // hard deadline = softTimeout * backstopMul from start + checkInterval time.Duration + now func() time.Time +} + +// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is +// the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A +// nil esc + the default backstop gives a safe "extend once, then hard-kill" +// watchdog with no host wiring. +func New(esc Escalator, backstopMul float64) *System { + if esc == nil { + esc = &ExtendOnce{} + } + if backstopMul <= 1 { + backstopMul = 3 + } + return &System{esc: esc, backstopMul: backstopMul, now: time.Now} +} + +var _ run.Critic = (*System)(nil) + +// Monitor starts watching a run and returns its handle. Implements run.Critic. +func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle { + if softTimeout <= 0 { + return run.CriticHandle(nil) // no soft timeout → not monitored + } + now := s.now() + check := s.checkInterval + if check <= 0 { + check = softTimeout / 2 + if check < time.Second { + check = time.Second + } + } + h := &handle{ + sys: s, + info: info, + softTimeout: softTimeout, + now: s.now, + lastActivity: now, + deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)), + stopCh: make(chan struct{}), + } + go h.watch(ctx, check) + return h +} + +// handle is one run's live critic link. Implements run.CriticHandle. +type handle struct { + sys *System + info run.RunInfo + softTimeout time.Duration + now func() time.Time + + mu sync.Mutex + lastActivity time.Time + escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period) + deadline time.Time + steer []llm.Message + iterations int + lastTool string + stopped bool + stopCh chan struct{} +} + +func (h *handle) RecordStep(iter int) { + h.mu.Lock() + h.iterations = iter + h.lastActivity = h.now() + h.mu.Unlock() +} + +func (h *handle) RecordToolStart(name, _ string) { + h.mu.Lock() + h.lastTool = name + h.lastActivity = h.now() + h.mu.Unlock() +} + +func (h *handle) Steer() []llm.Message { + h.mu.Lock() + defer h.mu.Unlock() + if len(h.steer) == 0 { + return nil + } + out := h.steer + h.steer = nil + return out +} + +func (h *handle) Deadline() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + return h.deadline +} + +func (h *handle) Stop() { + h.mu.Lock() + if !h.stopped { + h.stopped = true + close(h.stopCh) + } + h.mu.Unlock() +} + +// watch fires the Escalator once per idle period the run crosses its soft +// timeout, and applies the returned Decision. +func (h *handle) watch(ctx context.Context, interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-h.stopCh: + return + case <-ctx.Done(): + return + case <-t.C: + h.tick(ctx) + } + } +} + +func (h *handle) tick(ctx context.Context) { + h.mu.Lock() + idle := h.now().Sub(h.lastActivity) + // Only escalate once per idle period: skip if we already escalated for this + // exact lastActivity (a fresh step/tool updates lastActivity and re-arms). + if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) { + h.mu.Unlock() + return + } + h.escalatedAt = h.lastActivity + snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool} + h.mu.Unlock() + + d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap) + + h.mu.Lock() + defer h.mu.Unlock() + if len(d.Nudge) > 0 { + h.steer = append(h.steer, d.Nudge...) + } + if d.ExtendBy > 0 { + h.deadline = h.deadline.Add(d.ExtendBy) + } + if d.Kill { + h.deadline = h.now() // immediate hard deadline → executor cancels + } +} diff --git a/critic/critic_test.go b/critic/critic_test.go new file mode 100644 index 0000000..645e5c0 --- /dev/null +++ b/critic/critic_test.go @@ -0,0 +1,95 @@ +package critic + +import ( + "context" + "sync" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// escFunc adapts a func to an Escalator. +type escFunc func(context.Context, run.RunInfo, Progress) Decision + +func (f escFunc) OnSoftTimeout(ctx context.Context, i run.RunInfo, p Progress) Decision { + return f(ctx, i, p) +} + +func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) { + var mu sync.Mutex + var calls int + esc := escFunc(func(_ context.Context, _ run.RunInfo, p Progress) Decision { + mu.Lock() + calls++ + mu.Unlock() + return Decision{ExtendBy: 50 * time.Millisecond, Nudge: []llm.Message{{Role: llm.RoleUser}}} + }) + s := New(esc, 3) + s.checkInterval = 5 * time.Millisecond + h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond) + defer h.Stop() + + d0 := h.Deadline() + time.Sleep(60 * time.Millisecond) // cross the soft timeout with no activity + mu.Lock() + c := calls + mu.Unlock() + if c < 1 { + t.Fatalf("expected at least one escalation, got %d", c) + } + // Nudge was queued and is drained once. + if msgs := h.Steer(); len(msgs) == 0 { + t.Error("expected a queued steer nudge") + } + if msgs := h.Steer(); len(msgs) != 0 { + t.Error("steer should drain (be empty on second read)") + } + // Deadline was extended. + if !h.Deadline().After(d0) { + t.Error("deadline should have been extended past the original") + } + // A fresh step re-arms; another idle period escalates again. + h.RecordStep(1) + time.Sleep(60 * time.Millisecond) + mu.Lock() + c2 := calls + mu.Unlock() + if c2 <= c { + t.Errorf("a re-armed idle period should escalate again (%d -> %d)", c, c2) + } +} + +func TestKillCollapsesDeadline(t *testing.T) { + esc := escFunc(func(context.Context, run.RunInfo, Progress) Decision { + return Decision{Kill: true, KillReason: "hung"} + }) + s := New(esc, 10) // big backstop so only Kill collapses it + s.checkInterval = 5 * time.Millisecond + h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond) + defer h.Stop() + time.Sleep(60 * time.Millisecond) + if h.Deadline().After(time.Now().Add(time.Second)) { + t.Error("Kill should collapse the deadline to ~now") + } +} + +func TestExtendOnceOnlyFiresOnce(t *testing.T) { + e := &ExtendOnce{By: time.Minute} + d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) + d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{}) + if d1.ExtendBy != time.Minute { + t.Errorf("first decision should extend, got %+v", d1) + } + if d2.ExtendBy != 0 || d2.Kill { + t.Errorf("second decision should be a no-op, got %+v", d2) + } +} + +func TestZeroSoftTimeoutNotMonitored(t *testing.T) { + s := New(nil, 3) + if h := s.Monitor(context.Background(), run.RunInfo{}, 0); h != nil { + t.Error("zero soft timeout should return a nil handle (not monitored)") + } +}