Files
steve 38d656ec71
executus CI / test (pull_request) Successful in 45s
fix(run): address gadfly review of the checkpoint PR
Real findings from the consensus review (44 raw; heavy devstral noise):

- finalizeCheckpoint is now fired from the top-of-Run defer, so it runs on
  EVERY exit: a panic, an early build-error return (before the run loop), AND
  normal completion. Previously an early return on a recovered run left its
  durable record unfinalized → boot recovery would retry it forever on a
  persistent build error. (opus + glm)
- Removed the dead ActivePhase field from run.RunCheckpointState +
  run.ResumeState (and the battery RunCheckpoint) — phase recovery is
  boundary-granular (skip completed phases; the interrupted phase re-runs from
  its start), so ActivePhase was never written nor read. Docs across
  ports/checkpoint/phases now state this plainly (5-model consensus that the
  field + docs over-promised mid-phase resume).
- CheckpointerFactory.Begin error is now logged (WARN) before degrading to
  non-durable, per the documented contract (was silently swallowed). (4 models)
- finalizeCheckpoint logs Complete/Fail errors (was silent).
- Resume phase-skip now keys off a SEPARATE resumeSkip set, not the live
  outputs map — a fresh run with two same-named phases no longer skips the
  second (the outputs map fills as phases run). (opus:max) + regression test.
- Removed the dead checkpoint.factory.now field (never set). (opus + glm)
- Fixed the stale phaseDeps doc (the step observer moved out of sharedOpts to
  per-path). Hoisted the resume guard to a local; dropped the wasted acc
  allocation on the resume path; documented that Save throttling is the
  Checkpointer's responsibility and the accumulated transcript is pre-compaction
  (host size-caps it).

Note (carried from the PR): classifyCheckpointOutcome keys shutdown on
run.ErrShutdown; mort stamps its own runengine.ErrShutdown — the mort wiring PR
aliases them so errors.Is matches.

New test: duplicate phase names both run on a fresh run. Full ./... green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 16:34:42 -04:00

279 lines
9.9 KiB
Go

package run
import (
"context"
"encoding/json"
"errors"
"strings"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// phaseProvider builds a fake provider scripted with the given per-call steps
// (consumed in order across every phase's model call) and a resolver over it,
// returning both so a test can read back each call's request.
func phaseProvider(t *testing.T, steps ...fake.Step) (ModelResolver, *fake.Provider) {
t.Helper()
fp := fake.New("fake")
fp.Enqueue("test-model", steps...)
m, err := fp.Model("test-model")
if err != nil {
t.Fatalf("fake model: %v", err)
}
return func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, m, nil
}, fp
}
// TestPhases_SequentialThreadsOutputs: phases run in order, each phase's output
// is threaded into the next via {{.<PhaseName>}}, {{.Query}} reaches a phase, and
// the final phase's output is the run output.
func TestPhases_SequentialThreadsOutputs(t *testing.T) {
models, fp := phaseProvider(t,
fake.Reply("out-a"),
fake.Reply("out-b"),
fake.Reply("out-c"),
)
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "pipeline",
ModelTier: "test-model",
Phases: []Phase{
{Name: "a", SystemPrompt: "Phase A instructions"},
{Name: "b", SystemPrompt: "B saw: {{.a}}"},
{Name: "c", SystemPrompt: "C saw: {{.b}} and query {{.Query}}"},
},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "QUERY-TEXT")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "out-c" {
t.Fatalf("final output = %q, want the LAST phase's output out-c", res.Output)
}
calls := fp.Calls()
if len(calls) != 3 {
t.Fatalf("want 3 model calls (one per phase), got %d", len(calls))
}
if got := calls[0].Request.System; got != "Phase A instructions" {
t.Errorf("phase a system = %q", got)
}
if got := calls[1].Request.System; got != "B saw: out-a" {
t.Errorf("phase b should see phase a's output threaded; system = %q", got)
}
if got := calls[2].Request.System; got != "C saw: out-b and query QUERY-TEXT" {
t.Errorf("phase c should see phase b's output + {{.Query}}; system = %q", got)
}
}
// TestPhases_OptionalFailureSubstitutesFallback: an Optional phase that errors
// does not abort the pipeline — its FallbackMessage becomes its output and is
// threaded into later phases, which still run.
func TestPhases_OptionalFailureSubstitutesFallback(t *testing.T) {
models, fp := phaseProvider(t,
fake.Fail(errors.New("provider exploded")), // phase a fails
fake.Reply("out-b"), // phase b runs
)
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "pipeline",
ModelTier: "test-model",
Phases: []Phase{
{Name: "a", SystemPrompt: "Phase A", Optional: true, FallbackMessage: "FALLBACK-A"},
{Name: "b", SystemPrompt: "B saw: {{.a}}"},
},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
if res.Err != nil {
t.Fatalf("optional-phase failure must not fail the run: %v", res.Err)
}
if res.Output != "out-b" {
t.Fatalf("final output = %q, want out-b", res.Output)
}
calls := fp.Calls()
if len(calls) != 2 {
t.Fatalf("want 2 calls (failed phase a + phase b), got %d", len(calls))
}
if got := calls[1].Request.System; got != "B saw: FALLBACK-A" {
t.Errorf("phase b should see the fallback threaded; system = %q", got)
}
}
// TestPhases_OptionalDoesNotSwallowCancellation: an Optional phase that fails
// with a context cancellation must NOT be swallowed into its FallbackMessage —
// the run genuinely ended (cancel/deadline/critic-kill) and must surface the
// error so the run is classified cancelled/timeout/killed, not "ok".
func TestPhases_OptionalDoesNotSwallowCancellation(t *testing.T) {
models, _ := phaseProvider(t, fake.Fail(context.Canceled))
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "pipeline",
ModelTier: "test-model",
Phases: []Phase{
// IsRunFunc so the cancellation surfaces directly wrapped (%w).
{Name: "a", SystemPrompt: "Phase A", IsRunFunc: true, Optional: true, FallbackMessage: "FB"},
},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
if !errors.Is(res.Err, context.Canceled) {
t.Fatalf("Optional phase must NOT swallow a cancellation; res.Err = %v", res.Err)
}
if res.Output == "FB" {
t.Error("a cancelled run must not report the fallback message as output")
}
}
// TestPhases_DuplicateNamesBothRun: a fresh (non-resume) run with two phases
// sharing a name must run BOTH — the resume-skip guard keys off a separate
// resume set, not the live outputs map (which fills as phases run), so a phase
// never skips a same-named sibling on a fresh run.
func TestPhases_DuplicateNamesBothRun(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("first"), fake.Reply("second"))
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "p", ModelTier: "test-model",
Phases: []Phase{{Name: "x", SystemPrompt: "P1"}, {Name: "x", SystemPrompt: "P2"}},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r"}, "Q")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if n := len(fp.Calls()); n != 2 {
t.Fatalf("both same-named phases must run on a fresh run; got %d model calls", n)
}
}
// TestPhases_HardErrorAborts: a NON-optional phase that hits a hard error (not a
// budget/step exhaustion) aborts the pipeline; later phases do not run.
func TestPhases_HardErrorAborts(t *testing.T) {
boom := errors.New("model down")
models, fp := phaseProvider(t,
fake.Fail(boom), // phase a (non-optional) fails hard
fake.Reply("out-b"), // must NOT be consumed
)
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "pipeline",
ModelTier: "test-model",
Phases: []Phase{
{Name: "a", SystemPrompt: "Phase A"},
{Name: "b", SystemPrompt: "Phase B"},
},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
if res.Err == nil {
t.Fatal("a hard non-optional phase error must fail the run")
}
if !errors.Is(res.Err, boom) {
t.Errorf("run error %v should wrap the phase's model error", res.Err)
}
if n := len(fp.Calls()); n != 1 {
t.Errorf("pipeline must abort after phase a; got %d calls (phase b should not run)", n)
}
}
// TestPhases_IsRunFuncBareCall: an IsRunFunc phase produces output via a bare LLM
// call and that output threads into a following loop phase.
func TestPhases_IsRunFuncBareCall(t *testing.T) {
models, fp := phaseProvider(t,
fake.Reply("plan-output"), // IsRunFunc phase a
fake.Reply("final"), // loop phase b
)
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "pipeline",
ModelTier: "test-model",
Phases: []Phase{
{Name: "plan", SystemPrompt: "Make a plan for {{.Query}}", IsRunFunc: true},
{Name: "exec", SystemPrompt: "Execute: {{.plan}}"},
},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "do-thing")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "final" {
t.Fatalf("output = %q, want final", res.Output)
}
calls := fp.Calls()
if len(calls) != 2 {
t.Fatalf("want 2 calls, got %d", len(calls))
}
if got := calls[0].Request.System; got != "Make a plan for do-thing" {
t.Errorf("IsRunFunc phase system = %q", got)
}
if got := calls[1].Request.System; got != "Execute: plan-output" {
t.Errorf("exec phase should see the plan output threaded; system = %q", got)
}
}
// TestPhases_SystemHeaderAppliedPerPhase: the platform SystemHeader is prepended
// to every phase's prompt (each phase keeps it).
func TestPhases_SystemHeaderAppliedPerPhase(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("a"), fake.Reply("b"))
ex := New(Config{Registry: tool.NewRegistry(), Models: models, SystemHeader: "PLATFORM"})
ra := RunnableAgent{
Name: "p",
ModelTier: "test-model",
Phases: []Phase{{Name: "one", SystemPrompt: "P1"}, {Name: "two", SystemPrompt: "P2"}},
}
if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r"}, "Q"); res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
for i, want := range []string{"PLATFORM\n\nP1", "PLATFORM\n\nP2"} {
if got := fp.Calls()[i].Request.System; got != want {
t.Errorf("phase %d system = %q, want %q", i, got, want)
}
}
}
// TestFilterToolbox: a named subset restricts the toolbox (preserving order);
// empty names = the full palette; unknown names are skipped.
func TestFilterToolbox(t *testing.T) {
box := llm.NewToolbox("base")
noop := func(context.Context, json.RawMessage) (any, error) { return "", nil }
for _, name := range []string{"alpha", "beta", "gamma"} {
if err := box.Add(llm.Tool{Name: name, Description: "d", Handler: noop}); err != nil {
t.Fatalf("add %s: %v", name, err)
}
}
full := filterToolbox(box, nil)
if len(full.Tools()) != 3 {
t.Errorf("nil names = full palette; got %d tools", len(full.Tools()))
}
sub := filterToolbox(box, []string{"gamma", "alpha", "nonexistent"})
names := make([]string, 0)
for _, tl := range sub.Tools() {
names = append(names, tl.Name)
}
if strings.Join(names, ",") != "gamma,alpha" {
t.Errorf("subset (order-preserving, unknown skipped) = %v, want [gamma alpha]", names)
}
}
// TestExpandPhaseTemplate: {{.Query}} + prior outputs substitute; a parse error
// returns the template unchanged (best-effort).
func TestExpandPhaseTemplate(t *testing.T) {
got := expandPhaseTemplate("q={{.Query}} a={{.a}}", "QQ", map[string]string{"a": "AA"})
if got != "q=QQ a=AA" {
t.Errorf("expand = %q", got)
}
// Malformed template → returned unchanged.
bad := "{{.Unclosed"
if expandPhaseTemplate(bad, "QQ", nil) != bad {
t.Errorf("malformed template should pass through unchanged")
}
}