From 899059a7911757c633fa48c79a7f8a5b5fbfce49 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Mon, 29 Jun 2026 16:04:06 -0400 Subject: [PATCH] feat(run): durable checkpoint + resume (wire Ports.Checkpointer) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The kernel defined run.Ports.Checkpointer + the checkpoint battery but never drove them (the documented "P2 follow-up"). This wires durable recovery into the run loop so a run interrupted by shutdown can resume on the next boot instead of being lost — the executus-side half of mort's durable-agent-recovery parity (mort #1355). Kernel (run/): - Ports.Checkpointer is now a CheckpointerFactory (Begin per run → a per-run Checkpointer, or nil for a non-durable run). The single per-instance Checkpointer couldn't distinguish runs; a factory mints one per run, matching mort's agentexec.CheckpointerFactory. - RunInfo gains GuildID + ModelTier (so the factory can build resume meta); RunCheckpointState gains CompletedPhases + ActivePhase (+ PhaseOutput). - run/checkpoint.go: ResumeState + WithResumeState / WithExistingCheckpointer context carriers, classifyCheckpointOutcome (success→Complete, shutdown→leave for boot recovery, else→Fail using run.ErrShutdown), and finalizeCheckpoint. - run/executor.go: resolve the per-run checkpointer (existing-from-ctx on a recovery re-run, else factory.Begin); single-loop wraps the step observer to accumulate the transcript + Save each step (host throttles), and a recovered run seeds the saved transcript via WithHistory and continues with no new input; finalize on exit. - run/phases.go: phase-boundary checkpointing — record completed phases after each phase; a resumed run skips already-completed phases (the interrupted phase re-runs from its start — boundary-granular, documented; only the single-loop path resumes mid-loop). Battery (checkpoint/): NewFactory wires the battery into the factory port (per-run handle, meta derived from RunInfo); RunCheckpoint + handle.Save carry the phase fields. Tests (run/checkpoint_test.go): the finalize decision matrix; single-loop Save+Complete; terminal-error Fail; resume seeds history; phase-boundary Saves completed phases; resume skips completed phases. Full ./... green. Co-Authored-By: Claude Opus 4.8 (1M context) --- checkpoint/checkpoint.go | 19 ++-- checkpoint/handle.go | 48 +++++++++- run/checkpoint.go | 94 ++++++++++++++++++ run/checkpoint_test.go | 200 +++++++++++++++++++++++++++++++++++++++ run/executor.go | 66 +++++++++++-- run/phases.go | 41 +++++++- run/ports.go | 36 ++++++- 7 files changed, 477 insertions(+), 27 deletions(-) create mode 100644 run/checkpoint.go create mode 100644 run/checkpoint_test.go diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go index 64d17cf..52411e2 100644 --- a/checkpoint/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -4,9 +4,9 @@ // run.Ports.Checkpointer. // // Mort backs CheckpointStore with its durable-job table; Memory() is the -// zero-dependency default; contrib/store can add a SQLite one. NOTE: the -// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery -// provides the seam + impls ahead of that wiring. +// zero-dependency default; contrib/store can add a SQLite one. The executor calls +// run.Ports.Checkpointer (a CheckpointerFactory) during the run loop; NewFactory +// wires this battery into that seam. package checkpoint import ( @@ -14,6 +14,8 @@ import ( "time" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/run" ) // RunCheckpointMeta is the run attribution needed to resume a run from scratch @@ -32,11 +34,12 @@ type RunCheckpointMeta struct { // RunCheckpoint is one persisted snapshot of a run's resumable progress. type RunCheckpoint struct { - Meta RunCheckpointMeta - Messages []llm.Message // conversation so far - Iteration int // completed agent-loop iterations - ActivePhase string // current phase name (multi-phase agents); "" otherwise - UpdatedAt time.Time + Meta RunCheckpointMeta + Messages []llm.Message // conversation so far (single-loop or active phase) + Iteration int // completed agent-loop iterations + CompletedPhases []run.PhaseOutput // finished phases, in order (multi-phase agents) + ActivePhase string // current phase name (multi-phase agents); "" otherwise + UpdatedAt time.Time } // CheckpointStore persists run checkpoints keyed by run id. A live checkpoint diff --git a/checkpoint/handle.go b/checkpoint/handle.go index db2d8ba..eba2dfe 100644 --- a/checkpoint/handle.go +++ b/checkpoint/handle.go @@ -54,10 +54,12 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error { // caller believes was saved. (A run drives one Save goroutine, so the brief // unguarded window here can't double-write.) if err := h.store.Save(ctx, RunCheckpoint{ - Meta: h.meta, - Messages: st.Messages, - Iteration: st.Iteration, - UpdatedAt: now, + Meta: h.meta, + Messages: st.Messages, + Iteration: st.Iteration, + CompletedPhases: st.CompletedPhases, + ActivePhase: st.ActivePhase, + UpdatedAt: now, }); err != nil { return err } @@ -81,3 +83,41 @@ var _ run.Checkpointer = noop{} func (noop) Save(context.Context, run.RunCheckpointState) error { return nil } func (noop) Complete(context.Context) error { return nil } func (noop) Fail(context.Context, error) error { return nil } + +// factory is a run.CheckpointerFactory that mints a per-run handle over store, +// deriving the per-run meta from the kernel's RunInfo. It is the battery's glue +// for the Ports.Checkpointer (factory) seam: every run becomes durable (the +// store persists snapshots; a host wanting lazy/short-run skipping uses its own +// factory, as mort does over its durable-job table). +type factory struct { + store CheckpointStore + throttle time.Duration + now func() time.Time +} + +var _ run.CheckpointerFactory = (*factory)(nil) + +// NewFactory returns a run.CheckpointerFactory backed by store: each run gets a +// per-run Checkpointer (throttled to at most once per throttle). A nil store +// yields factory.Begin returning a no-op Checkpointer. +func NewFactory(store CheckpointStore, throttle time.Duration) run.CheckpointerFactory { + return &factory{store: store, throttle: throttle} +} + +// Begin mints the per-run Checkpointer. The prompt is read from +// info.Inputs["prompt"] when present so a recovered run can re-dispatch. +func (f *factory) Begin(_ context.Context, info run.RunInfo) (run.Checkpointer, error) { + prompt, _ := info.Inputs["prompt"].(string) + meta := RunCheckpointMeta{ + RunID: info.RunID, + AgentID: info.SubjectID, + AgentName: info.Name, + CallerID: info.CallerID, + ChannelID: info.ChannelID, + GuildID: info.GuildID, + Prompt: prompt, + ModelTier: info.ModelTier, + ParentRunID: info.ParentRunID, + } + return New(f.store, meta, f.throttle, f.now), nil +} diff --git a/run/checkpoint.go b/run/checkpoint.go new file mode 100644 index 0000000..0aeb079 --- /dev/null +++ b/run/checkpoint.go @@ -0,0 +1,94 @@ +package run + +import ( + "context" + "errors" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// Durable-recovery plumbing for the executor. The Checkpointer port (set via +// Ports.Checkpointer, a CheckpointerFactory) persists a run's resumable progress +// during the loop; on boot a host re-dispatches an interrupted run through the +// executor with a ResumeState (the saved transcript / completed phases) so it +// CONTINUES rather than restarting, reusing the SAME durable record via an +// existing Checkpointer. Both are carried into Run via the context (mirrors +// mort's agentexec.WithResumeState / WithExistingCheckpointer). + +// ResumeState carries a recovered run's prior progress into Run so the run +// continues instead of restarting. The host's recovery path sets it via +// WithResumeState; the executor reads it (single-loop seeds the saved transcript +// as history; multi-phase skips completed phases and seeds the active phase). +type ResumeState struct { + History []llm.Message // single-loop transcript OR active-phase transcript + CompletedPhases []PhaseOutput // multi-phase: outputs of finished phases, in order + ActivePhase string // multi-phase: the phase that was in flight +} + +type resumeStateKey struct{} + +// WithResumeState carries a recovered run's prior progress into Run. +func WithResumeState(ctx context.Context, rs *ResumeState) context.Context { + return context.WithValue(ctx, resumeStateKey{}, rs) +} + +func resumeStateFromContext(ctx context.Context) *ResumeState { + rs, _ := ctx.Value(resumeStateKey{}).(*ResumeState) + return rs +} + +type existingCheckpointerKey struct{} + +// WithExistingCheckpointer carries a pre-existing Checkpointer into Run so a +// recovery re-run reuses the SAME durable record (the executor uses it instead of +// calling Ports.Checkpointer.Begin). +func WithExistingCheckpointer(ctx context.Context, cp Checkpointer) context.Context { + return context.WithValue(ctx, existingCheckpointerKey{}, cp) +} + +func existingCheckpointerFromContext(ctx context.Context) Checkpointer { + cp, _ := ctx.Value(existingCheckpointerKey{}).(Checkpointer) + return cp +} + +// checkpointOutcome is the finalize decision for a durable run. +type checkpointOutcome int + +const ( + checkpointComplete checkpointOutcome = iota + checkpointLeaveRunning + checkpointFail +) + +// classifyCheckpointOutcome maps (run error, cancellation cause) to the durable +// finalize action: success clears the checkpoint (Complete); a shutdown-caused +// cancellation leaves the record so boot recovery picks it up (neither +// Complete nor Fail); anything else (model error, tool loop, the run's own +// deadline, a critic kill, a caller cancel) is terminal (Fail). Mirrors mort's +// agentexec.classifyCheckpointOutcome. +func classifyCheckpointOutcome(runErr, cause error) checkpointOutcome { + switch { + case runErr == nil: + return checkpointComplete + case errors.Is(cause, ErrShutdown): + return checkpointLeaveRunning + default: + return checkpointFail + } +} + +// finalizeCheckpoint applies the outcome to the per-run checkpointer (nil-safe). +// Runs on a detached context so a cancelled run still records its terminal state. +func finalizeCheckpoint(ctx context.Context, cp Checkpointer, runErr error, cause error) { + if cp == nil { + return + } + switch classifyCheckpointOutcome(runErr, cause) { + case checkpointComplete: + _ = cp.Complete(detach(ctx)) + case checkpointFail: + _ = cp.Fail(detach(ctx), runErr) + case checkpointLeaveRunning: + // Interrupted by shutdown: leave the record for boot recovery. + } +} diff --git a/run/checkpoint_test.go b/run/checkpoint_test.go new file mode 100644 index 0000000..d3d423a --- /dev/null +++ b/run/checkpoint_test.go @@ -0,0 +1,200 @@ +package run + +import ( + "context" + "errors" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// fakeCheckpointer records every Save state + whether Complete/Fail fired. +type fakeCheckpointer struct { + saves []RunCheckpointState + completed bool + failed bool + failErr error +} + +func (c *fakeCheckpointer) Save(_ context.Context, st RunCheckpointState) error { + c.saves = append(c.saves, st) + return nil +} +func (c *fakeCheckpointer) Complete(context.Context) error { c.completed = true; return nil } +func (c *fakeCheckpointer) Fail(_ context.Context, err error) error { + c.failed = true + c.failErr = err + return nil +} + +// fakeCheckpointFactory hands out one fakeCheckpointer and records the RunInfo. +type fakeCheckpointFactory struct { + cp *fakeCheckpointer + info RunInfo +} + +func (f *fakeCheckpointFactory) Begin(_ context.Context, info RunInfo) (Checkpointer, error) { + f.info = info + return f.cp, nil +} + +// TestClassifyCheckpointOutcome covers the finalize decision matrix. +func TestClassifyCheckpointOutcome(t *testing.T) { + cases := []struct { + name string + err error + cause error + want checkpointOutcome + }{ + {"success", nil, nil, checkpointComplete}, + {"shutdown", context.Canceled, ErrShutdown, checkpointLeaveRunning}, + {"critic-kill", context.Canceled, ErrCriticKill, checkpointFail}, + {"deadline", context.DeadlineExceeded, context.DeadlineExceeded, checkpointFail}, + {"model-error", errors.New("boom"), nil, checkpointFail}, + {"caller-cancel", context.Canceled, context.Canceled, checkpointFail}, + } + for _, tc := range cases { + if got := classifyCheckpointOutcome(tc.err, tc.cause); got != tc.want { + t.Errorf("%s: classifyCheckpointOutcome = %v, want %v", tc.name, got, tc.want) + } + } +} + +// TestCheckpoint_SingleLoopSaveAndComplete: a durable single-loop run gets a +// per-run checkpointer (Begin), Saves its transcript each step, and Completes on +// success (clearing the checkpoint). The RunInfo carries the resume meta. +func TestCheckpoint_SingleLoopSaveAndComplete(t *testing.T) { + models, _ := phaseProvider(t, fake.Reply("done")) + cp := &fakeCheckpointer{} + f := &fakeCheckpointFactory{cp: cp} + ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: f}}) + + res := ex.Run(context.Background(), + RunnableAgent{ID: "a1", Name: "boss", ModelTier: "test-model"}, + tool.Invocation{RunID: "run-x", CallerID: "steve", ChannelID: "chan", GuildID: "g", SkillInputs: map[string]any{"prompt": "go"}}, + "go") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if f.info.RunID != "run-x" || f.info.SubjectID != "a1" || f.info.ModelTier != "test-model" || f.info.GuildID != "g" { + t.Errorf("Begin RunInfo missing resume meta: %+v", f.info) + } + if len(cp.saves) == 0 { + t.Error("expected at least one checkpoint Save during the run") + } else if len(cp.saves[len(cp.saves)-1].Messages) == 0 { + t.Error("checkpoint Save should carry the running transcript") + } + if !cp.completed { + t.Error("a successful run must Complete (clear) its checkpoint") + } + if cp.failed { + t.Error("a successful run must NOT Fail its checkpoint") + } +} + +// TestCheckpoint_TerminalErrorFails: a run that errors (not shutdown) Fails its +// checkpoint (clears it — not a recovery candidate). +func TestCheckpoint_TerminalErrorFails(t *testing.T) { + models, _ := phaseProvider(t, fake.Fail(errors.New("model down"))) + cp := &fakeCheckpointer{} + ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}}) + + res := ex.Run(context.Background(), + RunnableAgent{ID: "a1", ModelTier: "test-model"}, + tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "go"}}, "go") + if res.Err == nil { + t.Fatal("expected a run error") + } + if !cp.failed { + t.Error("a terminal (non-shutdown) error must Fail the checkpoint") + } + if cp.completed { + t.Error("a failed run must NOT Complete its checkpoint") + } +} + +// TestCheckpoint_ResumeSeedsHistory: a run carrying a ResumeState seeds the saved +// transcript as the model's opening messages (continues) instead of the input. +func TestCheckpoint_ResumeSeedsHistory(t *testing.T) { + models, fp := phaseProvider(t, fake.Reply("continued")) + history := []llm.Message{llm.UserText("prior turn 1"), llm.AssistantText("prior answer 1")} + ctx := WithResumeState(context.Background(), &ResumeState{History: history}) + + ex := New(Config{Registry: tool.NewRegistry(), Models: models}) + res := ex.Run(ctx, + RunnableAgent{ID: "a1", ModelTier: "test-model"}, + tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "ignored-on-resume"}}, "ignored-on-resume") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + got := fp.Calls()[0].Request.Messages + if len(got) != len(history) { + t.Fatalf("resume should seed the saved %d-message transcript, got %d messages", len(history), len(got)) + } +} + +// TestCheckpoint_PhaseBoundarySavesCompleted: a durable multi-phase run records +// the completed phases at each boundary, growing the list, and Completes on +// success. +func TestCheckpoint_PhaseBoundarySavesCompleted(t *testing.T) { + models, _ := phaseProvider(t, fake.Reply("out-a"), fake.Reply("out-b")) + cp := &fakeCheckpointer{} + ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}}) + + ra := RunnableAgent{ + ID: "p", ModelTier: "test-model", + Phases: []Phase{{Name: "a", SystemPrompt: "A"}, {Name: "b", SystemPrompt: "B"}}, + } + if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q"); res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + // The final phase-boundary Save must list both completed phases. + var lastPhaseSave *RunCheckpointState + for i := range cp.saves { + if len(cp.saves[i].CompletedPhases) > 0 { + lastPhaseSave = &cp.saves[i] + } + } + if lastPhaseSave == nil || len(lastPhaseSave.CompletedPhases) != 2 { + t.Fatalf("expected a phase-boundary Save listing 2 completed phases; saves=%+v", cp.saves) + } + if !cp.completed { + t.Error("a successful phased run must Complete its checkpoint") + } +} + +// TestCheckpoint_ResumeSkipsCompletedPhases: a resumed multi-phase run skips +// phases already in ResumeState.CompletedPhases (only the remaining phase calls +// the model) and threads their outputs into the remaining phase's template. +func TestCheckpoint_ResumeSkipsCompletedPhases(t *testing.T) { + models, fp := phaseProvider(t, fake.Reply("out-b")) // ONLY phase b should call the model + ctx := WithResumeState(context.Background(), &ResumeState{ + CompletedPhases: []PhaseOutput{{Name: "a", Output: "saved-a"}}, + }) + ex := New(Config{Registry: tool.NewRegistry(), Models: models}) + + ra := RunnableAgent{ + ID: "p", ModelTier: "test-model", + Phases: []Phase{ + {Name: "a", SystemPrompt: "A"}, + {Name: "b", SystemPrompt: "B saw {{.a}}"}, + }, + } + res := ex.Run(ctx, ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if res.Output != "out-b" { + t.Fatalf("output = %q, want out-b", res.Output) + } + calls := fp.Calls() + if len(calls) != 1 { + t.Fatalf("only the un-completed phase b should call the model; got %d calls", len(calls)) + } + if calls[0].Request.System != "B saw saved-a" { + t.Errorf("resumed phase b should see the completed phase a's saved output; system = %q", calls[0].Request.System) + } +} diff --git a/run/executor.go b/run/executor.go index 41546dc..055957c 100644 --- a/run/executor.go +++ b/run/executor.go @@ -165,7 +165,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio Name: ra.Name, CallerID: inv.CallerID, ChannelID: inv.ChannelID, + GuildID: inv.GuildID, ParentRunID: inv.ParentRunID, + ModelTier: tier, Inputs: inv.SkillInputs, StartedAt: started, MaxIterations: maxIter, @@ -180,6 +182,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio inv.RunState = stateAcc } + // Durable recovery (optional): a recovered run carries a ResumeState (prior + // transcript / completed phases) + an existing Checkpointer in ctx so it + // continues on the SAME durable record; a fresh run mints a per-run + // Checkpointer via the factory (which decides durability — nil = non-durable). + // nil-safe throughout. + resume := resumeStateFromContext(ctx) + ckpt := existingCheckpointerFromContext(ctx) + if ckpt == nil && e.cfg.Ports.Checkpointer != nil { + if c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info); cerr == nil { + ckpt = c + } + } + // Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal // messages into the running conversation before its next step. Created BEFORE // the toolbox build so any tool's handler captures the live AttachImages seam. @@ -289,11 +304,11 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } // Shared agent options used by BOTH the single-loop path and every phase: the - // tool-error guards, the step observer, and optional compaction. The toolbox + - // step ceiling are NOT shared (they vary per phase), so they're added per path. + // tool-error guards and optional compaction. The toolbox, step ceiling, AND + // step observer are added per path (the observer is wrapped for checkpointing, + // which differs single-loop vs per-phase). sharedOpts := []agent.Option{ agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), - agent.WithStepObserver(stepObserver), } if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if threshold := e.compactionThreshold(tier); threshold > 0 { @@ -330,18 +345,47 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio // Single-loop run: the agent's base prompt + full toolbox, with the // critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a // healthy-but-long run's budget mid-flight; falls back to maxIter). + // + // Checkpointing: wrap the step observer to accumulate the running transcript + // and Save it each step (the host throttles). A recovered run seeds the saved + // transcript as history and continues with no new input. acc starts from the + // resume history (or the opening user message) and grows as steps complete. + obs := stepObserver + if ckpt != nil { + acc := []llm.Message{multimodalUserMessage(input, inv.Images)} + if resume != nil && len(resume.History) > 0 { + acc = append([]llm.Message(nil), resume.History...) + } + obs = func(s agent.Step) { + stepObserver(s) + if s.Response != nil { + acc = append(acc, s.Response.Message()) + } + if len(s.Results) > 0 { + acc = append(acc, llm.ToolResultsMessage(s.Results...)) + } + _ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1}) + } + } opts := append([]agent.Option{ agent.WithToolbox(toolbox), critic.maxStepsOption(maxIter), + agent.WithStepObserver(obs), }, sharedOpts...) ag := agent.New(model, e.systemPrompt(ra), opts...) - runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer)) + if resume != nil && len(resume.History) > 0 { + // Resume: seed the saved transcript and continue (no new input — the + // completed tool calls in the transcript are NOT re-run). + runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History)) + } else { + runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer)) + } } else { // Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap - // sequentially, threading outputs through {{.}} templates. Reuses - // the shared opts so audit/steps/critic-steer accumulate across every phase. - // (Per-phase step caps are fixed — the critic's dynamic ceiling is not - // propagated to phases — but its steer + hard deadline still apply.) + // sequentially, threading outputs through {{.}} templates. The + // shared step observer (audit/steps/critic) is wired per phase by the phase + // runner; checkpointing is phase-boundary granular (completed phases are + // recorded so a resumed run skips them). runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{ baseModel: model, baseToolbox: toolbox, @@ -350,9 +394,15 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio stepObserver: stepObserver, steer: steer, rec: rec, + checkpointer: ckpt, + resume: resume, }, input, inv.Images) } + // Finalize durable recovery: clear the checkpoint on success/terminal failure, + // or leave it for boot recovery when the run was interrupted by shutdown. + finalizeCheckpoint(ctx, ckpt, runErr, context.Cause(runCtx)) + status := statusFor(runCtx, runErr) if runRes != nil { res.Output = runRes.Output diff --git a/run/phases.go b/run/phases.go index 7db85f4..4a25d9c 100644 --- a/run/phases.go +++ b/run/phases.go @@ -64,6 +64,13 @@ type phaseDeps struct { stepObserver func(agent.Step) steer func() []llm.Message rec RunRecorder + // checkpointer records phase-boundary progress (completed phases) for durable + // recovery; nil = non-durable. resume carries a recovered run's completed + // phases so they are skipped on re-run. Phase recovery is boundary-granular: + // the interrupted (active) phase re-runs from its start (its mid-phase + // transcript is NOT resumed — only the single-loop path resumes mid-loop). + checkpointer Checkpointer + resume *ResumeState } // runPhases executes ra.Phases sequentially and returns a synthetic agent.Result @@ -73,10 +80,22 @@ type phaseDeps struct { // deadline/critic-kill — returns the error. func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phaseDeps, query string, images []llm.ImagePart) (*agent.Result, error) { outputs := make(map[string]string, len(ra.Phases)) + var completed []PhaseOutput var lastResult *agent.Result var lastOutput string var totalUsage llm.Usage + // Resume: pre-populate from the saved checkpoint so already-finished phases are + // skipped. The interrupted (active) phase is NOT pre-populated, so it re-runs + // from its start (boundary-granular recovery). + if deps.resume != nil { + for _, pc := range deps.resume.CompletedPhases { + outputs[pc.Name] = pc.Output + completed = append(completed, pc) + lastOutput = pc.Output + } + } + // finish stamps the aggregated usage + final output onto the synthetic result. finish := func(err error) (*agent.Result, error) { if lastResult == nil { @@ -90,6 +109,12 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas } for i, phase := range ra.Phases { + // Skip phases already completed on a resumed run (key presence, not output + // emptiness — a legitimately-empty phase output still counts as done). + if _, done := outputs[phase.Name]; done { + lastOutput = outputs[phase.Name] + continue + } // A killed/timed-out/cancelled run must not start its next phase. if err := runCtx.Err(); err != nil { return finish(err) @@ -151,6 +176,16 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas outputs[phase.Name] = output lastOutput = output + // Checkpoint the phase boundary: this phase is done, so a resumed run skips + // it and continues from the next. (Copy the slice — the checkpointer may + // hold/serialize it asynchronously.) + completed = append(completed, PhaseOutput{Name: phase.Name, Output: output}) + if deps.checkpointer != nil { + _ = deps.checkpointer.Save(runCtx, RunCheckpointState{ + CompletedPhases: append([]PhaseOutput(nil), completed...), + ActivePhase: "", + }) + } } return finish(nil) @@ -192,11 +227,13 @@ func (e *Executor) runOnePhase(runCtx context.Context, ra RunnableAgent, deps ph maxIter = deps.baseMaxIter } // Per-phase opts: a fixed step ceiling for this phase (the critic's dynamic - // ceiling is intentionally not propagated to phases) + the phase toolbox, on - // top of the shared opts (tool-error limits, step observer, compactor). + // ceiling is intentionally not propagated to phases) + the phase toolbox + the + // shared step observer (audit/steps/critic), on top of the shared opts + // (tool-error limits, compactor). opts := append([]agent.Option{ agent.WithToolbox(toolbox), agent.WithMaxSteps(maxIter), + agent.WithStepObserver(deps.stepObserver), }, deps.sharedOpts...) ag := agent.New(model, system, opts...) diff --git a/run/ports.go b/run/ports.go index c130dcf..6b59a74 100644 --- a/run/ports.go +++ b/run/ports.go @@ -33,9 +33,10 @@ type Ports struct { Budget Budget // Critic optionally monitors a long run for hangs/runaways. nil = none. Critic Critic - // Checkpointer persists resumable progress for durable recovery. nil = no - // checkpointing (a run interrupted by shutdown is simply lost). - Checkpointer Checkpointer + // Checkpointer mints a per-run Checkpointer for durable recovery (it decides + // per run whether the run is durable). nil = no checkpointing (a run + // interrupted by shutdown is simply lost). + Checkpointer CheckpointerFactory // Palette resolves SkillPalette / SubAgentPalette entries into delegation // tools (skill__ / agent__). nil = those entries are inert. Palette PaletteSource @@ -66,7 +67,9 @@ type RunInfo struct { Name string CallerID string ChannelID string + GuildID string // the originating guild/server id (empty for DMs/triggers) ParentRunID string + ModelTier string // the run's resolved base tier (for checkpoint re-dispatch) Inputs map[string]any StartedAt time.Time // MaxIterations is the run's base tool-dispatch step ceiling, so a critic can @@ -172,6 +175,16 @@ type CriticHandle interface { // --- Checkpointer --- +// CheckpointerFactory decides, per run, whether the run is durable and (if so) +// mints the per-run Checkpointer that records its progress. It returns (nil, nil) +// for a non-durable run (the common short-run case — no checkpointing overhead). +// A storage error should be logged and degraded to (nil, nil) so a failing +// checkpoint store never fails the run. Mirrors mort's +// agentexec.CheckpointerFactory. +type CheckpointerFactory interface { + Begin(ctx context.Context, info RunInfo) (Checkpointer, error) +} + // Checkpointer persists a run's resumable progress for durable recovery. // Mirrors mort's agentexec.RunCheckpointer. type Checkpointer interface { @@ -184,11 +197,24 @@ type Checkpointer interface { Fail(ctx context.Context, err error) error } -// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept -// minimal here; the executor extends what it records during the merge. +// RunCheckpointState is the resumable snapshot a Checkpointer persists. type RunCheckpointState struct { + // Messages is the running transcript (single-loop run) OR the active phase's + // transcript (multi-phase run). May be nil. Messages []llm.Message Iteration int + // CompletedPhases is set only for multi-phase runs: the outputs of phases + // already finished, in phase order. nil for single-loop runs. + CompletedPhases []PhaseOutput + // ActivePhase is the name of the in-progress phase (multi-phase only). + ActivePhase string +} + +// PhaseOutput is one completed pipeline phase's name and output text, recorded in +// a checkpoint so a resumed multi-phase run can skip already-finished phases. +type PhaseOutput struct { + Name string + Output string } // --- PaletteSource ---