ca243a2d50
executus CI / test (push) Failing after 24s
Batteries-included agent-harness base, extracted from mort's agent layer. This first cut establishes the module + the zero-coupling core primitives: - lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort - config: host config Source seam + env-var default (nil-safe helpers) - deliver: output-egress seam + Discard/Stdout defaults - identity: AdminPolicy + MemberResolver seams (nil-safe) - fanout: programmatic N×M swarm (bounded global + per-key concurrency) - README/CLAUDE.md with the vibe-coded banner; CI with Go gates + the "core stays majordomo+stdlib only" invariant Core builds with stdlib only today; majordomo enters at P1 (model/structured). go build/vet/test -race all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
203 lines
5.3 KiB
Go
203 lines
5.3 KiB
Go
package lane
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// fakeConvars is a ConvarReader fake backed by a map.
|
|
type fakeConvars struct {
|
|
mu sync.Mutex
|
|
vals map[string]int
|
|
}
|
|
|
|
func newFakeConvars() *fakeConvars { return &fakeConvars{vals: map[string]int{}} }
|
|
|
|
func (f *fakeConvars) set(name string, v int) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.vals[name] = v
|
|
}
|
|
|
|
func (f *fakeConvars) Int(_ context.Context, name string, def int) int {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if v, ok := f.vals[name]; ok {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
|
|
// TestRegistry_GetOrCreate verifies GetOrCreate creates the lane on
|
|
// first call and returns the same instance on subsequent calls.
|
|
func TestRegistry_GetOrCreate(t *testing.T) {
|
|
r := NewRegistry(nil)
|
|
l1 := r.GetOrCreate(context.Background(), "ollama")
|
|
l2 := r.GetOrCreate(context.Background(), "ollama")
|
|
if l1 != l2 {
|
|
t.Fatalf("expected same lane instance on second GetOrCreate")
|
|
}
|
|
if got := l1.Name(); got != "ollama" {
|
|
t.Fatalf("expected name=ollama, got %s", got)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_ConvarConcurrency verifies the convar value drives the
|
|
// lane's MaxConcurrent at creation time.
|
|
func TestRegistry_ConvarConcurrency(t *testing.T) {
|
|
c := newFakeConvars()
|
|
c.set("lanes.ollama.max_concurrent", 3)
|
|
r := NewRegistry(c)
|
|
l := r.GetOrCreate(context.Background(), "ollama")
|
|
if got := l.Stats().MaxConcurrent; got != 3 {
|
|
t.Fatalf("expected MaxConcurrent=3, got %d", got)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_DefaultConcurrencyOne verifies that a missing convar
|
|
// falls back to 1.
|
|
func TestRegistry_DefaultConcurrencyOne(t *testing.T) {
|
|
r := NewRegistry(nil)
|
|
l := r.GetOrCreate(context.Background(), "default")
|
|
if got := l.Stats().MaxConcurrent; got != 1 {
|
|
t.Fatalf("expected default MaxConcurrent=1, got %d", got)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_NegativeConvarClamped verifies that a negative or zero
|
|
// convar value is clamped to 1.
|
|
func TestRegistry_NegativeConvarClamped(t *testing.T) {
|
|
c := newFakeConvars()
|
|
c.set("lanes.bad.max_concurrent", -5)
|
|
r := NewRegistry(c)
|
|
l := r.GetOrCreate(context.Background(), "bad")
|
|
if got := l.Stats().MaxConcurrent; got != 1 {
|
|
t.Fatalf("expected clamped to 1, got %d", got)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_Reload picks up convar changes for existing lanes.
|
|
func TestRegistry_Reload(t *testing.T) {
|
|
c := newFakeConvars()
|
|
c.set("lanes.x.max_concurrent", 2)
|
|
r := NewRegistry(c)
|
|
l := r.GetOrCreate(context.Background(), "x")
|
|
if got := l.Stats().MaxConcurrent; got != 2 {
|
|
t.Fatalf("expected 2 at create, got %d", got)
|
|
}
|
|
|
|
c.set("lanes.x.max_concurrent", 5)
|
|
r.Reload(context.Background())
|
|
|
|
if got := l.Stats().MaxConcurrent; got != 5 {
|
|
t.Fatalf("expected 5 after Reload, got %d", got)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_List returns all created lanes.
|
|
func TestRegistry_List(t *testing.T) {
|
|
r := NewRegistry(nil)
|
|
r.GetOrCreate(context.Background(), "a")
|
|
r.GetOrCreate(context.Background(), "b")
|
|
r.GetOrCreate(context.Background(), "c")
|
|
if got := len(r.List()); got != 3 {
|
|
t.Fatalf("expected 3 lanes, got %d", got)
|
|
}
|
|
names := r.Names()
|
|
if len(names) != 3 {
|
|
t.Fatalf("expected 3 names, got %v", names)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_Get returns nil for missing lane (no implicit create).
|
|
func TestRegistry_Get(t *testing.T) {
|
|
r := NewRegistry(nil)
|
|
if got := r.Get("nope"); got != nil {
|
|
t.Fatalf("expected nil for missing lane, got %v", got)
|
|
}
|
|
r.GetOrCreate(context.Background(), "yes")
|
|
if got := r.Get("yes"); got == nil {
|
|
t.Fatalf("expected non-nil for existing lane")
|
|
}
|
|
}
|
|
|
|
// TestRegistry_PolicyFactoryDefault verifies the default factory
|
|
// produces fair-share lanes (round-robins across users).
|
|
func TestRegistry_PolicyFactoryDefault(t *testing.T) {
|
|
c := newFakeConvars()
|
|
c.set("lanes.fair.max_concurrent", 1)
|
|
r := NewRegistry(c)
|
|
lane := r.GetOrCreate(context.Background(), "fair")
|
|
|
|
// Block lane with one job so subsequent submits queue.
|
|
blocker := newTestJob("blocker")
|
|
blocker.caller = "blocker-user"
|
|
if _, _, err := lane.Submit(context.Background(), blocker); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
<-blocker.started
|
|
|
|
startOrder := make(chan string, 3)
|
|
mkJob := func(id, caller string) *funcJob {
|
|
return &funcJob{
|
|
id: id, caller: caller,
|
|
run: func(ctx context.Context) error {
|
|
startOrder <- id
|
|
return nil
|
|
},
|
|
}
|
|
}
|
|
if _, _, err := lane.Submit(context.Background(), mkJob("a1", "userA")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, _, err := lane.Submit(context.Background(), mkJob("a2", "userA")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, _, err := lane.Submit(context.Background(), mkJob("b1", "userB")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if got := lane.Stats().Queued; got != 3 {
|
|
t.Fatalf("expected queued=3, got %d", got)
|
|
}
|
|
close(blocker.release)
|
|
|
|
var order []string
|
|
for i := 0; i < 3; i++ {
|
|
select {
|
|
case id := <-startOrder:
|
|
order = append(order, id)
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("did not observe all dispatches; got %v", order)
|
|
}
|
|
}
|
|
// b1 must run at position 0 or 1 (after at most one A).
|
|
pos := -1
|
|
for i, id := range order {
|
|
if id == "b1" {
|
|
pos = i
|
|
break
|
|
}
|
|
}
|
|
if pos > 1 {
|
|
t.Fatalf("b1 ran at position %d among %v; expected 0 or 1",
|
|
pos, order)
|
|
}
|
|
}
|
|
|
|
// TestRegistry_SetPolicyFactory verifies tests can override the
|
|
// default factory.
|
|
func TestRegistry_SetPolicyFactory(t *testing.T) {
|
|
r := NewRegistry(nil)
|
|
called := false
|
|
r.SetPolicyFactory(func() queuePolicy {
|
|
called = true
|
|
return newFIFOPolicy()
|
|
})
|
|
r.GetOrCreate(context.Background(), "x")
|
|
if !called {
|
|
t.Fatal("custom policy factory was not called")
|
|
}
|
|
}
|