P4c: remaining batteries — checkpoint + schedule + critic #6
@@ -0,0 +1,228 @@
|
||||
// Package critic is the run-watchdog battery: a two-tier timeout monitor that
|
||||
// catches a run that has stopped making progress. It plugs into
|
||||
// run.Ports.Critic.
|
||||
//
|
||||
// The split of concerns is deliberate. executus owns the deterministic
|
||||
// MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill
|
||||
// backstop, carry steer messages and the extendable deadline back to the
|
||||
// executor. The POLICY — what to actually do when a run stalls (nudge it,
|
||||
// extend its deadline, kill it, escalate to a human) — is the Escalator seam.
|
||||
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
|
||||
// zero-dependency default.
|
||||
//
|
||||
// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this
|
||||
// battery provides the seam + impl ahead of that wiring.
|
||||
package critic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
)
|
||||
|
||||
// Progress is the snapshot the critic hands an Escalator when a run stalls.
|
||||
type Progress struct {
|
||||
Iterations int // completed agent-loop iterations so far
|
||||
LastActivity time.Time // wall-clock of the last step/tool event
|
||||
Idle time.Duration // now - LastActivity
|
||||
LastTool string // name of the most recently started tool ("" if none)
|
||||
}
|
||||
|
||||
// Decision is the Escalator's verdict for a stalled run. Zero value = do
|
||||
// nothing (let the hard backstop eventually kill a truly hung run).
|
||||
type Decision struct {
|
||||
Nudge []llm.Message // injected before the agent's next turn (a steer)
|
||||
ExtendBy time.Duration // push the hard deadline out by this much
|
||||
Kill bool // cancel the run now
|
||||
KillReason string
|
||||
}
|
||||
|
||||
// Escalator decides what to do when a run crosses its soft timeout. It is
|
||||
// called at most once per idle period (a fresh step/tool event re-arms it).
|
||||
type Escalator interface {
|
||||
OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision
|
||||
}
|
||||
|
||||
// ExtendOnce is the default Escalator: the first time a run stalls it extends
|
||||
// the deadline by By (giving a slow-but-healthy run room), then takes no
|
||||
// further action — so a genuinely hung run is later killed by the hard
|
||||
// backstop. A nil/zero By falls back to one soft-timeout's worth.
|
||||
type ExtendOnce struct {
|
||||
By time.Duration
|
||||
|
||||
mu sync.Mutex
|
||||
fired bool
|
||||
}
|
||||
|
||||
// OnSoftTimeout implements Escalator.
|
||||
func (e *ExtendOnce) OnSoftTimeout(_ context.Context, _ run.RunInfo, p Progress) Decision {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if e.fired {
|
||||
return Decision{}
|
||||
}
|
||||
e.fired = true
|
||||
by := e.By
|
||||
if by <= 0 {
|
||||
by = p.Idle // ~one soft timeout
|
||||
}
|
||||
return Decision{ExtendBy: by}
|
||||
}
|
||||
|
||||
// System implements run.Critic. Construct with New; one System monitors many
|
||||
// runs concurrently (each Monitor returns an independent handle).
|
||||
type System struct {
|
||||
esc Escalator
|
||||
backstopMul float64 // hard deadline = softTimeout * backstopMul from start
|
||||
checkInterval time.Duration
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is
|
||||
// the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A
|
||||
// nil esc + the default backstop gives a safe "extend once, then hard-kill"
|
||||
// watchdog with no host wiring.
|
||||
func New(esc Escalator, backstopMul float64) *System {
|
||||
if esc == nil {
|
||||
esc = &ExtendOnce{}
|
||||
}
|
||||
if backstopMul <= 1 {
|
||||
backstopMul = 3
|
||||
}
|
||||
return &System{esc: esc, backstopMul: backstopMul, now: time.Now}
|
||||
}
|
||||
|
||||
var _ run.Critic = (*System)(nil)
|
||||
|
||||
// Monitor starts watching a run and returns its handle. Implements run.Critic.
|
||||
func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle {
|
||||
if softTimeout <= 0 {
|
||||
return run.CriticHandle(nil) // no soft timeout → not monitored
|
||||
}
|
||||
now := s.now()
|
||||
check := s.checkInterval
|
||||
if check <= 0 {
|
||||
check = softTimeout / 2
|
||||
if check < time.Second {
|
||||
check = time.Second
|
||||
}
|
||||
}
|
||||
h := &handle{
|
||||
sys: s,
|
||||
info: info,
|
||||
softTimeout: softTimeout,
|
||||
now: s.now,
|
||||
lastActivity: now,
|
||||
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
go h.watch(ctx, check)
|
||||
return h
|
||||
}
|
||||
|
||||
// handle is one run's live critic link. Implements run.CriticHandle.
|
||||
type handle struct {
|
||||
sys *System
|
||||
info run.RunInfo
|
||||
softTimeout time.Duration
|
||||
now func() time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
lastActivity time.Time
|
||||
escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period)
|
||||
deadline time.Time
|
||||
steer []llm.Message
|
||||
iterations int
|
||||
lastTool string
|
||||
stopped bool
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (h *handle) RecordStep(iter int) {
|
||||
h.mu.Lock()
|
||||
h.iterations = iter
|
||||
h.lastActivity = h.now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *handle) RecordToolStart(name, _ string) {
|
||||
h.mu.Lock()
|
||||
h.lastTool = name
|
||||
h.lastActivity = h.now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *handle) Steer() []llm.Message {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if len(h.steer) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := h.steer
|
||||
h.steer = nil
|
||||
return out
|
||||
}
|
||||
|
||||
func (h *handle) Deadline() time.Time {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.deadline
|
||||
}
|
||||
|
||||
func (h *handle) Stop() {
|
||||
h.mu.Lock()
|
||||
if !h.stopped {
|
||||
h.stopped = true
|
||||
close(h.stopCh)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// watch fires the Escalator once per idle period the run crosses its soft
|
||||
// timeout, and applies the returned Decision.
|
||||
func (h *handle) watch(ctx context.Context, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.stopCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
h.tick(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handle) tick(ctx context.Context) {
|
||||
h.mu.Lock()
|
||||
idle := h.now().Sub(h.lastActivity)
|
||||
// Only escalate once per idle period: skip if we already escalated for this
|
||||
// exact lastActivity (a fresh step/tool updates lastActivity and re-arms).
|
||||
if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
h.escalatedAt = h.lastActivity
|
||||
snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool}
|
||||
h.mu.Unlock()
|
||||
|
||||
d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap)
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if len(d.Nudge) > 0 {
|
||||
h.steer = append(h.steer, d.Nudge...)
|
||||
}
|
||||
if d.ExtendBy > 0 {
|
||||
h.deadline = h.deadline.Add(d.ExtendBy)
|
||||
}
|
||||
if d.Kill {
|
||||
h.deadline = h.now() // immediate hard deadline → executor cancels
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
package critic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// escFunc adapts a func to an Escalator.
|
||||
type escFunc func(context.Context, run.RunInfo, Progress) Decision
|
||||
|
||||
func (f escFunc) OnSoftTimeout(ctx context.Context, i run.RunInfo, p Progress) Decision {
|
||||
return f(ctx, i, p)
|
||||
}
|
||||
|
||||
func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
|
||||
var mu sync.Mutex
|
||||
var calls int
|
||||
esc := escFunc(func(_ context.Context, _ run.RunInfo, p Progress) Decision {
|
||||
mu.Lock()
|
||||
calls++
|
||||
mu.Unlock()
|
||||
return Decision{ExtendBy: 50 * time.Millisecond, Nudge: []llm.Message{{Role: llm.RoleUser}}}
|
||||
})
|
||||
s := New(esc, 3)
|
||||
s.checkInterval = 5 * time.Millisecond
|
||||
h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond)
|
||||
defer h.Stop()
|
||||
|
||||
d0 := h.Deadline()
|
||||
time.Sleep(60 * time.Millisecond) // cross the soft timeout with no activity
|
||||
mu.Lock()
|
||||
c := calls
|
||||
mu.Unlock()
|
||||
if c < 1 {
|
||||
t.Fatalf("expected at least one escalation, got %d", c)
|
||||
}
|
||||
// Nudge was queued and is drained once.
|
||||
if msgs := h.Steer(); len(msgs) == 0 {
|
||||
t.Error("expected a queued steer nudge")
|
||||
}
|
||||
if msgs := h.Steer(); len(msgs) != 0 {
|
||||
t.Error("steer should drain (be empty on second read)")
|
||||
}
|
||||
// Deadline was extended.
|
||||
if !h.Deadline().After(d0) {
|
||||
t.Error("deadline should have been extended past the original")
|
||||
}
|
||||
// A fresh step re-arms; another idle period escalates again.
|
||||
h.RecordStep(1)
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
mu.Lock()
|
||||
c2 := calls
|
||||
mu.Unlock()
|
||||
if c2 <= c {
|
||||
t.Errorf("a re-armed idle period should escalate again (%d -> %d)", c, c2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKillCollapsesDeadline(t *testing.T) {
|
||||
esc := escFunc(func(context.Context, run.RunInfo, Progress) Decision {
|
||||
return Decision{Kill: true, KillReason: "hung"}
|
||||
})
|
||||
s := New(esc, 10) // big backstop so only Kill collapses it
|
||||
s.checkInterval = 5 * time.Millisecond
|
||||
h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond)
|
||||
defer h.Stop()
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
if h.Deadline().After(time.Now().Add(time.Second)) {
|
||||
t.Error("Kill should collapse the deadline to ~now")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtendOnceOnlyFiresOnce(t *testing.T) {
|
||||
e := &ExtendOnce{By: time.Minute}
|
||||
d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{})
|
||||
d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{}, Progress{})
|
||||
if d1.ExtendBy != time.Minute {
|
||||
t.Errorf("first decision should extend, got %+v", d1)
|
||||
}
|
||||
if d2.ExtendBy != 0 || d2.Kill {
|
||||
t.Errorf("second decision should be a no-op, got %+v", d2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestZeroSoftTimeoutNotMonitored(t *testing.T) {
|
||||
s := New(nil, 3)
|
||||
if h := s.Monitor(context.Background(), run.RunInfo{}, 0); h != nil {
|
||||
t.Error("zero soft timeout should return a nil handle (not monitored)")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user