Files
executus/run/checkpoint_test.go
steve 899059a791
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 17m25s
feat(run): durable checkpoint + resume (wire Ports.Checkpointer)
The kernel defined run.Ports.Checkpointer + the checkpoint battery but never
drove them (the documented "P2 follow-up"). This wires durable recovery into
the run loop so a run interrupted by shutdown can resume on the next boot
instead of being lost — the executus-side half of mort's durable-agent-recovery
parity (mort #1355).

Kernel (run/):
- Ports.Checkpointer is now a CheckpointerFactory (Begin per run → a per-run
  Checkpointer, or nil for a non-durable run). The single per-instance
  Checkpointer couldn't distinguish runs; a factory mints one per run, matching
  mort's agentexec.CheckpointerFactory.
- RunInfo gains GuildID + ModelTier (so the factory can build resume meta);
  RunCheckpointState gains CompletedPhases + ActivePhase (+ PhaseOutput).
- run/checkpoint.go: ResumeState + WithResumeState / WithExistingCheckpointer
  context carriers, classifyCheckpointOutcome (success→Complete, shutdown→leave
  for boot recovery, else→Fail using run.ErrShutdown), and finalizeCheckpoint.
- run/executor.go: resolve the per-run checkpointer (existing-from-ctx on a
  recovery re-run, else factory.Begin); single-loop wraps the step observer to
  accumulate the transcript + Save each step (host throttles), and a recovered
  run seeds the saved transcript via WithHistory and continues with no new
  input; finalize on exit.
- run/phases.go: phase-boundary checkpointing — record completed phases after
  each phase; a resumed run skips already-completed phases (the interrupted
  phase re-runs from its start — boundary-granular, documented; only the
  single-loop path resumes mid-loop).

Battery (checkpoint/): NewFactory wires the battery into the factory port
(per-run handle, meta derived from RunInfo); RunCheckpoint + handle.Save carry
the phase fields.

Tests (run/checkpoint_test.go): the finalize decision matrix; single-loop
Save+Complete; terminal-error Fail; resume seeds history; phase-boundary Saves
completed phases; resume skips completed phases. Full ./... green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 16:04:06 -04:00

201 lines
7.4 KiB
Go

package run
import (
"context"
"errors"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// fakeCheckpointer records every Save state + whether Complete/Fail fired.
type fakeCheckpointer struct {
saves []RunCheckpointState
completed bool
failed bool
failErr error
}
func (c *fakeCheckpointer) Save(_ context.Context, st RunCheckpointState) error {
c.saves = append(c.saves, st)
return nil
}
func (c *fakeCheckpointer) Complete(context.Context) error { c.completed = true; return nil }
func (c *fakeCheckpointer) Fail(_ context.Context, err error) error {
c.failed = true
c.failErr = err
return nil
}
// fakeCheckpointFactory hands out one fakeCheckpointer and records the RunInfo.
type fakeCheckpointFactory struct {
cp *fakeCheckpointer
info RunInfo
}
func (f *fakeCheckpointFactory) Begin(_ context.Context, info RunInfo) (Checkpointer, error) {
f.info = info
return f.cp, nil
}
// TestClassifyCheckpointOutcome covers the finalize decision matrix.
func TestClassifyCheckpointOutcome(t *testing.T) {
cases := []struct {
name string
err error
cause error
want checkpointOutcome
}{
{"success", nil, nil, checkpointComplete},
{"shutdown", context.Canceled, ErrShutdown, checkpointLeaveRunning},
{"critic-kill", context.Canceled, ErrCriticKill, checkpointFail},
{"deadline", context.DeadlineExceeded, context.DeadlineExceeded, checkpointFail},
{"model-error", errors.New("boom"), nil, checkpointFail},
{"caller-cancel", context.Canceled, context.Canceled, checkpointFail},
}
for _, tc := range cases {
if got := classifyCheckpointOutcome(tc.err, tc.cause); got != tc.want {
t.Errorf("%s: classifyCheckpointOutcome = %v, want %v", tc.name, got, tc.want)
}
}
}
// TestCheckpoint_SingleLoopSaveAndComplete: a durable single-loop run gets a
// per-run checkpointer (Begin), Saves its transcript each step, and Completes on
// success (clearing the checkpoint). The RunInfo carries the resume meta.
func TestCheckpoint_SingleLoopSaveAndComplete(t *testing.T) {
models, _ := phaseProvider(t, fake.Reply("done"))
cp := &fakeCheckpointer{}
f := &fakeCheckpointFactory{cp: cp}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: f}})
res := ex.Run(context.Background(),
RunnableAgent{ID: "a1", Name: "boss", ModelTier: "test-model"},
tool.Invocation{RunID: "run-x", CallerID: "steve", ChannelID: "chan", GuildID: "g", SkillInputs: map[string]any{"prompt": "go"}},
"go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if f.info.RunID != "run-x" || f.info.SubjectID != "a1" || f.info.ModelTier != "test-model" || f.info.GuildID != "g" {
t.Errorf("Begin RunInfo missing resume meta: %+v", f.info)
}
if len(cp.saves) == 0 {
t.Error("expected at least one checkpoint Save during the run")
} else if len(cp.saves[len(cp.saves)-1].Messages) == 0 {
t.Error("checkpoint Save should carry the running transcript")
}
if !cp.completed {
t.Error("a successful run must Complete (clear) its checkpoint")
}
if cp.failed {
t.Error("a successful run must NOT Fail its checkpoint")
}
}
// TestCheckpoint_TerminalErrorFails: a run that errors (not shutdown) Fails its
// checkpoint (clears it — not a recovery candidate).
func TestCheckpoint_TerminalErrorFails(t *testing.T) {
models, _ := phaseProvider(t, fake.Fail(errors.New("model down")))
cp := &fakeCheckpointer{}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
res := ex.Run(context.Background(),
RunnableAgent{ID: "a1", ModelTier: "test-model"},
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "go"}}, "go")
if res.Err == nil {
t.Fatal("expected a run error")
}
if !cp.failed {
t.Error("a terminal (non-shutdown) error must Fail the checkpoint")
}
if cp.completed {
t.Error("a failed run must NOT Complete its checkpoint")
}
}
// TestCheckpoint_ResumeSeedsHistory: a run carrying a ResumeState seeds the saved
// transcript as the model's opening messages (continues) instead of the input.
func TestCheckpoint_ResumeSeedsHistory(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("continued"))
history := []llm.Message{llm.UserText("prior turn 1"), llm.AssistantText("prior answer 1")}
ctx := WithResumeState(context.Background(), &ResumeState{History: history})
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
res := ex.Run(ctx,
RunnableAgent{ID: "a1", ModelTier: "test-model"},
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "ignored-on-resume"}}, "ignored-on-resume")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
got := fp.Calls()[0].Request.Messages
if len(got) != len(history) {
t.Fatalf("resume should seed the saved %d-message transcript, got %d messages", len(history), len(got))
}
}
// TestCheckpoint_PhaseBoundarySavesCompleted: a durable multi-phase run records
// the completed phases at each boundary, growing the list, and Completes on
// success.
func TestCheckpoint_PhaseBoundarySavesCompleted(t *testing.T) {
models, _ := phaseProvider(t, fake.Reply("out-a"), fake.Reply("out-b"))
cp := &fakeCheckpointer{}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
ra := RunnableAgent{
ID: "p", ModelTier: "test-model",
Phases: []Phase{{Name: "a", SystemPrompt: "A"}, {Name: "b", SystemPrompt: "B"}},
}
if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q"); res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
// The final phase-boundary Save must list both completed phases.
var lastPhaseSave *RunCheckpointState
for i := range cp.saves {
if len(cp.saves[i].CompletedPhases) > 0 {
lastPhaseSave = &cp.saves[i]
}
}
if lastPhaseSave == nil || len(lastPhaseSave.CompletedPhases) != 2 {
t.Fatalf("expected a phase-boundary Save listing 2 completed phases; saves=%+v", cp.saves)
}
if !cp.completed {
t.Error("a successful phased run must Complete its checkpoint")
}
}
// TestCheckpoint_ResumeSkipsCompletedPhases: a resumed multi-phase run skips
// phases already in ResumeState.CompletedPhases (only the remaining phase calls
// the model) and threads their outputs into the remaining phase's template.
func TestCheckpoint_ResumeSkipsCompletedPhases(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("out-b")) // ONLY phase b should call the model
ctx := WithResumeState(context.Background(), &ResumeState{
CompletedPhases: []PhaseOutput{{Name: "a", Output: "saved-a"}},
})
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
ID: "p", ModelTier: "test-model",
Phases: []Phase{
{Name: "a", SystemPrompt: "A"},
{Name: "b", SystemPrompt: "B saw {{.a}}"},
},
}
res := ex.Run(ctx, ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "out-b" {
t.Fatalf("output = %q, want out-b", res.Output)
}
calls := fp.Calls()
if len(calls) != 1 {
t.Fatalf("only the un-completed phase b should call the model; got %d calls", len(calls))
}
if calls[0].Request.System != "B saw saved-a" {
t.Errorf("resumed phase b should see the completed phase a's saved output; system = %q", calls[0].Request.System)
}
}