Implement new scheduler (#823)

- introduce internal/router/scheduler to decouple routing, swapping and
queuing into interface contracts.
- introduce a new `routing` configuration section that supersedes
`matrix` and `group` while maintaining backwards compatibility
- add FIFO scheduler with prioritized queuing 
- add internal/router/design.md as developer documentation on
implementing new schedulers and routers

Fixes #797
This commit is contained in:
Benson Wong
2026-06-10 20:34:25 -07:00
committed by GitHub
parent 0cfe5a6639
commit 9b3a33d7b9
26 changed files with 2398 additions and 1330 deletions
+15 -614
View File
@@ -5,35 +5,34 @@ import (
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
)
// stubPlanner is a swapPlanner that returns a fixed eviction list per target
// and never logs. It lets the base-router tests cover shared run-loop
// behaviour without dragging in either real router's eviction rules.
type stubPlanner struct {
evict map[string][]string
}
// These tests cover baseRouter's own machinery — the run loop, process
// lifecycle (doSwap), grant/ServeHTTP plumbing, Unload, and Shutdown. The
// scheduling decision logic (queueing, collation, eviction collisions) lives in
// the scheduler package and is tested directly there; see fifo_test.go.
func (s *stubPlanner) EvictionFor(target string, _ []string) []string {
if s.evict == nil {
return nil
}
return s.evict[target]
}
// stubPlanner evicts nothing. baseRouter tests drive the run loop through the
// default FIFO scheduler without exercising any particular eviction policy.
type stubPlanner struct{}
func (s *stubPlanner) OnSwapStart(string) {}
func (s *stubPlanner) EvictionFor(string, []string) []string { return nil }
func (s *stubPlanner) OnSwapStart(string, []string) {}
func newTestBase(t *testing.T, processes map[string]process.Process, planner swapPlanner) *baseRouter {
func newTestBase(t *testing.T, processes map[string]process.Process, planner scheduler.Swapper) *baseRouter {
t.Helper()
conf := config.Config{HealthCheckTimeout: 5}
b := newBaseRouter("test", conf, processes, planner, logmon.NewWriter(io.Discard))
b := newBaseRouter("test", conf, processes, logmon.NewWriter(io.Discard),
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, logger, planner, conf.Routing.Scheduler.Settings.Fifo, eff)
})
b.testProcessed = make(chan struct{}, 64)
go b.run()
t.Cleanup(func() {
@@ -157,114 +156,6 @@ func TestBaseRouter_Unload_StopsInParallel(t *testing.T) {
}
}
// TestBaseRouter_Unload_ReleasesActiveSwapWaiters verifies that Unload
// rejoins router state: a request whose swap to the unloaded model is
// still in progress receives an error, instead of being abandoned
// against a process that's about to vanish.
func TestBaseRouter_Unload_ReleasesActiveSwapWaiters(t *testing.T) {
a := newFakeProcess("a")
// autoReady=false: the swap parks on WaitReady so we can interrupt
// it with Unload before it completes.
b := newTestBase(t, map[string]process.Process{"a": a}, &stubPlanner{})
w := httptest.NewRecorder()
done := make(chan struct{})
go func() {
b.ServeHTTP(w, newRequest("a"))
close(done)
}()
waitProcessed(t, b.testProcessed, 1) // handlerReq absorbed; swap started
<-a.runStarted
b.Unload(time.Second, "a")
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("ServeHTTP did not return after Unload")
}
if w.Code == http.StatusOK {
t.Errorf("expected non-OK status after Unload, got %d body=%q", w.Code, w.Body.String())
}
if a.State() != process.StateStopped {
t.Errorf("a state=%q want stopped", a.State())
}
}
// TestBaseRouter_Unload_DropsQueuedRequests verifies that queued requests
// for an unloaded model receive an error rather than sitting forever in
// the queue against state the router no longer maintains.
func TestBaseRouter_Unload_DropsQueuedRequests(t *testing.T) {
a := newFakeProcess("a")
pb := newFakeProcess("b")
// Loading B evicts A — so a request for B while A is loading queues.
planner := &stubPlanner{evict: map[string][]string{"b": {"a"}}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb}, planner)
// r1 starts the swap to A and parks on WaitReady (autoReady=false).
w1 := httptest.NewRecorder()
done1 := make(chan struct{})
go func() {
b.ServeHTTP(w1, newRequest("a"))
close(done1)
}()
waitProcessed(t, b.testProcessed, 1)
<-a.runStarted
// r2 for B collides with A's in-flight swap and queues.
w2 := httptest.NewRecorder()
done2 := make(chan struct{})
go func() {
b.ServeHTTP(w2, newRequest("b"))
close(done2)
}()
waitProcessed(t, b.testProcessed, 1)
// Unload B — r2 (queued, targeting B) must be released with an error.
b.Unload(time.Second, "b")
select {
case <-done2:
case <-time.After(2 * time.Second):
t.Fatal("queued B request did not return after Unload(b)")
}
if w2.Code == http.StatusOK {
t.Errorf("queued B request: expected non-OK status, got %d", w2.Code)
}
if got := pb.runCalls.Load(); got != 0 {
t.Errorf("b.runCalls=%d want 0 (B should never have been started)", got)
}
// Release r1 so the test cleans up cleanly.
a.markReady()
select {
case <-done1:
case <-time.After(2 * time.Second):
t.Fatal("r1 did not complete after a.markReady")
}
}
func TestBaseRouter_FastPath(t *testing.T) {
a := newFakeProcess("a")
a.markReady()
b := newTestBase(t, map[string]process.Process{"a": a}, &stubPlanner{})
w := httptest.NewRecorder()
b.ServeHTTP(w, newRequest("a"))
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%q", w.Code, w.Body.String())
}
if got := a.serveCalls.Load(); got != 1 {
t.Errorf("serveCalls=%d want 1", got)
}
if got := a.runCalls.Load(); got != 0 {
t.Errorf("runCalls=%d want 0 (fast path should not start)", got)
}
}
func TestBaseRouter_OnDemandStart(t *testing.T) {
a := newFakeProcess("a")
a.autoReady = true
@@ -285,43 +176,6 @@ func TestBaseRouter_OnDemandStart(t *testing.T) {
}
}
func TestBaseRouter_ConcurrentSameModel(t *testing.T) {
a := newFakeProcess("a")
// autoReady=false so the swap parks on WaitReady until we release it.
b := newTestBase(t, map[string]process.Process{"a": a}, &stubPlanner{})
const N = 5
var wg sync.WaitGroup
codes := make([]int, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
w := httptest.NewRecorder()
b.ServeHTTP(w, newRequest("a"))
codes[i] = w.Code
}(i)
}
waitProcessed(t, b.testProcessed, N) // all N handlerReqs absorbed by run()
<-a.runStarted // swap goroutine reached Run()
a.markReady()
wg.Wait()
for i, c := range codes {
if c != http.StatusOK {
t.Errorf("request %d: status=%d", i, c)
}
}
if got := a.runCalls.Load(); got != 1 {
t.Errorf("runCalls=%d want 1 (single swap should issue one Run)", got)
}
if got := a.serveCalls.Load(); got != N {
t.Errorf("serveCalls=%d want %d", got, N)
}
}
func TestBaseRouter_ContextCancel(t *testing.T) {
a := newFakeProcess("a")
// autoReady=false so swap parks forever until we mark ready.
@@ -364,459 +218,6 @@ func TestBaseRouter_ContextCancel(t *testing.T) {
}
}
func TestBaseRouter_QueuedDifferentModel(t *testing.T) {
a := newFakeProcess("a")
pa := newFakeProcess("b")
// Loading b must stop a.
planner := &stubPlanner{evict: map[string][]string{"b": {"a"}}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pa}, planner)
// First request starts a swap to A; A's autoReady=false so it parks.
w1 := httptest.NewRecorder()
done1 := make(chan struct{})
go func() {
b.ServeHTTP(w1, newRequest("a"))
close(done1)
}()
waitProcessed(t, b.testProcessed, 1)
<-a.runStarted
// Second request for B should queue while A's swap is in flight.
w2 := httptest.NewRecorder()
done2 := make(chan struct{})
go func() {
b.ServeHTTP(w2, newRequest("b"))
close(done2)
}()
waitProcessed(t, b.testProcessed, 1)
if got := pa.runCalls.Load(); got != 0 {
t.Errorf("b started early: runCalls=%d want 0 while A's swap is pending", got)
}
// Release A's swap. B's swap should then run.
a.markReady()
waitProcessed(t, b.testProcessed, 1) // swapDone for A → B's swap kicked off
<-pa.runStarted
select {
case <-done1:
case <-time.After(time.Second):
t.Fatal("A request did not complete")
}
pa.markReady()
select {
case <-done2:
case <-time.After(time.Second):
t.Fatal("queued B request did not complete after A's swap")
}
if w2.Code != http.StatusOK {
t.Errorf("B status=%d body=%q", w2.Code, w2.Body.String())
}
if got := a.stopCalls.Load(); got != 1 {
t.Errorf("a.stopCalls=%d want 1 (B's swap must stop A)", got)
}
}
// TestBaseRouter_QueueCollation verifies that incoming requests of the form
// a, b, c, a, b, c collapse into three swaps (one per model) and that the
// second request for each model rides the fast path — either joining the
// active swap, or being pulled out of the queue when handleSwapDone promotes
// the next model.
func TestBaseRouter_QueueCollation(t *testing.T) {
a := newFakeProcess("a")
pb := newFakeProcess("b")
pc := newFakeProcess("c")
// Each model evicts the other two so all swaps are mutually exclusive.
planner := &stubPlanner{evict: map[string][]string{
"a": {"b", "c"},
"b": {"a", "c"},
"c": {"a", "b"},
}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb, "c": pc}, planner)
var (
completedMu sync.Mutex
completed []string
)
record := func(id string) {
completedMu.Lock()
defer completedMu.Unlock()
completed = append(completed, id)
}
ids := []string{"a", "b", "c", "a", "b", "c"}
var wg sync.WaitGroup
for _, id := range ids {
id := id
wg.Add(1)
go func() {
defer wg.Done()
w := httptest.NewRecorder()
b.ServeHTTP(w, newRequest(id))
if w.Code != http.StatusOK {
t.Errorf("%s: status=%d body=%q", id, w.Code, w.Body.String())
return
}
record(id)
}()
// Wait for run() to absorb this request before launching the next,
// so handlerCh receives them in launch order.
waitProcessed(t, b.testProcessed, 1)
}
// All 6 are now parked in run()'s waiters/queue. Release each swap in
// sequence, waiting deterministically for each promotion to fire.
<-a.runStarted
a.markReady()
waitProcessed(t, b.testProcessed, 1) // swapDone(a) → b swap kicked off
<-pb.runStarted
pb.markReady()
waitProcessed(t, b.testProcessed, 1) // swapDone(b) → c swap kicked off
<-pc.runStarted
pc.markReady()
wg.Wait()
if got := len(completed); got != 6 {
t.Fatalf("completed=%v want 6", completed)
}
// run() fans out responses in model-grouped order (a1,a2 → b1,b2 → c1,c2)
// but waiter goroutines may be scheduled in any order after their respond
// channel fires, so completion order isn't deterministic. Per-model counts
// (combined with the runCalls checks below) are sufficient to prove queue
// collation collapsed each pair into a single swap.
aDone, bDone, cDone := 0, 0, 0
for _, id := range completed {
switch id {
case "a":
aDone++
case "b":
bDone++
case "c":
cDone++
}
}
if aDone != 2 || bDone != 2 || cDone != 2 {
t.Errorf("per-model counts: a=%d b=%d c=%d, want 2 each (order=%v)", aDone, bDone, cDone, completed)
}
// Single swap per model — the second request for each must have ridden
// the fast path (joined active swap or joined a queued sibling), not
// triggered an extra Run.
if got := a.runCalls.Load(); got != 1 {
t.Errorf("a.runCalls=%d want 1", got)
}
if got := pb.runCalls.Load(); got != 1 {
t.Errorf("b.runCalls=%d want 1", got)
}
if got := pc.runCalls.Load(); got != 1 {
t.Errorf("c.runCalls=%d want 1", got)
}
}
// TestBaseRouter_ConcurrentDisjointSwaps verifies that two requests with
// non-conflicting evict sets are loaded in parallel: both Run() calls happen
// before either process is marked ready.
func TestBaseRouter_ConcurrentDisjointSwaps(t *testing.T) {
a := newFakeProcess("a")
pb := newFakeProcess("b")
// Empty evict sets for both: they can load in parallel.
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb}, &stubPlanner{})
w1 := httptest.NewRecorder()
done1 := make(chan struct{})
go func() {
b.ServeHTTP(w1, newRequest("a"))
close(done1)
}()
waitProcessed(t, b.testProcessed, 1)
w2 := httptest.NewRecorder()
done2 := make(chan struct{})
go func() {
b.ServeHTTP(w2, newRequest("b"))
close(done2)
}()
waitProcessed(t, b.testProcessed, 1)
// Both swaps must have reached Run() before either is marked ready —
// proves they ran in parallel rather than serializing.
<-a.runStarted
<-pb.runStarted
a.markReady()
pb.markReady()
select {
case <-done1:
case <-time.After(time.Second):
t.Fatal("request A did not complete")
}
select {
case <-done2:
case <-time.After(time.Second):
t.Fatal("request B did not complete")
}
if w1.Code != http.StatusOK {
t.Errorf("A status=%d body=%q", w1.Code, w1.Body.String())
}
if w2.Code != http.StatusOK {
t.Errorf("B status=%d body=%q", w2.Code, w2.Body.String())
}
if got := a.stopCalls.Load(); got != 0 {
t.Errorf("a.stopCalls=%d want 0 (parallel swap, no eviction)", got)
}
if got := pb.stopCalls.Load(); got != 0 {
t.Errorf("b.stopCalls=%d want 0 (parallel swap, no eviction)", got)
}
}
// TestBaseRouter_QueueDrainPromotesMultiple verifies that completing one swap
// unblocks every queued request that no longer collides — they all start in
// parallel rather than one-per-completion.
func TestBaseRouter_QueueDrainPromotesMultiple(t *testing.T) {
a := newFakeProcess("a")
pb := newFakeProcess("b")
pc := newFakeProcess("c")
// A's swap evicts both B and C, so B and C must queue. Once A finishes
// B and C themselves have empty evict sets, so they can start together.
planner := &stubPlanner{evict: map[string][]string{
"a": {"b", "c"},
}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb, "c": pc}, planner)
w1 := httptest.NewRecorder()
done1 := make(chan struct{})
go func() {
b.ServeHTTP(w1, newRequest("a"))
close(done1)
}()
waitProcessed(t, b.testProcessed, 1)
<-a.runStarted
// B and C arrive while A is loading. evict_b and evict_c are empty,
// but collidesWith returns true because they appear in A's evict set.
w2 := httptest.NewRecorder()
done2 := make(chan struct{})
go func() {
b.ServeHTTP(w2, newRequest("b"))
close(done2)
}()
waitProcessed(t, b.testProcessed, 1)
w3 := httptest.NewRecorder()
done3 := make(chan struct{})
go func() {
b.ServeHTTP(w3, newRequest("c"))
close(done3)
}()
waitProcessed(t, b.testProcessed, 1)
if got := pb.runCalls.Load(); got != 0 {
t.Errorf("b started early: runCalls=%d", got)
}
if got := pc.runCalls.Load(); got != 0 {
t.Errorf("c started early: runCalls=%d", got)
}
// Release A. The swapDone handler should drain the queue and start
// both B and C in parallel.
a.markReady()
waitProcessed(t, b.testProcessed, 1) // swapDone(A) → drainQueue starts B and C
<-pb.runStarted
<-pc.runStarted
pb.markReady()
pc.markReady()
for i, ch := range []chan struct{}{done1, done2, done3} {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("request %d did not complete", i)
}
}
}
// TestBaseRouter_Shutdown_FailsAllInFlight verifies that shutdown returns
// the shutdown error to every waiter on every active swap AND to every
// queued request.
func TestBaseRouter_Shutdown_FailsAllInFlight(t *testing.T) {
a := newFakeProcess("a")
pb := newFakeProcess("b")
pc := newFakeProcess("c")
// a and b load in parallel (empty evicts). c collides with both.
planner := &stubPlanner{evict: map[string][]string{
"c": {"a", "b"},
}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb, "c": pc}, planner)
const waitersPer = 2
var wg sync.WaitGroup
codes := make([]int, 0, 2*waitersPer+1)
var codesMu sync.Mutex
record := func(code int) {
codesMu.Lock()
codes = append(codes, code)
codesMu.Unlock()
}
launch := func(model string) {
wg.Add(1)
go func() {
defer wg.Done()
w := httptest.NewRecorder()
b.ServeHTTP(w, newRequest(model))
record(w.Code)
}()
}
// Active swaps for a and b, each with 2 waiters.
for i := 0; i < waitersPer; i++ {
launch("a")
waitProcessed(t, b.testProcessed, 1)
}
for i := 0; i < waitersPer; i++ {
launch("b")
waitProcessed(t, b.testProcessed, 1)
}
// c collides with both → queues.
launch("c")
waitProcessed(t, b.testProcessed, 1)
<-a.runStarted
<-pb.runStarted
if err := b.Shutdown(time.Second); err != nil {
t.Fatalf("Shutdown: %v", err)
}
wg.Wait()
codesMu.Lock()
defer codesMu.Unlock()
if len(codes) != 2*waitersPer+1 {
t.Fatalf("got %d responses, want %d", len(codes), 2*waitersPer+1)
}
for i, c := range codes {
if c == http.StatusOK {
t.Errorf("response %d: status=%d, want non-200 (shutdown)", i, c)
}
}
}
// TestBaseRouter_NoSwapWhileServing verifies that an already-loaded model
// is not stopped to satisfy another model's swap while it is still handling
// a request.
//
// Sequence:
// 1. r1 (A) — A loads; ServeHTTP enters and is pinned via serveBlock.
// 2. r2 (B, planner: B evicts A) — must NOT cause A.Stop while r1 is live.
// 3. r3 (A) — arrives next; the existing code queues it because B's swap
// intent collides with A.
// 4. r1 released — A finishes r1, then r3 is served by A.
// 5. B's swap then proceeds; r2 is served by B.
//
// fakeProcess.stoppedWhileServing flips true if Stop is ever called while
// a ServeHTTP is in flight — a direct, race-free signal of the violation.
func TestBaseRouter_NoSwapWhileServing(t *testing.T) {
a := newFakeProcess("a")
// autoReady left false: we markReady manually after observing runStarted,
// so autoReady's setState(Ready) cannot race with a later Stop and leave
// A in Ready, masking the bug.
a.serveBlock = make(chan struct{})
pb := newFakeProcess("b")
// Same reasoning for B: park its swap on WaitReady until we choose.
planner := &stubPlanner{evict: map[string][]string{"b": {"a"}}}
b := newTestBase(t, map[string]process.Process{"a": a, "b": pb}, planner)
// r1 — load A and enter its ServeHTTP (which blocks on serveBlock).
w1 := httptest.NewRecorder()
done1 := make(chan struct{})
go func() {
b.ServeHTTP(w1, newRequest("a"))
close(done1)
}()
waitProcessed(t, b.testProcessed, 1) // handlerReq for r1
<-a.runStarted
a.markReady()
waitProcessed(t, b.testProcessed, 1) // swapDone for A
<-a.serveStarted
// r2 — would evict A. A must not be stopped while r1 is in flight.
w2 := httptest.NewRecorder()
done2 := make(chan struct{})
go func() {
b.ServeHTTP(w2, newRequest("b"))
close(done2)
}()
waitProcessed(t, b.testProcessed, 1)
// r3 — another request for A, arrives behind r2 and queues because
// B's swap intent (which evicts A) is recorded as active.
w3 := httptest.NewRecorder()
done3 := make(chan struct{})
go func() {
b.ServeHTTP(w3, newRequest("a"))
close(done3)
}()
waitProcessed(t, b.testProcessed, 1)
// Release r1 (and r3 if it is fast-pathed onto the still-loaded A).
// The router must hold off B's swap until A has drained.
close(a.serveBlock)
select {
case <-done1:
case <-time.After(2 * time.Second):
t.Fatal("r1 did not complete after serveBlock release")
}
// Wait for B.Run before marking it ready: markReady before Run would
// skip the Run path entirely and leave pb.runCalls at 0. In a correct
// implementation B's swap only starts after A has drained; in the
// current implementation it has already started — either way runStarted
// fires.
<-pb.runStarted
pb.markReady()
select {
case <-done2:
case <-time.After(2 * time.Second):
t.Fatal("r2 did not complete after B marked ready")
}
select {
case <-done3:
case <-time.After(2 * time.Second):
t.Fatal("r3 did not complete")
}
if w1.Code != http.StatusOK || w2.Code != http.StatusOK || w3.Code != http.StatusOK {
t.Fatalf("statuses: w1=%d w2=%d w3=%d", w1.Code, w2.Code, w3.Code)
}
if w1.Body.String() != "ok:a" {
t.Errorf("r1 body=%q want ok:a", w1.Body.String())
}
if w3.Body.String() != "ok:a" {
t.Errorf("r3 body=%q want ok:a (r3 must be served by A)", w3.Body.String())
}
if w2.Body.String() != "ok:b" {
t.Errorf("r2 body=%q want ok:b", w2.Body.String())
}
if a.stoppedWhileServing.Load() {
t.Errorf("A.Stop was called while A was still handling a request — the router swapped out a busy process")
}
}
func TestBaseRouter_ModelNotFound(t *testing.T) {
a := newFakeProcess("a")
b := newTestBase(t, map[string]process.Process{"a": a}, &stubPlanner{})