ca243a2d50
executus CI / test (push) Failing after 24s
Batteries-included agent-harness base, extracted from mort's agent layer. This first cut establishes the module + the zero-coupling core primitives: - lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort - config: host config Source seam + env-var default (nil-safe helpers) - deliver: output-egress seam + Discard/Stdout defaults - identity: AdminPolicy + MemberResolver seams (nil-safe) - fanout: programmatic N×M swarm (bounded global + per-key concurrency) - README/CLAUDE.md with the vibe-coded banner; CI with Go gates + the "core stays majordomo+stdlib only" invariant Core builds with stdlib only today; majordomo enters at P1 (model/structured). go build/vet/test -race all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
381 lines
10 KiB
Go
381 lines
10 KiB
Go
package lane
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// fakeStore is an in-memory PersistenceStore used by persistence
|
|
// tests. Records every method call so tests can assert ordering.
|
|
type fakeStore struct {
|
|
mu sync.Mutex
|
|
rows map[string]*storeRow
|
|
enqErr error
|
|
updErr error
|
|
purgeFn func(time.Time) (int64, error)
|
|
}
|
|
|
|
type storeRow struct {
|
|
jobID, lane, callerID string
|
|
priority int
|
|
metadata []byte
|
|
state string
|
|
enqueuedAt time.Time
|
|
startedAt *time.Time
|
|
finishedAt *time.Time
|
|
}
|
|
|
|
func newFakeStore() *fakeStore { return &fakeStore{rows: map[string]*storeRow{}} }
|
|
|
|
func (f *fakeStore) EnqueueJob(_ context.Context, jobID, lane, callerID string, priority int, metadata []byte) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if f.enqErr != nil {
|
|
return f.enqErr
|
|
}
|
|
if _, exists := f.rows[jobID]; exists {
|
|
return errors.New("duplicate enqueue")
|
|
}
|
|
f.rows[jobID] = &storeRow{
|
|
jobID: jobID, lane: lane, callerID: callerID,
|
|
priority: priority, metadata: metadata,
|
|
state: "queued", enqueuedAt: time.Now(),
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeStore) UpdateJobState(_ context.Context, jobID, state string, at time.Time) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if f.updErr != nil {
|
|
return f.updErr
|
|
}
|
|
r, ok := f.rows[jobID]
|
|
if !ok {
|
|
return errors.New("not found")
|
|
}
|
|
r.state = state
|
|
t := at
|
|
switch state {
|
|
case "running":
|
|
r.startedAt = &t
|
|
case "finished", "cancelled", "failed":
|
|
r.finishedAt = &t
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeStore) ListQueuedJobs(_ context.Context, lane string) ([]QueuedJobRef, error) {
|
|
return f.list(lane, "queued"), nil
|
|
}
|
|
|
|
func (f *fakeStore) ListRunningJobs(_ context.Context, lane string) ([]QueuedJobRef, error) {
|
|
return f.list(lane, "running"), nil
|
|
}
|
|
|
|
func (f *fakeStore) list(lane, state string) []QueuedJobRef {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
var out []QueuedJobRef
|
|
for _, r := range f.rows {
|
|
if r.lane == lane && r.state == state {
|
|
out = append(out, QueuedJobRef{
|
|
JobID: r.jobID, Lane: r.lane,
|
|
CallerID: r.callerID, Priority: r.priority,
|
|
Metadata: r.metadata, EnqueuedAt: r.enqueuedAt,
|
|
})
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (f *fakeStore) PurgeFinishedJobs(_ context.Context, olderThan time.Time) (int64, error) {
|
|
if f.purgeFn != nil {
|
|
return f.purgeFn(olderThan)
|
|
}
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
var deleted int64
|
|
for id, r := range f.rows {
|
|
if (r.state == "finished" || r.state == "cancelled" || r.state == "failed") &&
|
|
r.finishedAt != nil && r.finishedAt.Before(olderThan) {
|
|
delete(f.rows, id)
|
|
deleted++
|
|
}
|
|
}
|
|
return deleted, nil
|
|
}
|
|
|
|
func (f *fakeStore) state(jobID string) string {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if r, ok := f.rows[jobID]; ok {
|
|
return r.state
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// metaJob is a Job impl that exposes Metadata. Used in persistence
|
|
// tests that assert metadata round-trip.
|
|
type metaJob struct {
|
|
id, caller string
|
|
priority int
|
|
meta []byte
|
|
run func(ctx context.Context) error
|
|
}
|
|
|
|
func (m *metaJob) ID() string { return m.id }
|
|
func (m *metaJob) CallerID() string { return m.caller }
|
|
func (m *metaJob) Priority() int { return m.priority }
|
|
func (m *metaJob) Metadata() []byte { return m.meta }
|
|
func (m *metaJob) Run(ctx context.Context) error { return m.run(ctx) }
|
|
|
|
// TestPersistedLane_Submit_WritesRow verifies Submit writes a queued
|
|
// row with the right fields, then on Run completes transitions to
|
|
// finished.
|
|
func TestPersistedLane_Submit_WritesRow(t *testing.T) {
|
|
store := newFakeStore()
|
|
inner := New("ollama", 1)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
done := make(chan struct{})
|
|
job := &metaJob{
|
|
id: "j1", caller: "alice", priority: 3,
|
|
meta: []byte(`{"prompt":"hi"}`),
|
|
run: func(ctx context.Context) error {
|
|
close(done)
|
|
return nil
|
|
},
|
|
}
|
|
if _, _, err := pl.Submit(context.Background(), job); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
<-done
|
|
|
|
// Wait for state-update goroutine to land "finished".
|
|
waitFor(t, func() bool { return store.state("j1") == "finished" })
|
|
|
|
r := store.rows["j1"]
|
|
if r.lane != "ollama" || r.callerID != "alice" || r.priority != 3 {
|
|
t.Fatalf("row identity mismatch: %+v", r)
|
|
}
|
|
if string(r.metadata) != `{"prompt":"hi"}` {
|
|
t.Fatalf("metadata mismatch: %s", r.metadata)
|
|
}
|
|
if r.startedAt == nil || r.finishedAt == nil {
|
|
t.Fatalf("expected started_at + finished_at set; row=%+v", r)
|
|
}
|
|
}
|
|
|
|
// TestPersistedLane_Submit_RunErrorMarksFailed verifies a failing Run
|
|
// transitions to state=failed.
|
|
func TestPersistedLane_Submit_RunErrorMarksFailed(t *testing.T) {
|
|
store := newFakeStore()
|
|
inner := New("test", 1)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
job := &metaJob{
|
|
id: "j1", caller: "alice",
|
|
run: func(ctx context.Context) error {
|
|
return errors.New("boom")
|
|
},
|
|
}
|
|
if err := pl.SubmitWait(context.Background(), job); err == nil {
|
|
t.Fatal("expected error from Run")
|
|
}
|
|
if got := store.state("j1"); got != "failed" {
|
|
t.Fatalf("expected state=failed, got %s", got)
|
|
}
|
|
}
|
|
|
|
// TestPersistedLane_EnqueueErrorAborts verifies that if EnqueueJob
|
|
// errors, the inner lane never sees the job.
|
|
func TestPersistedLane_EnqueueErrorAborts(t *testing.T) {
|
|
store := newFakeStore()
|
|
store.enqErr = errors.New("disk full")
|
|
inner := New("test", 1).(*pool)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
job := &funcJob{
|
|
id: "j1", caller: "alice",
|
|
run: func(ctx context.Context) error {
|
|
t.Fatal("Run should not be called when persist enqueue fails")
|
|
return nil
|
|
},
|
|
}
|
|
_, _, err := pl.Submit(context.Background(), job)
|
|
if err == nil {
|
|
t.Fatal("expected Submit to fail")
|
|
}
|
|
// Inner lane should be empty.
|
|
if got := inner.Stats().Running + inner.Stats().Queued; got != 0 {
|
|
t.Fatalf("expected inner lane empty, got running+queued=%d", got)
|
|
}
|
|
}
|
|
|
|
// TestPersistedLane_Cancel_QueuedWritesCancelled verifies cancelling a
|
|
// queued job writes state=cancelled.
|
|
func TestPersistedLane_Cancel_QueuedWritesCancelled(t *testing.T) {
|
|
store := newFakeStore()
|
|
inner := New("test", 1).(*pool)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
blocker := newTestJob("blocker")
|
|
if _, _, err := pl.Submit(context.Background(), blocker); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
<-blocker.started
|
|
|
|
target := newTestJob("target")
|
|
if _, _, err := pl.Submit(context.Background(), target); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if got := inner.Stats().Queued; got != 1 {
|
|
t.Fatalf("expected queued=1, got %d", got)
|
|
}
|
|
|
|
if err := pl.Cancel("target"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
waitFor(t, func() bool { return store.state("target") == "cancelled" })
|
|
|
|
close(blocker.release)
|
|
}
|
|
|
|
// TestRecover_RunningMarkedFailed verifies that running rows at
|
|
// recovery time are marked failed (lost-on-restart).
|
|
func TestRecover_RunningMarkedFailed(t *testing.T) {
|
|
store := newFakeStore()
|
|
now := time.Now()
|
|
store.rows["r1"] = &storeRow{
|
|
jobID: "r1", lane: "ollama", callerID: "alice",
|
|
state: "running", enqueuedAt: now.Add(-1 * time.Hour),
|
|
startedAt: &now,
|
|
}
|
|
|
|
inner := New("ollama", 1)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
// reconstructFn never called for running rows.
|
|
if err := pl.Recover(context.Background(), nil); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if got := store.state("r1"); got != "failed" {
|
|
t.Fatalf("expected r1 → failed, got %s", got)
|
|
}
|
|
}
|
|
|
|
// TestRecover_QueuedReSubmitted verifies queued rows are re-submitted
|
|
// to the inner lane via reconstructFn.
|
|
func TestRecover_QueuedReSubmitted(t *testing.T) {
|
|
store := newFakeStore()
|
|
store.rows["q1"] = &storeRow{
|
|
jobID: "q1", lane: "ollama", callerID: "alice",
|
|
state: "queued", enqueuedAt: time.Now(),
|
|
metadata: []byte("opaque"),
|
|
}
|
|
store.rows["q2"] = &storeRow{
|
|
jobID: "q2", lane: "ollama", callerID: "bob",
|
|
state: "queued", enqueuedAt: time.Now(),
|
|
}
|
|
|
|
inner := New("ollama", 2)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
calls := make(chan string, 2)
|
|
reconstruct := func(ref QueuedJobRef) Job {
|
|
return &funcJob{
|
|
id: ref.JobID, caller: ref.CallerID,
|
|
run: func(ctx context.Context) error {
|
|
calls <- ref.JobID
|
|
return nil
|
|
},
|
|
}
|
|
}
|
|
if err := pl.Recover(context.Background(), reconstruct); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
got := map[string]bool{}
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case id := <-calls:
|
|
got[id] = true
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("expected 2 reconstructed runs; only got %v", got)
|
|
}
|
|
}
|
|
if !got["q1"] || !got["q2"] {
|
|
t.Fatalf("expected both q1 and q2 reconstructed; got %v", got)
|
|
}
|
|
|
|
// After Run completes, both rows should be state=finished.
|
|
waitFor(t, func() bool {
|
|
return store.state("q1") == "finished" && store.state("q2") == "finished"
|
|
})
|
|
}
|
|
|
|
// TestRecover_NilReconstructMarksFailed verifies that when
|
|
// reconstructFn returns nil for a queued row, the row is marked
|
|
// failed.
|
|
func TestRecover_NilReconstructMarksFailed(t *testing.T) {
|
|
store := newFakeStore()
|
|
store.rows["q1"] = &storeRow{
|
|
jobID: "q1", lane: "ollama", callerID: "alice",
|
|
state: "queued", enqueuedAt: time.Now(),
|
|
}
|
|
inner := New("ollama", 1)
|
|
pl := NewPersistedLane(inner, store)
|
|
|
|
if err := pl.Recover(context.Background(), func(QueuedJobRef) Job { return nil }); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if got := store.state("q1"); got != "failed" {
|
|
t.Fatalf("expected q1 → failed, got %s", got)
|
|
}
|
|
}
|
|
|
|
// TestSweeper_PurgesFinishedRows verifies Sweep calls
|
|
// PurgeFinishedJobs with the right cutoff.
|
|
func TestSweeper_PurgesFinishedRows(t *testing.T) {
|
|
store := newFakeStore()
|
|
old := time.Now().Add(-25 * time.Hour)
|
|
finished := time.Now()
|
|
store.rows["old"] = &storeRow{
|
|
jobID: "old", lane: "x", state: "finished",
|
|
enqueuedAt: old, finishedAt: &old,
|
|
}
|
|
store.rows["recent"] = &storeRow{
|
|
jobID: "recent", lane: "x", state: "finished",
|
|
enqueuedAt: finished, finishedAt: &finished,
|
|
}
|
|
sw := NewSweeper(store, func() time.Duration { return 24 * time.Hour }, nil)
|
|
sw.Sweep(context.Background())
|
|
if _, ok := store.rows["old"]; ok {
|
|
t.Fatal("old row should have been purged")
|
|
}
|
|
if _, ok := store.rows["recent"]; !ok {
|
|
t.Fatal("recent row should remain")
|
|
}
|
|
}
|
|
|
|
// TestSweeper_RetentionIsDynamic verifies the retention function is
|
|
// called per Sweep, so a runtime convar change takes effect.
|
|
func TestSweeper_RetentionIsDynamic(t *testing.T) {
|
|
store := newFakeStore()
|
|
called := 0
|
|
retention := func() time.Duration {
|
|
called++
|
|
return time.Hour
|
|
}
|
|
sw := NewSweeper(store, retention, nil)
|
|
sw.Sweep(context.Background())
|
|
sw.Sweep(context.Background())
|
|
if called != 2 {
|
|
t.Fatalf("expected retention called twice, got %d", called)
|
|
}
|
|
}
|