feat(run): durable checkpoint + resume (wire Ports.Checkpointer) #20

Merged
steve merged 2 commits from feat/kernel-checkpoint into main 2026-06-29 20:44:17 +00:00
8 changed files with 540 additions and 31 deletions
+7 -5
View File
@@ -4,9 +4,9 @@
// run.Ports.Checkpointer. // run.Ports.Checkpointer.
// //
// Mort backs CheckpointStore with its durable-job table; Memory() is the // Mort backs CheckpointStore with its durable-job table; Memory() is the
// zero-dependency default; contrib/store can add a SQLite one. NOTE: the // zero-dependency default; contrib/store can add a SQLite one. The executor calls
// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery // run.Ports.Checkpointer (a CheckpointerFactory) during the run loop; NewFactory
// provides the seam + impls ahead of that wiring. // wires this battery into that seam.
package checkpoint package checkpoint
import ( import (
@@ -14,6 +14,8 @@ import (
"time" "time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/run"
) )
// RunCheckpointMeta is the run attribution needed to resume a run from scratch // RunCheckpointMeta is the run attribution needed to resume a run from scratch
@@ -33,9 +35,9 @@ type RunCheckpointMeta struct {
// RunCheckpoint is one persisted snapshot of a run's resumable progress. // RunCheckpoint is one persisted snapshot of a run's resumable progress.
type RunCheckpoint struct { type RunCheckpoint struct {
Meta RunCheckpointMeta Meta RunCheckpointMeta
Messages []llm.Message // conversation so far Messages []llm.Message // conversation so far (single-loop runs)
Iteration int // completed agent-loop iterations Iteration int // completed agent-loop iterations
ActivePhase string // current phase name (multi-phase agents); "" otherwise CompletedPhases []run.PhaseOutput // finished phases, in order (multi-phase agents)
UpdatedAt time.Time UpdatedAt time.Time
} }
+38
View File
@@ -57,6 +57,7 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error {
Meta: h.meta, Meta: h.meta,
Messages: st.Messages, Messages: st.Messages,
Iteration: st.Iteration, Iteration: st.Iteration,
CompletedPhases: st.CompletedPhases,
UpdatedAt: now, UpdatedAt: now,
}); err != nil { }); err != nil {
return err return err
@@ -81,3 +82,40 @@ var _ run.Checkpointer = noop{}
func (noop) Save(context.Context, run.RunCheckpointState) error { return nil } func (noop) Save(context.Context, run.RunCheckpointState) error { return nil }
func (noop) Complete(context.Context) error { return nil } func (noop) Complete(context.Context) error { return nil }
func (noop) Fail(context.Context, error) error { return nil } func (noop) Fail(context.Context, error) error { return nil }
// factory is a run.CheckpointerFactory that mints a per-run handle over store,
// deriving the per-run meta from the kernel's RunInfo. It is the battery's glue
// for the Ports.Checkpointer (factory) seam: every run becomes durable (the
// store persists snapshots; a host wanting lazy/short-run skipping uses its own
// factory, as mort does over its durable-job table).
Review

factory.now field is never set by NewFactory (always nil); dead/untestable field

maintainability · flagged by 1 model

🪰 Gadfly · advisory

⚪ **factory.now field is never set by NewFactory (always nil); dead/untestable field** _maintainability · flagged by 1 model_ <sub>🪰 Gadfly · advisory</sub>
type factory struct {
store CheckpointStore
throttle time.Duration
}
Review

factory.now field is never assigned (NewFactory omits it) — dead/always-nil clock field

maintainability · flagged by 1 model

  • checkpoint/handle.go:95factory.now is dead. The factory struct carries a now func() time.Time field, but NewFactory never sets it (handle.go:103-105), so f.now is always nil and Begin always passes nil to New (handle.go:122), which then defaults to time.Now. Unlike handle, there's no constructor/option that injects a clock, so the field can never be anything but nil. Either drop it, or add the injection path it implies (mirroring handle's testable clock).

🪰 Gadfly · advisory

⚪ **factory.now field is never assigned (NewFactory omits it) — dead/always-nil clock field** _maintainability · flagged by 1 model_ - **`checkpoint/handle.go:95` — `factory.now` is dead.** The `factory` struct carries a `now func() time.Time` field, but `NewFactory` never sets it (handle.go:103-105), so `f.now` is always nil and `Begin` always passes nil to `New` (handle.go:122), which then defaults to `time.Now`. Unlike `handle`, there's no constructor/option that injects a clock, so the field can never be anything but nil. Either drop it, or add the injection path it implies (mirroring `handle`'s testable clock). <sub>🪰 Gadfly · advisory</sub>
var _ run.CheckpointerFactory = (*factory)(nil)
// NewFactory returns a run.CheckpointerFactory backed by store: each run gets a
// per-run Checkpointer (throttled to at most once per throttle). A nil store
// yields factory.Begin returning a no-op Checkpointer.
func NewFactory(store CheckpointStore, throttle time.Duration) run.CheckpointerFactory {
return &factory{store: store, throttle: throttle}
}
Review

🟡 NewFactory never initializes factory.now, leaving a configurable-looking but always-nil field

maintainability · flagged by 1 model

  1. NewFactory never initializes factory.now — confirmed at checkpoint/handle.go:103-104; f.now stays nil, passed to New which falls back to time.Now. Confirmed.

🪰 Gadfly · advisory

🟡 **NewFactory never initializes factory.now, leaving a configurable-looking but always-nil field** _maintainability · flagged by 1 model_ 4. **NewFactory never initializes factory.now** — confirmed at checkpoint/handle.go:103-104; `f.now` stays nil, passed to `New` which falls back to `time.Now`. Confirmed. <sub>🪰 Gadfly · advisory</sub>
// Begin mints the per-run Checkpointer. The prompt is read from
// info.Inputs["prompt"] when present so a recovered run can re-dispatch.
func (f *factory) Begin(_ context.Context, info run.RunInfo) (run.Checkpointer, error) {
Review

🟡 Potential information leakage in checkpoint metadata

maintainability, security · flagged by 2 models

  • Potential information leakage in checkpoint metadata (checkpoint/handle.go:109-121): The Begin method extracts the prompt from info.Inputs["prompt"] without validation or sanitization. If the prompt contains sensitive information (e.g., API keys, PII), it will be stored in the checkpoint metadata. This could lead to unintended exposure if the checkpoint store is compromised or accessed by unauthorized parties. Consider redacting sensitive data or providing an allowlist/denylist mecha…

🪰 Gadfly · advisory

🟡 **Potential information leakage in checkpoint metadata** _maintainability, security · flagged by 2 models_ - **Potential information leakage in checkpoint metadata** (`checkpoint/handle.go:109-121`): The `Begin` method extracts the prompt from `info.Inputs["prompt"]` without validation or sanitization. If the prompt contains sensitive information (e.g., API keys, PII), it will be stored in the checkpoint metadata. This could lead to unintended exposure if the checkpoint store is compromised or accessed by unauthorized parties. Consider redacting sensitive data or providing an allowlist/denylist mecha… <sub>🪰 Gadfly · advisory</sub>
prompt, _ := info.Inputs["prompt"].(string)
meta := RunCheckpointMeta{
RunID: info.RunID,
AgentID: info.SubjectID,
AgentName: info.Name,
CallerID: info.CallerID,
ChannelID: info.ChannelID,
GuildID: info.GuildID,
Prompt: prompt,
ModelTier: info.ModelTier,
ParentRunID: info.ParentRunID,
}
return New(f.store, meta, f.throttle, nil /* now defaults to time.Now */), nil
}
+103
View File
@@ -0,0 +1,103 @@
package run
import (
"context"
"errors"
"log/slog"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// Durable-recovery plumbing for the executor. The Checkpointer port (set via
// Ports.Checkpointer, a CheckpointerFactory) persists a run's resumable progress
// during the loop; on boot a host re-dispatches an interrupted run through the
// executor with a ResumeState (the saved transcript / completed phases) so it
// CONTINUES rather than restarting, reusing the SAME durable record via an
// existing Checkpointer. Both are carried into Run via the context (mirrors
// mort's agentexec.WithResumeState / WithExistingCheckpointer).
// ResumeState carries a recovered run's prior progress into Run so the run
// continues instead of restarting. The host's recovery path sets it via
// WithResumeState; the executor reads it:
// - single-loop: History seeds the saved transcript (the run continues).
// - multi-phase: CompletedPhases are skipped; the interrupted phase re-runs
// from its start (boundary-granular — there is no mid-phase transcript
// resume, so History is unused for multi-phase runs).
type ResumeState struct {
History []llm.Message // single-loop transcript (unused for multi-phase)
CompletedPhases []PhaseOutput // multi-phase: outputs of finished phases, in order
}
type resumeStateKey struct{}
// WithResumeState carries a recovered run's prior progress into Run.
func WithResumeState(ctx context.Context, rs *ResumeState) context.Context {
return context.WithValue(ctx, resumeStateKey{}, rs)
Review

🟡 Context value type safety

security · flagged by 1 model

  • Context value type safety (run/checkpoint.go:35,49): The resumeStateFromContext and existingCheckpointerFromContext functions use type assertions without error handling. If the context contains a value with the wrong type for these keys, the functions will silently return nil, which could lead to unexpected behavior. While this is not a direct security vulnerability, it could mask configuration errors that might have security implications.

🪰 Gadfly · advisory

🟡 **Context value type safety** _security · flagged by 1 model_ - **Context value type safety** (`run/checkpoint.go:35,49`): The `resumeStateFromContext` and `existingCheckpointerFromContext` functions use type assertions without error handling. If the context contains a value with the wrong type for these keys, the functions will silently return `nil`, which could lead to unexpected behavior. While this is not a direct security vulnerability, it could mask configuration errors that might have security implications. <sub>🪰 Gadfly · advisory</sub>
}
func resumeStateFromContext(ctx context.Context) *ResumeState {
rs, _ := ctx.Value(resumeStateKey{}).(*ResumeState)
return rs
}
type existingCheckpointerKey struct{}
// WithExistingCheckpointer carries a pre-existing Checkpointer into Run so a
// recovery re-run reuses the SAME durable record (the executor uses it instead of
// calling Ports.Checkpointer.Begin).
func WithExistingCheckpointer(ctx context.Context, cp Checkpointer) context.Context {
return context.WithValue(ctx, existingCheckpointerKey{}, cp)
}
func existingCheckpointerFromContext(ctx context.Context) Checkpointer {
cp, _ := ctx.Value(existingCheckpointerKey{}).(Checkpointer)
return cp
Review

🟡 checkpointOutcome type and constants are not exported, limiting usability outside the package

maintainability · flagged by 1 model

  • run/checkpoint.go:54-61: The checkpointOutcome type and its constants are not exported. This limits their usability outside the package and could be confusing for external consumers who might need to interact with these outcomes. Consider exporting them if they are part of the public API. - run/phases.go:67-74: The phaseDeps struct includes a checkpointer field and a resume field, but the comments and logic around these fields are not entirely clear. The comments could be i…

🪰 Gadfly · advisory

🟡 **checkpointOutcome type and constants are not exported, limiting usability outside the package** _maintainability · flagged by 1 model_ - **`run/checkpoint.go:54-61`**: The `checkpointOutcome` type and its constants are not exported. This limits their usability outside the package and could be confusing for external consumers who might need to interact with these outcomes. Consider exporting them if they are part of the public API. - **`run/phases.go:67-74`**: The `phaseDeps` struct includes a `checkpointer` field and a `resume` field, but the comments and logic around these fields are not entirely clear. The comments could be i… <sub>🪰 Gadfly · advisory</sub>
}
// checkpointOutcome is the finalize decision for a durable run.
type checkpointOutcome int
const (
checkpointComplete checkpointOutcome = iota
checkpointLeaveRunning
checkpointFail
)
// classifyCheckpointOutcome maps (run error, cancellation cause) to the durable
// finalize action: success clears the checkpoint (Complete); a shutdown-caused
// cancellation leaves the record so boot recovery picks it up (neither
// Complete nor Fail); anything else (model error, tool loop, the run's own
// deadline, a critic kill, a caller cancel) is terminal (Fail). Mirrors mort's
// agentexec.classifyCheckpointOutcome.
func classifyCheckpointOutcome(runErr, cause error) checkpointOutcome {
switch {
Review

🔴 ErrShutdown sentinel mismatch: shutdowns classified as Fail until mort wiring PR lands

correctness, error-handling · flagged by 2 models

  • run/checkpoint.go:73classifyCheckpointOutcome keys shutdown detection on run.ErrShutdown, but mort currently stamps its own runengine.ErrShutdown (a distinct sentinel) on the base context. Until the mort wiring PR aliases them, errors.Is(cause, ErrShutdown) will be false for every mort shutdown, causing checkpointFail (deletes the checkpoint) instead of checkpointLeaveRunning. The PR description acknowledges this gap; from the error-handling lens, it means the shutdown→r…

🪰 Gadfly · advisory

🔴 **ErrShutdown sentinel mismatch: shutdowns classified as Fail until mort wiring PR lands** _correctness, error-handling · flagged by 2 models_ - **`run/checkpoint.go:73`** — `classifyCheckpointOutcome` keys shutdown detection on `run.ErrShutdown`, but mort currently stamps its own `runengine.ErrShutdown` (a distinct sentinel) on the base context. Until the mort wiring PR aliases them, `errors.Is(cause, ErrShutdown)` will be `false` for every mort shutdown, causing `checkpointFail` (deletes the checkpoint) instead of `checkpointLeaveRunning`. The PR description acknowledges this gap; from the error-handling lens, it means the shutdown→r… <sub>🪰 Gadfly · advisory</sub>
case runErr == nil:
return checkpointComplete
case errors.Is(cause, ErrShutdown):
return checkpointLeaveRunning
default:
return checkpointFail
}
}
// finalizeCheckpoint applies the outcome to the per-run checkpointer (nil-safe).
// Runs on a detached context so a cancelled run still records its terminal state.
// Complete/Fail errors are best-effort but logged (a stale record would only
// cause a wasteful boot-recovery retry, not data loss).
func finalizeCheckpoint(ctx context.Context, cp Checkpointer, runErr error, cause error) {
if cp == nil {
return
}
switch classifyCheckpointOutcome(runErr, cause) {
case checkpointComplete:
if err := cp.Complete(detach(ctx)); err != nil {
slog.Warn("run: checkpoint Complete failed", "error", err)
}
case checkpointFail:
if err := cp.Fail(detach(ctx), runErr); err != nil {
slog.Warn("run: checkpoint Fail failed", "error", err)
}
case checkpointLeaveRunning:
// Interrupted by shutdown: leave the record for boot recovery.
}
}
+200
View File
@@ -0,0 +1,200 @@
package run
import (
"context"
"errors"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// fakeCheckpointer records every Save state + whether Complete/Fail fired.
type fakeCheckpointer struct {
saves []RunCheckpointState
completed bool
failed bool
failErr error
}
func (c *fakeCheckpointer) Save(_ context.Context, st RunCheckpointState) error {
c.saves = append(c.saves, st)
return nil
}
func (c *fakeCheckpointer) Complete(context.Context) error { c.completed = true; return nil }
func (c *fakeCheckpointer) Fail(_ context.Context, err error) error {
c.failed = true
c.failErr = err
return nil
}
// fakeCheckpointFactory hands out one fakeCheckpointer and records the RunInfo.
type fakeCheckpointFactory struct {
cp *fakeCheckpointer
info RunInfo
}
func (f *fakeCheckpointFactory) Begin(_ context.Context, info RunInfo) (Checkpointer, error) {
f.info = info
return f.cp, nil
}
// TestClassifyCheckpointOutcome covers the finalize decision matrix.
func TestClassifyCheckpointOutcome(t *testing.T) {
cases := []struct {
name string
err error
cause error
want checkpointOutcome
}{
{"success", nil, nil, checkpointComplete},
{"shutdown", context.Canceled, ErrShutdown, checkpointLeaveRunning},
{"critic-kill", context.Canceled, ErrCriticKill, checkpointFail},
{"deadline", context.DeadlineExceeded, context.DeadlineExceeded, checkpointFail},
{"model-error", errors.New("boom"), nil, checkpointFail},
{"caller-cancel", context.Canceled, context.Canceled, checkpointFail},
}
for _, tc := range cases {
if got := classifyCheckpointOutcome(tc.err, tc.cause); got != tc.want {
t.Errorf("%s: classifyCheckpointOutcome = %v, want %v", tc.name, got, tc.want)
}
}
}
// TestCheckpoint_SingleLoopSaveAndComplete: a durable single-loop run gets a
// per-run checkpointer (Begin), Saves its transcript each step, and Completes on
// success (clearing the checkpoint). The RunInfo carries the resume meta.
func TestCheckpoint_SingleLoopSaveAndComplete(t *testing.T) {
models, _ := phaseProvider(t, fake.Reply("done"))
cp := &fakeCheckpointer{}
f := &fakeCheckpointFactory{cp: cp}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: f}})
res := ex.Run(context.Background(),
RunnableAgent{ID: "a1", Name: "boss", ModelTier: "test-model"},
tool.Invocation{RunID: "run-x", CallerID: "steve", ChannelID: "chan", GuildID: "g", SkillInputs: map[string]any{"prompt": "go"}},
"go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if f.info.RunID != "run-x" || f.info.SubjectID != "a1" || f.info.ModelTier != "test-model" || f.info.GuildID != "g" {
t.Errorf("Begin RunInfo missing resume meta: %+v", f.info)
}
if len(cp.saves) == 0 {
t.Error("expected at least one checkpoint Save during the run")
} else if len(cp.saves[len(cp.saves)-1].Messages) == 0 {
t.Error("checkpoint Save should carry the running transcript")
}
if !cp.completed {
t.Error("a successful run must Complete (clear) its checkpoint")
}
if cp.failed {
t.Error("a successful run must NOT Fail its checkpoint")
}
}
// TestCheckpoint_TerminalErrorFails: a run that errors (not shutdown) Fails its
// checkpoint (clears it — not a recovery candidate).
func TestCheckpoint_TerminalErrorFails(t *testing.T) {
models, _ := phaseProvider(t, fake.Fail(errors.New("model down")))
cp := &fakeCheckpointer{}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
res := ex.Run(context.Background(),
RunnableAgent{ID: "a1", ModelTier: "test-model"},
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "go"}}, "go")
if res.Err == nil {
t.Fatal("expected a run error")
}
if !cp.failed {
t.Error("a terminal (non-shutdown) error must Fail the checkpoint")
}
if cp.completed {
t.Error("a failed run must NOT Complete its checkpoint")
}
}
// TestCheckpoint_ResumeSeedsHistory: a run carrying a ResumeState seeds the saved
// transcript as the model's opening messages (continues) instead of the input.
func TestCheckpoint_ResumeSeedsHistory(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("continued"))
history := []llm.Message{llm.UserText("prior turn 1"), llm.AssistantText("prior answer 1")}
ctx := WithResumeState(context.Background(), &ResumeState{History: history})
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
res := ex.Run(ctx,
RunnableAgent{ID: "a1", ModelTier: "test-model"},
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "ignored-on-resume"}}, "ignored-on-resume")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
got := fp.Calls()[0].Request.Messages
if len(got) != len(history) {
t.Fatalf("resume should seed the saved %d-message transcript, got %d messages", len(history), len(got))
}
}
// TestCheckpoint_PhaseBoundarySavesCompleted: a durable multi-phase run records
// the completed phases at each boundary, growing the list, and Completes on
// success.
func TestCheckpoint_PhaseBoundarySavesCompleted(t *testing.T) {
models, _ := phaseProvider(t, fake.Reply("out-a"), fake.Reply("out-b"))
cp := &fakeCheckpointer{}
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
ra := RunnableAgent{
ID: "p", ModelTier: "test-model",
Phases: []Phase{{Name: "a", SystemPrompt: "A"}, {Name: "b", SystemPrompt: "B"}},
}
if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q"); res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
// The final phase-boundary Save must list both completed phases.
var lastPhaseSave *RunCheckpointState
for i := range cp.saves {
if len(cp.saves[i].CompletedPhases) > 0 {
lastPhaseSave = &cp.saves[i]
}
}
if lastPhaseSave == nil || len(lastPhaseSave.CompletedPhases) != 2 {
t.Fatalf("expected a phase-boundary Save listing 2 completed phases; saves=%+v", cp.saves)
}
if !cp.completed {
t.Error("a successful phased run must Complete its checkpoint")
}
}
// TestCheckpoint_ResumeSkipsCompletedPhases: a resumed multi-phase run skips
// phases already in ResumeState.CompletedPhases (only the remaining phase calls
// the model) and threads their outputs into the remaining phase's template.
func TestCheckpoint_ResumeSkipsCompletedPhases(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("out-b")) // ONLY phase b should call the model
ctx := WithResumeState(context.Background(), &ResumeState{
CompletedPhases: []PhaseOutput{{Name: "a", Output: "saved-a"}},
})
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
ID: "p", ModelTier: "test-model",
Phases: []Phase{
{Name: "a", SystemPrompt: "A"},
{Name: "b", SystemPrompt: "B saw {{.a}}"},
},
}
res := ex.Run(ctx, ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "out-b" {
t.Fatalf("output = %q, want out-b", res.Output)
}
calls := fp.Calls()
if len(calls) != 1 {
t.Fatalf("only the un-completed phase b should call the model; got %d calls", len(calls))
}
if calls[0].Request.System != "B saw saved-a" {
t.Errorf("resumed phase b should see the completed phase a's saved output; system = %q", calls[0].Request.System)
}
}
+87 -8
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log/slog"
"time" "time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/agent"
@@ -113,13 +114,26 @@ type Result struct {
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) { func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now() started := time.Now()
res = Result{RunID: inv.RunID} res = Result{RunID: inv.RunID}
// ckpt is the per-run durable checkpointer (resolved below; nil = non-durable).
// checkpointCause yields the run context's cancellation cause once the run
// context exists; nil before then (an early build-error return).
var ckpt Checkpointer
var checkpointCause func() error
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host // Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err // Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller. // rather than unwinding into the caller. This defer ALSO finalizes the
// checkpoint on EVERY exit path — panic, an early build-error return (before
// the run loop), or normal completion — so a recovered run's durable record is
// never left dangling (which would loop boot-recovery on a persistent error).
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r) res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
} }
var cause error
if checkpointCause != nil {
cause = checkpointCause()
}
finalizeCheckpoint(ctx, ckpt, res.Err, cause)
}() }()
tier := ra.ModelTier tier := ra.ModelTier
@@ -165,7 +179,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
Name: ra.Name, Name: ra.Name,
CallerID: inv.CallerID, CallerID: inv.CallerID,
ChannelID: inv.ChannelID, ChannelID: inv.ChannelID,
GuildID: inv.GuildID,
ParentRunID: inv.ParentRunID, ParentRunID: inv.ParentRunID,
ModelTier: tier,
Inputs: inv.SkillInputs, Inputs: inv.SkillInputs,
StartedAt: started, StartedAt: started,
MaxIterations: maxIter, MaxIterations: maxIter,
@@ -180,6 +196,25 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
inv.RunState = stateAcc inv.RunState = stateAcc
} }
// Durable recovery (optional): a recovered run carries a ResumeState (prior
// transcript / completed phases) + an existing Checkpointer in ctx so it
// continues on the SAME durable record; a fresh run mints a per-run
// Checkpointer via the factory (which decides durability — nil = non-durable).
// nil-safe throughout.
resume := resumeStateFromContext(ctx)
ckpt = existingCheckpointerFromContext(ctx)
if ckpt == nil && e.cfg.Ports.Checkpointer != nil {
c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info)
if cerr != nil {
// Degrade to non-durable (the documented contract) but log it — a
// failing checkpoint store must not fail the run, yet shouldn't be silent.
slog.Warn("run: checkpointer Begin failed; running non-durable",
"run_id", inv.RunID, "error", cerr)
} else {
ckpt = c
}
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal // Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE // messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam. // the toolbox build so any tool's handler captures the live AttachImages seam.
@@ -247,6 +282,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
defer cancelCause(nil) defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx) runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel() defer mergeCancel()
// The finalize defer (top of Run) now has a run context to read the
// cancellation cause from (shutdown vs critic-kill vs deadline vs cancel).
checkpointCause = func() error { return context.Cause(runCtx) }
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via // Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass). // its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
@@ -289,11 +327,11 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
} }
// Shared agent options used by BOTH the single-loop path and every phase: the // Shared agent options used by BOTH the single-loop path and every phase: the
// tool-error guards, the step observer, and optional compaction. The toolbox + // tool-error guards and optional compaction. The toolbox, step ceiling, AND
// step ceiling are NOT shared (they vary per phase), so they're added per path. // step observer are added per path (the observer is wrapped for checkpointing,
// which differs single-loop vs per-phase).
sharedOpts := []agent.Option{ sharedOpts := []agent.Option{
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
} }
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 { if threshold := e.compactionThreshold(tier); threshold > 0 {
@@ -324,24 +362,60 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
// the critic's nudges before each step. // the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) } steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
resuming := resume != nil && len(resume.History) > 0
var runRes *agent.Result var runRes *agent.Result
var runErr error var runErr error
if len(ra.Phases) == 0 { if len(ra.Phases) == 0 {
// Single-loop run: the agent's base prompt + full toolbox, with the // Single-loop run: the agent's base prompt + full toolbox, with the
// critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a // critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a
// healthy-but-long run's budget mid-flight; falls back to maxIter). // healthy-but-long run's budget mid-flight; falls back to maxIter).
//
// Checkpointing: wrap the step observer to accumulate the running transcript
// and Save it each step. Save is called every step; THROTTLING is the
// Checkpointer's responsibility (the battery + mort's durable-job adapter
// both throttle + size-cap), so the kernel doesn't gate the hot path. The
// accumulated transcript is the pre-compaction one (the observer sees raw
// step responses, not the loop's compacted history) — a host that caps size
// bounds it. A recovered run seeds the saved transcript and continues.
obs := stepObserver
if ckpt != nil {
var acc []llm.Message
if resuming {
acc = append([]llm.Message(nil), resume.History...)
} else {
acc = []llm.Message{multimodalUserMessage(input, inv.Images)}
}
obs = func(s agent.Step) {
stepObserver(s)
if s.Response != nil {
acc = append(acc, s.Response.Message())
}
if len(s.Results) > 0 {
acc = append(acc, llm.ToolResultsMessage(s.Results...))
}
_ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1})
}
}
opts := append([]agent.Option{ opts := append([]agent.Option{
agent.WithToolbox(toolbox), agent.WithToolbox(toolbox),
critic.maxStepsOption(maxIter), critic.maxStepsOption(maxIter),
agent.WithStepObserver(obs),
}, sharedOpts...) }, sharedOpts...)
ag := agent.New(model, e.systemPrompt(ra), opts...) ag := agent.New(model, e.systemPrompt(ra), opts...)
if resuming {
// Resume: seed the saved transcript and continue (no new input — the
// completed tool calls in the transcript are NOT re-run).
runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History))
} else {
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer)) runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
}
} else { } else {
// Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap // Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap
// sequentially, threading outputs through {{.<PhaseName>}} templates. Reuses // sequentially, threading outputs through {{.<PhaseName>}} templates. The
// the shared opts so audit/steps/critic-steer accumulate across every phase. // shared step observer (audit/steps/critic) is wired per phase by the phase
// (Per-phase step caps are fixed — the critic's dynamic ceiling is not // runner; checkpointing is phase-boundary granular (completed phases are
// propagated to phases — but its steer + hard deadline still apply.) // recorded so a resumed run skips them).
runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{ runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{
baseModel: model, baseModel: model,
baseToolbox: toolbox, baseToolbox: toolbox,
@@ -350,9 +424,14 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
stepObserver: stepObserver, stepObserver: stepObserver,
steer: steer, steer: steer,
rec: rec, rec: rec,
checkpointer: ckpt,
resume: resume,
}, input, inv.Images) }, input, inv.Images)
} }
// Durable-recovery finalize (Complete/Fail/leave-running) happens in the
// top-of-Run defer so it covers panics + early build-error returns too.
status := statusFor(runCtx, runErr) status := statusFor(runCtx, runErr)
if runRes != nil { if runRes != nil {
res.Output = runRes.Output res.Output = runRes.Output
+46 -5
View File
@@ -53,9 +53,10 @@ import (
// phaseDeps carries the per-run state the phase runner shares with Run: the base // phaseDeps carries the per-run state the phase runner shares with Run: the base
// model, the full decorated toolbox (filtered per phase), the base step cap, the // model, the full decorated toolbox (filtered per phase), the base step cap, the
// shared agent options (tool-error limits + step observer + compactor), the // shared agent options (tool-error limits + compactor — the step observer is
// shared step observer (also fed by IsRunFunc bare calls), the critic/session // added per phase, NOT in sharedOpts, so checkpointing can vary per path), the
// steer, and the audit recorder (phase events). // shared step observer (wired into each phase's loop AND invoked for IsRunFunc
// bare calls), the critic/session steer, and the audit recorder (phase events).
type phaseDeps struct { type phaseDeps struct {
baseModel llm.Model baseModel llm.Model
baseToolbox *llm.Toolbox baseToolbox *llm.Toolbox
@@ -64,6 +65,13 @@ type phaseDeps struct {
stepObserver func(agent.Step) stepObserver func(agent.Step)
steer func() []llm.Message steer func() []llm.Message
rec RunRecorder rec RunRecorder
// checkpointer records phase-boundary progress (completed phases) for durable
// recovery; nil = non-durable. resume carries a recovered run's completed
// phases so they are skipped on re-run. Phase recovery is boundary-granular:
// the interrupted (active) phase re-runs from its start (its mid-phase
// transcript is NOT resumed — only the single-loop path resumes mid-loop).
checkpointer Checkpointer
resume *ResumeState
} }
// runPhases executes ra.Phases sequentially and returns a synthetic agent.Result // runPhases executes ra.Phases sequentially and returns a synthetic agent.Result
@@ -73,10 +81,28 @@ type phaseDeps struct {
// deadline/critic-kill — returns the error. // deadline/critic-kill — returns the error.
func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phaseDeps, query string, images []llm.ImagePart) (*agent.Result, error) { func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phaseDeps, query string, images []llm.ImagePart) (*agent.Result, error) {
outputs := make(map[string]string, len(ra.Phases)) outputs := make(map[string]string, len(ra.Phases))
var completed []PhaseOutput
var lastResult *agent.Result var lastResult *agent.Result
var lastOutput string var lastOutput string
var totalUsage llm.Usage var totalUsage llm.Usage
// resumeSkip is the set of phases already finished on a RECOVERED run — kept
// SEPARATE from the live `outputs` map (which fills as phases run this time) so
// the skip guard only skips RESUME-completed phases, never a fresh run's own
// phases. (Reusing `outputs` would make a second phase with a duplicate name
// skip itself.) Pre-populate outputs + completed so a resumed run threads the
// saved outputs into later phases. The interrupted (active) phase is NOT
// pre-populated, so it re-runs from its start (boundary-granular recovery).
resumeSkip := map[string]bool{}
if deps.resume != nil {
for _, pc := range deps.resume.CompletedPhases {
outputs[pc.Name] = pc.Output
resumeSkip[pc.Name] = true
completed = append(completed, pc)
lastOutput = pc.Output
}
}
// finish stamps the aggregated usage + final output onto the synthetic result. // finish stamps the aggregated usage + final output onto the synthetic result.
finish := func(err error) (*agent.Result, error) { finish := func(err error) (*agent.Result, error) {
if lastResult == nil { if lastResult == nil {
@@ -90,6 +116,10 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas
} }
for i, phase := range ra.Phases { for i, phase := range ra.Phases {
// Skip phases already completed on a resumed run.
if resumeSkip[phase.Name] {
continue
}
// A killed/timed-out/cancelled run must not start its next phase. // A killed/timed-out/cancelled run must not start its next phase.
if err := runCtx.Err(); err != nil { if err := runCtx.Err(); err != nil {
return finish(err) return finish(err)
@@ -151,6 +181,15 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas
outputs[phase.Name] = output outputs[phase.Name] = output
lastOutput = output lastOutput = output
// Checkpoint the phase boundary: this phase is done, so a resumed run skips
Review

🔴 Phase-boundary checkpoint Save error silently swallowed

correctness, error-handling, maintainability, performance · flagged by 5 models

  • run/phases.go:184 — Phase-boundary checkpoint Save error silently swallowed: The phase-boundary save discards the error with _ =. If the checkpoint store fails mid-run (disk full, DB locked, network blip), the run continues but checkpoints aren't persisted. On shutdown/recovery, the host will resume from a stale checkpoint (or none), potentially re-executing work already done or losing progress. Verified by reading run/phases.go:183-188. Suggested fix: Log the error (at minim…

🪰 Gadfly · advisory

🔴 **Phase-boundary checkpoint Save error silently swallowed** _correctness, error-handling, maintainability, performance · flagged by 5 models_ - **`run/phases.go:184` — Phase-boundary checkpoint `Save` error silently swallowed**: The phase-boundary save discards the error with `_ =`. If the checkpoint store fails mid-run (disk full, DB locked, network blip), the run continues but checkpoints aren't persisted. On shutdown/recovery, the host will resume from a stale checkpoint (or none), potentially re-executing work already done or losing progress. **Verified** by reading `run/phases.go:183-188`. *Suggested fix*: Log the error (at minim… <sub>🪰 Gadfly · advisory</sub>
// it and continues from the next. (Copy the slice — the checkpointer may
// hold/serialize it asynchronously.)
completed = append(completed, PhaseOutput{Name: phase.Name, Output: output})
if deps.checkpointer != nil {
_ = deps.checkpointer.Save(runCtx, RunCheckpointState{
CompletedPhases: append([]PhaseOutput(nil), completed...),
})
}
} }
return finish(nil) return finish(nil)
@@ -192,11 +231,13 @@ func (e *Executor) runOnePhase(runCtx context.Context, ra RunnableAgent, deps ph
maxIter = deps.baseMaxIter maxIter = deps.baseMaxIter
} }
// Per-phase opts: a fixed step ceiling for this phase (the critic's dynamic // Per-phase opts: a fixed step ceiling for this phase (the critic's dynamic
// ceiling is intentionally not propagated to phases) + the phase toolbox, on // ceiling is intentionally not propagated to phases) + the phase toolbox + the
// top of the shared opts (tool-error limits, step observer, compactor). // shared step observer (audit/steps/critic), on top of the shared opts
// (tool-error limits, compactor).
opts := append([]agent.Option{ opts := append([]agent.Option{
agent.WithToolbox(toolbox), agent.WithToolbox(toolbox),
agent.WithMaxSteps(maxIter), agent.WithMaxSteps(maxIter),
agent.WithStepObserver(deps.stepObserver),
}, deps.sharedOpts...) }, deps.sharedOpts...)
ag := agent.New(model, system, opts...) ag := agent.New(model, system, opts...)
+20
View File
@@ -130,6 +130,26 @@ func TestPhases_OptionalDoesNotSwallowCancellation(t *testing.T) {
} }
} }
// TestPhases_DuplicateNamesBothRun: a fresh (non-resume) run with two phases
// sharing a name must run BOTH — the resume-skip guard keys off a separate
// resume set, not the live outputs map (which fills as phases run), so a phase
// never skips a same-named sibling on a fresh run.
func TestPhases_DuplicateNamesBothRun(t *testing.T) {
models, fp := phaseProvider(t, fake.Reply("first"), fake.Reply("second"))
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
ra := RunnableAgent{
Name: "p", ModelTier: "test-model",
Phases: []Phase{{Name: "x", SystemPrompt: "P1"}, {Name: "x", SystemPrompt: "P2"}},
}
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r"}, "Q")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if n := len(fp.Calls()); n != 2 {
t.Fatalf("both same-named phases must run on a fresh run; got %d model calls", n)
}
}
// TestPhases_HardErrorAborts: a NON-optional phase that hits a hard error (not a // TestPhases_HardErrorAborts: a NON-optional phase that hits a hard error (not a
// budget/step exhaustion) aborts the pipeline; later phases do not run. // budget/step exhaustion) aborts the pipeline; later phases do not run.
func TestPhases_HardErrorAborts(t *testing.T) { func TestPhases_HardErrorAborts(t *testing.T) {
+31 -5
View File
@@ -33,9 +33,10 @@ type Ports struct {
Budget Budget Budget Budget
// Critic optionally monitors a long run for hangs/runaways. nil = none. // Critic optionally monitors a long run for hangs/runaways. nil = none.
Critic Critic Critic Critic
// Checkpointer persists resumable progress for durable recovery. nil = no // Checkpointer mints a per-run Checkpointer for durable recovery (it decides
// checkpointing (a run interrupted by shutdown is simply lost). // per run whether the run is durable). nil = no checkpointing (a run
Checkpointer Checkpointer // interrupted by shutdown is simply lost).
Checkpointer CheckpointerFactory
// Palette resolves SkillPalette / SubAgentPalette entries into delegation // Palette resolves SkillPalette / SubAgentPalette entries into delegation
// tools (skill__<name> / agent__<name>). nil = those entries are inert. // tools (skill__<name> / agent__<name>). nil = those entries are inert.
Palette PaletteSource Palette PaletteSource
@@ -66,7 +67,9 @@ type RunInfo struct {
Name string Name string
CallerID string CallerID string
ChannelID string ChannelID string
GuildID string // the originating guild/server id (empty for DMs/triggers)
ParentRunID string ParentRunID string
ModelTier string // the run's resolved base tier (for checkpoint re-dispatch)
Inputs map[string]any Inputs map[string]any
StartedAt time.Time StartedAt time.Time
// MaxIterations is the run's base tool-dispatch step ceiling, so a critic can // MaxIterations is the run's base tool-dispatch step ceiling, so a critic can
@@ -172,6 +175,16 @@ type CriticHandle interface {
// --- Checkpointer --- // --- Checkpointer ---
// CheckpointerFactory decides, per run, whether the run is durable and (if so)
// mints the per-run Checkpointer that records its progress. It returns (nil, nil)
// for a non-durable run (the common short-run case — no checkpointing overhead).
// A storage error should be logged and degraded to (nil, nil) so a failing
// checkpoint store never fails the run. Mirrors mort's
// agentexec.CheckpointerFactory.
type CheckpointerFactory interface {
Begin(ctx context.Context, info RunInfo) (Checkpointer, error)
}
// Checkpointer persists a run's resumable progress for durable recovery. // Checkpointer persists a run's resumable progress for durable recovery.
// Mirrors mort's agentexec.RunCheckpointer. // Mirrors mort's agentexec.RunCheckpointer.
type Checkpointer interface { type Checkpointer interface {
@@ -184,11 +197,24 @@ type Checkpointer interface {
Fail(ctx context.Context, err error) error Fail(ctx context.Context, err error) error
} }
// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept // RunCheckpointState is the resumable snapshot a Checkpointer persists.
// minimal here; the executor extends what it records during the merge.
type RunCheckpointState struct { type RunCheckpointState struct {
// Messages is the running transcript of a SINGLE-LOOP run (grows each step;
// resumed via WithHistory). nil for multi-phase runs — phase recovery is
// boundary-granular (see CompletedPhases), not mid-phase transcript.
Messages []llm.Message Messages []llm.Message
Iteration int Iteration int
// CompletedPhases is set only for multi-phase runs: the outputs of phases
// already finished, in phase order, so a resumed run skips them and re-runs
// the interrupted phase from its start. nil for single-loop runs.
CompletedPhases []PhaseOutput
Review

🟡 RunCheckpointState.ActivePhase is always empty string — field exists but is never populated with meaningful data

maintainability · flagged by 1 model

  1. run/ports.go:210 / run/phases.go:186RunCheckpointState.ActivePhase is defined and persisted, but the only write in the entire codebase is ActivePhase: "" at run/phases.go:186. It is never set to a non-empty value.

🪰 Gadfly · advisory

🟡 **RunCheckpointState.ActivePhase is always empty string — field exists but is never populated with meaningful data** _maintainability · flagged by 1 model_ 2. **`run/ports.go:210` / `run/phases.go:186`** — `RunCheckpointState.ActivePhase` is defined and persisted, but the only write in the entire codebase is `ActivePhase: ""` at `run/phases.go:186`. It is never set to a non-empty value. <sub>🪰 Gadfly · advisory</sub>
}
// PhaseOutput is one completed pipeline phase's name and output text, recorded in
// a checkpoint so a resumed multi-phase run can skip already-finished phases.
type PhaseOutput struct {
Name string
Output string
} }
// --- PaletteSource --- // --- PaletteSource ---