P4c: remaining batteries — checkpoint + schedule + critic
executus CI / test (push) Failing after 1m6s
executus CI / test (push) Failing after 1m6s
Completes the P4 battery set (squashed onto main from phase-4c-batteries). - checkpoint/: run.Checkpointer durable-resume (CheckpointStore + throttled handle + Memory). - schedule/: generic cron Runner (Tick/Loop; no cron grammar of its own). - critic/: two-tier timeout watchdog (run.Critic) + Escalator policy seam + ExtendOnce default. Includes the verified gadfly #6 fixes (ExtendOnce per-run, Kill-sticky, watch panic-recovery; checkpoint throttle-after-success; schedule Next-before-Run + nil-guard + Loop recovery). P4 battery set complete: audit, budget, persona, skill, checkpoint, schedule, critic — each nil-safe, each with a default, each core-import-clean. Executor wiring for Critic/Checkpointer remains a P2 follow-up. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,50 @@
|
|||||||
|
// Package checkpoint is the durable-resume battery: it persists a run's
|
||||||
|
// resumable progress so a run interrupted by a shutdown can be recovered and
|
||||||
|
// continued on the next boot, rather than silently lost. It plugs into
|
||||||
|
// run.Ports.Checkpointer.
|
||||||
|
//
|
||||||
|
// Mort backs CheckpointStore with its durable-job table; Memory() is the
|
||||||
|
// zero-dependency default; contrib/store can add a SQLite one. NOTE: the
|
||||||
|
// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery
|
||||||
|
// provides the seam + impls ahead of that wiring.
|
||||||
|
package checkpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RunCheckpointMeta is the run attribution needed to resume a run from scratch
|
||||||
|
// (mirrors mort's agentexec.RunCheckpointMeta).
|
||||||
|
type RunCheckpointMeta struct {
|
||||||
|
RunID string
|
||||||
|
AgentID string
|
||||||
|
AgentName string
|
||||||
|
CallerID string
|
||||||
|
ChannelID string
|
||||||
|
GuildID string
|
||||||
|
Prompt string
|
||||||
|
ModelTier string
|
||||||
|
ParentRunID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunCheckpoint is one persisted snapshot of a run's resumable progress.
|
||||||
|
type RunCheckpoint struct {
|
||||||
|
Meta RunCheckpointMeta
|
||||||
|
Messages []llm.Message // conversation so far
|
||||||
|
Iteration int // completed agent-loop iterations
|
||||||
|
ActivePhase string // current phase name (multi-phase agents); "" otherwise
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckpointStore persists run checkpoints keyed by run id. A live checkpoint
|
||||||
|
// means "this run was in flight and not cleanly finished"; Complete/Fail delete
|
||||||
|
// it. ListInterrupted returns every surviving checkpoint at boot for recovery.
|
||||||
|
type CheckpointStore interface {
|
||||||
|
Save(ctx context.Context, cp RunCheckpoint) error
|
||||||
|
Load(ctx context.Context, runID string) (*RunCheckpoint, error)
|
||||||
|
Delete(ctx context.Context, runID string) error
|
||||||
|
ListInterrupted(ctx context.Context) ([]RunCheckpoint, error)
|
||||||
|
}
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
package checkpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHandleSaveCompleteDelete(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mem := NewMemory()
|
||||||
|
meta := RunCheckpointMeta{RunID: "r1", AgentID: "a1", CallerID: "c1"}
|
||||||
|
cp := New(mem, meta, 0, nil) // throttle 0 = save every call
|
||||||
|
|
||||||
|
if err := cp.Save(ctx, run.RunCheckpointState{Messages: []llm.Message{{Role: "user"}}, Iteration: 2}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got, _ := mem.Load(ctx, "r1")
|
||||||
|
if got == nil || got.Iteration != 2 || got.Meta.AgentID != "a1" {
|
||||||
|
t.Fatalf("checkpoint not persisted: %+v", got)
|
||||||
|
}
|
||||||
|
if il, _ := mem.ListInterrupted(ctx); len(il) != 1 {
|
||||||
|
t.Errorf("ListInterrupted = %d, want 1 (in-flight)", len(il))
|
||||||
|
}
|
||||||
|
// Complete clears it (no longer a recovery candidate).
|
||||||
|
if err := cp.Complete(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if il, _ := mem.ListInterrupted(ctx); len(il) != 0 {
|
||||||
|
t.Errorf("after Complete, ListInterrupted = %d, want 0", len(il))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleThrottle(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mem := NewMemory()
|
||||||
|
now := time.Now()
|
||||||
|
cp := New(mem, RunCheckpointMeta{RunID: "r"}, time.Minute, func() time.Time { return now })
|
||||||
|
|
||||||
|
cp.Save(ctx, run.RunCheckpointState{Iteration: 1})
|
||||||
|
now = now.Add(10 * time.Second) // within throttle window
|
||||||
|
cp.Save(ctx, run.RunCheckpointState{Iteration: 2})
|
||||||
|
if got, _ := mem.Load(ctx, "r"); got.Iteration != 1 {
|
||||||
|
t.Errorf("throttled save should keep iteration 1, got %d", got.Iteration)
|
||||||
|
}
|
||||||
|
now = now.Add(time.Minute) // past throttle
|
||||||
|
cp.Save(ctx, run.RunCheckpointState{Iteration: 3})
|
||||||
|
if got, _ := mem.Load(ctx, "r"); got.Iteration != 3 {
|
||||||
|
t.Errorf("post-throttle save should land iteration 3, got %d", got.Iteration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNilStoreNoop(t *testing.T) {
|
||||||
|
cp := New(nil, RunCheckpointMeta{RunID: "r"}, 0, nil)
|
||||||
|
if err := cp.Save(context.Background(), run.RunCheckpointState{}); err != nil {
|
||||||
|
t.Errorf("nil-store Save should be a no-op, got %v", err)
|
||||||
|
}
|
||||||
|
if err := cp.Complete(context.Background()); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package checkpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handle is a per-run run.Checkpointer bound to one run's id + meta. Save writes
|
||||||
|
// a fresh snapshot (throttled), Complete/Fail delete the checkpoint (a cleanly
|
||||||
|
// finished or terminally failed run is NOT a recovery candidate). A run
|
||||||
|
// interrupted by shutdown never calls Complete/Fail, so its checkpoint survives
|
||||||
|
// for ListInterrupted at boot.
|
||||||
|
type handle struct {
|
||||||
|
store CheckpointStore
|
||||||
|
meta RunCheckpointMeta
|
||||||
|
throttle time.Duration
|
||||||
|
now func() time.Time
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
lastSave time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ run.Checkpointer = (*handle)(nil)
|
||||||
|
|
||||||
|
// New returns a run.Checkpointer that persists snapshots of the run identified
|
||||||
|
// by meta.RunID to store, no more often than throttle (Save calls inside the
|
||||||
|
// window are skipped). A nil store yields a no-op Checkpointer. throttle <= 0
|
||||||
|
// saves every call; now defaults to time.Now.
|
||||||
|
func New(store CheckpointStore, meta RunCheckpointMeta, throttle time.Duration, now func() time.Time) run.Checkpointer {
|
||||||
|
if store == nil {
|
||||||
|
return noop{}
|
||||||
|
}
|
||||||
|
if now == nil {
|
||||||
|
now = time.Now
|
||||||
|
}
|
||||||
|
return &handle{store: store, meta: meta, throttle: throttle, now: now}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error {
|
||||||
|
h.mu.Lock()
|
||||||
|
now := h.now()
|
||||||
|
if h.throttle > 0 && !h.lastSave.IsZero() && now.Sub(h.lastSave) < h.throttle {
|
||||||
|
h.mu.Unlock()
|
||||||
|
return nil // throttled — a more recent snapshot will land shortly
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
// Advance the throttle clock only AFTER a successful persist. If the store
|
||||||
|
// write fails, lastSave stays put so the next Save isn't throttled away —
|
||||||
|
// otherwise a transient store error would silently drop the snapshot the
|
||||||
|
// caller believes was saved. (A run drives one Save goroutine, so the brief
|
||||||
|
// unguarded window here can't double-write.)
|
||||||
|
if err := h.store.Save(ctx, RunCheckpoint{
|
||||||
|
Meta: h.meta,
|
||||||
|
Messages: st.Messages,
|
||||||
|
Iteration: st.Iteration,
|
||||||
|
UpdatedAt: now,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.mu.Lock()
|
||||||
|
if now.After(h.lastSave) {
|
||||||
|
h.lastSave = now
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) }
|
||||||
|
|
||||||
|
func (h *handle) Fail(ctx context.Context, _ error) error { return h.store.Delete(ctx, h.meta.RunID) }
|
||||||
|
|
||||||
|
// noop is the nil-store Checkpointer: every method is a successful no-op.
|
||||||
|
type noop struct{}
|
||||||
|
|
||||||
|
var _ run.Checkpointer = noop{}
|
||||||
|
|
||||||
|
func (noop) Save(context.Context, run.RunCheckpointState) error { return nil }
|
||||||
|
func (noop) Complete(context.Context) error { return nil }
|
||||||
|
func (noop) Fail(context.Context, error) error { return nil }
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package checkpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Memory is a zero-dependency in-process CheckpointStore. NOTE: an in-memory
|
||||||
|
// checkpoint store does NOT survive the process restart it exists to recover
|
||||||
|
// from — it is the test/light-host default and makes ListInterrupted meaningful
|
||||||
|
// only within a single process lifetime. A host that wants real
|
||||||
|
// crash-recovery wires a durable CheckpointStore (mort's durable-job table).
|
||||||
|
type Memory struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
cps map[string]RunCheckpoint // by run id
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMemory returns an empty in-memory CheckpointStore.
|
||||||
|
func NewMemory() *Memory { return &Memory{cps: map[string]RunCheckpoint{}} }
|
||||||
|
|
||||||
|
var _ CheckpointStore = (*Memory)(nil)
|
||||||
|
|
||||||
|
func (m *Memory) Save(_ context.Context, cp RunCheckpoint) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.cps[cp.Meta.RunID] = cp
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) Load(_ context.Context, runID string) (*RunCheckpoint, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
cp, ok := m.cps[runID]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil // no checkpoint (not an error — the run finished cleanly or never started)
|
||||||
|
}
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) Delete(_ context.Context, runID string) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
delete(m.cps, runID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListInterrupted(_ context.Context) ([]RunCheckpoint, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
out := make([]RunCheckpoint, 0, len(m.cps))
|
||||||
|
for _, cp := range m.cps {
|
||||||
|
out = append(out, cp)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,266 @@
|
|||||||
|
// Package critic is the run-watchdog battery: a two-tier timeout monitor that
|
||||||
|
// catches a run that has stopped making progress. It plugs into
|
||||||
|
// run.Ports.Critic.
|
||||||
|
//
|
||||||
|
// The split of concerns is deliberate. executus owns the deterministic
|
||||||
|
// MECHANICS — track activity, fire on a soft timeout, enforce a hard-kill
|
||||||
|
// backstop, carry steer messages and the extendable deadline back to the
|
||||||
|
// executor. The POLICY — what to actually do when a run stalls (nudge it,
|
||||||
|
// extend its deadline, kill it, escalate to a human) — is the Escalator seam.
|
||||||
|
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
|
||||||
|
// zero-dependency default.
|
||||||
|
//
|
||||||
|
// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this
|
||||||
|
// battery provides the seam + impl ahead of that wiring.
|
||||||
|
package critic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Progress is the snapshot the critic hands an Escalator when a run stalls.
|
||||||
|
type Progress struct {
|
||||||
|
Iterations int // completed agent-loop iterations so far
|
||||||
|
LastActivity time.Time // wall-clock of the last step/tool event
|
||||||
|
Idle time.Duration // now - LastActivity
|
||||||
|
LastTool string // name of the most recently started tool ("" if none)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decision is the Escalator's verdict for a stalled run. Zero value = do
|
||||||
|
// nothing (let the hard backstop eventually kill a truly hung run).
|
||||||
|
type Decision struct {
|
||||||
|
Nudge []llm.Message // injected before the agent's next turn (a steer)
|
||||||
|
ExtendBy time.Duration // push the hard deadline out by this much
|
||||||
|
Kill bool // cancel the run now
|
||||||
|
KillReason string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Escalator decides what to do when a run crosses its soft timeout. It is
|
||||||
|
// called at most once per idle period (a fresh step/tool event re-arms it).
|
||||||
|
type Escalator interface {
|
||||||
|
OnSoftTimeout(ctx context.Context, info run.RunInfo, p Progress) Decision
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtendOnce is the default Escalator: the first time a given run stalls it
|
||||||
|
// extends that run's deadline by By (giving a slow-but-healthy run room), then
|
||||||
|
// takes no further action for it — so a genuinely hung run is later killed by
|
||||||
|
// the hard backstop. A nil/zero By falls back to one soft-timeout's worth.
|
||||||
|
//
|
||||||
|
// The one-shot is keyed PER RUN (by RunInfo.RunID): a single System shares one
|
||||||
|
// ExtendOnce across every run it monitors, so a global flag would let only the
|
||||||
|
// first run to stall ever get its extension. The fired set grows with the
|
||||||
|
// number of distinct runs that stall — fine for a process's run volume; a host
|
||||||
|
// running unboundedly long can construct a fresh System periodically.
|
||||||
|
type ExtendOnce struct {
|
||||||
|
By time.Duration
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
fired map[string]bool // run ids that have already had their one extension
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnSoftTimeout implements Escalator.
|
||||||
|
func (e *ExtendOnce) OnSoftTimeout(_ context.Context, info run.RunInfo, p Progress) Decision {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
if e.fired[info.RunID] {
|
||||||
|
return Decision{}
|
||||||
|
}
|
||||||
|
if e.fired == nil {
|
||||||
|
e.fired = map[string]bool{}
|
||||||
|
}
|
||||||
|
e.fired[info.RunID] = true
|
||||||
|
by := e.By
|
||||||
|
if by <= 0 {
|
||||||
|
by = p.Idle // ~one soft timeout
|
||||||
|
}
|
||||||
|
return Decision{ExtendBy: by}
|
||||||
|
}
|
||||||
|
|
||||||
|
// System implements run.Critic. Construct with New; one System monitors many
|
||||||
|
// runs concurrently (each Monitor returns an independent handle).
|
||||||
|
type System struct {
|
||||||
|
esc Escalator
|
||||||
|
backstopMul float64 // hard deadline = softTimeout * backstopMul from start
|
||||||
|
checkInterval time.Duration
|
||||||
|
now func() time.Time
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *System) log() *slog.Logger {
|
||||||
|
if s.logger != nil {
|
||||||
|
return s.logger
|
||||||
|
}
|
||||||
|
return slog.Default()
|
||||||
|
}
|
||||||
|
|
||||||
|
// New builds a run.Critic. esc is the policy (nil → ExtendOnce). backstopMul is
|
||||||
|
// the hard-kill backstop as a multiple of each run's soft timeout (<=1 → 3). A
|
||||||
|
// nil esc + the default backstop gives a safe "extend once, then hard-kill"
|
||||||
|
// watchdog with no host wiring.
|
||||||
|
func New(esc Escalator, backstopMul float64) *System {
|
||||||
|
if esc == nil {
|
||||||
|
esc = &ExtendOnce{}
|
||||||
|
}
|
||||||
|
if backstopMul <= 1 {
|
||||||
|
backstopMul = 3
|
||||||
|
}
|
||||||
|
return &System{esc: esc, backstopMul: backstopMul, now: time.Now}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ run.Critic = (*System)(nil)
|
||||||
|
|
||||||
|
// Monitor starts watching a run and returns its handle. Implements run.Critic.
|
||||||
|
func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time.Duration) run.CriticHandle {
|
||||||
|
if softTimeout <= 0 {
|
||||||
|
return run.CriticHandle(nil) // no soft timeout → not monitored
|
||||||
|
}
|
||||||
|
now := s.now()
|
||||||
|
check := s.checkInterval
|
||||||
|
if check <= 0 {
|
||||||
|
check = softTimeout / 2
|
||||||
|
if check < time.Second {
|
||||||
|
check = time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h := &handle{
|
||||||
|
sys: s,
|
||||||
|
info: info,
|
||||||
|
softTimeout: softTimeout,
|
||||||
|
now: s.now,
|
||||||
|
lastActivity: now,
|
||||||
|
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go h.watch(ctx, check)
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle is one run's live critic link. Implements run.CriticHandle.
|
||||||
|
type handle struct {
|
||||||
|
sys *System
|
||||||
|
info run.RunInfo
|
||||||
|
softTimeout time.Duration
|
||||||
|
now func() time.Time
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
lastActivity time.Time
|
||||||
|
escalatedAt time.Time // lastActivity value we last escalated for (de-dupes per idle period)
|
||||||
|
deadline time.Time
|
||||||
|
steer []llm.Message
|
||||||
|
iterations int
|
||||||
|
lastTool string
|
||||||
|
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
||||||
|
stopped bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) RecordStep(iter int) {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.iterations = iter
|
||||||
|
h.lastActivity = h.now()
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) RecordToolStart(name, _ string) {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.lastTool = name
|
||||||
|
h.lastActivity = h.now()
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Steer() []llm.Message {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
if len(h.steer) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := h.steer
|
||||||
|
h.steer = nil
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Deadline() time.Time {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.deadline
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Stop() {
|
||||||
|
h.mu.Lock()
|
||||||
|
if !h.stopped {
|
||||||
|
h.stopped = true
|
||||||
|
close(h.stopCh)
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch fires the Escalator once per idle period the run crosses its soft
|
||||||
|
// timeout, and applies the returned Decision.
|
||||||
|
func (h *handle) watch(ctx context.Context, interval time.Duration) {
|
||||||
|
// A misbehaving Escalator that panics must not silently kill the watch
|
||||||
|
// goroutine (which would leave the run unmonitored for its lifetime). Log
|
||||||
|
// and exit cleanly — the run falls back to the deadline already set.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
h.sys.log().Error("critic watch panicked; run is now unmonitored", "run", h.info.RunID, "panic", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
t := time.NewTicker(interval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-h.stopCh:
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
h.tick(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) tick(ctx context.Context) {
|
||||||
|
h.mu.Lock()
|
||||||
|
// Kill is sticky: once an Escalator has killed this run, no later tick (and
|
||||||
|
// no later Decision) un-collapses the deadline.
|
||||||
|
if h.killed {
|
||||||
|
h.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
idle := h.now().Sub(h.lastActivity)
|
||||||
|
// Only escalate once per idle period: skip if we already escalated for this
|
||||||
|
// exact lastActivity (a fresh step/tool updates lastActivity and re-arms).
|
||||||
|
if idle < h.softTimeout || h.escalatedAt.Equal(h.lastActivity) {
|
||||||
|
h.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.escalatedAt = h.lastActivity
|
||||||
|
snap := Progress{Iterations: h.iterations, LastActivity: h.lastActivity, Idle: idle, LastTool: h.lastTool}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
d := h.sys.esc.OnSoftTimeout(ctx, h.info, snap)
|
||||||
|
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
if h.killed { // a concurrent tick may have killed while OnSoftTimeout ran
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if d.Kill {
|
||||||
|
h.killed = true
|
||||||
|
h.deadline = h.now() // immediate hard deadline → executor cancels
|
||||||
|
return // ignore any Nudge/ExtendBy paired with a Kill
|
||||||
|
}
|
||||||
|
if len(d.Nudge) > 0 {
|
||||||
|
h.steer = append(h.steer, d.Nudge...)
|
||||||
|
}
|
||||||
|
if d.ExtendBy > 0 {
|
||||||
|
h.deadline = h.deadline.Add(d.ExtendBy)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,100 @@
|
|||||||
|
package critic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// escFunc adapts a func to an Escalator.
|
||||||
|
type escFunc func(context.Context, run.RunInfo, Progress) Decision
|
||||||
|
|
||||||
|
func (f escFunc) OnSoftTimeout(ctx context.Context, i run.RunInfo, p Progress) Decision {
|
||||||
|
return f(ctx, i, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
|
||||||
|
var mu sync.Mutex
|
||||||
|
var calls int
|
||||||
|
esc := escFunc(func(_ context.Context, _ run.RunInfo, p Progress) Decision {
|
||||||
|
mu.Lock()
|
||||||
|
calls++
|
||||||
|
mu.Unlock()
|
||||||
|
return Decision{ExtendBy: 50 * time.Millisecond, Nudge: []llm.Message{{Role: llm.RoleUser}}}
|
||||||
|
})
|
||||||
|
s := New(esc, 3)
|
||||||
|
s.checkInterval = 5 * time.Millisecond
|
||||||
|
h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond)
|
||||||
|
defer h.Stop()
|
||||||
|
|
||||||
|
d0 := h.Deadline()
|
||||||
|
time.Sleep(60 * time.Millisecond) // cross the soft timeout with no activity
|
||||||
|
mu.Lock()
|
||||||
|
c := calls
|
||||||
|
mu.Unlock()
|
||||||
|
if c < 1 {
|
||||||
|
t.Fatalf("expected at least one escalation, got %d", c)
|
||||||
|
}
|
||||||
|
// Nudge was queued and is drained once.
|
||||||
|
if msgs := h.Steer(); len(msgs) == 0 {
|
||||||
|
t.Error("expected a queued steer nudge")
|
||||||
|
}
|
||||||
|
if msgs := h.Steer(); len(msgs) != 0 {
|
||||||
|
t.Error("steer should drain (be empty on second read)")
|
||||||
|
}
|
||||||
|
// Deadline was extended.
|
||||||
|
if !h.Deadline().After(d0) {
|
||||||
|
t.Error("deadline should have been extended past the original")
|
||||||
|
}
|
||||||
|
// A fresh step re-arms; another idle period escalates again.
|
||||||
|
h.RecordStep(1)
|
||||||
|
time.Sleep(60 * time.Millisecond)
|
||||||
|
mu.Lock()
|
||||||
|
c2 := calls
|
||||||
|
mu.Unlock()
|
||||||
|
if c2 <= c {
|
||||||
|
t.Errorf("a re-armed idle period should escalate again (%d -> %d)", c, c2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKillCollapsesDeadline(t *testing.T) {
|
||||||
|
esc := escFunc(func(context.Context, run.RunInfo, Progress) Decision {
|
||||||
|
return Decision{Kill: true, KillReason: "hung"}
|
||||||
|
})
|
||||||
|
s := New(esc, 10) // big backstop so only Kill collapses it
|
||||||
|
s.checkInterval = 5 * time.Millisecond
|
||||||
|
h := s.Monitor(context.Background(), run.RunInfo{RunID: "r"}, 20*time.Millisecond)
|
||||||
|
defer h.Stop()
|
||||||
|
time.Sleep(60 * time.Millisecond)
|
||||||
|
if h.Deadline().After(time.Now().Add(time.Second)) {
|
||||||
|
t.Error("Kill should collapse the deadline to ~now")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtendOnceOnlyFiresOnce(t *testing.T) {
|
||||||
|
e := &ExtendOnce{By: time.Minute}
|
||||||
|
// Same run id: only the first call extends.
|
||||||
|
d1 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{})
|
||||||
|
d2 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r1"}, Progress{})
|
||||||
|
if d1.ExtendBy != time.Minute {
|
||||||
|
t.Errorf("first decision should extend, got %+v", d1)
|
||||||
|
}
|
||||||
|
if d2.ExtendBy != 0 || d2.Kill {
|
||||||
|
t.Errorf("second call for the same run should be a no-op, got %+v", d2)
|
||||||
|
}
|
||||||
|
// A DIFFERENT run still gets its own one extension (per-run, not global).
|
||||||
|
if d3 := e.OnSoftTimeout(context.Background(), run.RunInfo{RunID: "r2"}, Progress{}); d3.ExtendBy != time.Minute {
|
||||||
|
t.Errorf("a different run should get its own extension, got %+v", d3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestZeroSoftTimeoutNotMonitored(t *testing.T) {
|
||||||
|
s := New(nil, 3)
|
||||||
|
if h := s.Monitor(context.Background(), run.RunInfo{}, 0); h != nil {
|
||||||
|
t.Error("zero soft timeout should return a nil handle (not monitored)")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,132 @@
|
|||||||
|
// Package schedule is the cron-runner battery: a generic ticker that, each
|
||||||
|
// interval, asks a store for the jobs whose next-run time has passed, runs each
|
||||||
|
// one, and stamps its next fire time. It is host-agnostic orchestration — the
|
||||||
|
// host wires the store (skill.SkillStore.ListDueScheduled /
|
||||||
|
// persona.Storage.ListScheduledAgents), the run (run.Executor), and the cron
|
||||||
|
// "next fire" function (a cron library, or skill's schedule parser). The
|
||||||
|
// battery owns no cron grammar of its own, so it never duplicates the parser.
|
||||||
|
package schedule
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Due is one schedulable job: its id and its cron expression.
|
||||||
|
type Due struct {
|
||||||
|
ID string
|
||||||
|
Cron string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Runner periodically fires due jobs. Every func field is required except Now
|
||||||
|
// (defaults to time.Now) and Logger (defaults to slog.Default). Construct the
|
||||||
|
// struct directly and call Loop (or Tick for a single pass / tests).
|
||||||
|
type Runner struct {
|
||||||
|
// Interval is how often Loop checks for due jobs. <= 0 defaults to 1m.
|
||||||
|
Interval time.Duration
|
||||||
|
// Due lists the jobs due at now.
|
||||||
|
Due func(ctx context.Context, now time.Time) ([]Due, error)
|
||||||
|
// Run executes one job by id.
|
||||||
|
Run func(ctx context.Context, id string) error
|
||||||
|
// Mark records that a job ran at ranAt and is next due at nextAt.
|
||||||
|
Mark func(ctx context.Context, id string, ranAt, nextAt time.Time) error
|
||||||
|
// Next computes a cron expression's next fire after a given time.
|
||||||
|
Next func(cron string, after time.Time) (time.Time, error)
|
||||||
|
|
||||||
|
Now func() time.Time
|
||||||
|
Logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Runner) now() time.Time {
|
||||||
|
if r.Now != nil {
|
||||||
|
return r.Now()
|
||||||
|
}
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Runner) log() *slog.Logger {
|
||||||
|
if r.Logger != nil {
|
||||||
|
return r.Logger
|
||||||
|
}
|
||||||
|
return slog.Default()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tick runs one pass: every currently-due job is run, then stamped with its
|
||||||
|
// next fire time. A job whose Run or Next errors is logged and skipped (its
|
||||||
|
// next-run time is left unchanged so it stays due and retries next tick) — one
|
||||||
|
// bad job never stalls the others. Returns the error from Due (the only
|
||||||
|
// pass-fatal step).
|
||||||
|
func (r *Runner) Tick(ctx context.Context) error {
|
||||||
|
if err := r.validate(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
now := r.now()
|
||||||
|
due, err := r.Due(ctx, now)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, j := range due {
|
||||||
|
// Compute the next fire BEFORE running. A permanently-unparseable cron
|
||||||
|
// then skips the job entirely (logged) rather than running it — an
|
||||||
|
// unstamped job stays due, so checking Next first avoids a hot-loop of
|
||||||
|
// real Run executions every tick.
|
||||||
|
next, err := r.Next(j.Cron, now)
|
||||||
|
if err != nil {
|
||||||
|
r.log().Warn("scheduled job has an unparseable cron; skipping (not run, not rescheduled)", "job", j.ID, "cron", j.Cron, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := r.Run(ctx, j.ID); err != nil {
|
||||||
|
r.log().Warn("scheduled job failed; stays due, will retry next tick", "job", j.ID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// A Mark failure leaves the job due, so it re-runs next tick — Run must
|
||||||
|
// be idempotent (there is no atomic run+stamp across two host callbacks).
|
||||||
|
if err := r.Mark(ctx, j.ID, now, next); err != nil {
|
||||||
|
r.log().Warn("failed to stamp next run; job may re-execute next tick (Run must be idempotent)", "job", j.ID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate reports a misconfigured Runner (a required callback left nil) as a
|
||||||
|
// clear error rather than a nil-deref panic on first tick.
|
||||||
|
func (r *Runner) validate() error {
|
||||||
|
if r.Due == nil || r.Run == nil || r.Mark == nil || r.Next == nil {
|
||||||
|
return errors.New("schedule: Runner requires non-nil Due, Run, Mark, and Next")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop ticks every Interval until ctx is cancelled. A Tick error (the Due
|
||||||
|
// lister failing) is logged and the loop continues — a transient store hiccup
|
||||||
|
// shouldn't kill the scheduler — and a panic from any host callback is
|
||||||
|
// recovered so one bad tick can't silently kill the scheduler goroutine.
|
||||||
|
func (r *Runner) Loop(ctx context.Context) {
|
||||||
|
interval := r.Interval
|
||||||
|
if interval <= 0 {
|
||||||
|
interval = time.Minute
|
||||||
|
}
|
||||||
|
t := time.NewTicker(interval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
r.safeTick(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Runner) safeTick(ctx context.Context) {
|
||||||
|
defer func() {
|
||||||
|
if rec := recover(); rec != nil {
|
||||||
|
r.log().Error("schedule tick panicked; scheduler continues", "panic", rec)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err := r.Tick(ctx); err != nil {
|
||||||
|
r.log().Warn("schedule tick failed", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
package schedule
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTickRunsDueAndStampsNext(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
|
var ran []string
|
||||||
|
marked := map[string]time.Time{}
|
||||||
|
|
||||||
|
r := &Runner{
|
||||||
|
Now: func() time.Time { return now },
|
||||||
|
Due: func(_ context.Context, _ time.Time) ([]Due, error) {
|
||||||
|
return []Due{{ID: "a", Cron: "hourly"}, {ID: "b", Cron: "bad"}}, nil
|
||||||
|
},
|
||||||
|
Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil },
|
||||||
|
Mark: func(_ context.Context, id string, _, next time.Time) error { marked[id] = next; return nil },
|
||||||
|
Next: func(cron string, after time.Time) (time.Time, error) {
|
||||||
|
if cron == "bad" {
|
||||||
|
return time.Time{}, errors.New("unparseable")
|
||||||
|
}
|
||||||
|
return after.Add(time.Hour), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := r.Tick(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Next is checked first, so the bad-cron job is skipped BEFORE Run — only
|
||||||
|
// the parseable job runs and gets stamped (no hot-loop of a bad-cron Run).
|
||||||
|
if len(ran) != 1 || ran[0] != "a" {
|
||||||
|
t.Errorf("ran = %v, want only [a] (bad-cron b skipped before Run)", ran)
|
||||||
|
}
|
||||||
|
if marked["a"] != now.Add(time.Hour) {
|
||||||
|
t.Errorf("a next = %v, want +1h", marked["a"])
|
||||||
|
}
|
||||||
|
if _, ok := marked["b"]; ok {
|
||||||
|
t.Errorf("b should not be stamped (bad cron), got %v", marked["b"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTickRunFailureDoesNotStampOrStall(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
var ran []string
|
||||||
|
marked := map[string]bool{}
|
||||||
|
r := &Runner{
|
||||||
|
Due: func(_ context.Context, _ time.Time) ([]Due, error) {
|
||||||
|
return []Due{{ID: "x", Cron: "h"}, {ID: "y", Cron: "h"}}, nil
|
||||||
|
},
|
||||||
|
Run: func(_ context.Context, id string) error {
|
||||||
|
ran = append(ran, id)
|
||||||
|
if id == "x" {
|
||||||
|
return errors.New("boom")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
Mark: func(_ context.Context, id string, _, _ time.Time) error { marked[id] = true; return nil },
|
||||||
|
Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil },
|
||||||
|
}
|
||||||
|
if err := r.Tick(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(ran) != 2 { // y still runs despite x failing
|
||||||
|
t.Errorf("ran = %v, want both attempted", ran)
|
||||||
|
}
|
||||||
|
if marked["x"] { // failed job NOT stamped -> stays due, retries
|
||||||
|
t.Error("failed job x should not be stamped")
|
||||||
|
}
|
||||||
|
if !marked["y"] {
|
||||||
|
t.Error("y should be stamped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTickDueErrorIsFatalToPass(t *testing.T) {
|
||||||
|
r := &Runner{
|
||||||
|
Due: func(context.Context, time.Time) ([]Due, error) { return nil, errors.New("store down") },
|
||||||
|
Run: func(context.Context, string) error { return nil },
|
||||||
|
Mark: func(context.Context, string, time.Time, time.Time) error { return nil },
|
||||||
|
Next: func(string, time.Time) (time.Time, error) { return time.Now(), nil },
|
||||||
|
}
|
||||||
|
if err := r.Tick(context.Background()); err == nil {
|
||||||
|
t.Error("Tick should surface the Due lister error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnparseableCronSkipsRunEntirely(t *testing.T) {
|
||||||
|
var ran []string
|
||||||
|
r := &Runner{
|
||||||
|
Due: func(context.Context, time.Time) ([]Due, error) { return []Due{{ID: "z", Cron: "bad"}}, nil },
|
||||||
|
Run: func(_ context.Context, id string) error { ran = append(ran, id); return nil },
|
||||||
|
Mark: func(context.Context, string, time.Time, time.Time) error { return nil },
|
||||||
|
Next: func(string, time.Time) (time.Time, error) { return time.Time{}, errors.New("bad cron") },
|
||||||
|
}
|
||||||
|
if err := r.Tick(context.Background()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(ran) != 0 {
|
||||||
|
t.Errorf("a job with an unparseable cron must NOT be run (avoids hot-loop), ran=%v", ran)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateRejectsNilCallbacks(t *testing.T) {
|
||||||
|
r := &Runner{Due: func(context.Context, time.Time) ([]Due, error) { return nil, nil }} // missing Run/Mark/Next
|
||||||
|
if err := r.Tick(context.Background()); err == nil {
|
||||||
|
t.Error("Tick should return a validation error for a partially-wired Runner, not panic")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user