Files
executus/lane/persistence_test.go
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

381 lines
10 KiB
Go

package lane
import (
"context"
"errors"
"sync"
"testing"
"time"
)
// fakeStore is an in-memory PersistenceStore used by persistence
// tests. Records every method call so tests can assert ordering.
type fakeStore struct {
mu sync.Mutex
rows map[string]*storeRow
enqErr error
updErr error
purgeFn func(time.Time) (int64, error)
}
type storeRow struct {
jobID, lane, callerID string
priority int
metadata []byte
state string
enqueuedAt time.Time
startedAt *time.Time
finishedAt *time.Time
}
func newFakeStore() *fakeStore { return &fakeStore{rows: map[string]*storeRow{}} }
func (f *fakeStore) EnqueueJob(_ context.Context, jobID, lane, callerID string, priority int, metadata []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.enqErr != nil {
return f.enqErr
}
if _, exists := f.rows[jobID]; exists {
return errors.New("duplicate enqueue")
}
f.rows[jobID] = &storeRow{
jobID: jobID, lane: lane, callerID: callerID,
priority: priority, metadata: metadata,
state: "queued", enqueuedAt: time.Now(),
}
return nil
}
func (f *fakeStore) UpdateJobState(_ context.Context, jobID, state string, at time.Time) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.updErr != nil {
return f.updErr
}
r, ok := f.rows[jobID]
if !ok {
return errors.New("not found")
}
r.state = state
t := at
switch state {
case "running":
r.startedAt = &t
case "finished", "cancelled", "failed":
r.finishedAt = &t
}
return nil
}
func (f *fakeStore) ListQueuedJobs(_ context.Context, lane string) ([]QueuedJobRef, error) {
return f.list(lane, "queued"), nil
}
func (f *fakeStore) ListRunningJobs(_ context.Context, lane string) ([]QueuedJobRef, error) {
return f.list(lane, "running"), nil
}
func (f *fakeStore) list(lane, state string) []QueuedJobRef {
f.mu.Lock()
defer f.mu.Unlock()
var out []QueuedJobRef
for _, r := range f.rows {
if r.lane == lane && r.state == state {
out = append(out, QueuedJobRef{
JobID: r.jobID, Lane: r.lane,
CallerID: r.callerID, Priority: r.priority,
Metadata: r.metadata, EnqueuedAt: r.enqueuedAt,
})
}
}
return out
}
func (f *fakeStore) PurgeFinishedJobs(_ context.Context, olderThan time.Time) (int64, error) {
if f.purgeFn != nil {
return f.purgeFn(olderThan)
}
f.mu.Lock()
defer f.mu.Unlock()
var deleted int64
for id, r := range f.rows {
if (r.state == "finished" || r.state == "cancelled" || r.state == "failed") &&
r.finishedAt != nil && r.finishedAt.Before(olderThan) {
delete(f.rows, id)
deleted++
}
}
return deleted, nil
}
func (f *fakeStore) state(jobID string) string {
f.mu.Lock()
defer f.mu.Unlock()
if r, ok := f.rows[jobID]; ok {
return r.state
}
return ""
}
// metaJob is a Job impl that exposes Metadata. Used in persistence
// tests that assert metadata round-trip.
type metaJob struct {
id, caller string
priority int
meta []byte
run func(ctx context.Context) error
}
func (m *metaJob) ID() string { return m.id }
func (m *metaJob) CallerID() string { return m.caller }
func (m *metaJob) Priority() int { return m.priority }
func (m *metaJob) Metadata() []byte { return m.meta }
func (m *metaJob) Run(ctx context.Context) error { return m.run(ctx) }
// TestPersistedLane_Submit_WritesRow verifies Submit writes a queued
// row with the right fields, then on Run completes transitions to
// finished.
func TestPersistedLane_Submit_WritesRow(t *testing.T) {
store := newFakeStore()
inner := New("ollama", 1)
pl := NewPersistedLane(inner, store)
done := make(chan struct{})
job := &metaJob{
id: "j1", caller: "alice", priority: 3,
meta: []byte(`{"prompt":"hi"}`),
run: func(ctx context.Context) error {
close(done)
return nil
},
}
if _, _, err := pl.Submit(context.Background(), job); err != nil {
t.Fatal(err)
}
<-done
// Wait for state-update goroutine to land "finished".
waitFor(t, func() bool { return store.state("j1") == "finished" })
r := store.rows["j1"]
if r.lane != "ollama" || r.callerID != "alice" || r.priority != 3 {
t.Fatalf("row identity mismatch: %+v", r)
}
if string(r.metadata) != `{"prompt":"hi"}` {
t.Fatalf("metadata mismatch: %s", r.metadata)
}
if r.startedAt == nil || r.finishedAt == nil {
t.Fatalf("expected started_at + finished_at set; row=%+v", r)
}
}
// TestPersistedLane_Submit_RunErrorMarksFailed verifies a failing Run
// transitions to state=failed.
func TestPersistedLane_Submit_RunErrorMarksFailed(t *testing.T) {
store := newFakeStore()
inner := New("test", 1)
pl := NewPersistedLane(inner, store)
job := &metaJob{
id: "j1", caller: "alice",
run: func(ctx context.Context) error {
return errors.New("boom")
},
}
if err := pl.SubmitWait(context.Background(), job); err == nil {
t.Fatal("expected error from Run")
}
if got := store.state("j1"); got != "failed" {
t.Fatalf("expected state=failed, got %s", got)
}
}
// TestPersistedLane_EnqueueErrorAborts verifies that if EnqueueJob
// errors, the inner lane never sees the job.
func TestPersistedLane_EnqueueErrorAborts(t *testing.T) {
store := newFakeStore()
store.enqErr = errors.New("disk full")
inner := New("test", 1).(*pool)
pl := NewPersistedLane(inner, store)
job := &funcJob{
id: "j1", caller: "alice",
run: func(ctx context.Context) error {
t.Fatal("Run should not be called when persist enqueue fails")
return nil
},
}
_, _, err := pl.Submit(context.Background(), job)
if err == nil {
t.Fatal("expected Submit to fail")
}
// Inner lane should be empty.
if got := inner.Stats().Running + inner.Stats().Queued; got != 0 {
t.Fatalf("expected inner lane empty, got running+queued=%d", got)
}
}
// TestPersistedLane_Cancel_QueuedWritesCancelled verifies cancelling a
// queued job writes state=cancelled.
func TestPersistedLane_Cancel_QueuedWritesCancelled(t *testing.T) {
store := newFakeStore()
inner := New("test", 1).(*pool)
pl := NewPersistedLane(inner, store)
blocker := newTestJob("blocker")
if _, _, err := pl.Submit(context.Background(), blocker); err != nil {
t.Fatal(err)
}
<-blocker.started
target := newTestJob("target")
if _, _, err := pl.Submit(context.Background(), target); err != nil {
t.Fatal(err)
}
if got := inner.Stats().Queued; got != 1 {
t.Fatalf("expected queued=1, got %d", got)
}
if err := pl.Cancel("target"); err != nil {
t.Fatal(err)
}
waitFor(t, func() bool { return store.state("target") == "cancelled" })
close(blocker.release)
}
// TestRecover_RunningMarkedFailed verifies that running rows at
// recovery time are marked failed (lost-on-restart).
func TestRecover_RunningMarkedFailed(t *testing.T) {
store := newFakeStore()
now := time.Now()
store.rows["r1"] = &storeRow{
jobID: "r1", lane: "ollama", callerID: "alice",
state: "running", enqueuedAt: now.Add(-1 * time.Hour),
startedAt: &now,
}
inner := New("ollama", 1)
pl := NewPersistedLane(inner, store)
// reconstructFn never called for running rows.
if err := pl.Recover(context.Background(), nil); err != nil {
t.Fatal(err)
}
if got := store.state("r1"); got != "failed" {
t.Fatalf("expected r1 → failed, got %s", got)
}
}
// TestRecover_QueuedReSubmitted verifies queued rows are re-submitted
// to the inner lane via reconstructFn.
func TestRecover_QueuedReSubmitted(t *testing.T) {
store := newFakeStore()
store.rows["q1"] = &storeRow{
jobID: "q1", lane: "ollama", callerID: "alice",
state: "queued", enqueuedAt: time.Now(),
metadata: []byte("opaque"),
}
store.rows["q2"] = &storeRow{
jobID: "q2", lane: "ollama", callerID: "bob",
state: "queued", enqueuedAt: time.Now(),
}
inner := New("ollama", 2)
pl := NewPersistedLane(inner, store)
calls := make(chan string, 2)
reconstruct := func(ref QueuedJobRef) Job {
return &funcJob{
id: ref.JobID, caller: ref.CallerID,
run: func(ctx context.Context) error {
calls <- ref.JobID
return nil
},
}
}
if err := pl.Recover(context.Background(), reconstruct); err != nil {
t.Fatal(err)
}
got := map[string]bool{}
for i := 0; i < 2; i++ {
select {
case id := <-calls:
got[id] = true
case <-time.After(time.Second):
t.Fatalf("expected 2 reconstructed runs; only got %v", got)
}
}
if !got["q1"] || !got["q2"] {
t.Fatalf("expected both q1 and q2 reconstructed; got %v", got)
}
// After Run completes, both rows should be state=finished.
waitFor(t, func() bool {
return store.state("q1") == "finished" && store.state("q2") == "finished"
})
}
// TestRecover_NilReconstructMarksFailed verifies that when
// reconstructFn returns nil for a queued row, the row is marked
// failed.
func TestRecover_NilReconstructMarksFailed(t *testing.T) {
store := newFakeStore()
store.rows["q1"] = &storeRow{
jobID: "q1", lane: "ollama", callerID: "alice",
state: "queued", enqueuedAt: time.Now(),
}
inner := New("ollama", 1)
pl := NewPersistedLane(inner, store)
if err := pl.Recover(context.Background(), func(QueuedJobRef) Job { return nil }); err != nil {
t.Fatal(err)
}
if got := store.state("q1"); got != "failed" {
t.Fatalf("expected q1 → failed, got %s", got)
}
}
// TestSweeper_PurgesFinishedRows verifies Sweep calls
// PurgeFinishedJobs with the right cutoff.
func TestSweeper_PurgesFinishedRows(t *testing.T) {
store := newFakeStore()
old := time.Now().Add(-25 * time.Hour)
finished := time.Now()
store.rows["old"] = &storeRow{
jobID: "old", lane: "x", state: "finished",
enqueuedAt: old, finishedAt: &old,
}
store.rows["recent"] = &storeRow{
jobID: "recent", lane: "x", state: "finished",
enqueuedAt: finished, finishedAt: &finished,
}
sw := NewSweeper(store, func() time.Duration { return 24 * time.Hour }, nil)
sw.Sweep(context.Background())
if _, ok := store.rows["old"]; ok {
t.Fatal("old row should have been purged")
}
if _, ok := store.rows["recent"]; !ok {
t.Fatal("recent row should remain")
}
}
// TestSweeper_RetentionIsDynamic verifies the retention function is
// called per Sweep, so a runtime convar change takes effect.
func TestSweeper_RetentionIsDynamic(t *testing.T) {
store := newFakeStore()
called := 0
retention := func() time.Duration {
called++
return time.Hour
}
sw := NewSweeper(store, retention, nil)
sw.Sweep(context.Background())
sw.Sweep(context.Background())
if called != 2 {
t.Fatalf("expected retention called twice, got %d", called)
}
}