Files
executus/lane/policy_fair_share_test.go
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

279 lines
7.5 KiB
Go

package lane
import (
"context"
"fmt"
"testing"
"time"
)
// fakeJob is a Job impl that records its ID; doesn't block. Used by
// policy tests that need to enumerate dequeue order.
type fakeJob struct {
id string
caller string
priority int
}
func (f *fakeJob) ID() string { return f.id }
func (f *fakeJob) CallerID() string { return f.caller }
func (f *fakeJob) Priority() int { return f.priority }
func (f *fakeJob) Run(ctx context.Context) error { return nil }
// enq is a test helper that enqueues a fakeJob with the given fields
// directly on a fairSharePolicy.
func enq(p queuePolicy, id, user string, priority int) *queuedJob {
qj := &queuedJob{
job: &fakeJob{id: id, caller: user, priority: priority},
enqueuedAt: time.Now(),
done: make(chan jobResult, 1),
}
p.Enqueue(qj)
return qj
}
// drainOrder returns the IDs in the order Dequeue produces them.
func drainOrder(p queuePolicy) []string {
var out []string
for {
j := p.Dequeue()
if j == nil {
return out
}
out = append(out, j.job.ID())
}
}
// TestFairShare_RoundRobinAcrossUsers covers the spec's headline
// guarantee: A submits 10, B submits 1, B's job runs after at most 1
// of A's.
func TestFairShare_RoundRobinAcrossUsers(t *testing.T) {
p := NewFairSharePolicy()
for i := 0; i < 10; i++ {
enq(p, fmt.Sprintf("a%d", i), "userA", 0)
}
enq(p, "b1", "userB", 0)
order := drainOrder(p)
// First two dequeues should be one A then b1 (or b1 then A,
// depending on rotation start). Either way, b1 must appear within
// the first two entries.
foundB := -1
for i, id := range order {
if id == "b1" {
foundB = i
break
}
}
if foundB == -1 {
t.Fatalf("b1 was never dequeued; order=%v", order)
}
if foundB > 1 {
t.Fatalf("b1 dequeued at position %d; expected 0 or 1; order=%v",
foundB, order)
}
if len(order) != 11 {
t.Fatalf("expected 11 dequeues, got %d (%v)", len(order), order)
}
}
// TestFairShare_PriorityWithinUser covers per-user priority ordering.
// Within one user, priority 5 > 1 > 0, FIFO ties.
func TestFairShare_PriorityWithinUser(t *testing.T) {
p := NewFairSharePolicy()
enq(p, "lo1", "u1", 0)
enq(p, "hi", "u1", 5)
enq(p, "mid", "u1", 1)
enq(p, "lo2", "u1", 0)
order := drainOrder(p)
if got := order[0]; got != "hi" {
t.Fatalf("expected hi first, got %v", order)
}
if got := order[1]; got != "mid" {
t.Fatalf("expected mid second, got %v", order)
}
// lo1 was enqueued before lo2 — FIFO preserves order.
if order[2] != "lo1" || order[3] != "lo2" {
t.Fatalf("expected lo1 then lo2 (FIFO ties), got %v", order)
}
}
// TestFairShare_PrioritySortStable covers a regression-prone case:
// when an existing job at priority N is in the queue, a new job at
// priority N appended afterward must come AFTER (FIFO ties), not
// before.
func TestFairShare_PrioritySortStable(t *testing.T) {
p := NewFairSharePolicy()
enq(p, "a", "u1", 1)
enq(p, "b", "u1", 1)
enq(p, "c", "u1", 1)
order := drainOrder(p)
want := []string{"a", "b", "c"}
for i, id := range want {
if order[i] != id {
t.Fatalf("expected FIFO order %v, got %v", want, order)
}
}
}
// TestFairShare_CancelRemovesFromSubQueue verifies Cancel removes a
// queued job and rotation continues correctly.
func TestFairShare_CancelRemovesFromSubQueue(t *testing.T) {
p := NewFairSharePolicy()
a := enq(p, "a1", "userA", 0)
enq(p, "b1", "userB", 0)
enq(p, "a2", "userA", 0)
if !p.Cancel("a1") {
t.Fatal("expected Cancel(a1) to return true")
}
// Verify a's done channel got cancelled signal.
select {
case res := <-a.done:
if res.err != ErrCancelled {
t.Fatalf("expected ErrCancelled, got %v", res.err)
}
default:
t.Fatal("expected a1.done to have a cancellation signal")
}
if p.Len() != 2 {
t.Fatalf("expected len=2 after cancel, got %d", p.Len())
}
// Drain — should be one of (b1, a2) or (a2, b1).
order := drainOrder(p)
if len(order) != 2 {
t.Fatalf("expected 2 dequeues, got %v", order)
}
}
// TestFairShare_CancelLastInUserRemovesFromRotation verifies that
// cancelling the last queued job in a user's sub-queue removes the
// user from the rotation (no empty-user spinning on next Dequeue).
func TestFairShare_CancelLastInUserRemovesFromRotation(t *testing.T) {
p := NewFairSharePolicy().(*fairSharePolicy)
enq(p, "a1", "userA", 0)
enq(p, "b1", "userB", 0)
if !p.Cancel("a1") {
t.Fatal("cancel a1 failed")
}
if _, ok := p.perUser["userA"]; ok {
t.Fatal("userA should have been removed from perUser map")
}
for _, u := range p.users {
if u == "userA" {
t.Fatal("userA should have been removed from rotation")
}
}
}
// TestFairShare_OldestEnqueueTime verifies the earliest enqueue time
// across all sub-queues is reported.
func TestFairShare_OldestEnqueueTime(t *testing.T) {
p := NewFairSharePolicy()
t1 := time.Now().Add(-10 * time.Second)
t2 := time.Now().Add(-5 * time.Second)
p.Enqueue(&queuedJob{
job: &fakeJob{id: "a", caller: "uA"},
enqueuedAt: t1,
done: make(chan jobResult, 1),
})
p.Enqueue(&queuedJob{
job: &fakeJob{id: "b", caller: "uB"},
enqueuedAt: t2,
done: make(chan jobResult, 1),
})
got := p.OldestEnqueueTime()
if got == nil {
t.Fatal("expected non-nil oldest")
}
if !got.Equal(t1) {
t.Fatalf("expected %v, got %v", t1, *got)
}
}
// TestFairShare_EmptyDequeue verifies Dequeue returns nil on empty
// queue.
func TestFairShare_EmptyDequeue(t *testing.T) {
p := NewFairSharePolicy()
if j := p.Dequeue(); j != nil {
t.Fatalf("expected nil dequeue, got %v", j)
}
}
// TestFairShare_LaneIntegration verifies NewWithFairShare wires a
// fair-share lane that respects the same scheduling guarantees.
//
// Two users, A submits 4, B submits 1 — with maxConcurrent=1, B's
// job must dispatch within the first two queued positions (after at
// most one of A's jobs).
//
// We capture dispatch order by recording the run order via a shared
// channel; each Run sends its id then waits for release.
func TestFairShare_LaneIntegration(t *testing.T) {
lane := NewWithFairShare("test", 1)
// Block dispatch with a single running job so subsequent submits
// queue.
blocker := newTestJob("blocker")
blocker.caller = "blocker-user"
if _, _, err := lane.Submit(context.Background(), blocker); err != nil {
t.Fatal(err)
}
<-blocker.started
startOrder := make(chan string, 5)
mkJob := func(id, caller string) *funcJob {
return &funcJob{
id: id, caller: caller,
run: func(ctx context.Context) error {
startOrder <- id
return nil
},
}
}
for _, id := range []string{"a1", "a2", "a3", "a4"} {
if _, _, err := lane.Submit(context.Background(), mkJob(id, "userA")); err != nil {
t.Fatal(err)
}
}
if _, _, err := lane.Submit(context.Background(), mkJob("b1", "userB")); err != nil {
t.Fatal(err)
}
// Release blocker; queued jobs dispatch one at a time as each
// previous one finishes (Run returns immediately after sending
// to startOrder).
close(blocker.release)
var observed []string
deadline := time.After(2 * time.Second)
for i := 0; i < 5; i++ {
select {
case id := <-startOrder:
observed = append(observed, id)
case <-deadline:
t.Fatalf("never observed all dispatches; got %v", observed)
}
}
// b1 must run at position 0 or 1 (after at most one A).
foundB := -1
for i, id := range observed {
if id == "b1" {
foundB = i
break
}
}
if foundB == -1 {
t.Fatalf("b1 was never dispatched; order=%v", observed)
}
if foundB > 1 {
t.Fatalf("b1 ran at position %d among %v; expected 0 or 1",
foundB, observed)
}
}