diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go index 52411e2..576584e 100644 --- a/checkpoint/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -35,10 +35,9 @@ type RunCheckpointMeta struct { // RunCheckpoint is one persisted snapshot of a run's resumable progress. type RunCheckpoint struct { Meta RunCheckpointMeta - Messages []llm.Message // conversation so far (single-loop or active phase) + Messages []llm.Message // conversation so far (single-loop runs) Iteration int // completed agent-loop iterations CompletedPhases []run.PhaseOutput // finished phases, in order (multi-phase agents) - ActivePhase string // current phase name (multi-phase agents); "" otherwise UpdatedAt time.Time } diff --git a/checkpoint/handle.go b/checkpoint/handle.go index eba2dfe..d231485 100644 --- a/checkpoint/handle.go +++ b/checkpoint/handle.go @@ -58,7 +58,6 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error { Messages: st.Messages, Iteration: st.Iteration, CompletedPhases: st.CompletedPhases, - ActivePhase: st.ActivePhase, UpdatedAt: now, }); err != nil { return err @@ -92,7 +91,6 @@ func (noop) Fail(context.Context, error) error { return nil } type factory struct { store CheckpointStore throttle time.Duration - now func() time.Time } var _ run.CheckpointerFactory = (*factory)(nil) @@ -119,5 +117,5 @@ func (f *factory) Begin(_ context.Context, info run.RunInfo) (run.Checkpointer, ModelTier: info.ModelTier, ParentRunID: info.ParentRunID, } - return New(f.store, meta, f.throttle, f.now), nil + return New(f.store, meta, f.throttle, nil /* now defaults to time.Now */), nil } diff --git a/run/checkpoint.go b/run/checkpoint.go index 0aeb079..8c266c5 100644 --- a/run/checkpoint.go +++ b/run/checkpoint.go @@ -3,6 +3,7 @@ package run import ( "context" "errors" + "log/slog" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) @@ -17,12 +18,14 @@ import ( // ResumeState carries a recovered run's prior progress into Run so the run // continues instead of restarting. The host's recovery path sets it via -// WithResumeState; the executor reads it (single-loop seeds the saved transcript -// as history; multi-phase skips completed phases and seeds the active phase). +// WithResumeState; the executor reads it: +// - single-loop: History seeds the saved transcript (the run continues). +// - multi-phase: CompletedPhases are skipped; the interrupted phase re-runs +// from its start (boundary-granular — there is no mid-phase transcript +// resume, so History is unused for multi-phase runs). type ResumeState struct { - History []llm.Message // single-loop transcript OR active-phase transcript + History []llm.Message // single-loop transcript (unused for multi-phase) CompletedPhases []PhaseOutput // multi-phase: outputs of finished phases, in order - ActivePhase string // multi-phase: the phase that was in flight } type resumeStateKey struct{} @@ -79,15 +82,21 @@ func classifyCheckpointOutcome(runErr, cause error) checkpointOutcome { // finalizeCheckpoint applies the outcome to the per-run checkpointer (nil-safe). // Runs on a detached context so a cancelled run still records its terminal state. +// Complete/Fail errors are best-effort but logged (a stale record would only +// cause a wasteful boot-recovery retry, not data loss). func finalizeCheckpoint(ctx context.Context, cp Checkpointer, runErr error, cause error) { if cp == nil { return } switch classifyCheckpointOutcome(runErr, cause) { case checkpointComplete: - _ = cp.Complete(detach(ctx)) + if err := cp.Complete(detach(ctx)); err != nil { + slog.Warn("run: checkpoint Complete failed", "error", err) + } case checkpointFail: - _ = cp.Fail(detach(ctx), runErr) + if err := cp.Fail(detach(ctx), runErr); err != nil { + slog.Warn("run: checkpoint Fail failed", "error", err) + } case checkpointLeaveRunning: // Interrupted by shutdown: leave the record for boot recovery. } diff --git a/run/executor.go b/run/executor.go index 055957c..c950f98 100644 --- a/run/executor.go +++ b/run/executor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/agent" @@ -113,13 +114,26 @@ type Result struct { func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) { started := time.Now() res = Result{RunID: inv.RunID} + // ckpt is the per-run durable checkpointer (resolved below; nil = non-durable). + // checkpointCause yields the run context's cancellation cause once the run + // context exists; nil before then (an early build-error return). + var ckpt Checkpointer + var checkpointCause func() error // Enforce the no-panic contract: a panic anywhere in the run (incl. a host // Critic/Audit/Palette callback on the main goroutine) becomes Result.Err - // rather than unwinding into the caller. + // rather than unwinding into the caller. This defer ALSO finalizes the + // checkpoint on EVERY exit path — panic, an early build-error return (before + // the run loop), or normal completion — so a recovered run's durable record is + // never left dangling (which would loop boot-recovery on a persistent error). defer func() { if r := recover(); r != nil { res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r) } + var cause error + if checkpointCause != nil { + cause = checkpointCause() + } + finalizeCheckpoint(ctx, ckpt, res.Err, cause) }() tier := ra.ModelTier @@ -188,9 +202,15 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio // Checkpointer via the factory (which decides durability — nil = non-durable). // nil-safe throughout. resume := resumeStateFromContext(ctx) - ckpt := existingCheckpointerFromContext(ctx) + ckpt = existingCheckpointerFromContext(ctx) if ckpt == nil && e.cfg.Ports.Checkpointer != nil { - if c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info); cerr == nil { + c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info) + if cerr != nil { + // Degrade to non-durable (the documented contract) but log it — a + // failing checkpoint store must not fail the run, yet shouldn't be silent. + slog.Warn("run: checkpointer Begin failed; running non-durable", + "run_id", inv.RunID, "error", cerr) + } else { ckpt = c } } @@ -262,6 +282,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio defer cancelCause(nil) runCtx, mergeCancel := MergeCancellation(runCtx, ctx) defer mergeCancel() + // The finalize defer (top of Run) now has a run context to read the + // cancellation cause from (shutdown vs critic-kill vs deadline vs cancel). + checkpointCause = func() error { return context.Cause(runCtx) } // Critic (optional): monitors the run for a stall, can nudge/extend/kill via // its host Escalator. Its hard deadline is bound to runCtx (cancel on pass). @@ -339,6 +362,8 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio // the critic's nudges before each step. steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) } + resuming := resume != nil && len(resume.History) > 0 + var runRes *agent.Result var runErr error if len(ra.Phases) == 0 { @@ -347,14 +372,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio // healthy-but-long run's budget mid-flight; falls back to maxIter). // // Checkpointing: wrap the step observer to accumulate the running transcript - // and Save it each step (the host throttles). A recovered run seeds the saved - // transcript as history and continues with no new input. acc starts from the - // resume history (or the opening user message) and grows as steps complete. + // and Save it each step. Save is called every step; THROTTLING is the + // Checkpointer's responsibility (the battery + mort's durable-job adapter + // both throttle + size-cap), so the kernel doesn't gate the hot path. The + // accumulated transcript is the pre-compaction one (the observer sees raw + // step responses, not the loop's compacted history) — a host that caps size + // bounds it. A recovered run seeds the saved transcript and continues. obs := stepObserver if ckpt != nil { - acc := []llm.Message{multimodalUserMessage(input, inv.Images)} - if resume != nil && len(resume.History) > 0 { + var acc []llm.Message + if resuming { acc = append([]llm.Message(nil), resume.History...) + } else { + acc = []llm.Message{multimodalUserMessage(input, inv.Images)} } obs = func(s agent.Step) { stepObserver(s) @@ -373,7 +403,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio agent.WithStepObserver(obs), }, sharedOpts...) ag := agent.New(model, e.systemPrompt(ra), opts...) - if resume != nil && len(resume.History) > 0 { + if resuming { // Resume: seed the saved transcript and continue (no new input — the // completed tool calls in the transcript are NOT re-run). runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History)) @@ -399,9 +429,8 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio }, input, inv.Images) } - // Finalize durable recovery: clear the checkpoint on success/terminal failure, - // or leave it for boot recovery when the run was interrupted by shutdown. - finalizeCheckpoint(ctx, ckpt, runErr, context.Cause(runCtx)) + // Durable-recovery finalize (Complete/Fail/leave-running) happens in the + // top-of-Run defer so it covers panics + early build-error returns too. status := statusFor(runCtx, runErr) if runRes != nil { diff --git a/run/phases.go b/run/phases.go index 4a25d9c..4a73d89 100644 --- a/run/phases.go +++ b/run/phases.go @@ -53,9 +53,10 @@ import ( // phaseDeps carries the per-run state the phase runner shares with Run: the base // model, the full decorated toolbox (filtered per phase), the base step cap, the -// shared agent options (tool-error limits + step observer + compactor), the -// shared step observer (also fed by IsRunFunc bare calls), the critic/session -// steer, and the audit recorder (phase events). +// shared agent options (tool-error limits + compactor — the step observer is +// added per phase, NOT in sharedOpts, so checkpointing can vary per path), the +// shared step observer (wired into each phase's loop AND invoked for IsRunFunc +// bare calls), the critic/session steer, and the audit recorder (phase events). type phaseDeps struct { baseModel llm.Model baseToolbox *llm.Toolbox @@ -85,12 +86,18 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas var lastOutput string var totalUsage llm.Usage - // Resume: pre-populate from the saved checkpoint so already-finished phases are - // skipped. The interrupted (active) phase is NOT pre-populated, so it re-runs - // from its start (boundary-granular recovery). + // resumeSkip is the set of phases already finished on a RECOVERED run — kept + // SEPARATE from the live `outputs` map (which fills as phases run this time) so + // the skip guard only skips RESUME-completed phases, never a fresh run's own + // phases. (Reusing `outputs` would make a second phase with a duplicate name + // skip itself.) Pre-populate outputs + completed so a resumed run threads the + // saved outputs into later phases. The interrupted (active) phase is NOT + // pre-populated, so it re-runs from its start (boundary-granular recovery). + resumeSkip := map[string]bool{} if deps.resume != nil { for _, pc := range deps.resume.CompletedPhases { outputs[pc.Name] = pc.Output + resumeSkip[pc.Name] = true completed = append(completed, pc) lastOutput = pc.Output } @@ -109,10 +116,8 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas } for i, phase := range ra.Phases { - // Skip phases already completed on a resumed run (key presence, not output - // emptiness — a legitimately-empty phase output still counts as done). - if _, done := outputs[phase.Name]; done { - lastOutput = outputs[phase.Name] + // Skip phases already completed on a resumed run. + if resumeSkip[phase.Name] { continue } // A killed/timed-out/cancelled run must not start its next phase. @@ -183,7 +188,6 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas if deps.checkpointer != nil { _ = deps.checkpointer.Save(runCtx, RunCheckpointState{ CompletedPhases: append([]PhaseOutput(nil), completed...), - ActivePhase: "", }) } } diff --git a/run/phases_test.go b/run/phases_test.go index a8fe249..454c71c 100644 --- a/run/phases_test.go +++ b/run/phases_test.go @@ -130,6 +130,26 @@ func TestPhases_OptionalDoesNotSwallowCancellation(t *testing.T) { } } +// TestPhases_DuplicateNamesBothRun: a fresh (non-resume) run with two phases +// sharing a name must run BOTH — the resume-skip guard keys off a separate +// resume set, not the live outputs map (which fills as phases run), so a phase +// never skips a same-named sibling on a fresh run. +func TestPhases_DuplicateNamesBothRun(t *testing.T) { + models, fp := phaseProvider(t, fake.Reply("first"), fake.Reply("second")) + ex := New(Config{Registry: tool.NewRegistry(), Models: models}) + ra := RunnableAgent{ + Name: "p", ModelTier: "test-model", + Phases: []Phase{{Name: "x", SystemPrompt: "P1"}, {Name: "x", SystemPrompt: "P2"}}, + } + res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r"}, "Q") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if n := len(fp.Calls()); n != 2 { + t.Fatalf("both same-named phases must run on a fresh run; got %d model calls", n) + } +} + // TestPhases_HardErrorAborts: a NON-optional phase that hits a hard error (not a // budget/step exhaustion) aborts the pipeline; later phases do not run. func TestPhases_HardErrorAborts(t *testing.T) { diff --git a/run/ports.go b/run/ports.go index 6b59a74..9713cb6 100644 --- a/run/ports.go +++ b/run/ports.go @@ -199,15 +199,15 @@ type Checkpointer interface { // RunCheckpointState is the resumable snapshot a Checkpointer persists. type RunCheckpointState struct { - // Messages is the running transcript (single-loop run) OR the active phase's - // transcript (multi-phase run). May be nil. + // Messages is the running transcript of a SINGLE-LOOP run (grows each step; + // resumed via WithHistory). nil for multi-phase runs — phase recovery is + // boundary-granular (see CompletedPhases), not mid-phase transcript. Messages []llm.Message Iteration int // CompletedPhases is set only for multi-phase runs: the outputs of phases - // already finished, in phase order. nil for single-loop runs. + // already finished, in phase order, so a resumed run skips them and re-runs + // the interrupted phase from its start. nil for single-loop runs. CompletedPhases []PhaseOutput - // ActivePhase is the name of the in-progress phase (multi-phase only). - ActivePhase string } // PhaseOutput is one completed pipeline phase's name and output text, recorded in