899059a791
The kernel defined run.Ports.Checkpointer + the checkpoint battery but never drove them (the documented "P2 follow-up"). This wires durable recovery into the run loop so a run interrupted by shutdown can resume on the next boot instead of being lost — the executus-side half of mort's durable-agent-recovery parity (mort #1355). Kernel (run/): - Ports.Checkpointer is now a CheckpointerFactory (Begin per run → a per-run Checkpointer, or nil for a non-durable run). The single per-instance Checkpointer couldn't distinguish runs; a factory mints one per run, matching mort's agentexec.CheckpointerFactory. - RunInfo gains GuildID + ModelTier (so the factory can build resume meta); RunCheckpointState gains CompletedPhases + ActivePhase (+ PhaseOutput). - run/checkpoint.go: ResumeState + WithResumeState / WithExistingCheckpointer context carriers, classifyCheckpointOutcome (success→Complete, shutdown→leave for boot recovery, else→Fail using run.ErrShutdown), and finalizeCheckpoint. - run/executor.go: resolve the per-run checkpointer (existing-from-ctx on a recovery re-run, else factory.Begin); single-loop wraps the step observer to accumulate the transcript + Save each step (host throttles), and a recovered run seeds the saved transcript via WithHistory and continues with no new input; finalize on exit. - run/phases.go: phase-boundary checkpointing — record completed phases after each phase; a resumed run skips already-completed phases (the interrupted phase re-runs from its start — boundary-granular, documented; only the single-loop path resumes mid-loop). Battery (checkpoint/): NewFactory wires the battery into the factory port (per-run handle, meta derived from RunInfo); RunCheckpoint + handle.Save carry the phase fields. Tests (run/checkpoint_test.go): the finalize decision matrix; single-loop Save+Complete; terminal-error Fail; resume seeds history; phase-boundary Saves completed phases; resume skips completed phases. Full ./... green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
201 lines
7.4 KiB
Go
201 lines
7.4 KiB
Go
package run
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
|
)
|
|
|
|
// fakeCheckpointer records every Save state + whether Complete/Fail fired.
|
|
type fakeCheckpointer struct {
|
|
saves []RunCheckpointState
|
|
completed bool
|
|
failed bool
|
|
failErr error
|
|
}
|
|
|
|
func (c *fakeCheckpointer) Save(_ context.Context, st RunCheckpointState) error {
|
|
c.saves = append(c.saves, st)
|
|
return nil
|
|
}
|
|
func (c *fakeCheckpointer) Complete(context.Context) error { c.completed = true; return nil }
|
|
func (c *fakeCheckpointer) Fail(_ context.Context, err error) error {
|
|
c.failed = true
|
|
c.failErr = err
|
|
return nil
|
|
}
|
|
|
|
// fakeCheckpointFactory hands out one fakeCheckpointer and records the RunInfo.
|
|
type fakeCheckpointFactory struct {
|
|
cp *fakeCheckpointer
|
|
info RunInfo
|
|
}
|
|
|
|
func (f *fakeCheckpointFactory) Begin(_ context.Context, info RunInfo) (Checkpointer, error) {
|
|
f.info = info
|
|
return f.cp, nil
|
|
}
|
|
|
|
// TestClassifyCheckpointOutcome covers the finalize decision matrix.
|
|
func TestClassifyCheckpointOutcome(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
err error
|
|
cause error
|
|
want checkpointOutcome
|
|
}{
|
|
{"success", nil, nil, checkpointComplete},
|
|
{"shutdown", context.Canceled, ErrShutdown, checkpointLeaveRunning},
|
|
{"critic-kill", context.Canceled, ErrCriticKill, checkpointFail},
|
|
{"deadline", context.DeadlineExceeded, context.DeadlineExceeded, checkpointFail},
|
|
{"model-error", errors.New("boom"), nil, checkpointFail},
|
|
{"caller-cancel", context.Canceled, context.Canceled, checkpointFail},
|
|
}
|
|
for _, tc := range cases {
|
|
if got := classifyCheckpointOutcome(tc.err, tc.cause); got != tc.want {
|
|
t.Errorf("%s: classifyCheckpointOutcome = %v, want %v", tc.name, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestCheckpoint_SingleLoopSaveAndComplete: a durable single-loop run gets a
|
|
// per-run checkpointer (Begin), Saves its transcript each step, and Completes on
|
|
// success (clearing the checkpoint). The RunInfo carries the resume meta.
|
|
func TestCheckpoint_SingleLoopSaveAndComplete(t *testing.T) {
|
|
models, _ := phaseProvider(t, fake.Reply("done"))
|
|
cp := &fakeCheckpointer{}
|
|
f := &fakeCheckpointFactory{cp: cp}
|
|
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: f}})
|
|
|
|
res := ex.Run(context.Background(),
|
|
RunnableAgent{ID: "a1", Name: "boss", ModelTier: "test-model"},
|
|
tool.Invocation{RunID: "run-x", CallerID: "steve", ChannelID: "chan", GuildID: "g", SkillInputs: map[string]any{"prompt": "go"}},
|
|
"go")
|
|
if res.Err != nil {
|
|
t.Fatalf("run error: %v", res.Err)
|
|
}
|
|
if f.info.RunID != "run-x" || f.info.SubjectID != "a1" || f.info.ModelTier != "test-model" || f.info.GuildID != "g" {
|
|
t.Errorf("Begin RunInfo missing resume meta: %+v", f.info)
|
|
}
|
|
if len(cp.saves) == 0 {
|
|
t.Error("expected at least one checkpoint Save during the run")
|
|
} else if len(cp.saves[len(cp.saves)-1].Messages) == 0 {
|
|
t.Error("checkpoint Save should carry the running transcript")
|
|
}
|
|
if !cp.completed {
|
|
t.Error("a successful run must Complete (clear) its checkpoint")
|
|
}
|
|
if cp.failed {
|
|
t.Error("a successful run must NOT Fail its checkpoint")
|
|
}
|
|
}
|
|
|
|
// TestCheckpoint_TerminalErrorFails: a run that errors (not shutdown) Fails its
|
|
// checkpoint (clears it — not a recovery candidate).
|
|
func TestCheckpoint_TerminalErrorFails(t *testing.T) {
|
|
models, _ := phaseProvider(t, fake.Fail(errors.New("model down")))
|
|
cp := &fakeCheckpointer{}
|
|
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
|
|
|
|
res := ex.Run(context.Background(),
|
|
RunnableAgent{ID: "a1", ModelTier: "test-model"},
|
|
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "go"}}, "go")
|
|
if res.Err == nil {
|
|
t.Fatal("expected a run error")
|
|
}
|
|
if !cp.failed {
|
|
t.Error("a terminal (non-shutdown) error must Fail the checkpoint")
|
|
}
|
|
if cp.completed {
|
|
t.Error("a failed run must NOT Complete its checkpoint")
|
|
}
|
|
}
|
|
|
|
// TestCheckpoint_ResumeSeedsHistory: a run carrying a ResumeState seeds the saved
|
|
// transcript as the model's opening messages (continues) instead of the input.
|
|
func TestCheckpoint_ResumeSeedsHistory(t *testing.T) {
|
|
models, fp := phaseProvider(t, fake.Reply("continued"))
|
|
history := []llm.Message{llm.UserText("prior turn 1"), llm.AssistantText("prior answer 1")}
|
|
ctx := WithResumeState(context.Background(), &ResumeState{History: history})
|
|
|
|
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
|
|
res := ex.Run(ctx,
|
|
RunnableAgent{ID: "a1", ModelTier: "test-model"},
|
|
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "ignored-on-resume"}}, "ignored-on-resume")
|
|
if res.Err != nil {
|
|
t.Fatalf("run error: %v", res.Err)
|
|
}
|
|
got := fp.Calls()[0].Request.Messages
|
|
if len(got) != len(history) {
|
|
t.Fatalf("resume should seed the saved %d-message transcript, got %d messages", len(history), len(got))
|
|
}
|
|
}
|
|
|
|
// TestCheckpoint_PhaseBoundarySavesCompleted: a durable multi-phase run records
|
|
// the completed phases at each boundary, growing the list, and Completes on
|
|
// success.
|
|
func TestCheckpoint_PhaseBoundarySavesCompleted(t *testing.T) {
|
|
models, _ := phaseProvider(t, fake.Reply("out-a"), fake.Reply("out-b"))
|
|
cp := &fakeCheckpointer{}
|
|
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
|
|
|
|
ra := RunnableAgent{
|
|
ID: "p", ModelTier: "test-model",
|
|
Phases: []Phase{{Name: "a", SystemPrompt: "A"}, {Name: "b", SystemPrompt: "B"}},
|
|
}
|
|
if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q"); res.Err != nil {
|
|
t.Fatalf("run error: %v", res.Err)
|
|
}
|
|
// The final phase-boundary Save must list both completed phases.
|
|
var lastPhaseSave *RunCheckpointState
|
|
for i := range cp.saves {
|
|
if len(cp.saves[i].CompletedPhases) > 0 {
|
|
lastPhaseSave = &cp.saves[i]
|
|
}
|
|
}
|
|
if lastPhaseSave == nil || len(lastPhaseSave.CompletedPhases) != 2 {
|
|
t.Fatalf("expected a phase-boundary Save listing 2 completed phases; saves=%+v", cp.saves)
|
|
}
|
|
if !cp.completed {
|
|
t.Error("a successful phased run must Complete its checkpoint")
|
|
}
|
|
}
|
|
|
|
// TestCheckpoint_ResumeSkipsCompletedPhases: a resumed multi-phase run skips
|
|
// phases already in ResumeState.CompletedPhases (only the remaining phase calls
|
|
// the model) and threads their outputs into the remaining phase's template.
|
|
func TestCheckpoint_ResumeSkipsCompletedPhases(t *testing.T) {
|
|
models, fp := phaseProvider(t, fake.Reply("out-b")) // ONLY phase b should call the model
|
|
ctx := WithResumeState(context.Background(), &ResumeState{
|
|
CompletedPhases: []PhaseOutput{{Name: "a", Output: "saved-a"}},
|
|
})
|
|
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
|
|
|
|
ra := RunnableAgent{
|
|
ID: "p", ModelTier: "test-model",
|
|
Phases: []Phase{
|
|
{Name: "a", SystemPrompt: "A"},
|
|
{Name: "b", SystemPrompt: "B saw {{.a}}"},
|
|
},
|
|
}
|
|
res := ex.Run(ctx, ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
|
|
if res.Err != nil {
|
|
t.Fatalf("run error: %v", res.Err)
|
|
}
|
|
if res.Output != "out-b" {
|
|
t.Fatalf("output = %q, want out-b", res.Output)
|
|
}
|
|
calls := fp.Calls()
|
|
if len(calls) != 1 {
|
|
t.Fatalf("only the un-completed phase b should call the model; got %d calls", len(calls))
|
|
}
|
|
if calls[0].Request.System != "B saw saved-a" {
|
|
t.Errorf("resumed phase b should see the completed phase a's saved output; system = %q", calls[0].Request.System)
|
|
}
|
|
}
|