Files
executus/lane/pool_test.go
T
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

486 lines
12 KiB
Go

package lane
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
// testJob is a Job impl that signals when Run starts and blocks until
// release is closed. Used by tests to control dispatch ordering
// deterministically.
type testJob struct {
id string
caller string
priority int
started chan struct{}
release chan struct{}
err error
// runCount is incremented inside Run; tests assert "exactly once".
runCount int32
}
func newTestJob(id string) *testJob {
return &testJob{
id: id,
started: make(chan struct{}, 1),
release: make(chan struct{}),
}
}
func (t *testJob) ID() string { return t.id }
func (t *testJob) CallerID() string {
if t.caller == "" {
return "anon"
}
return t.caller
}
func (t *testJob) Priority() int { return t.priority }
func (t *testJob) Run(ctx context.Context) error {
atomic.AddInt32(&t.runCount, 1)
// Non-blocking send so a test that doesn't drain `started` does
// not deadlock.
select {
case t.started <- struct{}{}:
default:
}
<-t.release
return t.err
}
// TestPool_Submit_SlotAvailable verifies that Submit dispatches
// immediately when a slot is free.
func TestPool_Submit_SlotAvailable(t *testing.T) {
p := New("test", 1).(*pool)
job := newTestJob("j1")
pos, eta, err := p.Submit(context.Background(), job)
if err != nil {
t.Fatalf("submit err: %v", err)
}
if pos != 0 {
t.Fatalf("expected pos=0 (dispatched), got %d", pos)
}
if eta != 0 {
t.Fatalf("expected eta=0, got %v", eta)
}
// Wait for Run to start.
select {
case <-job.started:
case <-time.After(time.Second):
t.Fatalf("job did not start within 1s")
}
close(job.release)
// Drain completion.
waitForRunning(t, p, 0)
}
// TestPool_Submit_QueuedWhenFull verifies queue position reporting.
func TestPool_Submit_QueuedWhenFull(t *testing.T) {
p := New("test", 1).(*pool)
j1 := newTestJob("j1")
if _, _, err := p.Submit(context.Background(), j1); err != nil {
t.Fatal(err)
}
<-j1.started
j2 := newTestJob("j2")
pos, _, err := p.Submit(context.Background(), j2)
if err != nil {
t.Fatal(err)
}
if pos != 1 {
t.Fatalf("expected pos=1 for first queued, got %d", pos)
}
j3 := newTestJob("j3")
pos, _, err = p.Submit(context.Background(), j3)
if err != nil {
t.Fatal(err)
}
if pos != 2 {
t.Fatalf("expected pos=2 for second queued, got %d", pos)
}
stats := p.Stats()
if stats.Running != 1 || stats.Queued != 2 {
t.Fatalf("expected running=1 queued=2, got %+v", stats)
}
// Drain.
close(j1.release)
close(j2.release)
close(j3.release)
}
// TestPool_SubmitWait_Blocks verifies SubmitWait blocks until Run
// completes and returns Run's error.
func TestPool_SubmitWait_Blocks(t *testing.T) {
p := New("test", 2)
expected := errors.New("boom")
j := newTestJob("j1")
j.err = expected
var got error
done := make(chan struct{})
go func() {
got = p.SubmitWait(context.Background(), j)
close(done)
}()
<-j.started
close(j.release)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("SubmitWait did not return within 1s")
}
if !errors.Is(got, expected) {
t.Fatalf("expected %v, got %v", expected, got)
}
}
// TestPool_SubmitWait_CtxCancelledWhileQueued verifies that cancelling
// the ctx while queued returns ctx.Err and removes the job.
func TestPool_SubmitWait_CtxCancelledWhileQueued(t *testing.T) {
p := New("test", 1).(*pool)
blocker := newTestJob("blocker")
if _, _, err := p.Submit(context.Background(), blocker); err != nil {
t.Fatal(err)
}
<-blocker.started
ctx, cancel := context.WithCancel(context.Background())
target := newTestJob("target")
done := make(chan error, 1)
go func() {
done <- p.SubmitWait(ctx, target)
}()
// Wait until target is enqueued.
waitFor(t, func() bool { return p.Stats().Queued == 1 })
cancel()
select {
case err := <-done:
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
case <-time.After(time.Second):
t.Fatal("SubmitWait did not return after cancel")
}
// target.Run must never have been called.
if atomic.LoadInt32(&target.runCount) != 0 {
t.Fatalf("target.Run was called %d times, want 0",
target.runCount)
}
close(blocker.release)
}
// TestPool_Cancel_RemovesQueued verifies Cancel removes a queued job
// and that a subsequent SubmitWait observer would see ErrCancelled.
// Here we use Submit (fire-and-forget) so we just check that Cancel
// returns nil and the queue shrinks.
func TestPool_Cancel_RemovesQueued(t *testing.T) {
p := New("test", 1).(*pool)
blocker := newTestJob("blocker")
if _, _, err := p.Submit(context.Background(), blocker); err != nil {
t.Fatal(err)
}
<-blocker.started
target := newTestJob("target")
if _, _, err := p.Submit(context.Background(), target); err != nil {
t.Fatal(err)
}
if err := p.Cancel("target"); err != nil {
t.Fatalf("cancel: %v", err)
}
if p.Stats().Queued != 0 {
t.Fatalf("expected queued=0 after cancel, got %d",
p.Stats().Queued)
}
// Cancelling again or cancelling a missing job returns ErrNotQueued.
if err := p.Cancel("target"); !errors.Is(err, ErrNotQueued) {
t.Fatalf("expected ErrNotQueued, got %v", err)
}
close(blocker.release)
}
// TestPool_Cancel_PropagatesToSubmitWait verifies that cancelling a
// job whose caller is in SubmitWait returns ErrCancelled.
func TestPool_Cancel_PropagatesToSubmitWait(t *testing.T) {
p := New("test", 1).(*pool)
blocker := newTestJob("blocker")
if _, _, err := p.Submit(context.Background(), blocker); err != nil {
t.Fatal(err)
}
<-blocker.started
target := newTestJob("target")
done := make(chan error, 1)
go func() {
done <- p.SubmitWait(context.Background(), target)
}()
waitFor(t, func() bool { return p.Stats().Queued == 1 })
if err := p.Cancel("target"); err != nil {
t.Fatalf("cancel: %v", err)
}
select {
case err := <-done:
if !errors.Is(err, ErrCancelled) {
t.Fatalf("expected ErrCancelled, got %v", err)
}
case <-time.After(time.Second):
t.Fatal("SubmitWait did not return after cancel")
}
close(blocker.release)
}
// TestPool_Stats_Accurate covers Running + Queued + OldestQueuedAt.
func TestPool_Stats_Accurate(t *testing.T) {
p := New("test", 1).(*pool)
j1 := newTestJob("j1")
if _, _, err := p.Submit(context.Background(), j1); err != nil {
t.Fatal(err)
}
<-j1.started
beforeQueue := time.Now()
j2 := newTestJob("j2")
if _, _, err := p.Submit(context.Background(), j2); err != nil {
t.Fatal(err)
}
j3 := newTestJob("j3")
if _, _, err := p.Submit(context.Background(), j3); err != nil {
t.Fatal(err)
}
stats := p.Stats()
if stats.Running != 1 {
t.Errorf("running=%d, want 1", stats.Running)
}
if stats.Queued != 2 {
t.Errorf("queued=%d, want 2", stats.Queued)
}
if stats.OldestQueuedAt == nil {
t.Errorf("OldestQueuedAt is nil")
} else if stats.OldestQueuedAt.Before(beforeQueue.Add(-time.Second)) {
t.Errorf("OldestQueuedAt seems too old: %v vs %v",
*stats.OldestQueuedAt, beforeQueue)
}
close(j1.release)
close(j2.release)
close(j3.release)
}
// TestPool_Throughput1m: complete 5 jobs, throughput=5; sleep 1.1s
// would be slow — instead manipulate the completions slice directly.
// The test verifies the slice trimming logic.
func TestPool_Throughput1m(t *testing.T) {
p := New("test", 1).(*pool)
now := time.Now()
// Fill completions slice manually.
p.completions = []time.Time{
now.Add(-90 * time.Second),
now.Add(-30 * time.Second),
now.Add(-10 * time.Second),
now.Add(-1 * time.Second),
now,
}
stats := p.Stats()
if stats.Throughput1m != 4 {
t.Fatalf("expected 4 (only the last 4 are within 60s), got %d",
stats.Throughput1m)
}
}
// TestPool_SetMaxConcurrent verifies that raising the cap drains
// queued backlog onto the new slots.
func TestPool_SetMaxConcurrent(t *testing.T) {
p := New("test", 1).(*pool)
j1 := newTestJob("j1")
j2 := newTestJob("j2")
j3 := newTestJob("j3")
if _, _, err := p.Submit(context.Background(), j1); err != nil {
t.Fatal(err)
}
<-j1.started
if _, _, err := p.Submit(context.Background(), j2); err != nil {
t.Fatal(err)
}
if _, _, err := p.Submit(context.Background(), j3); err != nil {
t.Fatal(err)
}
if got := p.Stats().Queued; got != 2 {
t.Fatalf("expected queued=2, got %d", got)
}
// Raise cap to 3 — should drain both queued jobs immediately.
p.SetMaxConcurrent(3)
waitFor(t, func() bool { return p.Stats().Running == 3 })
if got := p.Stats().Queued; got != 0 {
t.Fatalf("expected queued=0 after raise, got %d", got)
}
close(j1.release)
close(j2.release)
close(j3.release)
waitForRunning(t, p, 0)
}
// TestPool_SetMaxConcurrent_NoOpZeroOrNegative verifies n<=0 is
// ignored.
func TestPool_SetMaxConcurrent_NoOpZeroOrNegative(t *testing.T) {
p := New("test", 2).(*pool)
p.SetMaxConcurrent(0)
if got := p.Stats().MaxConcurrent; got != 2 {
t.Fatalf("zero set should be no-op, got %d", got)
}
p.SetMaxConcurrent(-1)
if got := p.Stats().MaxConcurrent; got != 2 {
t.Fatalf("negative set should be no-op, got %d", got)
}
}
// TestPool_DispatchOnComplete verifies that finishing a running job
// pulls the next queued job onto the freed slot.
func TestPool_DispatchOnComplete(t *testing.T) {
p := New("test", 1).(*pool)
j1 := newTestJob("j1")
j2 := newTestJob("j2")
if _, _, err := p.Submit(context.Background(), j1); err != nil {
t.Fatal(err)
}
<-j1.started
if _, _, err := p.Submit(context.Background(), j2); err != nil {
t.Fatal(err)
}
if got := p.Stats().Queued; got != 1 {
t.Fatalf("expected queued=1, got %d", got)
}
// Release j1; j2 should auto-dispatch.
close(j1.release)
select {
case <-j2.started:
case <-time.After(time.Second):
t.Fatal("j2 did not dispatch after j1 finished")
}
if got := p.Stats().Queued; got != 0 {
t.Errorf("expected queued=0 after dispatch, got %d", got)
}
close(j2.release)
waitForRunning(t, p, 0)
}
// TestPool_ConcurrencyLimitRespected fires N jobs at a lane with
// maxConcurrent=2 and verifies at most 2 ever run simultaneously.
func TestPool_ConcurrencyLimitRespected(t *testing.T) {
p := New("test", 2)
const N = 8
var inflight int32
var maxObserved int32
done := make(chan struct{}, N)
for i := 0; i < N; i++ {
i := i
j := &funcJob{
id: fmt.Sprintf("j%d", i),
caller: "u1",
run: func(ctx context.Context) error {
cur := atomic.AddInt32(&inflight, 1)
for {
m := atomic.LoadInt32(&maxObserved)
if cur <= m || atomic.CompareAndSwapInt32(&maxObserved, m, cur) {
break
}
}
time.Sleep(20 * time.Millisecond)
atomic.AddInt32(&inflight, -1)
done <- struct{}{}
return nil
},
}
if _, _, err := p.Submit(context.Background(), j); err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
<-done
}
if max := atomic.LoadInt32(&maxObserved); max > 2 {
t.Fatalf("expected max in-flight <= 2, observed %d", max)
}
}
// funcJob is a Job impl driven by a closure. Used by tests that don't
// need the started/release plumbing.
type funcJob struct {
id string
caller string
priority int
run func(ctx context.Context) error
}
func (f *funcJob) ID() string { return f.id }
func (f *funcJob) CallerID() string { return f.caller }
func (f *funcJob) Priority() int { return f.priority }
func (f *funcJob) Run(ctx context.Context) error { return f.run(ctx) }
// waitForRunning waits up to 1s for stats.Running == n.
func waitForRunning(t *testing.T, p *pool, n int) {
t.Helper()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if p.Stats().Running == n {
return
}
time.Sleep(2 * time.Millisecond)
}
t.Fatalf("running != %d after 1s; have %d", n, p.Stats().Running)
}
// waitFor polls cond up to 1s.
func waitFor(t *testing.T, cond func() bool) {
t.Helper()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if cond() {
return
}
time.Sleep(2 * time.Millisecond)
}
t.Fatalf("condition did not become true within 1s")
}
// Verify funcJob compiles under the Job interface.
var _ Job = (*funcJob)(nil)
// silence unused import warning if reached during refactoring
var _ = sync.Mutex{}