P4: checkpoint battery — durable-resume seam + run.Checkpointer handle

Plugs into run.Ports.Checkpointer (the executor's call site is a P2 follow-up;
this provides the seam + impls ahead of it):
- checkpoint.go: CheckpointStore seam + RunCheckpoint{Meta, Messages, Iteration,
  ActivePhase} + RunCheckpointMeta (mirrors mort's agentexec types).
- handle.go: New(store, meta, throttle, now) -> run.Checkpointer. Save writes a
  throttled snapshot; Complete/Fail delete it (a cleanly finished or terminally
  failed run is NOT a recovery candidate; a shutdown-interrupted run never calls
  them, so its checkpoint survives ListInterrupted at boot). nil store -> no-op.
- memory.go: NewMemory() default (with the honest caveat that in-memory does
  not survive the restart it exists to recover from — a durable store is mort's).

Tests: save+complete clears the recovery candidate; throttle skips in-window
saves; nil-store is a clean no-op. Core imports ZERO from checkpoint.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 23:00:02 -04:00
parent 5b5e130cee
commit e856dacc12
5 changed files with 242 additions and 1 deletions
+2 -1
View File
@@ -71,7 +71,8 @@ BATTERIES (opt-in siblings, each nil-safe + a default):
default (skillaudit Storage iface; GORM stays in mort)
critic/ two-tier timeout state machine + Escalator [P4]
schedule/ cron runner cores [P4]
checkpoint/ durable resume seam [P4]
checkpoint/ CheckpointStore + run.Checkpointer handle [P4]
(throttled Save/Complete/Fail) + Memory (exec wiring=P2 follow-up)
budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓]
BudgetStorage iface + Memory default
+50
View File
@@ -0,0 +1,50 @@
// Package checkpoint is the durable-resume battery: it persists a run's
// resumable progress so a run interrupted by a shutdown can be recovered and
// continued on the next boot, rather than silently lost. It plugs into
// run.Ports.Checkpointer.
//
// Mort backs CheckpointStore with its durable-job table; Memory() is the
// zero-dependency default; contrib/store can add a SQLite one. NOTE: the
// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery
// provides the seam + impls ahead of that wiring.
package checkpoint
import (
"context"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// RunCheckpointMeta is the run attribution needed to resume a run from scratch
// (mirrors mort's agentexec.RunCheckpointMeta).
type RunCheckpointMeta struct {
RunID string
AgentID string
AgentName string
CallerID string
ChannelID string
GuildID string
Prompt string
ModelTier string
ParentRunID string
}
// RunCheckpoint is one persisted snapshot of a run's resumable progress.
type RunCheckpoint struct {
Meta RunCheckpointMeta
Messages []llm.Message // conversation so far
Iteration int // completed agent-loop iterations
ActivePhase string // current phase name (multi-phase agents); "" otherwise
UpdatedAt time.Time
}
// CheckpointStore persists run checkpoints keyed by run id. A live checkpoint
// means "this run was in flight and not cleanly finished"; Complete/Fail delete
// it. ListInterrupted returns every surviving checkpoint at boot for recovery.
type CheckpointStore interface {
Save(ctx context.Context, cp RunCheckpoint) error
Load(ctx context.Context, runID string) (*RunCheckpoint, error)
Delete(ctx context.Context, runID string) error
ListInterrupted(ctx context.Context) ([]RunCheckpoint, error)
}
+64
View File
@@ -0,0 +1,64 @@
package checkpoint
import (
"context"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
func TestHandleSaveCompleteDelete(t *testing.T) {
ctx := context.Background()
mem := NewMemory()
meta := RunCheckpointMeta{RunID: "r1", AgentID: "a1", CallerID: "c1"}
cp := New(mem, meta, 0, nil) // throttle 0 = save every call
if err := cp.Save(ctx, run.RunCheckpointState{Messages: []llm.Message{{Role: "user"}}, Iteration: 2}); err != nil {
t.Fatal(err)
}
got, _ := mem.Load(ctx, "r1")
if got == nil || got.Iteration != 2 || got.Meta.AgentID != "a1" {
t.Fatalf("checkpoint not persisted: %+v", got)
}
if il, _ := mem.ListInterrupted(ctx); len(il) != 1 {
t.Errorf("ListInterrupted = %d, want 1 (in-flight)", len(il))
}
// Complete clears it (no longer a recovery candidate).
if err := cp.Complete(ctx); err != nil {
t.Fatal(err)
}
if il, _ := mem.ListInterrupted(ctx); len(il) != 0 {
t.Errorf("after Complete, ListInterrupted = %d, want 0", len(il))
}
}
func TestHandleThrottle(t *testing.T) {
ctx := context.Background()
mem := NewMemory()
now := time.Now()
cp := New(mem, RunCheckpointMeta{RunID: "r"}, time.Minute, func() time.Time { return now })
cp.Save(ctx, run.RunCheckpointState{Iteration: 1})
now = now.Add(10 * time.Second) // within throttle window
cp.Save(ctx, run.RunCheckpointState{Iteration: 2})
if got, _ := mem.Load(ctx, "r"); got.Iteration != 1 {
t.Errorf("throttled save should keep iteration 1, got %d", got.Iteration)
}
now = now.Add(time.Minute) // past throttle
cp.Save(ctx, run.RunCheckpointState{Iteration: 3})
if got, _ := mem.Load(ctx, "r"); got.Iteration != 3 {
t.Errorf("post-throttle save should land iteration 3, got %d", got.Iteration)
}
}
func TestNilStoreNoop(t *testing.T) {
cp := New(nil, RunCheckpointMeta{RunID: "r"}, 0, nil)
if err := cp.Save(context.Background(), run.RunCheckpointState{}); err != nil {
t.Errorf("nil-store Save should be a no-op, got %v", err)
}
if err := cp.Complete(context.Background()); err != nil {
t.Error(err)
}
}
+71
View File
@@ -0,0 +1,71 @@
package checkpoint
import (
"context"
"sync"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/run"
)
// handle is a per-run run.Checkpointer bound to one run's id + meta. Save writes
// a fresh snapshot (throttled), Complete/Fail delete the checkpoint (a cleanly
// finished or terminally failed run is NOT a recovery candidate). A run
// interrupted by shutdown never calls Complete/Fail, so its checkpoint survives
// for ListInterrupted at boot.
type handle struct {
store CheckpointStore
meta RunCheckpointMeta
throttle time.Duration
now func() time.Time
mu sync.Mutex
lastSave time.Time
}
var _ run.Checkpointer = (*handle)(nil)
// New returns a run.Checkpointer that persists snapshots of the run identified
// by meta.RunID to store, no more often than throttle (Save calls inside the
// window are skipped). A nil store yields a no-op Checkpointer. throttle <= 0
// saves every call; now defaults to time.Now.
func New(store CheckpointStore, meta RunCheckpointMeta, throttle time.Duration, now func() time.Time) run.Checkpointer {
if store == nil {
return noop{}
}
if now == nil {
now = time.Now
}
return &handle{store: store, meta: meta, throttle: throttle, now: now}
}
func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error {
h.mu.Lock()
now := h.now()
if h.throttle > 0 && !h.lastSave.IsZero() && now.Sub(h.lastSave) < h.throttle {
h.mu.Unlock()
return nil // throttled — a more recent snapshot will land shortly
}
h.lastSave = now
h.mu.Unlock()
return h.store.Save(ctx, RunCheckpoint{
Meta: h.meta,
Messages: st.Messages,
Iteration: st.Iteration,
UpdatedAt: now,
})
}
func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) }
func (h *handle) Fail(ctx context.Context, _ error) error { return h.store.Delete(ctx, h.meta.RunID) }
// noop is the nil-store Checkpointer: every method is a successful no-op.
type noop struct{}
var _ run.Checkpointer = noop{}
func (noop) Save(context.Context, run.RunCheckpointState) error { return nil }
func (noop) Complete(context.Context) error { return nil }
func (noop) Fail(context.Context, error) error { return nil }
+55
View File
@@ -0,0 +1,55 @@
package checkpoint
import (
"context"
"sync"
)
// Memory is a zero-dependency in-process CheckpointStore. NOTE: an in-memory
// checkpoint store does NOT survive the process restart it exists to recover
// from — it is the test/light-host default and makes ListInterrupted meaningful
// only within a single process lifetime. A host that wants real
// crash-recovery wires a durable CheckpointStore (mort's durable-job table).
type Memory struct {
mu sync.RWMutex
cps map[string]RunCheckpoint // by run id
}
// NewMemory returns an empty in-memory CheckpointStore.
func NewMemory() *Memory { return &Memory{cps: map[string]RunCheckpoint{}} }
var _ CheckpointStore = (*Memory)(nil)
func (m *Memory) Save(_ context.Context, cp RunCheckpoint) error {
m.mu.Lock()
defer m.mu.Unlock()
m.cps[cp.Meta.RunID] = cp
return nil
}
func (m *Memory) Load(_ context.Context, runID string) (*RunCheckpoint, error) {
m.mu.RLock()
defer m.mu.RUnlock()
cp, ok := m.cps[runID]
if !ok {
return nil, nil // no checkpoint (not an error — the run finished cleanly or never started)
}
return &cp, nil
}
func (m *Memory) Delete(_ context.Context, runID string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cps, runID)
return nil
}
func (m *Memory) ListInterrupted(_ context.Context) ([]RunCheckpoint, error) {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]RunCheckpoint, 0, len(m.cps))
for _, cp := range m.cps {
out = append(out, cp)
}
return out, nil
}