From e856dacc12af74eab577f6f689ba8b56dfded600 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 23:00:02 -0400 Subject: [PATCH] =?UTF-8?q?P4:=20checkpoint=20battery=20=E2=80=94=20durabl?= =?UTF-8?q?e-resume=20seam=20+=20run.Checkpointer=20handle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugs into run.Ports.Checkpointer (the executor's call site is a P2 follow-up; this provides the seam + impls ahead of it): - checkpoint.go: CheckpointStore seam + RunCheckpoint{Meta, Messages, Iteration, ActivePhase} + RunCheckpointMeta (mirrors mort's agentexec types). - handle.go: New(store, meta, throttle, now) -> run.Checkpointer. Save writes a throttled snapshot; Complete/Fail delete it (a cleanly finished or terminally failed run is NOT a recovery candidate; a shutdown-interrupted run never calls them, so its checkpoint survives ListInterrupted at boot). nil store -> no-op. - memory.go: NewMemory() default (with the honest caveat that in-memory does not survive the restart it exists to recover from — a durable store is mort's). Tests: save+complete clears the recovery candidate; throttle skips in-window saves; nil-store is a clean no-op. Core imports ZERO from checkpoint. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 3 +- checkpoint/checkpoint.go | 50 ++++++++++++++++++++++++ checkpoint/checkpoint_test.go | 64 +++++++++++++++++++++++++++++++ checkpoint/handle.go | 71 +++++++++++++++++++++++++++++++++++ checkpoint/memory.go | 55 +++++++++++++++++++++++++++ 5 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 checkpoint/checkpoint.go create mode 100644 checkpoint/checkpoint_test.go create mode 100644 checkpoint/handle.go create mode 100644 checkpoint/memory.go diff --git a/CLAUDE.md b/CLAUDE.md index e602bb4..cf4efff 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -71,7 +71,8 @@ BATTERIES (opt-in siblings, each nil-safe + a default): default (skillaudit Storage iface; GORM stays in mort) critic/ two-tier timeout state machine + Escalator [P4] schedule/ cron runner cores [P4] - checkpoint/ durable resume seam [P4] + checkpoint/ CheckpointStore + run.Checkpointer handle [P4 ✓] + (throttled Save/Complete/Fail) + Memory (exec wiring=P2 follow-up) budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓] BudgetStorage iface + Memory default diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go new file mode 100644 index 0000000..64d17cf --- /dev/null +++ b/checkpoint/checkpoint.go @@ -0,0 +1,50 @@ +// Package checkpoint is the durable-resume battery: it persists a run's +// resumable progress so a run interrupted by a shutdown can be recovered and +// continued on the next boot, rather than silently lost. It plugs into +// run.Ports.Checkpointer. +// +// Mort backs CheckpointStore with its durable-job table; Memory() is the +// zero-dependency default; contrib/store can add a SQLite one. NOTE: the +// executor's call into run.Ports.Checkpointer is a P2 follow-up — this battery +// provides the seam + impls ahead of that wiring. +package checkpoint + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// RunCheckpointMeta is the run attribution needed to resume a run from scratch +// (mirrors mort's agentexec.RunCheckpointMeta). +type RunCheckpointMeta struct { + RunID string + AgentID string + AgentName string + CallerID string + ChannelID string + GuildID string + Prompt string + ModelTier string + ParentRunID string +} + +// RunCheckpoint is one persisted snapshot of a run's resumable progress. +type RunCheckpoint struct { + Meta RunCheckpointMeta + Messages []llm.Message // conversation so far + Iteration int // completed agent-loop iterations + ActivePhase string // current phase name (multi-phase agents); "" otherwise + UpdatedAt time.Time +} + +// CheckpointStore persists run checkpoints keyed by run id. A live checkpoint +// means "this run was in flight and not cleanly finished"; Complete/Fail delete +// it. ListInterrupted returns every surviving checkpoint at boot for recovery. +type CheckpointStore interface { + Save(ctx context.Context, cp RunCheckpoint) error + Load(ctx context.Context, runID string) (*RunCheckpoint, error) + Delete(ctx context.Context, runID string) error + ListInterrupted(ctx context.Context) ([]RunCheckpoint, error) +} diff --git a/checkpoint/checkpoint_test.go b/checkpoint/checkpoint_test.go new file mode 100644 index 0000000..6eb747d --- /dev/null +++ b/checkpoint/checkpoint_test.go @@ -0,0 +1,64 @@ +package checkpoint + +import ( + "context" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +func TestHandleSaveCompleteDelete(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + meta := RunCheckpointMeta{RunID: "r1", AgentID: "a1", CallerID: "c1"} + cp := New(mem, meta, 0, nil) // throttle 0 = save every call + + if err := cp.Save(ctx, run.RunCheckpointState{Messages: []llm.Message{{Role: "user"}}, Iteration: 2}); err != nil { + t.Fatal(err) + } + got, _ := mem.Load(ctx, "r1") + if got == nil || got.Iteration != 2 || got.Meta.AgentID != "a1" { + t.Fatalf("checkpoint not persisted: %+v", got) + } + if il, _ := mem.ListInterrupted(ctx); len(il) != 1 { + t.Errorf("ListInterrupted = %d, want 1 (in-flight)", len(il)) + } + // Complete clears it (no longer a recovery candidate). + if err := cp.Complete(ctx); err != nil { + t.Fatal(err) + } + if il, _ := mem.ListInterrupted(ctx); len(il) != 0 { + t.Errorf("after Complete, ListInterrupted = %d, want 0", len(il)) + } +} + +func TestHandleThrottle(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + now := time.Now() + cp := New(mem, RunCheckpointMeta{RunID: "r"}, time.Minute, func() time.Time { return now }) + + cp.Save(ctx, run.RunCheckpointState{Iteration: 1}) + now = now.Add(10 * time.Second) // within throttle window + cp.Save(ctx, run.RunCheckpointState{Iteration: 2}) + if got, _ := mem.Load(ctx, "r"); got.Iteration != 1 { + t.Errorf("throttled save should keep iteration 1, got %d", got.Iteration) + } + now = now.Add(time.Minute) // past throttle + cp.Save(ctx, run.RunCheckpointState{Iteration: 3}) + if got, _ := mem.Load(ctx, "r"); got.Iteration != 3 { + t.Errorf("post-throttle save should land iteration 3, got %d", got.Iteration) + } +} + +func TestNilStoreNoop(t *testing.T) { + cp := New(nil, RunCheckpointMeta{RunID: "r"}, 0, nil) + if err := cp.Save(context.Background(), run.RunCheckpointState{}); err != nil { + t.Errorf("nil-store Save should be a no-op, got %v", err) + } + if err := cp.Complete(context.Background()); err != nil { + t.Error(err) + } +} diff --git a/checkpoint/handle.go b/checkpoint/handle.go new file mode 100644 index 0000000..6a59f3d --- /dev/null +++ b/checkpoint/handle.go @@ -0,0 +1,71 @@ +package checkpoint + +import ( + "context" + "sync" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// handle is a per-run run.Checkpointer bound to one run's id + meta. Save writes +// a fresh snapshot (throttled), Complete/Fail delete the checkpoint (a cleanly +// finished or terminally failed run is NOT a recovery candidate). A run +// interrupted by shutdown never calls Complete/Fail, so its checkpoint survives +// for ListInterrupted at boot. +type handle struct { + store CheckpointStore + meta RunCheckpointMeta + throttle time.Duration + now func() time.Time + + mu sync.Mutex + lastSave time.Time +} + +var _ run.Checkpointer = (*handle)(nil) + +// New returns a run.Checkpointer that persists snapshots of the run identified +// by meta.RunID to store, no more often than throttle (Save calls inside the +// window are skipped). A nil store yields a no-op Checkpointer. throttle <= 0 +// saves every call; now defaults to time.Now. +func New(store CheckpointStore, meta RunCheckpointMeta, throttle time.Duration, now func() time.Time) run.Checkpointer { + if store == nil { + return noop{} + } + if now == nil { + now = time.Now + } + return &handle{store: store, meta: meta, throttle: throttle, now: now} +} + +func (h *handle) Save(ctx context.Context, st run.RunCheckpointState) error { + h.mu.Lock() + now := h.now() + if h.throttle > 0 && !h.lastSave.IsZero() && now.Sub(h.lastSave) < h.throttle { + h.mu.Unlock() + return nil // throttled — a more recent snapshot will land shortly + } + h.lastSave = now + h.mu.Unlock() + + return h.store.Save(ctx, RunCheckpoint{ + Meta: h.meta, + Messages: st.Messages, + Iteration: st.Iteration, + UpdatedAt: now, + }) +} + +func (h *handle) Complete(ctx context.Context) error { return h.store.Delete(ctx, h.meta.RunID) } + +func (h *handle) Fail(ctx context.Context, _ error) error { return h.store.Delete(ctx, h.meta.RunID) } + +// noop is the nil-store Checkpointer: every method is a successful no-op. +type noop struct{} + +var _ run.Checkpointer = noop{} + +func (noop) Save(context.Context, run.RunCheckpointState) error { return nil } +func (noop) Complete(context.Context) error { return nil } +func (noop) Fail(context.Context, error) error { return nil } diff --git a/checkpoint/memory.go b/checkpoint/memory.go new file mode 100644 index 0000000..3572e35 --- /dev/null +++ b/checkpoint/memory.go @@ -0,0 +1,55 @@ +package checkpoint + +import ( + "context" + "sync" +) + +// Memory is a zero-dependency in-process CheckpointStore. NOTE: an in-memory +// checkpoint store does NOT survive the process restart it exists to recover +// from — it is the test/light-host default and makes ListInterrupted meaningful +// only within a single process lifetime. A host that wants real +// crash-recovery wires a durable CheckpointStore (mort's durable-job table). +type Memory struct { + mu sync.RWMutex + cps map[string]RunCheckpoint // by run id +} + +// NewMemory returns an empty in-memory CheckpointStore. +func NewMemory() *Memory { return &Memory{cps: map[string]RunCheckpoint{}} } + +var _ CheckpointStore = (*Memory)(nil) + +func (m *Memory) Save(_ context.Context, cp RunCheckpoint) error { + m.mu.Lock() + defer m.mu.Unlock() + m.cps[cp.Meta.RunID] = cp + return nil +} + +func (m *Memory) Load(_ context.Context, runID string) (*RunCheckpoint, error) { + m.mu.RLock() + defer m.mu.RUnlock() + cp, ok := m.cps[runID] + if !ok { + return nil, nil // no checkpoint (not an error — the run finished cleanly or never started) + } + return &cp, nil +} + +func (m *Memory) Delete(_ context.Context, runID string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.cps, runID) + return nil +} + +func (m *Memory) ListInterrupted(_ context.Context) ([]RunCheckpoint, error) { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]RunCheckpoint, 0, len(m.cps)) + for _, cp := range m.cps { + out = append(out, cp) + } + return out, nil +}