390e6cf905
Completes the run-critic seam so a host adapter (mort's agentcritic) has full fidelity, closing the two limitations gadfly surfaced on mort #1334. - RecordStep(iter int, resp *llm.Response): the completed step's model response is now passed to the critic (was index-only), so a host that records a trace (mort's ProgressRecorder) can show what the agent actually produced, not just an iteration count. The executor forwards s.Response; the battery ignores it (its Progress is count-based). - CriticHandle.KillCause() error + ErrCriticKill: the executor now distinguishes an explicit critic KILL from a natural backstop expiry. runCtx uses a cause-carrying cancel (WithCancelCause + a MaxRuntime timer cancelling with DeadlineExceeded); the deadline-watch cancels with ErrCriticKill when KillCause()!=nil, else DeadlineExceeded. statusFor reads context.Cause → killed / timeout / cancelled are now distinct (were all "cancelled"). The battery sets killCause from Decision.KillReason on a Kill. Tests: statusFor "killed" case (cause=ErrCriticKill, err=Canceled); fake handle + battery RecordStep/KillCause signatures. Core stays battery-free. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
303 lines
9.1 KiB
Go
303 lines
9.1 KiB
Go
// Package critic is the run-watchdog battery: a two-tier timeout monitor that
|
|
// catches a run that has stopped making progress. It plugs into
|
|
// run.Ports.Critic.
|
|
//
|
|
// The split of concerns is deliberate. executus owns the deterministic
|
|
// MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill
|
|
// backstop, carry steer messages and the extendable deadline back to the
|
|
// executor. The POLICY — what to actually do when a run stalls (nudge it,
|
|
// extend its deadline, kill it, escalate to a human) — is the Escalator seam.
|
|
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
|
|
// zero-dependency default.
|
|
//
|
|
// The executor wires run.Ports.Critic (C0b): it feeds the handle activity,
|
|
// binds the run context to its extendable Deadline, drains its Steer, and polls
|
|
// MaxSteps each step so an Escalator can also raise a long run's step ceiling
|
|
// (Decision.RaiseStepsBy).
|
|
package critic
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
|
)
|
|
|
|
// Progress is the snapshot the critic hands an Escalator when a run stalls.
|
|
type Progress struct {
|
|
Iterations int // completed agent-loop iterations so far
|
|
LastActivity time.Time // wall-clock of the last step/tool event
|
|
Idle time.Duration // now - LastActivity
|
|
LastTool string // name of the most recently started tool ("" if none)
|
|
}
|
|
|
|
// Decision is the Escalator's verdict for a stalled run. Zero value = do
|
|
// nothing (let the hard backstop eventually kill a truly hung run).
|
|
type Decision struct {
|
|
Nudge []llm.Message // injected before the agent's next turn (a steer)
|
|
ExtendBy time.Duration // push the hard deadline out by this much
|
|
RaiseStepsBy int // raise the run's tool-dispatch step ceiling by this
|
|
Kill bool // cancel the run now
|
|
KillReason string
|
|
}
|
|
|
|
// Escalator decides what to do when a run crosses its soft timeout. It is
|
|
// called at most once per idle period (a fresh step/tool event re-arms it).
|
|
type Escalator interface {
|
|
OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision
|
|
}
|
|
|
|
// ExtendOnce is the default Escalator: the first time a given run stalls it
|
|
// extends that run's deadline by By (giving a slow-but-healthy run room), then
|
|
// takes no further action for it — so a genuinely hung run is later killed by
|
|
// the hard backstop. A nil/zero By falls back to one soft-timeout's worth.
|
|
//
|
|
// The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one
|
|
// ExtendOnce across every run it monitors, so a global flag would let only the
|
|
// first run to stall ever get its extension. The fired set grows with the
|
|
// number of distinct runs that stall — fine for a process's run volume; a host
|
|
// running unboundedly long can construct a fresh System periodically.
|
|
type ExtendOnce struct {
|
|
By time.Duration
|
|
|
|
mu sync.Mutex
|
|
fired map[string]bool // run ids that have already had their one extension
|
|
}
|
|
|
|
// OnSoftTimeout implements Escalator.
|
|
func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if e.fired[info.RunID] {
|
|
return Decision{}
|
|
}
|
|
if e.fired == nil {
|
|
e.fired = map[string]bool{}
|
|
}
|
|
e.fired[info.RunID] = true
|
|
by := e.By
|
|
if by <= 0 {
|
|
by = p.Idle // ~one soft timeout
|
|
}
|
|
return Decision{ExtendBy: by}
|
|
}
|
|
|
|
// System implements run.Critic. Construct with New; one System monitors many
|
|
// runs concurrently (each Monitor returns an independent handle).
|
|
type System struct {
|
|
esc Escalator
|
|
backstopMul float64 // hard deadline = softTimeout * backstopMul from start
|
|
checkInterval time.Duration
|
|
now func() time.Time
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func (s *System) log() *slog.Logger {
|
|
if s.logger != nil {
|
|
return s.logger
|
|
}
|
|
return slog.Default()
|
|
}
|
|
|
|
// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is
|
|
// the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A
|
|
// nil esc + the default backstop gives a safe "extend once, then hard-kill"
|
|
// watchdog with no host wiring.
|
|
func New(esc Escalator, backstopMul float64) *System {
|
|
if esc == nil {
|
|
esc = &ExtendOnce{}
|
|
}
|
|
if backstopMul <= 1 {
|
|
backstopMul = 3
|
|
}
|
|
return &System{esc: esc, backstopMul: backstopMul, now: time.Now}
|
|
}
|
|
|
|
var _ run.Critic = (*System)(nil)
|
|
|
|
// Monitor starts watching a run and returns its handle. Implements run.Critic.
|
|
func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle {
|
|
if softTimeout <= 0 {
|
|
return run.CriticHandle(nil) // no soft timeout → not monitored
|
|
}
|
|
now := s.now()
|
|
check := s.checkInterval
|
|
if check <= 0 {
|
|
check = softTimeout / 2
|
|
if check < time.Second {
|
|
check = time.Second
|
|
}
|
|
}
|
|
h := &handle{
|
|
sys: s,
|
|
info: info,
|
|
softTimeout: softTimeout,
|
|
now: s.now,
|
|
lastActivity: now,
|
|
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
|
|
maxSteps: info.MaxIterations, // base ceiling; an Escalator may RaiseStepsBy
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
go h.watch(ctx, check)
|
|
return h
|
|
}
|
|
|
|
// handle is one run's live critic link. Implements run.CriticHandle.
|
|
type handle struct {
|
|
sys *System
|
|
info run.RunInfo
|
|
softTimeout time.Duration
|
|
now func() time.Time
|
|
|
|
mu sync.Mutex
|
|
lastActivity time.Time
|
|
escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period)
|
|
deadline time.Time
|
|
steer []llm.Message
|
|
iterations int
|
|
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
|
|
lastTool string
|
|
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
|
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
|
|
stopped bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func (h *handle) RecordStep(iter int, _ *llm.Response) {
|
|
// This battery's Progress tracks iteration count + activity, not per-step
|
|
// payload, so the response is unused here; a richer Escalator could record it.
|
|
h.mu.Lock()
|
|
h.iterations = iter
|
|
h.lastActivity = h.now()
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *handle) RecordToolStart(name, _ string) {
|
|
h.mu.Lock()
|
|
h.lastTool = name
|
|
h.lastActivity = h.now()
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *handle) Steer() []llm.Message {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if len(h.steer) == 0 {
|
|
return nil
|
|
}
|
|
out := h.steer
|
|
h.steer = nil
|
|
return out
|
|
}
|
|
|
|
func (h *handle) Deadline() time.Time {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.deadline
|
|
}
|
|
|
|
func (h *handle) MaxSteps() int {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.maxSteps
|
|
}
|
|
|
|
func (h *handle) KillCause() error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.killCause
|
|
}
|
|
|
|
func (h *handle) Stop() {
|
|
h.mu.Lock()
|
|
if !h.stopped {
|
|
h.stopped = true
|
|
close(h.stopCh)
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
// watch fires the Escalator once per idle period the run crosses its soft
|
|
// timeout, and applies the returned Decision.
|
|
func (h *handle) watch(ctx context.Context, interval time.Duration) {
|
|
// A misbehaving Escalator that panics must not silently kill the watch
|
|
// goroutine (which would leave the run unmonitored for its lifetime). Log
|
|
// and exit cleanly — the run falls back to the deadline already set.
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r)
|
|
}
|
|
}()
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-h.stopCh:
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
h.tick(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *handle) tick(ctx context.Context) {
|
|
h.mu.Lock()
|
|
// Kill is sticky: once an Escalator has killed this run, no later tick (and
|
|
// no later Decision) un-collapses the deadline.
|
|
if h.killed {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
idle := h.now().Sub(h.lastActivity)
|
|
// Only escalate once per idle period: skip if we already escalated for this
|
|
// exact lastActivity (a fresh step/tool updates lastActivity and re-arms).
|
|
if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
h.escalatedAt = h.lastActivity
|
|
snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool}
|
|
h.mu.Unlock()
|
|
|
|
d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap)
|
|
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran
|
|
return
|
|
}
|
|
if d.Kill {
|
|
h.killed = true
|
|
reason := d.KillReason
|
|
if reason == "" {
|
|
reason = "critic killed the run"
|
|
}
|
|
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
|
|
h.deadline = h.now() // immediate hard deadline → executor cancels
|
|
return // ignore any Nudge/ExtendBy paired with a Kill
|
|
}
|
|
if len(d.Nudge) > 0 {
|
|
h.steer = append(h.steer, d.Nudge...)
|
|
}
|
|
if d.ExtendBy > 0 {
|
|
h.deadline = h.deadline.Add(d.ExtendBy)
|
|
}
|
|
if d.RaiseStepsBy > 0 {
|
|
// Overflow-safe: a buggy Escalator returning a huge delta must not wrap
|
|
// maxSteps negative (which the executor would read as "defer to base").
|
|
if d.RaiseStepsBy > math.MaxInt-h.maxSteps {
|
|
h.maxSteps = math.MaxInt
|
|
} else {
|
|
h.maxSteps += d.RaiseStepsBy
|
|
}
|
|
}
|
|
}
|