Files
executus/lane/persistence.go
T
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

376 lines
13 KiB
Go

package lane
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
)
// PersistenceStore is the narrow surface PersistedLane needs to
// persist and recover lane jobs across process restarts.
//
// Why an interface here vs reaching into pkg/logic/skills directly:
// keeps the lane primitive generic — anyone with a job-row table that
// satisfies these six methods can plug in. pkg/logic/skills.Storage
// satisfies it via a thin adapter (PersistedSkillsStore).
//
// Test: persistence_test.go covers the round-trip + restart recovery
// flow using an in-memory fake store.
type PersistenceStore interface {
// EnqueueJob writes a row in state=queued. lane is the lane
// name; metadata is opaque payload preserved verbatim across
// restart for reconstruct paths.
EnqueueJob(ctx context.Context, jobID, lane, callerID string, priority int, metadata []byte) error
// UpdateJobState transitions the row to a new state. The state
// strings are the QueueJobState values from
// pkg/logic/skills/skill_queue_job.go ("queued", "running",
// "finished", "cancelled", "failed"). Stamps the matching
// timestamp column.
UpdateJobState(ctx context.Context, jobID string, state string, at time.Time) error
// ListQueuedJobs returns rows in state=queued for the given
// lane. Used by Recover to re-submit pending work.
ListQueuedJobs(ctx context.Context, lane string) ([]QueuedJobRef, error)
// ListRunningJobs returns rows in state=running for the given
// lane. After a process restart these are unrecoverable (the
// worker goroutine is gone) and Recover marks them failed.
ListRunningJobs(ctx context.Context, lane string) ([]QueuedJobRef, error)
// PurgeFinishedJobs deletes terminal-state rows older than the
// cutoff. Returns count deleted.
PurgeFinishedJobs(ctx context.Context, olderThan time.Time) (int64, error)
}
// QueuedJobRef is a thin row reference returned by List* methods.
// Carries enough state for Recover to reconstruct or mark a job.
//
// Why a separate type from the skills.QueueJob domain: the lane
// package doesn't import the skills package (and would create an
// import cycle if it did). The narrow ref type keeps the contract
// flat.
type QueuedJobRef struct {
JobID string
Lane string
CallerID string
Priority int
Metadata []byte
EnqueuedAt time.Time
}
// MetadataProvider is the optional interface a Job can implement to
// supply its restart-recovery payload.
//
// Why optional: not every job needs to be reconstructed (raw LLM
// transport jobs are issued ad-hoc by callers; a restart just drops
// the in-flight ones). Skills set Metadata so the executor can
// rehydrate the original Invocation.
type MetadataProvider interface {
Metadata() []byte
}
// PersistedLane wraps a Lane with DB persistence. Submit writes a
// row before delegating to the inner lane; Run state transitions
// update the row in place.
//
// Why a wrapper vs baking persistence into the pool: keeps the
// in-memory primitives test-friendly (pool_test.go runs without a
// DB). Production wires a PersistedLane around each named lane that
// needs restart recovery; lanes that don't (e.g. transient
// LLM-transport lanes used by anonymous callers) can stay
// in-memory only.
type PersistedLane struct {
inner Lane
store PersistenceStore
}
// NewPersistedLane wraps an existing Lane with a persistence store.
// The inner lane keeps doing all the in-memory queueing; the
// PersistedLane writes a DB row for each Submit and updates state on
// transitions.
func NewPersistedLane(inner Lane, store PersistenceStore) *PersistedLane {
return &PersistedLane{inner: inner, store: store}
}
// Inner returns the wrapped lane. Used by Recover to bypass the
// persistence path on re-submission (the row already exists).
func (p *PersistedLane) Inner() Lane { return p.inner }
// Name delegates to the inner lane.
func (p *PersistedLane) Name() string { return p.inner.Name() }
// Submit writes the queued row, then delegates to the inner lane.
// The job is wrapped so Run-time state transitions update the row.
//
// On enqueue-row write failure: returns the error WITHOUT submitting
// to the inner lane. We don't want to dispatch a job that we couldn't
// persist — admin visibility (and restart recovery) would then be
// inconsistent with the running set.
func (p *PersistedLane) Submit(ctx context.Context, job Job) (int, time.Duration, error) {
var meta []byte
if mp, ok := job.(MetadataProvider); ok {
meta = mp.Metadata()
}
if err := p.store.EnqueueJob(ctx, job.ID(), p.inner.Name(),
job.CallerID(), job.Priority(), meta); err != nil {
return 0, 0, fmt.Errorf("persist enqueue: %w", err)
}
wrapped := &persistedJob{inner: job, store: p.store}
return p.inner.Submit(ctx, wrapped)
}
// SubmitWait writes the queued row and blocks until Run completes
// (or ctx is cancelled). Same persistence semantics as Submit.
func (p *PersistedLane) SubmitWait(ctx context.Context, job Job) error {
var meta []byte
if mp, ok := job.(MetadataProvider); ok {
meta = mp.Metadata()
}
if err := p.store.EnqueueJob(ctx, job.ID(), p.inner.Name(),
job.CallerID(), job.Priority(), meta); err != nil {
return fmt.Errorf("persist enqueue: %w", err)
}
wrapped := &persistedJob{inner: job, store: p.store}
return p.inner.SubmitWait(ctx, wrapped)
}
// Cancel removes the job from the inner queue and writes
// state=cancelled to the persistence store. If Cancel returns
// ErrNotQueued (already running, etc.) the row state is NOT touched —
// the caller knows the job is past the queue stage.
func (p *PersistedLane) Cancel(jobID string) error {
if err := p.inner.Cancel(jobID); err != nil {
return err
}
// Inner cancel succeeded — update DB.
if uerr := p.store.UpdateJobState(context.Background(), jobID,
string(stateCancelled), time.Now()); uerr != nil {
// Best-effort: log; return nil because the in-memory
// cancellation already happened.
slog.Warn("lane persist: cancel state update failed",
"job", jobID, "error", uerr)
}
return nil
}
// Stats delegates to the inner lane.
func (p *PersistedLane) Stats() LaneStats { return p.inner.Stats() }
// SetMaxConcurrent delegates to the inner lane.
func (p *PersistedLane) SetMaxConcurrent(n int) { p.inner.SetMaxConcurrent(n) }
// Recover reconciles the persistence store with the in-memory lane
// after a process restart.
//
// - Rows in state=running at restart correspond to jobs whose
// worker goroutine was killed. They are marked failed (no
// auto-retry — skills can have side effects, see v6 spec
// "Restart amnesia").
// - Rows in state=queued are re-submitted to the inner lane via
// reconstructFn(ref) → Job. If reconstructFn returns nil the row
// is marked failed with reason "lost on restart" — the caller
// could not reconstruct the original work.
//
// Recover bypasses the PersistedLane.Submit path (which would write a
// duplicate row). The row already exists in state=queued; we just
// re-submit to the in-memory queue and let normal Run-time
// transitions take over from there.
func (p *PersistedLane) Recover(ctx context.Context, reconstructFn func(QueuedJobRef) Job) error {
// 1. Mark running rows as failed.
running, err := p.store.ListRunningJobs(ctx, p.inner.Name())
if err != nil {
return fmt.Errorf("list running: %w", err)
}
for _, ref := range running {
if uerr := p.store.UpdateJobState(ctx, ref.JobID,
string(stateFailed), time.Now()); uerr != nil {
slog.Warn("lane recover: failed to mark lost-on-restart",
"lane", p.inner.Name(), "job", ref.JobID, "error", uerr)
continue
}
slog.Warn("lane recover: job lost on restart",
"lane", p.inner.Name(), "job", ref.JobID)
}
// 2. Re-submit queued rows.
queued, err := p.store.ListQueuedJobs(ctx, p.inner.Name())
if err != nil {
return fmt.Errorf("list queued: %w", err)
}
for _, ref := range queued {
var job Job
if reconstructFn != nil {
job = reconstructFn(ref)
}
if job == nil {
if uerr := p.store.UpdateJobState(ctx, ref.JobID,
string(stateFailed), time.Now()); uerr != nil {
slog.Warn("lane recover: cannot reconstruct, mark-failed errored",
"lane", p.inner.Name(), "job", ref.JobID, "error", uerr)
} else {
slog.Warn("lane recover: cannot reconstruct, marked failed",
"lane", p.inner.Name(), "job", ref.JobID)
}
continue
}
// Wrap the reconstructed job so Run-time state transitions
// still update the existing row (no fresh enqueue).
wrapped := &persistedJob{inner: job, store: p.store}
if _, _, serr := p.inner.Submit(ctx, wrapped); serr != nil {
slog.Warn("lane recover: re-submit failed",
"lane", p.inner.Name(), "job", ref.JobID, "error", serr)
// Mark failed — job is in DB as queued but in-memory
// dispatch never happened.
if uerr := p.store.UpdateJobState(ctx, ref.JobID,
string(stateFailed), time.Now()); uerr != nil {
slog.Warn("lane recover: post-resubmit-failure mark errored",
"job", ref.JobID, "error", uerr)
}
}
}
return nil
}
// persistedJob wraps an inner Job to write state transitions on
// Run() entry and exit.
type persistedJob struct {
inner Job
store PersistenceStore
}
func (p *persistedJob) ID() string { return p.inner.ID() }
func (p *persistedJob) CallerID() string { return p.inner.CallerID() }
func (p *persistedJob) Priority() int { return p.inner.Priority() }
func (p *persistedJob) Metadata() []byte {
if mp, ok := p.inner.(MetadataProvider); ok {
return mp.Metadata()
}
return nil
}
func (p *persistedJob) Run(ctx context.Context) error {
// Mark running.
if uerr := p.store.UpdateJobState(ctx, p.inner.ID(),
string(stateRunning), time.Now()); uerr != nil {
// Don't abort the run if the audit write fails — the
// inner work is what the caller asked for. Log and continue.
slog.Warn("lane persist: state=running update failed",
"job", p.inner.ID(), "error", uerr)
}
err := p.inner.Run(ctx)
terminal := stateFinished
if err != nil {
// Cancellation surfaced as ErrCancelled (queued cancel) is
// already written by PersistedLane.Cancel; if it bubbles up
// here that means Run was called and Run returned with the
// cancellation error — record as cancelled.
if errors.Is(err, ErrCancelled) {
terminal = stateCancelled
} else {
terminal = stateFailed
}
}
if uerr := p.store.UpdateJobState(ctx, p.inner.ID(),
string(terminal), time.Now()); uerr != nil {
slog.Warn("lane persist: terminal state update failed",
"job", p.inner.ID(), "state", terminal, "error", uerr)
}
return err
}
// Internal copies of the QueueJobState string constants. Why duplicate
// them here vs importing skills: pkg/lane is generic and cannot
// import skills (would create a cycle). Production callers wire the
// PersistedLane via an adapter that satisfies PersistenceStore —
// the strings are part of the contract.
const (
stateRunning = "running"
stateFinished = "finished"
stateCancelled = "cancelled"
stateFailed = "failed"
)
// Sweeper periodically purges finished/cancelled/failed rows older
// than the configured retention window.
//
// Why a separate goroutine struct vs reusing
// pkg/logic/skills.StorageSweeper: the queue rows are owned by the
// lane primitive; keeping the sweeper in pkg/lane lets future lane
// users (LLM transport, GPU lanes) share it without pulling in skills
// concerns.
//
// Test: persistence_test.go drives Sweep synchronously.
type Sweeper struct {
store PersistenceStore
clock func() time.Time
interval time.Duration
// retention is computed at Sweep call time so a runtime convar
// change takes effect without restart.
retention func() time.Duration
}
// NewSweeper constructs the sweeper. retention may be nil → defaults
// to 24h. clock may be nil → time.Now.
func NewSweeper(store PersistenceStore, retention func() time.Duration, clock func() time.Time) *Sweeper {
if clock == nil {
clock = time.Now
}
if retention == nil {
retention = func() time.Duration { return 24 * time.Hour }
}
return &Sweeper{
store: store,
clock: clock,
retention: retention,
interval: time.Hour,
}
}
// SetInterval overrides the loop cadence. interval <= 0 is a no-op.
func (s *Sweeper) SetInterval(d time.Duration) {
if d > 0 {
s.interval = d
}
}
// Start launches the sweeper loop. Returns immediately; cancellation
// via ctx.
func (s *Sweeper) Start(ctx context.Context) {
go s.loop(ctx)
}
// Sweep runs one purge pass synchronously. Public for tests.
func (s *Sweeper) Sweep(ctx context.Context) {
cutoff := s.clock().Add(-s.retention())
n, err := s.store.PurgeFinishedJobs(ctx, cutoff)
if err != nil {
slog.Warn("lane sweeper: purge failed", "error", err)
return
}
if n > 0 {
slog.Info("lane sweeper: purged finished jobs", "deleted", n)
}
}
func (s *Sweeper) loop(ctx context.Context) {
tick := time.NewTicker(s.interval)
defer tick.Stop()
// Startup delay so cold-start load doesn't stack everything in
// the first second. 90s is a reasonable spread.
startup := time.NewTimer(90 * time.Second)
defer startup.Stop()
for {
select {
case <-ctx.Done():
return
case <-startup.C:
s.Sweep(ctx)
case <-tick.C:
s.Sweep(ctx)
}
}
}