diff --git a/internal/router/base.go b/internal/router/base.go index a14d2e09..4b34cba5 100644 --- a/internal/router/base.go +++ b/internal/router/base.go @@ -54,6 +54,7 @@ type baseRouter struct { procCancel context.CancelFunc handlerCh chan scheduler.HandlerReq + cancelCh chan scheduler.HandlerReq shutdownCh chan shutdownReq unloadCh chan unloadReq swapDoneCh chan scheduler.SwapDone @@ -88,6 +89,7 @@ func newBaseRouter( procCtx: procCtx, procCancel: procCancel, handlerCh: make(chan scheduler.HandlerReq), + cancelCh: make(chan scheduler.HandlerReq), shutdownCh: make(chan shutdownReq), unloadCh: make(chan unloadReq), swapDoneCh: make(chan scheduler.SwapDone), @@ -117,6 +119,10 @@ func (b *baseRouter) run() { b.schedule.OnRequest(req) b.notifyProcessed() + case req := <-b.cancelCh: + b.schedule.OnCancel(req) + b.notifyProcessed() + case req := <-b.unloadCh: b.schedule.OnUnload(req.targets, req.timeout) close(req.respond) @@ -473,6 +479,14 @@ func (b *baseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) { finishLoading() case <-req.Context().Done(): finishLoading() + // Notify the scheduler so it can prune this request from its queue + // and swap waiters. Without this, a queued request whose client left + // would sit in the scheduler until drainQueue eventually starts a + // wasted model load for it. + select { + case b.cancelCh <- hr: + case <-b.shutdownCtx.Done(): + } return case <-b.shutdownCtx.Done(): finishLoading() diff --git a/internal/router/scheduler/fifo.go b/internal/router/scheduler/fifo.go index d5159e7f..49e050b5 100644 --- a/internal/router/scheduler/fifo.go +++ b/internal/router/scheduler/fifo.go @@ -116,6 +116,46 @@ func (s *FIFO) OnRequest(req HandlerReq) { s.startSwap(req, evict, running) } +// OnCancel removes a request whose client has disconnected from the queue and +// from every in-flight swap's waiters. If the request was the sole waiter of an +// active swap, the swap goroutine is left to complete on its own — OnSwapDone +// will find no waiters and simply clean up. This prevents drainQueue from ever +// starting a model load for a caller that is no longer there. +func (s *FIFO) OnCancel(req HandlerReq) { + removed := false + + // Prune from the queue. + if len(s.queued) > 0 { + kept := s.queued[:0] + for _, q := range s.queued { + if q.Respond == req.Respond { + removed = true + continue + } + kept = append(kept, q) + } + s.queued = kept + } + + // Prune from any active swap's waiters. + for _, sw := range s.active { + filtered := sw.waiters[:0] + for _, w := range sw.waiters { + if w.Respond == req.Respond { + removed = true + continue + } + filtered = append(filtered, w) + } + sw.waiters = filtered + } + + if removed { + s.logger.Debugf("%s: cancelled request for model %s pruned from scheduler", s.name, req.Model) + broadcastQueuePositions(s.queued) + } +} + // OnSwapDone fans the result out to every waiter that joined this swap, removes // the swap from the active map, then walks the queue once, promoting any items // that no longer collide with the remaining active set. FIFO order is preserved: diff --git a/internal/router/scheduler/fifo_test.go b/internal/router/scheduler/fifo_test.go index 54f71987..f0a6e00d 100644 --- a/internal/router/scheduler/fifo_test.go +++ b/internal/router/scheduler/fifo_test.go @@ -143,6 +143,15 @@ func newFIFO(planner Swapper, eff Effects) *FIFO { func req(model string) HandlerReq { return HandlerReq{Model: model} } +// reqCh creates a HandlerReq with a unique Respond channel so OnCancel can +// identify it among queued requests and swap waiters. +func reqCh(model string) HandlerReq { + return HandlerReq{ + Model: model, + Respond: make(chan HandlerResp, 1), + } +} + func TestFIFO_FastPath(t *testing.T) { eff := newFakeEffects() eff.states["a"] = process.StateReady @@ -535,3 +544,90 @@ func TestFIFO_PriorityQueueOrder(t *testing.T) { } } } + +// TestFIFO_OnCancel_QueuedRequest verifies that cancelling a queued request +// prevents drainQueue from ever starting a model load for it. Without OnCancel +// the dead request would sit in the queue until a drain triggers a wasted swap. +func TestFIFO_OnCancel_QueuedRequest(t *testing.T) { + eff := newFakeEffects() + eff.states["a"] = process.StateStopped + eff.states["b"] = process.StateStopped + // b evicts a, so a request for b queues while a is loading. + s := newFIFO(&stubPlanner{evict: map[string][]string{"b": {"a"}}}, eff) + + s.OnRequest(req("a")) // StartSwap(a) + + cancelledReq := reqCh("b") + s.OnRequest(cancelledReq) // queued (collides with a's in-flight swap) + if len(s.queued) != 1 { + t.Fatalf("queue len=%d want 1 before cancel", len(s.queued)) + } + + // Client disconnects. + s.OnCancel(cancelledReq) + + if len(s.queued) != 0 { + t.Fatalf("queue len=%d want 0 after cancel", len(s.queued)) + } + + // a's swap finishes; drainQueue runs but b is gone — no swap for b. + eff.states["a"] = process.StateReady + s.OnSwapDone(SwapDone{ModelID: "a"}) + + if got := eff.startsFor("b"); got != 0 { + t.Errorf("StartSwap(b)=%d want 0 (cancelled request should not trigger a load)", got) + } +} + +// TestFIFO_OnCancel_SwapWaiter verifies that cancelling a request that joined an +// in-flight swap removes it from the waiter list. When the swap completes, the +// cancelled waiter receives no grant and does not bump the in-flight count. +func TestFIFO_OnCancel_SwapWaiter(t *testing.T) { + eff := newFakeEffects() + eff.states["a"] = process.StateStopped + s := newFIFO(&stubPlanner{}, eff) + + liveReq := reqCh("a") + cancelledReq := reqCh("a") + s.OnRequest(liveReq) // starts swap + s.OnRequest(cancelledReq) // joins + + if sw := s.active["a"]; len(sw.waiters) != 2 { + t.Fatalf("waiters=%d want 2", len(sw.waiters)) + } + + s.OnCancel(cancelledReq) + + if sw := s.active["a"]; len(sw.waiters) != 1 { + t.Fatalf("waiters=%d want 1 after cancel", len(sw.waiters)) + } + + // Swap finishes: only the live waiter is granted. + eff.states["a"] = process.StateReady + s.OnSwapDone(SwapDone{ModelID: "a"}) + + if got := eff.served("a"); got != 1 { + t.Errorf("served(a)=%d want 1 (only the non-cancelled waiter)", got) + } +} + +// TestFIFO_OnCancel_NotPresent is a no-op: cancelling a request that was already +// granted (and is no longer queued or waiting) must not affect anything. +func TestFIFO_OnCancel_NotPresent(t *testing.T) { + eff := newFakeEffects() + eff.states["a"] = process.StateReady + s := newFIFO(&stubPlanner{}, eff) + + r := reqCh("a") + s.OnRequest(r) // fast-path served immediately + + // Cancel after grant — should be a harmless no-op. + s.OnCancel(r) + + if got := eff.served("a"); got != 1 { + t.Errorf("served(a)=%d want 1 (cancel of granted request is a no-op)", got) + } + if len(s.queued) != 0 { + t.Errorf("queue should be empty, len=%d", len(s.queued)) + } +} diff --git a/internal/router/scheduler/scheduler.go b/internal/router/scheduler/scheduler.go index 87ed6ad2..eda36574 100644 --- a/internal/router/scheduler/scheduler.go +++ b/internal/router/scheduler/scheduler.go @@ -47,6 +47,11 @@ type Swapper interface { type Scheduler interface { // OnRequest handles one incoming ServeHTTP request. OnRequest(req HandlerReq) + // OnCancel handles a request whose client has disconnected before it was + // granted. The scheduler must remove the request from its queue and from + // any in-flight swap's waiters so it never triggers a model load or grant + // for a caller that is no longer there. + OnCancel(req HandlerReq) // OnSwapDone handles a swap goroutine reporting completion. OnSwapDone(ev SwapDone) // OnServeDone handles a tracked ServeHTTP finishing (in-flight decrement). diff --git a/ui-svelte/src/components/ModelsPanel.svelte b/ui-svelte/src/components/ModelsPanel.svelte index 090befe0..7c884391 100644 --- a/ui-svelte/src/components/ModelsPanel.svelte +++ b/ui-svelte/src/components/ModelsPanel.svelte @@ -6,6 +6,8 @@ let isUnloading = $state(false); let menuOpen = $state(false); + let pendingLoads = $state>({}); + const loadControllers = new Map(); const showUnlistedStore = persistentStore("showUnlisted", true); const showIdorNameStore = persistentStore<"id" | "name">("showIdorName", "id"); @@ -42,6 +44,25 @@ } } + async function handleLoadModel(modelId: string): Promise { + if (pendingLoads[modelId]) return; + const controller = new AbortController(); + loadControllers.set(modelId, controller); + pendingLoads[modelId] = true; + try { + await loadModel(modelId, controller.signal); + } catch (e) { + console.error(e); + } finally { + loadControllers.delete(modelId); + delete pendingLoads[modelId]; + } + } + + function cancelLoad(modelId: string): void { + loadControllers.get(modelId)?.abort(); + } + function toggleIdorName(): void { showIdorNameStore.update((prev) => (prev === "name" ? "id" : "name")); } @@ -170,14 +191,20 @@ {/if} - {#if model.state === "stopped"} - + {#if model.state === "stopped" && pendingLoads[model.id]} + + {:else if model.state === "stopped"} + {:else} {/if} - {model.state} + {#if model.state === "stopped" && pendingLoads[model.id]} + queued + {:else} + {model.state} + {/if} {/each} diff --git a/ui-svelte/src/index.css b/ui-svelte/src/index.css index 057e180c..1f833519 100644 --- a/ui-svelte/src/index.css +++ b/ui-svelte/src/index.css @@ -139,7 +139,8 @@ } .status--starting, - .status--stopping { + .status--stopping, + .status--queued { @apply bg-warning/10 text-warning; } diff --git a/ui-svelte/src/stores/api.ts b/ui-svelte/src/stores/api.ts index e77d5b76..653cd1a8 100644 --- a/ui-svelte/src/stores/api.ts +++ b/ui-svelte/src/stores/api.ts @@ -176,15 +176,19 @@ export async function unloadSingleModel(model: string): Promise { } } -export async function loadModel(model: string): Promise { +export async function loadModel(model: string, signal?: AbortSignal): Promise { try { - const response = await fetch(`/upstream/${model}/`, { + const response = await fetch(`/upstream/${model}/?_=${Date.now()}`, { method: "GET", + signal, }); if (!response.ok) { throw new Error(`Failed to load model: ${response.status}`); } } catch (error) { + if (error instanceof DOMException && error.name === "AbortError") { + return; + } console.error("Failed to load model:", error); throw error; }