feat(run): durable checkpoint + resume (wire Ports.Checkpointer) #20
@@ -4,9 +4,9 @@
|
||||
// run.Ports.Checkpointer.
|
||||
//
|
||||
// Mort backs CheckpointStore with its durable-job table; Memory() is the
|
||||
// zero-dependency default; contrib/store can add a SQLite one. NOTE: the
|
||||
// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery
|
||||
// provides the seam + impls ahead of that wiring.
|
||||
// zero-dependency default; contrib/store can add a SQLite one. The executor calls
|
||||
// run.Ports.Checkpointer (a CheckpointerFactory) during the run loop; NewFactory
|
||||
// wires this battery into that seam.
|
||||
package checkpoint
|
||||
|
||||
import (
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
)
|
||||
|
||||
// RunCheckpointMeta is the run attribution needed to resume a run from scratch
|
||||
@@ -33,9 +35,9 @@ type RunCheckpointMeta struct {
|
||||
// RunCheckpoint is one persisted snapshot of a run's resumable progress.
|
||||
type RunCheckpoint struct {
|
||||
Meta RunCheckpointMeta
|
||||
Messages []llm.Message // conversation so far
|
||||
Messages []llm.Message // conversation so far (single-loop runs)
|
||||
Iteration int // completed agent-loop iterations
|
||||
ActivePhase string // current phase name (multi-phase agents); "" otherwise
|
||||
CompletedPhases []run.PhaseOutput // finished phases, in order (multi-phase agents)
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error {
|
||||
Meta: h.meta,
|
||||
Messages: st.Messages,
|
||||
Iteration: st.Iteration,
|
||||
CompletedPhases: st.CompletedPhases,
|
||||
UpdatedAt: now,
|
||||
}); err != nil {
|
||||
return err
|
||||
@@ -81,3 +82,40 @@ var _ run.Checkpointer = noop{}
|
||||
func (noop) Save(context.Context, run.RunCheckpointState) error { return nil }
|
||||
func (noop) Complete(context.Context) error { return nil }
|
||||
func (noop) Fail(context.Context, error) error { return nil }
|
||||
|
||||
// factory is a run.CheckpointerFactory that mints a per-run handle over store,
|
||||
// deriving the per-run meta from the kernel's RunInfo. It is the battery's glue
|
||||
// for the Ports.Checkpointer (factory) seam: every run becomes durable (the
|
||||
// store persists snapshots; a host wanting lazy/short-run skipping uses its own
|
||||
// factory, as mort does over its durable-job table).
|
||||
|
|
||||
type factory struct {
|
||||
store CheckpointStore
|
||||
throttle time.Duration
|
||||
}
|
||||
|
||||
|
gitea-actions
commented
⚪ factory.now field is never assigned (NewFactory omits it) — dead/always-nil clock field maintainability · flagged by 1 model
🪰 Gadfly · advisory ⚪ **factory.now field is never assigned (NewFactory omits it) — dead/always-nil clock field**
_maintainability · flagged by 1 model_
- **`checkpoint/handle.go:95` — `factory.now` is dead.** The `factory` struct carries a `now func() time.Time` field, but `NewFactory` never sets it (handle.go:103-105), so `f.now` is always nil and `Begin` always passes nil to `New` (handle.go:122), which then defaults to `time.Now`. Unlike `handle`, there's no constructor/option that injects a clock, so the field can never be anything but nil. Either drop it, or add the injection path it implies (mirroring `handle`'s testable clock).
<sub>🪰 Gadfly · advisory</sub>
|
||||
var _ run.CheckpointerFactory = (*factory)(nil)
|
||||
|
||||
// NewFactory returns a run.CheckpointerFactory backed by store: each run gets a
|
||||
// per-run Checkpointer (throttled to at most once per throttle). A nil store
|
||||
// yields factory.Begin returning a no-op Checkpointer.
|
||||
func NewFactory(store CheckpointStore, throttle time.Duration) run.CheckpointerFactory {
|
||||
return &factory{store: store, throttle: throttle}
|
||||
}
|
||||
|
gitea-actions
commented
🟡 NewFactory never initializes factory.now, leaving a configurable-looking but always-nil field maintainability · flagged by 1 model
🪰 Gadfly · advisory 🟡 **NewFactory never initializes factory.now, leaving a configurable-looking but always-nil field**
_maintainability · flagged by 1 model_
4. **NewFactory never initializes factory.now** — confirmed at checkpoint/handle.go:103-104; `f.now` stays nil, passed to `New` which falls back to `time.Now`. Confirmed.
<sub>🪰 Gadfly · advisory</sub>
|
||||
|
||||
// Begin mints the per-run Checkpointer. The prompt is read from
|
||||
// info.Inputs["prompt"] when present so a recovered run can re-dispatch.
|
||||
func (f *factory) Begin(_ context.Context, info run.RunInfo) (run.Checkpointer, error) {
|
||||
|
gitea-actions
commented
🟡 Potential information leakage in checkpoint metadata maintainability, security · flagged by 2 models
🪰 Gadfly · advisory 🟡 **Potential information leakage in checkpoint metadata**
_maintainability, security · flagged by 2 models_
- **Potential information leakage in checkpoint metadata** (`checkpoint/handle.go:109-121`): The `Begin` method extracts the prompt from `info.Inputs["prompt"]` without validation or sanitization. If the prompt contains sensitive information (e.g., API keys, PII), it will be stored in the checkpoint metadata. This could lead to unintended exposure if the checkpoint store is compromised or accessed by unauthorized parties. Consider redacting sensitive data or providing an allowlist/denylist mecha…
<sub>🪰 Gadfly · advisory</sub>
|
||||
prompt, _ := info.Inputs["prompt"].(string)
|
||||
meta := RunCheckpointMeta{
|
||||
RunID: info.RunID,
|
||||
AgentID: info.SubjectID,
|
||||
AgentName: info.Name,
|
||||
CallerID: info.CallerID,
|
||||
ChannelID: info.ChannelID,
|
||||
GuildID: info.GuildID,
|
||||
Prompt: prompt,
|
||||
ModelTier: info.ModelTier,
|
||||
ParentRunID: info.ParentRunID,
|
||||
}
|
||||
return New(f.store, meta, f.throttle, nil /* now defaults to time.Now */), nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// Durable-recovery plumbing for the executor. The Checkpointer port (set via
|
||||
// Ports.Checkpointer, a CheckpointerFactory) persists a run's resumable progress
|
||||
// during the loop; on boot a host re-dispatches an interrupted run through the
|
||||
// executor with a ResumeState (the saved transcript / completed phases) so it
|
||||
// CONTINUES rather than restarting, reusing the SAME durable record via an
|
||||
// existing Checkpointer. Both are carried into Run via the context (mirrors
|
||||
// mort's agentexec.WithResumeState / WithExistingCheckpointer).
|
||||
|
||||
// ResumeState carries a recovered run's prior progress into Run so the run
|
||||
// continues instead of restarting. The host's recovery path sets it via
|
||||
// WithResumeState; the executor reads it:
|
||||
// - single-loop: History seeds the saved transcript (the run continues).
|
||||
// - multi-phase: CompletedPhases are skipped; the interrupted phase re-runs
|
||||
|
gitea-actions
commented
🟠 ActivePhase field plumbed through 3 structs but never read/set; doc comments claim active-phase seeding that isn't implemented correctness, maintainability · flagged by 5 models
🪰 Gadfly · advisory 🟠 **ActivePhase field plumbed through 3 structs but never read/set; doc comments claim active-phase seeding that isn't implemented**
_correctness, maintainability · flagged by 5 models_
- **`ActivePhase` is dead weight, and its docs overpromise — `run/checkpoint.go:25`, `run/ports.go:210`, `checkpoint/checkpoint.go:41`.** The field is plumbed through three structs (`ResumeState`, `RunCheckpointState`, `RunCheckpoint`) but the kernel never reads it and never writes a non-empty value to it. Every write site is either the literal `ActivePhase: ""` (`run/phases.go:186`) or a pass-through copy (`checkpoint/handle.go:61`), and there is no read site anywhere — the single-loop `Save` (…
<sub>🪰 Gadfly · advisory</sub>
|
||||
// from its start (boundary-granular — there is no mid-phase transcript
|
||||
// resume, so History is unused for multi-phase runs).
|
||||
type ResumeState struct {
|
||||
History []llm.Message // single-loop transcript (unused for multi-phase)
|
||||
CompletedPhases []PhaseOutput // multi-phase: outputs of finished phases, in order
|
||||
}
|
||||
|
||||
type resumeStateKey struct{}
|
||||
|
||||
// WithResumeState carries a recovered run's prior progress into Run.
|
||||
func WithResumeState(ctx context.Context, rs *ResumeState) context.Context {
|
||||
return context.WithValue(ctx, resumeStateKey{}, rs)
|
||||
|
gitea-actions
commented
🟡 Context value type safety security · flagged by 1 model
🪰 Gadfly · advisory 🟡 **Context value type safety**
_security · flagged by 1 model_
- **Context value type safety** (`run/checkpoint.go:35,49`): The `resumeStateFromContext` and `existingCheckpointerFromContext` functions use type assertions without error handling. If the context contains a value with the wrong type for these keys, the functions will silently return `nil`, which could lead to unexpected behavior. While this is not a direct security vulnerability, it could mask configuration errors that might have security implications.
<sub>🪰 Gadfly · advisory</sub>
|
||||
}
|
||||
|
||||
func resumeStateFromContext(ctx context.Context) *ResumeState {
|
||||
rs, _ := ctx.Value(resumeStateKey{}).(*ResumeState)
|
||||
return rs
|
||||
}
|
||||
|
||||
type existingCheckpointerKey struct{}
|
||||
|
||||
// WithExistingCheckpointer carries a pre-existing Checkpointer into Run so a
|
||||
// recovery re-run reuses the SAME durable record (the executor uses it instead of
|
||||
// calling Ports.Checkpointer.Begin).
|
||||
func WithExistingCheckpointer(ctx context.Context, cp Checkpointer) context.Context {
|
||||
return context.WithValue(ctx, existingCheckpointerKey{}, cp)
|
||||
}
|
||||
|
||||
func existingCheckpointerFromContext(ctx context.Context) Checkpointer {
|
||||
cp, _ := ctx.Value(existingCheckpointerKey{}).(Checkpointer)
|
||||
return cp
|
||||
|
gitea-actions
commented
🟡 checkpointOutcome type and constants are not exported, limiting usability outside the package maintainability · flagged by 1 model
🪰 Gadfly · advisory 🟡 **checkpointOutcome type and constants are not exported, limiting usability outside the package**
_maintainability · flagged by 1 model_
- **`run/checkpoint.go:54-61`**: The `checkpointOutcome` type and its constants are not exported. This limits their usability outside the package and could be confusing for external consumers who might need to interact with these outcomes. Consider exporting them if they are part of the public API. - **`run/phases.go:67-74`**: The `phaseDeps` struct includes a `checkpointer` field and a `resume` field, but the comments and logic around these fields are not entirely clear. The comments could be i…
<sub>🪰 Gadfly · advisory</sub>
|
||||
}
|
||||
|
||||
// checkpointOutcome is the finalize decision for a durable run.
|
||||
type checkpointOutcome int
|
||||
|
||||
const (
|
||||
checkpointComplete checkpointOutcome = iota
|
||||
checkpointLeaveRunning
|
||||
checkpointFail
|
||||
)
|
||||
|
||||
// classifyCheckpointOutcome maps (run error, cancellation cause) to the durable
|
||||
// finalize action: success clears the checkpoint (Complete); a shutdown-caused
|
||||
// cancellation leaves the record so boot recovery picks it up (neither
|
||||
// Complete nor Fail); anything else (model error, tool loop, the run's own
|
||||
// deadline, a critic kill, a caller cancel) is terminal (Fail). Mirrors mort's
|
||||
// agentexec.classifyCheckpointOutcome.
|
||||
func classifyCheckpointOutcome(runErr, cause error) checkpointOutcome {
|
||||
switch {
|
||||
|
gitea-actions
commented
🔴 ErrShutdown sentinel mismatch: shutdowns classified as Fail until mort wiring PR lands correctness, error-handling · flagged by 2 models
🪰 Gadfly · advisory 🔴 **ErrShutdown sentinel mismatch: shutdowns classified as Fail until mort wiring PR lands**
_correctness, error-handling · flagged by 2 models_
- **`run/checkpoint.go:73`** — `classifyCheckpointOutcome` keys shutdown detection on `run.ErrShutdown`, but mort currently stamps its own `runengine.ErrShutdown` (a distinct sentinel) on the base context. Until the mort wiring PR aliases them, `errors.Is(cause, ErrShutdown)` will be `false` for every mort shutdown, causing `checkpointFail` (deletes the checkpoint) instead of `checkpointLeaveRunning`. The PR description acknowledges this gap; from the error-handling lens, it means the shutdown→r…
<sub>🪰 Gadfly · advisory</sub>
|
||||
case runErr == nil:
|
||||
return checkpointComplete
|
||||
case errors.Is(cause, ErrShutdown):
|
||||
return checkpointLeaveRunning
|
||||
default:
|
||||
return checkpointFail
|
||||
}
|
||||
}
|
||||
|
||||
// finalizeCheckpoint applies the outcome to the per-run checkpointer (nil-safe).
|
||||
// Runs on a detached context so a cancelled run still records its terminal state.
|
||||
// Complete/Fail errors are best-effort but logged (a stale record would only
|
||||
// cause a wasteful boot-recovery retry, not data loss).
|
||||
|
gitea-actions
commented
🔴 finalizeCheckpoint silently ignores Complete/Fail errors leaving stale recovery records error-handling, security · flagged by 4 models
🪰 Gadfly · advisory 🔴 **finalizeCheckpoint silently ignores Complete/Fail errors leaving stale recovery records**
_error-handling, security · flagged by 4 models_
- **`run/checkpoint.go:88-90`** — `finalizeCheckpoint` swallows errors from `cp.Complete` and `cp.Fail`. If the store fails to delete a finished run's checkpoint (network blip, timeout), the record survives and the next boot will incorrectly attempt to resume a run that already terminated. This creates false recovery candidates.
<sub>🪰 Gadfly · advisory</sub>
|
||||
func finalizeCheckpoint(ctx context.Context, cp Checkpointer, runErr error, cause error) {
|
||||
if cp == nil {
|
||||
return
|
||||
}
|
||||
switch classifyCheckpointOutcome(runErr, cause) {
|
||||
case checkpointComplete:
|
||||
if err := cp.Complete(detach(ctx)); err != nil {
|
||||
slog.Warn("run: checkpoint Complete failed", "error", err)
|
||||
}
|
||||
case checkpointFail:
|
||||
if err := cp.Fail(detach(ctx), runErr); err != nil {
|
||||
slog.Warn("run: checkpoint Fail failed", "error", err)
|
||||
}
|
||||
case checkpointLeaveRunning:
|
||||
// Interrupted by shutdown: leave the record for boot recovery.
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,200 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// fakeCheckpointer records every Save state + whether Complete/Fail fired.
|
||||
type fakeCheckpointer struct {
|
||||
saves []RunCheckpointState
|
||||
completed bool
|
||||
failed bool
|
||||
failErr error
|
||||
}
|
||||
|
||||
func (c *fakeCheckpointer) Save(_ context.Context, st RunCheckpointState) error {
|
||||
c.saves = append(c.saves, st)
|
||||
return nil
|
||||
}
|
||||
func (c *fakeCheckpointer) Complete(context.Context) error { c.completed = true; return nil }
|
||||
func (c *fakeCheckpointer) Fail(_ context.Context, err error) error {
|
||||
c.failed = true
|
||||
c.failErr = err
|
||||
return nil
|
||||
}
|
||||
|
||||
// fakeCheckpointFactory hands out one fakeCheckpointer and records the RunInfo.
|
||||
type fakeCheckpointFactory struct {
|
||||
cp *fakeCheckpointer
|
||||
info RunInfo
|
||||
}
|
||||
|
||||
func (f *fakeCheckpointFactory) Begin(_ context.Context, info RunInfo) (Checkpointer, error) {
|
||||
f.info = info
|
||||
return f.cp, nil
|
||||
}
|
||||
|
||||
// TestClassifyCheckpointOutcome covers the finalize decision matrix.
|
||||
func TestClassifyCheckpointOutcome(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
cause error
|
||||
want checkpointOutcome
|
||||
}{
|
||||
{"success", nil, nil, checkpointComplete},
|
||||
{"shutdown", context.Canceled, ErrShutdown, checkpointLeaveRunning},
|
||||
{"critic-kill", context.Canceled, ErrCriticKill, checkpointFail},
|
||||
{"deadline", context.DeadlineExceeded, context.DeadlineExceeded, checkpointFail},
|
||||
{"model-error", errors.New("boom"), nil, checkpointFail},
|
||||
{"caller-cancel", context.Canceled, context.Canceled, checkpointFail},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := classifyCheckpointOutcome(tc.err, tc.cause); got != tc.want {
|
||||
t.Errorf("%s: classifyCheckpointOutcome = %v, want %v", tc.name, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpoint_SingleLoopSaveAndComplete: a durable single-loop run gets a
|
||||
// per-run checkpointer (Begin), Saves its transcript each step, and Completes on
|
||||
// success (clearing the checkpoint). The RunInfo carries the resume meta.
|
||||
func TestCheckpoint_SingleLoopSaveAndComplete(t *testing.T) {
|
||||
models, _ := phaseProvider(t, fake.Reply("done"))
|
||||
cp := &fakeCheckpointer{}
|
||||
f := &fakeCheckpointFactory{cp: cp}
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: f}})
|
||||
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{ID: "a1", Name: "boss", ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "run-x", CallerID: "steve", ChannelID: "chan", GuildID: "g", SkillInputs: map[string]any{"prompt": "go"}},
|
||||
"go")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if f.info.RunID != "run-x" || f.info.SubjectID != "a1" || f.info.ModelTier != "test-model" || f.info.GuildID != "g" {
|
||||
t.Errorf("Begin RunInfo missing resume meta: %+v", f.info)
|
||||
}
|
||||
if len(cp.saves) == 0 {
|
||||
t.Error("expected at least one checkpoint Save during the run")
|
||||
} else if len(cp.saves[len(cp.saves)-1].Messages) == 0 {
|
||||
t.Error("checkpoint Save should carry the running transcript")
|
||||
}
|
||||
if !cp.completed {
|
||||
t.Error("a successful run must Complete (clear) its checkpoint")
|
||||
}
|
||||
if cp.failed {
|
||||
t.Error("a successful run must NOT Fail its checkpoint")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpoint_TerminalErrorFails: a run that errors (not shutdown) Fails its
|
||||
// checkpoint (clears it — not a recovery candidate).
|
||||
func TestCheckpoint_TerminalErrorFails(t *testing.T) {
|
||||
models, _ := phaseProvider(t, fake.Fail(errors.New("model down")))
|
||||
cp := &fakeCheckpointer{}
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
|
||||
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{ID: "a1", ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "go"}}, "go")
|
||||
if res.Err == nil {
|
||||
t.Fatal("expected a run error")
|
||||
}
|
||||
if !cp.failed {
|
||||
t.Error("a terminal (non-shutdown) error must Fail the checkpoint")
|
||||
}
|
||||
if cp.completed {
|
||||
t.Error("a failed run must NOT Complete its checkpoint")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpoint_ResumeSeedsHistory: a run carrying a ResumeState seeds the saved
|
||||
// transcript as the model's opening messages (continues) instead of the input.
|
||||
func TestCheckpoint_ResumeSeedsHistory(t *testing.T) {
|
||||
models, fp := phaseProvider(t, fake.Reply("continued"))
|
||||
history := []llm.Message{llm.UserText("prior turn 1"), llm.AssistantText("prior answer 1")}
|
||||
ctx := WithResumeState(context.Background(), &ResumeState{History: history})
|
||||
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
|
||||
res := ex.Run(ctx,
|
||||
RunnableAgent{ID: "a1", ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "r", CallerID: "c", SkillInputs: map[string]any{"prompt": "ignored-on-resume"}}, "ignored-on-resume")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
got := fp.Calls()[0].Request.Messages
|
||||
if len(got) != len(history) {
|
||||
t.Fatalf("resume should seed the saved %d-message transcript, got %d messages", len(history), len(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpoint_PhaseBoundarySavesCompleted: a durable multi-phase run records
|
||||
// the completed phases at each boundary, growing the list, and Completes on
|
||||
// success.
|
||||
func TestCheckpoint_PhaseBoundarySavesCompleted(t *testing.T) {
|
||||
models, _ := phaseProvider(t, fake.Reply("out-a"), fake.Reply("out-b"))
|
||||
cp := &fakeCheckpointer{}
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models, Ports: Ports{Checkpointer: &fakeCheckpointFactory{cp: cp}}})
|
||||
|
||||
ra := RunnableAgent{
|
||||
ID: "p", ModelTier: "test-model",
|
||||
Phases: []Phase{{Name: "a", SystemPrompt: "A"}, {Name: "b", SystemPrompt: "B"}},
|
||||
}
|
||||
if res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q"); res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
// The final phase-boundary Save must list both completed phases.
|
||||
var lastPhaseSave *RunCheckpointState
|
||||
for i := range cp.saves {
|
||||
if len(cp.saves[i].CompletedPhases) > 0 {
|
||||
lastPhaseSave = &cp.saves[i]
|
||||
}
|
||||
}
|
||||
if lastPhaseSave == nil || len(lastPhaseSave.CompletedPhases) != 2 {
|
||||
t.Fatalf("expected a phase-boundary Save listing 2 completed phases; saves=%+v", cp.saves)
|
||||
}
|
||||
if !cp.completed {
|
||||
t.Error("a successful phased run must Complete its checkpoint")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCheckpoint_ResumeSkipsCompletedPhases: a resumed multi-phase run skips
|
||||
// phases already in ResumeState.CompletedPhases (only the remaining phase calls
|
||||
// the model) and threads their outputs into the remaining phase's template.
|
||||
func TestCheckpoint_ResumeSkipsCompletedPhases(t *testing.T) {
|
||||
models, fp := phaseProvider(t, fake.Reply("out-b")) // ONLY phase b should call the model
|
||||
ctx := WithResumeState(context.Background(), &ResumeState{
|
||||
CompletedPhases: []PhaseOutput{{Name: "a", Output: "saved-a"}},
|
||||
})
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
|
||||
|
||||
ra := RunnableAgent{
|
||||
ID: "p", ModelTier: "test-model",
|
||||
Phases: []Phase{
|
||||
{Name: "a", SystemPrompt: "A"},
|
||||
{Name: "b", SystemPrompt: "B saw {{.a}}"},
|
||||
},
|
||||
}
|
||||
res := ex.Run(ctx, ra, tool.Invocation{RunID: "r", CallerID: "c"}, "Q")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if res.Output != "out-b" {
|
||||
t.Fatalf("output = %q, want out-b", res.Output)
|
||||
}
|
||||
calls := fp.Calls()
|
||||
if len(calls) != 1 {
|
||||
t.Fatalf("only the un-completed phase b should call the model; got %d calls", len(calls))
|
||||
}
|
||||
if calls[0].Request.System != "B saw saved-a" {
|
||||
t.Errorf("resumed phase b should see the completed phase a's saved output; system = %q", calls[0].Request.System)
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||
@@ -113,13 +114,26 @@ type Result struct {
|
||||
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
|
||||
started := time.Now()
|
||||
res = Result{RunID: inv.RunID}
|
||||
// ckpt is the per-run durable checkpointer (resolved below; nil = non-durable).
|
||||
// checkpointCause yields the run context's cancellation cause once the run
|
||||
// context exists; nil before then (an early build-error return).
|
||||
var ckpt Checkpointer
|
||||
var checkpointCause func() error
|
||||
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
|
||||
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
|
||||
// rather than unwinding into the caller.
|
||||
// rather than unwinding into the caller. This defer ALSO finalizes the
|
||||
// checkpoint on EVERY exit path — panic, an early build-error return (before
|
||||
// the run loop), or normal completion — so a recovered run's durable record is
|
||||
// never left dangling (which would loop boot-recovery on a persistent error).
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
|
||||
}
|
||||
var cause error
|
||||
if checkpointCause != nil {
|
||||
cause = checkpointCause()
|
||||
}
|
||||
finalizeCheckpoint(ctx, ckpt, res.Err, cause)
|
||||
}()
|
||||
|
||||
tier := ra.ModelTier
|
||||
@@ -165,7 +179,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
Name: ra.Name,
|
||||
CallerID: inv.CallerID,
|
||||
ChannelID: inv.ChannelID,
|
||||
GuildID: inv.GuildID,
|
||||
ParentRunID: inv.ParentRunID,
|
||||
ModelTier: tier,
|
||||
Inputs: inv.SkillInputs,
|
||||
StartedAt: started,
|
||||
MaxIterations: maxIter,
|
||||
@@ -180,6 +196,25 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
inv.RunState = stateAcc
|
||||
}
|
||||
|
||||
// Durable recovery (optional): a recovered run carries a ResumeState (prior
|
||||
// transcript / completed phases) + an existing Checkpointer in ctx so it
|
||||
// continues on the SAME durable record; a fresh run mints a per-run
|
||||
// Checkpointer via the factory (which decides durability — nil = non-durable).
|
||||
// nil-safe throughout.
|
||||
resume := resumeStateFromContext(ctx)
|
||||
ckpt = existingCheckpointerFromContext(ctx)
|
||||
if ckpt == nil && e.cfg.Ports.Checkpointer != nil {
|
||||
c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info)
|
||||
if cerr != nil {
|
||||
// Degrade to non-durable (the documented contract) but log it — a
|
||||
// failing checkpoint store must not fail the run, yet shouldn't be silent.
|
||||
slog.Warn("run: checkpointer Begin failed; running non-durable",
|
||||
"run_id", inv.RunID, "error", cerr)
|
||||
} else {
|
||||
ckpt = c
|
||||
}
|
||||
}
|
||||
|
||||
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
|
||||
// messages into the running conversation before its next step. Created BEFORE
|
||||
// the toolbox build so any tool's handler captures the live AttachImages seam.
|
||||
@@ -247,6 +282,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
defer cancelCause(nil)
|
||||
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||
defer mergeCancel()
|
||||
// The finalize defer (top of Run) now has a run context to read the
|
||||
// cancellation cause from (shutdown vs critic-kill vs deadline vs cancel).
|
||||
checkpointCause = func() error { return context.Cause(runCtx) }
|
||||
|
||||
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
|
||||
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
|
||||
@@ -289,11 +327,11 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
}
|
||||
|
||||
// Shared agent options used by BOTH the single-loop path and every phase: the
|
||||
// tool-error guards, the step observer, and optional compaction. The toolbox +
|
||||
// step ceiling are NOT shared (they vary per phase), so they're added per path.
|
||||
// tool-error guards and optional compaction. The toolbox, step ceiling, AND
|
||||
// step observer are added per path (the observer is wrapped for checkpointing,
|
||||
// which differs single-loop vs per-phase).
|
||||
sharedOpts := []agent.Option{
|
||||
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
||||
agent.WithStepObserver(stepObserver),
|
||||
}
|
||||
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
||||
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
||||
@@ -324,24 +362,60 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
// the critic's nudges before each step.
|
||||
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
|
||||
|
||||
resuming := resume != nil && len(resume.History) > 0
|
||||
|
||||
var runRes *agent.Result
|
||||
|
gitea-actions
commented
🔴 Per-step checkpoint Save error silently swallowed correctness, error-handling, performance, security · flagged by 6 models
🪰 Gadfly · advisory 🔴 **Per-step checkpoint Save error silently swallowed**
_correctness, error-handling, performance, security · flagged by 6 models_
- **`run/executor.go:367` — Per-step checkpoint `Save` error silently swallowed**: The per-step save in the single-loop path discards the error with `_ =`. Same impact as above — checkpoint persistence failures go unnoticed, risking stale recovery state. **Verified** by reading `run/executor.go:359-368`. *Suggested fix*: Log the error; at minimum ensure the failure is recorded in the audit trail.
<sub>🪰 Gadfly · advisory</sub>
|
||||
var runErr error
|
||||
if len(ra.Phases) == 0 {
|
||||
// Single-loop run: the agent's base prompt + full toolbox, with the
|
||||
// critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a
|
||||
// healthy-but-long run's budget mid-flight; falls back to maxIter).
|
||||
//
|
||||
// Checkpointing: wrap the step observer to accumulate the running transcript
|
||||
// and Save it each step. Save is called every step; THROTTLING is the
|
||||
// Checkpointer's responsibility (the battery + mort's durable-job adapter
|
||||
// both throttle + size-cap), so the kernel doesn't gate the hot path. The
|
||||
// accumulated transcript is the pre-compaction one (the observer sees raw
|
||||
// step responses, not the loop's compacted history) — a host that caps size
|
||||
// bounds it. A recovered run seeds the saved transcript and continues.
|
||||
obs := stepObserver
|
||||
if ckpt != nil {
|
||||
var acc []llm.Message
|
||||
if resuming {
|
||||
acc = append([]llm.Message(nil), resume.History...)
|
||||
} else {
|
||||
acc = []llm.Message{multimodalUserMessage(input, inv.Images)}
|
||||
}
|
||||
obs = func(s agent.Step) {
|
||||
stepObserver(s)
|
||||
if s.Response != nil {
|
||||
acc = append(acc, s.Response.Message())
|
||||
}
|
||||
if len(s.Results) > 0 {
|
||||
acc = append(acc, llm.ToolResultsMessage(s.Results...))
|
||||
}
|
||||
_ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1})
|
||||
}
|
||||
}
|
||||
opts := append([]agent.Option{
|
||||
agent.WithToolbox(toolbox),
|
||||
critic.maxStepsOption(maxIter),
|
||||
agent.WithStepObserver(obs),
|
||||
}, sharedOpts...)
|
||||
|
gitea-actions
commented
🟠 finalizeCheckpoint not deferred: a panic (or early setup-error return) bypasses it, leaving an orphaned recoverable checkpoint that should be Fail'd → boot recovery loop error-handling · flagged by 1 model
🪰 Gadfly · advisory 🟠 **finalizeCheckpoint not deferred: a panic (or early setup-error return) bypasses it, leaving an orphaned recoverable checkpoint that should be Fail'd → boot recovery loop**
_error-handling · flagged by 1 model_
- **`run/executor.go:404` — `finalizeCheckpoint` is a plain statement, not deferred, so panics and early-error returns bypass it; a panicked run leaves an orphaned recoverable checkpoint.** The checkpointer is begun at `:193`, the wrapped step observer `Save`s a snapshot each completed step (`:367`), but `finalizeCheckpoint(...)` only runs at `:404` after the dispatch. The top-level `recover()` defer (`:119-123`) converts a panic anywhere in the loop into `res.Err` and returns **without** finali…
<sub>🪰 Gadfly · advisory</sub>
|
||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||
if resuming {
|
||||
// Resume: seed the saved transcript and continue (no new input — the
|
||||
// completed tool calls in the transcript are NOT re-run).
|
||||
runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History))
|
||||
} else {
|
||||
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
|
||||
}
|
||||
} else {
|
||||
// Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap
|
||||
// sequentially, threading outputs through {{.<PhaseName>}} templates. Reuses
|
||||
// the shared opts so audit/steps/critic-steer accumulate across every phase.
|
||||
// (Per-phase step caps are fixed — the critic's dynamic ceiling is not
|
||||
// propagated to phases — but its steer + hard deadline still apply.)
|
||||
// sequentially, threading outputs through {{.<PhaseName>}} templates. The
|
||||
// shared step observer (audit/steps/critic) is wired per phase by the phase
|
||||
// runner; checkpointing is phase-boundary granular (completed phases are
|
||||
// recorded so a resumed run skips them).
|
||||
runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{
|
||||
baseModel: model,
|
||||
baseToolbox: toolbox,
|
||||
@@ -350,9 +424,14 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
stepObserver: stepObserver,
|
||||
steer: steer,
|
||||
rec: rec,
|
||||
checkpointer: ckpt,
|
||||
resume: resume,
|
||||
}, input, inv.Images)
|
||||
}
|
||||
|
||||
// Durable-recovery finalize (Complete/Fail/leave-running) happens in the
|
||||
// top-of-Run defer so it covers panics + early build-error returns too.
|
||||
|
||||
status := statusFor(runCtx, runErr)
|
||||
if runRes != nil {
|
||||
res.Output = runRes.Output
|
||||
|
||||
@@ -53,9 +53,10 @@ import (
|
||||
|
||||
// phaseDeps carries the per-run state the phase runner shares with Run: the base
|
||||
// model, the full decorated toolbox (filtered per phase), the base step cap, the
|
||||
// shared agent options (tool-error limits + step observer + compactor), the
|
||||
// shared step observer (also fed by IsRunFunc bare calls), the critic/session
|
||||
// steer, and the audit recorder (phase events).
|
||||
// shared agent options (tool-error limits + compactor — the step observer is
|
||||
// added per phase, NOT in sharedOpts, so checkpointing can vary per path), the
|
||||
// shared step observer (wired into each phase's loop AND invoked for IsRunFunc
|
||||
// bare calls), the critic/session steer, and the audit recorder (phase events).
|
||||
type phaseDeps struct {
|
||||
baseModel llm.Model
|
||||
baseToolbox *llm.Toolbox
|
||||
@@ -64,6 +65,13 @@ type phaseDeps struct {
|
||||
stepObserver func(agent.Step)
|
||||
steer func() []llm.Message
|
||||
rec RunRecorder
|
||||
|
gitea-actions
commented
🟡 Comments and logic around phaseDeps fields could be improved for clarity maintainability · flagged by 1 model
🪰 Gadfly · advisory 🟡 **Comments and logic around phaseDeps fields could be improved for clarity**
_maintainability · flagged by 1 model_
- **`run/checkpoint.go:54-61`**: The `checkpointOutcome` type and its constants are not exported. This limits their usability outside the package and could be confusing for external consumers who might need to interact with these outcomes. Consider exporting them if they are part of the public API. - **`run/phases.go:67-74`**: The `phaseDeps` struct includes a `checkpointer` field and a `resume` field, but the comments and logic around these fields are not entirely clear. The comments could be i…
<sub>🪰 Gadfly · advisory</sub>
|
||||
// checkpointer records phase-boundary progress (completed phases) for durable
|
||||
// recovery; nil = non-durable. resume carries a recovered run's completed
|
||||
// phases so they are skipped on re-run. Phase recovery is boundary-granular:
|
||||
// the interrupted (active) phase re-runs from its start (its mid-phase
|
||||
// transcript is NOT resumed — only the single-loop path resumes mid-loop).
|
||||
checkpointer Checkpointer
|
||||
resume *ResumeState
|
||||
}
|
||||
|
||||
// runPhases executes ra.Phases sequentially and returns a synthetic agent.Result
|
||||
@@ -73,10 +81,28 @@ type phaseDeps struct {
|
||||
// deadline/critic-kill — returns the error.
|
||||
func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phaseDeps, query string, images []llm.ImagePart) (*agent.Result, error) {
|
||||
outputs := make(map[string]string, len(ra.Phases))
|
||||
|
gitea-actions
commented
🟡 Potential Unbounded Growth in performance · flagged by 1 model
🪰 Gadfly · advisory 🟡 **Potential Unbounded Growth in `completed` Slice in `runPhases`**
_performance · flagged by 1 model_
- **Potential Unbounded Growth in `completed` Slice in `runPhases`** In `run/phases.go:83`, the `completed` slice is appended to in each phase iteration without any bounds checking. This could lead to unbounded growth in memory usage for long-running or frequently resumed multi-phase runs. Consider adding a limit or periodic cleanup mechanism to prevent excessive memory consumption.
<sub>🪰 Gadfly · advisory</sub>
|
||||
var completed []PhaseOutput
|
||||
var lastResult *agent.Result
|
||||
var lastOutput string
|
||||
var totalUsage llm.Usage
|
||||
|
||||
// resumeSkip is the set of phases already finished on a RECOVERED run — kept
|
||||
// SEPARATE from the live `outputs` map (which fills as phases run this time) so
|
||||
// the skip guard only skips RESUME-completed phases, never a fresh run's own
|
||||
// phases. (Reusing `outputs` would make a second phase with a duplicate name
|
||||
// skip itself.) Pre-populate outputs + completed so a resumed run threads the
|
||||
// saved outputs into later phases. The interrupted (active) phase is NOT
|
||||
// pre-populated, so it re-runs from its start (boundary-granular recovery).
|
||||
resumeSkip := map[string]bool{}
|
||||
if deps.resume != nil {
|
||||
for _, pc := range deps.resume.CompletedPhases {
|
||||
outputs[pc.Name] = pc.Output
|
||||
resumeSkip[pc.Name] = true
|
||||
completed = append(completed, pc)
|
||||
lastOutput = pc.Output
|
||||
}
|
||||
}
|
||||
|
||||
// finish stamps the aggregated usage + final output onto the synthetic result.
|
||||
finish := func(err error) (*agent.Result, error) {
|
||||
if lastResult == nil {
|
||||
@@ -90,6 +116,10 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas
|
||||
}
|
||||
|
||||
for i, phase := range ra.Phases {
|
||||
// Skip phases already completed on a resumed run.
|
||||
if resumeSkip[phase.Name] {
|
||||
continue
|
||||
}
|
||||
// A killed/timed-out/cancelled run must not start its next phase.
|
||||
if err := runCtx.Err(); err != nil {
|
||||
return finish(err)
|
||||
@@ -151,6 +181,15 @@ func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phas
|
||||
|
||||
outputs[phase.Name] = output
|
||||
lastOutput = output
|
||||
// Checkpoint the phase boundary: this phase is done, so a resumed run skips
|
||||
|
gitea-actions
commented
🔴 Phase-boundary checkpoint Save error silently swallowed correctness, error-handling, maintainability, performance · flagged by 5 models
🪰 Gadfly · advisory 🔴 **Phase-boundary checkpoint Save error silently swallowed**
_correctness, error-handling, maintainability, performance · flagged by 5 models_
- **`run/phases.go:184` — Phase-boundary checkpoint `Save` error silently swallowed**: The phase-boundary save discards the error with `_ =`. If the checkpoint store fails mid-run (disk full, DB locked, network blip), the run continues but checkpoints aren't persisted. On shutdown/recovery, the host will resume from a stale checkpoint (or none), potentially re-executing work already done or losing progress. **Verified** by reading `run/phases.go:183-188`. *Suggested fix*: Log the error (at minim…
<sub>🪰 Gadfly · advisory</sub>
|
||||
// it and continues from the next. (Copy the slice — the checkpointer may
|
||||
// hold/serialize it asynchronously.)
|
||||
completed = append(completed, PhaseOutput{Name: phase.Name, Output: output})
|
||||
if deps.checkpointer != nil {
|
||||
_ = deps.checkpointer.Save(runCtx, RunCheckpointState{
|
||||
CompletedPhases: append([]PhaseOutput(nil), completed...),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return finish(nil)
|
||||
@@ -192,11 +231,13 @@ func (e *Executor) runOnePhase(runCtx context.Context, ra RunnableAgent, deps ph
|
||||
maxIter = deps.baseMaxIter
|
||||
}
|
||||
// Per-phase opts: a fixed step ceiling for this phase (the critic's dynamic
|
||||
// ceiling is intentionally not propagated to phases) + the phase toolbox, on
|
||||
// top of the shared opts (tool-error limits, step observer, compactor).
|
||||
// ceiling is intentionally not propagated to phases) + the phase toolbox + the
|
||||
// shared step observer (audit/steps/critic), on top of the shared opts
|
||||
// (tool-error limits, compactor).
|
||||
opts := append([]agent.Option{
|
||||
agent.WithToolbox(toolbox),
|
||||
agent.WithMaxSteps(maxIter),
|
||||
agent.WithStepObserver(deps.stepObserver),
|
||||
}, deps.sharedOpts...)
|
||||
ag := agent.New(model, system, opts...)
|
||||
|
||||
|
||||
@@ -130,6 +130,26 @@ func TestPhases_OptionalDoesNotSwallowCancellation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPhases_DuplicateNamesBothRun: a fresh (non-resume) run with two phases
|
||||
// sharing a name must run BOTH — the resume-skip guard keys off a separate
|
||||
// resume set, not the live outputs map (which fills as phases run), so a phase
|
||||
// never skips a same-named sibling on a fresh run.
|
||||
func TestPhases_DuplicateNamesBothRun(t *testing.T) {
|
||||
models, fp := phaseProvider(t, fake.Reply("first"), fake.Reply("second"))
|
||||
ex := New(Config{Registry: tool.NewRegistry(), Models: models})
|
||||
ra := RunnableAgent{
|
||||
Name: "p", ModelTier: "test-model",
|
||||
Phases: []Phase{{Name: "x", SystemPrompt: "P1"}, {Name: "x", SystemPrompt: "P2"}},
|
||||
}
|
||||
res := ex.Run(context.Background(), ra, tool.Invocation{RunID: "r"}, "Q")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if n := len(fp.Calls()); n != 2 {
|
||||
t.Fatalf("both same-named phases must run on a fresh run; got %d model calls", n)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPhases_HardErrorAborts: a NON-optional phase that hits a hard error (not a
|
||||
// budget/step exhaustion) aborts the pipeline; later phases do not run.
|
||||
func TestPhases_HardErrorAborts(t *testing.T) {
|
||||
|
||||
@@ -33,9 +33,10 @@ type Ports struct {
|
||||
Budget Budget
|
||||
// Critic optionally monitors a long run for hangs/runaways. nil = none.
|
||||
Critic Critic
|
||||
// Checkpointer persists resumable progress for durable recovery. nil = no
|
||||
// checkpointing (a run interrupted by shutdown is simply lost).
|
||||
Checkpointer Checkpointer
|
||||
// Checkpointer mints a per-run Checkpointer for durable recovery (it decides
|
||||
// per run whether the run is durable). nil = no checkpointing (a run
|
||||
// interrupted by shutdown is simply lost).
|
||||
Checkpointer CheckpointerFactory
|
||||
// Palette resolves SkillPalette / SubAgentPalette entries into delegation
|
||||
// tools (skill__<name> / agent__<name>). nil = those entries are inert.
|
||||
Palette PaletteSource
|
||||
@@ -66,7 +67,9 @@ type RunInfo struct {
|
||||
Name string
|
||||
CallerID string
|
||||
ChannelID string
|
||||
GuildID string // the originating guild/server id (empty for DMs/triggers)
|
||||
ParentRunID string
|
||||
ModelTier string // the run's resolved base tier (for checkpoint re-dispatch)
|
||||
Inputs map[string]any
|
||||
StartedAt time.Time
|
||||
// MaxIterations is the run's base tool-dispatch step ceiling, so a critic can
|
||||
@@ -172,6 +175,16 @@ type CriticHandle interface {
|
||||
|
||||
// --- Checkpointer ---
|
||||
|
||||
// CheckpointerFactory decides, per run, whether the run is durable and (if so)
|
||||
// mints the per-run Checkpointer that records its progress. It returns (nil, nil)
|
||||
// for a non-durable run (the common short-run case — no checkpointing overhead).
|
||||
// A storage error should be logged and degraded to (nil, nil) so a failing
|
||||
// checkpoint store never fails the run. Mirrors mort's
|
||||
// agentexec.CheckpointerFactory.
|
||||
type CheckpointerFactory interface {
|
||||
Begin(ctx context.Context, info RunInfo) (Checkpointer, error)
|
||||
}
|
||||
|
||||
// Checkpointer persists a run's resumable progress for durable recovery.
|
||||
// Mirrors mort's agentexec.RunCheckpointer.
|
||||
type Checkpointer interface {
|
||||
@@ -184,11 +197,24 @@ type Checkpointer interface {
|
||||
Fail(ctx context.Context, err error) error
|
||||
}
|
||||
|
||||
// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept
|
||||
// minimal here; the executor extends what it records during the merge.
|
||||
// RunCheckpointState is the resumable snapshot a Checkpointer persists.
|
||||
type RunCheckpointState struct {
|
||||
// Messages is the running transcript of a SINGLE-LOOP run (grows each step;
|
||||
|
gitea-actions
commented
🟡 Documentation claims multi-phase transcript resume not implemented correctness · flagged by 1 model
🪰 Gadfly · advisory 🟡 **Documentation claims multi-phase transcript resume not implemented**
_correctness · flagged by 1 model_
2. **`run/ports.go:202-203` — Documentation claims unsupported functionality** - The comment states `Messages` is "the running transcript (single-loop run) OR the active phase's transcript (multi-phase run)". However, the multi-phase checkpoint save at `run/phases.go:184-187` **never sets `Messages`** — it only saves `CompletedPhases` and `ActivePhase`. This means mid-phase transcript recovery for multi-phase runs is NOT supported (only boundary-granular). - **Impact**: Documentation promises fu…
<sub>🪰 Gadfly · advisory</sub>
|
||||
// resumed via WithHistory). nil for multi-phase runs — phase recovery is
|
||||
// boundary-granular (see CompletedPhases), not mid-phase transcript.
|
||||
Messages []llm.Message
|
||||
Iteration int
|
||||
// CompletedPhases is set only for multi-phase runs: the outputs of phases
|
||||
// already finished, in phase order, so a resumed run skips them and re-runs
|
||||
// the interrupted phase from its start. nil for single-loop runs.
|
||||
CompletedPhases []PhaseOutput
|
||||
|
gitea-actions
commented
🟡 RunCheckpointState.ActivePhase is always empty string — field exists but is never populated with meaningful data maintainability · flagged by 1 model
🪰 Gadfly · advisory 🟡 **RunCheckpointState.ActivePhase is always empty string — field exists but is never populated with meaningful data**
_maintainability · flagged by 1 model_
2. **`run/ports.go:210` / `run/phases.go:186`** — `RunCheckpointState.ActivePhase` is defined and persisted, but the only write in the entire codebase is `ActivePhase: ""` at `run/phases.go:186`. It is never set to a non-empty value.
<sub>🪰 Gadfly · advisory</sub>
|
||||
}
|
||||
|
||||
// PhaseOutput is one completed pipeline phase's name and output text, recorded in
|
||||
// a checkpoint so a resumed multi-phase run can skip already-finished phases.
|
||||
type PhaseOutput struct {
|
||||
Name string
|
||||
Output string
|
||||
}
|
||||
|
||||
// --- PaletteSource ---
|
||||
|
||||
⚪ factory.now field is never set by NewFactory (always nil); dead/untestable field
maintainability · flagged by 1 model
🪰 Gadfly · advisory