feat(run): durable checkpoint + resume (wire Ports.Checkpointer)
The kernel defined run.Ports.Checkpointer + the checkpoint battery but never drove them (the documented "P2 follow-up"). This wires durable recovery into the run loop so a run interrupted by shutdown can resume on the next boot instead of being lost — the executus-side half of mort's durable-agent-recovery parity (mort #1355). Kernel (run/): - Ports.Checkpointer is now a CheckpointerFactory (Begin per run → a per-run Checkpointer, or nil for a non-durable run). The single per-instance Checkpointer couldn't distinguish runs; a factory mints one per run, matching mort's agentexec.CheckpointerFactory. - RunInfo gains GuildID + ModelTier (so the factory can build resume meta); RunCheckpointState gains CompletedPhases + ActivePhase (+ PhaseOutput). - run/checkpoint.go: ResumeState + WithResumeState / WithExistingCheckpointer context carriers, classifyCheckpointOutcome (success→Complete, shutdown→leave for boot recovery, else→Fail using run.ErrShutdown), and finalizeCheckpoint. - run/executor.go: resolve the per-run checkpointer (existing-from-ctx on a recovery re-run, else factory.Begin); single-loop wraps the step observer to accumulate the transcript + Save each step (host throttles), and a recovered run seeds the saved transcript via WithHistory and continues with no new input; finalize on exit. - run/phases.go: phase-boundary checkpointing — record completed phases after each phase; a resumed run skips already-completed phases (the interrupted phase re-runs from its start — boundary-granular, documented; only the single-loop path resumes mid-loop). Battery (checkpoint/): NewFactory wires the battery into the factory port (per-run handle, meta derived from RunInfo); RunCheckpoint + handle.Save carry the phase fields. Tests (run/checkpoint_test.go): the finalize decision matrix; single-loop Save+Complete; terminal-error Fail; resume seeds history; phase-boundary Saves completed phases; resume skips completed phases. Full ./... green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+58
-8
@@ -165,7 +165,9 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
Name: ra.Name,
|
||||
CallerID: inv.CallerID,
|
||||
ChannelID: inv.ChannelID,
|
||||
GuildID: inv.GuildID,
|
||||
ParentRunID: inv.ParentRunID,
|
||||
ModelTier: tier,
|
||||
Inputs: inv.SkillInputs,
|
||||
StartedAt: started,
|
||||
MaxIterations: maxIter,
|
||||
@@ -180,6 +182,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
inv.RunState = stateAcc
|
||||
}
|
||||
|
||||
// Durable recovery (optional): a recovered run carries a ResumeState (prior
|
||||
// transcript / completed phases) + an existing Checkpointer in ctx so it
|
||||
// continues on the SAME durable record; a fresh run mints a per-run
|
||||
// Checkpointer via the factory (which decides durability — nil = non-durable).
|
||||
// nil-safe throughout.
|
||||
resume := resumeStateFromContext(ctx)
|
||||
ckpt := existingCheckpointerFromContext(ctx)
|
||||
if ckpt == nil && e.cfg.Ports.Checkpointer != nil {
|
||||
if c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info); cerr == nil {
|
||||
ckpt = c
|
||||
}
|
||||
}
|
||||
|
||||
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
|
||||
// messages into the running conversation before its next step. Created BEFORE
|
||||
// the toolbox build so any tool's handler captures the live AttachImages seam.
|
||||
@@ -289,11 +304,11 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
}
|
||||
|
||||
// Shared agent options used by BOTH the single-loop path and every phase: the
|
||||
// tool-error guards, the step observer, and optional compaction. The toolbox +
|
||||
// step ceiling are NOT shared (they vary per phase), so they're added per path.
|
||||
// tool-error guards and optional compaction. The toolbox, step ceiling, AND
|
||||
// step observer are added per path (the observer is wrapped for checkpointing,
|
||||
// which differs single-loop vs per-phase).
|
||||
sharedOpts := []agent.Option{
|
||||
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
||||
agent.WithStepObserver(stepObserver),
|
||||
}
|
||||
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
||||
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
||||
@@ -330,18 +345,47 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
// Single-loop run: the agent's base prompt + full toolbox, with the
|
||||
// critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a
|
||||
// healthy-but-long run's budget mid-flight; falls back to maxIter).
|
||||
//
|
||||
// Checkpointing: wrap the step observer to accumulate the running transcript
|
||||
// and Save it each step (the host throttles). A recovered run seeds the saved
|
||||
// transcript as history and continues with no new input. acc starts from the
|
||||
// resume history (or the opening user message) and grows as steps complete.
|
||||
obs := stepObserver
|
||||
if ckpt != nil {
|
||||
acc := []llm.Message{multimodalUserMessage(input, inv.Images)}
|
||||
if resume != nil && len(resume.History) > 0 {
|
||||
acc = append([]llm.Message(nil), resume.History...)
|
||||
}
|
||||
obs = func(s agent.Step) {
|
||||
stepObserver(s)
|
||||
if s.Response != nil {
|
||||
acc = append(acc, s.Response.Message())
|
||||
}
|
||||
if len(s.Results) > 0 {
|
||||
acc = append(acc, llm.ToolResultsMessage(s.Results...))
|
||||
}
|
||||
_ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1})
|
||||
}
|
||||
}
|
||||
opts := append([]agent.Option{
|
||||
agent.WithToolbox(toolbox),
|
||||
critic.maxStepsOption(maxIter),
|
||||
agent.WithStepObserver(obs),
|
||||
}, sharedOpts...)
|
||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
|
||||
if resume != nil && len(resume.History) > 0 {
|
||||
// Resume: seed the saved transcript and continue (no new input — the
|
||||
// completed tool calls in the transcript are NOT re-run).
|
||||
runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History))
|
||||
} else {
|
||||
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
|
||||
}
|
||||
} else {
|
||||
// Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap
|
||||
// sequentially, threading outputs through {{.<PhaseName>}} templates. Reuses
|
||||
// the shared opts so audit/steps/critic-steer accumulate across every phase.
|
||||
// (Per-phase step caps are fixed — the critic's dynamic ceiling is not
|
||||
// propagated to phases — but its steer + hard deadline still apply.)
|
||||
// sequentially, threading outputs through {{.<PhaseName>}} templates. The
|
||||
// shared step observer (audit/steps/critic) is wired per phase by the phase
|
||||
// runner; checkpointing is phase-boundary granular (completed phases are
|
||||
// recorded so a resumed run skips them).
|
||||
runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{
|
||||
baseModel: model,
|
||||
baseToolbox: toolbox,
|
||||
@@ -350,9 +394,15 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
stepObserver: stepObserver,
|
||||
steer: steer,
|
||||
rec: rec,
|
||||
checkpointer: ckpt,
|
||||
resume: resume,
|
||||
}, input, inv.Images)
|
||||
}
|
||||
|
||||
// Finalize durable recovery: clear the checkpoint on success/terminal failure,
|
||||
// or leave it for boot recovery when the run was interrupted by shutdown.
|
||||
finalizeCheckpoint(ctx, ckpt, runErr, context.Cause(runCtx))
|
||||
|
||||
status := statusFor(runCtx, runErr)
|
||||
if runRes != nil {
|
||||
res.Output = runRes.Output
|
||||
|
||||
Reference in New Issue
Block a user