package lane import ( "context" "fmt" "testing" "time" ) // fakeJob is a Job impl that records its ID; doesn't block. Used by // policy tests that need to enumerate dequeue order. type fakeJob struct { id string caller string priority int } func (f *fakeJob) ID() string { return f.id } func (f *fakeJob) CallerID() string { return f.caller } func (f *fakeJob) Priority() int { return f.priority } func (f *fakeJob) Run(ctx context.Context) error { return nil } // enq is a test helper that enqueues a fakeJob with the given fields // directly on a fairSharePolicy. func enq(p queuePolicy, id, user string, priority int) *queuedJob { qj := &queuedJob{ job: &fakeJob{id: id, caller: user, priority: priority}, enqueuedAt: time.Now(), done: make(chan jobResult, 1), } p.Enqueue(qj) return qj } // drainOrder returns the IDs in the order Dequeue produces them. func drainOrder(p queuePolicy) []string { var out []string for { j := p.Dequeue() if j == nil { return out } out = append(out, j.job.ID()) } } // TestFairShare_RoundRobinAcrossUsers covers the spec's headline // guarantee: A submits 10, B submits 1, B's job runs after at most 1 // of A's. func TestFairShare_RoundRobinAcrossUsers(t *testing.T) { p := NewFairSharePolicy() for i := 0; i < 10; i++ { enq(p, fmt.Sprintf("a%d", i), "userA", 0) } enq(p, "b1", "userB", 0) order := drainOrder(p) // First two dequeues should be one A then b1 (or b1 then A, // depending on rotation start). Either way, b1 must appear within // the first two entries. foundB := -1 for i, id := range order { if id == "b1" { foundB = i break } } if foundB == -1 { t.Fatalf("b1 was never dequeued; order=%v", order) } if foundB > 1 { t.Fatalf("b1 dequeued at position %d; expected 0 or 1; order=%v", foundB, order) } if len(order) != 11 { t.Fatalf("expected 11 dequeues, got %d (%v)", len(order), order) } } // TestFairShare_PriorityWithinUser covers per-user priority ordering. // Within one user, priority 5 > 1 > 0, FIFO ties. func TestFairShare_PriorityWithinUser(t *testing.T) { p := NewFairSharePolicy() enq(p, "lo1", "u1", 0) enq(p, "hi", "u1", 5) enq(p, "mid", "u1", 1) enq(p, "lo2", "u1", 0) order := drainOrder(p) if got := order[0]; got != "hi" { t.Fatalf("expected hi first, got %v", order) } if got := order[1]; got != "mid" { t.Fatalf("expected mid second, got %v", order) } // lo1 was enqueued before lo2 — FIFO preserves order. if order[2] != "lo1" || order[3] != "lo2" { t.Fatalf("expected lo1 then lo2 (FIFO ties), got %v", order) } } // TestFairShare_PrioritySortStable covers a regression-prone case: // when an existing job at priority N is in the queue, a new job at // priority N appended afterward must come AFTER (FIFO ties), not // before. func TestFairShare_PrioritySortStable(t *testing.T) { p := NewFairSharePolicy() enq(p, "a", "u1", 1) enq(p, "b", "u1", 1) enq(p, "c", "u1", 1) order := drainOrder(p) want := []string{"a", "b", "c"} for i, id := range want { if order[i] != id { t.Fatalf("expected FIFO order %v, got %v", want, order) } } } // TestFairShare_CancelRemovesFromSubQueue verifies Cancel removes a // queued job and rotation continues correctly. func TestFairShare_CancelRemovesFromSubQueue(t *testing.T) { p := NewFairSharePolicy() a := enq(p, "a1", "userA", 0) enq(p, "b1", "userB", 0) enq(p, "a2", "userA", 0) if !p.Cancel("a1") { t.Fatal("expected Cancel(a1) to return true") } // Verify a's done channel got cancelled signal. select { case res := <-a.done: if res.err != ErrCancelled { t.Fatalf("expected ErrCancelled, got %v", res.err) } default: t.Fatal("expected a1.done to have a cancellation signal") } if p.Len() != 2 { t.Fatalf("expected len=2 after cancel, got %d", p.Len()) } // Drain — should be one of (b1, a2) or (a2, b1). order := drainOrder(p) if len(order) != 2 { t.Fatalf("expected 2 dequeues, got %v", order) } } // TestFairShare_CancelLastInUserRemovesFromRotation verifies that // cancelling the last queued job in a user's sub-queue removes the // user from the rotation (no empty-user spinning on next Dequeue). func TestFairShare_CancelLastInUserRemovesFromRotation(t *testing.T) { p := NewFairSharePolicy().(*fairSharePolicy) enq(p, "a1", "userA", 0) enq(p, "b1", "userB", 0) if !p.Cancel("a1") { t.Fatal("cancel a1 failed") } if _, ok := p.perUser["userA"]; ok { t.Fatal("userA should have been removed from perUser map") } for _, u := range p.users { if u == "userA" { t.Fatal("userA should have been removed from rotation") } } } // TestFairShare_OldestEnqueueTime verifies the earliest enqueue time // across all sub-queues is reported. func TestFairShare_OldestEnqueueTime(t *testing.T) { p := NewFairSharePolicy() t1 := time.Now().Add(-10 * time.Second) t2 := time.Now().Add(-5 * time.Second) p.Enqueue(&queuedJob{ job: &fakeJob{id: "a", caller: "uA"}, enqueuedAt: t1, done: make(chan jobResult, 1), }) p.Enqueue(&queuedJob{ job: &fakeJob{id: "b", caller: "uB"}, enqueuedAt: t2, done: make(chan jobResult, 1), }) got := p.OldestEnqueueTime() if got == nil { t.Fatal("expected non-nil oldest") } if !got.Equal(t1) { t.Fatalf("expected %v, got %v", t1, *got) } } // TestFairShare_EmptyDequeue verifies Dequeue returns nil on empty // queue. func TestFairShare_EmptyDequeue(t *testing.T) { p := NewFairSharePolicy() if j := p.Dequeue(); j != nil { t.Fatalf("expected nil dequeue, got %v", j) } } // TestFairShare_LaneIntegration verifies NewWithFairShare wires a // fair-share lane that respects the same scheduling guarantees. // // Two users, A submits 4, B submits 1 — with maxConcurrent=1, B's // job must dispatch within the first two queued positions (after at // most one of A's jobs). // // We capture dispatch order by recording the run order via a shared // channel; each Run sends its id then waits for release. func TestFairShare_LaneIntegration(t *testing.T) { lane := NewWithFairShare("test", 1) // Block dispatch with a single running job so subsequent submits // queue. blocker := newTestJob("blocker") blocker.caller = "blocker-user" if _, _, err := lane.Submit(context.Background(), blocker); err != nil { t.Fatal(err) } <-blocker.started startOrder := make(chan string, 5) mkJob := func(id, caller string) *funcJob { return &funcJob{ id: id, caller: caller, run: func(ctx context.Context) error { startOrder <- id return nil }, } } for _, id := range []string{"a1", "a2", "a3", "a4"} { if _, _, err := lane.Submit(context.Background(), mkJob(id, "userA")); err != nil { t.Fatal(err) } } if _, _, err := lane.Submit(context.Background(), mkJob("b1", "userB")); err != nil { t.Fatal(err) } // Release blocker; queued jobs dispatch one at a time as each // previous one finishes (Run returns immediately after sending // to startOrder). close(blocker.release) var observed []string deadline := time.After(2 * time.Second) for i := 0; i < 5; i++ { select { case id := <-startOrder: observed = append(observed, id) case <-deadline: t.Fatalf("never observed all dispatches; got %v", observed) } } // b1 must run at position 0 or 1 (after at most one A). foundB := -1 for i, id := range observed { if id == "b1" { foundB = i break } } if foundB == -1 { t.Fatalf("b1 was never dispatched; order=%v", observed) } if foundB > 1 { t.Fatalf("b1 ran at position %d among %v; expected 0 or 1", foundB, observed) } }