// Package critic is the run-watchdog battery: a two-tier timeout monitor that // catches a run that has stopped making progress. It plugs into // run.Ports.Critic. // // The split of concerns is deliberate. executus owns the deterministic // MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill // backstop, carry steer messages and the extendable deadline back to the // executor. The POLICY — what to actually do when a run stalls (nudge it, // extend its deadline, kill it, escalate to a human) — is the Escalator seam. // Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the // zero-dependency default. // // NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this // battery provides the seam + impl ahead of that wiring. package critic import ( "context" "log/slog" "sync" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/executus/run" ) // Progress is the snapshot the critic hands an Escalator when a run stalls. type Progress struct { Iterations int // completed agent-loop iterations so far LastActivity time.Time // wall-clock of the last step/tool event Idle time.Duration // now - LastActivity LastTool string // name of the most recently started tool ("" if none) } // Decision is the Escalator's verdict for a stalled run. Zero value = do // nothing (let the hard backstop eventually kill a truly hung run). type Decision struct { Nudge []llm.Message // injected before the agent's next turn (a steer) ExtendBy time.Duration // push the hard deadline out by this much Kill bool // cancel the run now KillReason string } // Escalator decides what to do when a run crosses its soft timeout. It is // called at most once per idle period (a fresh step/tool event re-arms it). type Escalator interface { OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision } // ExtendOnce is the default Escalator: the first time a given run stalls it // extends that run's deadline by By (giving a slow-but-healthy run room), then // takes no further action for it — so a genuinely hung run is later killed by // the hard backstop. A nil/zero By falls back to one soft-timeout's worth. // // The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one // ExtendOnce across every run it monitors, so a global flag would let only the // first run to stall ever get its extension. The fired set grows with the // number of distinct runs that stall — fine for a process's run volume; a host // running unboundedly long can construct a fresh System periodically. type ExtendOnce struct { By time.Duration mu sync.Mutex fired map[string]bool // run ids that have already had their one extension } // OnSoftTimeout implements Escalator. func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision { e.mu.Lock() defer e.mu.Unlock() if e.fired[info.RunID] { return Decision{} } if e.fired == nil { e.fired = map[string]bool{} } e.fired[info.RunID] = true by := e.By if by <= 0 { by = p.Idle // ~one soft timeout } return Decision{ExtendBy: by} } // System implements run.Critic. Construct with New; one System monitors many // runs concurrently (each Monitor returns an independent handle). type System struct { esc Escalator backstopMul float64 // hard deadline = softTimeout * backstopMul from start checkInterval time.Duration now func() time.Time logger *slog.Logger } func (s *System) log() *slog.Logger { if s.logger != nil { return s.logger } return slog.Default() } // New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is // the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A // nil esc + the default backstop gives a safe "extend once, then hard-kill" // watchdog with no host wiring. func New(esc Escalator, backstopMul float64) *System { if esc == nil { esc = &ExtendOnce{} } if backstopMul <= 1 { backstopMul = 3 } return &System{esc: esc, backstopMul: backstopMul, now: time.Now} } var _ run.Critic = (*System)(nil) // Monitor starts watching a run and returns its handle. Implements run.Critic. func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle { if softTimeout <= 0 { return run.CriticHandle(nil) // no soft timeout → not monitored } now := s.now() check := s.checkInterval if check <= 0 { check = softTimeout / 2 if check < time.Second { check = time.Second } } h := &handle{ sys: s, info: info, softTimeout: softTimeout, now: s.now, lastActivity: now, deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)), stopCh: make(chan struct{}), } go h.watch(ctx, check) return h } // handle is one run's live critic link. Implements run.CriticHandle. type handle struct { sys *System info run.RunInfo softTimeout time.Duration now func() time.Time mu sync.Mutex lastActivity time.Time escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period) deadline time.Time steer []llm.Message iterations int lastTool string killed bool // sticky: once an Escalator kills, no later decision un-kills it stopped bool stopCh chan struct{} } func (h *handle) RecordStep(iter int) { h.mu.Lock() h.iterations = iter h.lastActivity = h.now() h.mu.Unlock() } func (h *handle) RecordToolStart(name, _ string) { h.mu.Lock() h.lastTool = name h.lastActivity = h.now() h.mu.Unlock() } func (h *handle) Steer() []llm.Message { h.mu.Lock() defer h.mu.Unlock() if len(h.steer) == 0 { return nil } out := h.steer h.steer = nil return out } func (h *handle) Deadline() time.Time { h.mu.Lock() defer h.mu.Unlock() return h.deadline } func (h *handle) Stop() { h.mu.Lock() if !h.stopped { h.stopped = true close(h.stopCh) } h.mu.Unlock() } // watch fires the Escalator once per idle period the run crosses its soft // timeout, and applies the returned Decision. func (h *handle) watch(ctx context.Context, interval time.Duration) { // A misbehaving Escalator that panics must not silently kill the watch // goroutine (which would leave the run unmonitored for its lifetime). Log // and exit cleanly — the run falls back to the deadline already set. defer func() { if r := recover(); r != nil { h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r) } }() t := time.NewTicker(interval) defer t.Stop() for { select { case <-h.stopCh: return case <-ctx.Done(): return case <-t.C: h.tick(ctx) } } } func (h *handle) tick(ctx context.Context) { h.mu.Lock() // Kill is sticky: once an Escalator has killed this run, no later tick (and // no later Decision) un-collapses the deadline. if h.killed { h.mu.Unlock() return } idle := h.now().Sub(h.lastActivity) // Only escalate once per idle period: skip if we already escalated for this // exact lastActivity (a fresh step/tool updates lastActivity and re-arms). if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) { h.mu.Unlock() return } h.escalatedAt = h.lastActivity snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool} h.mu.Unlock() d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap) h.mu.Lock() defer h.mu.Unlock() if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran return } if d.Kill { h.killed = true h.deadline = h.now() // immediate hard deadline → executor cancels return // ignore any Nudge/ExtendBy paired with a Kill } if len(d.Nudge) > 0 { h.steer = append(h.steer, d.Nudge...) } if d.ExtendBy > 0 { h.deadline = h.deadline.Add(d.ExtendBy) } }