Compare commits

...

3 Commits

Author SHA1 Message Date
coderabbitai[bot] 090bb4623c CodeRabbit Generated Unit Tests: Generate unit tests for PR changes 2026-06-16 12:47:43 +00:00
Benson Wong 6cf1317341 schedule,shared: move concurrency 429 limits into scheduler code (#849)
- make concurrency limiting the scheduler.Scheduler's responsibility
- eliminate the separate concurrency limit middleware 
- move concurrencyLimit logic into scheduler.FIFO to maintain backwards compatibility
- add HTTPError from #834 

Updates #834
2026-06-15 22:35:12 -07:00
Wojciech 8e84b2ec4f README.md: add macports install option to README (#848) 2026-06-15 15:58:24 -07:00
18 changed files with 814 additions and 177 deletions
+15 -4
View File
@@ -88,10 +88,11 @@ Real time log streaming:
llama-swap can be installed in multiple ways
1. Docker
2. Homebrew (OSX and Linux)
3. WinGet
4. From release binaries
5. From source
2. Homebrew (macOS and Linux)
3. MacPorts (macOS)
4. WinGet
5. From release binaries
6. From source
### Docker Install ([download images](https://github.com/mostlygeek/llama-swap/pkgs/container/llama-swap))
@@ -155,6 +156,16 @@ brew install llama-swap
llama-swap --config path/to/config.yaml --listen localhost:8080
```
### MacPorts (macOS)
> [!NOTE]
> Maintained by MacPorts community - [llama-swap port](https://ports.macports.org/port/llama-swap). It is not an official part of llama-swap.
```shell
sudo port install llama-swap
llama-swap --config path/to/config.yaml --listen localhost:8080
```
### WinGet Install (Windows)
> [!NOTE]
+9 -6
View File
@@ -28,8 +28,7 @@ type unloadReq struct {
// baseRouter owns the channels, run-loop, and process machinery shared by every
// concrete router. Concrete routers embed *baseRouter and supply a
// scheduler.Factory (which captures their scheduler.Swapper) describing how
// requests are scheduled and how their eviction set is decided. baseRouter
// scheduler.Swapper describing how eviction sets are decided. baseRouter
// implements scheduler.Effects so the scheduler can call back for side-effects.
type baseRouter struct {
name string
@@ -75,8 +74,8 @@ func newBaseRouter(
conf config.Config,
processes map[string]process.Process,
logger *logmon.Monitor,
newSched scheduler.Factory,
) *baseRouter {
planner scheduler.Swapper,
) (*baseRouter, error) {
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
procCtx, procCancel := context.WithCancel(context.Background())
b := &baseRouter{
@@ -96,8 +95,12 @@ func newBaseRouter(
serveDoneCh: make(chan scheduler.ServeDoneEvent),
runDone: make(chan struct{}),
}
b.schedule = newSched(name, logger, b)
return b
sched, err := scheduler.New(conf, name, logger, planner, b)
if err != nil {
return nil, err
}
b.schedule = sched
return b, nil
}
func (b *baseRouter) notifyProcessed() {
+4 -4
View File
@@ -29,10 +29,10 @@ func (s *stubPlanner) OnSwapStart(string, []string) {}
func newTestBase(t *testing.T, processes map[string]process.Process, planner scheduler.Swapper) *baseRouter {
t.Helper()
conf := config.Config{HealthCheckTimeout: 5}
b := newBaseRouter("test", conf, processes, logmon.NewWriter(io.Discard),
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, logger, planner, conf.Routing.Scheduler.Settings.Fifo, eff)
})
b, err := newBaseRouter("test", conf, processes, logmon.NewWriter(io.Discard), planner)
if err != nil {
t.Fatalf("newBaseRouter: %v", err)
}
b.testProcessed = make(chan struct{}, 64)
go b.run()
t.Cleanup(func() {
+4 -5
View File
@@ -6,7 +6,6 @@ import (
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
)
type Group struct {
@@ -30,10 +29,10 @@ func NewGroup(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Group
}
processes := make(map[string]process.Process, len(modelToGroup))
base := newBaseRouter("group", conf, processes, proxylog,
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
})
base, err := newBaseRouter("group", conf, processes, proxylog, swapper)
if err != nil {
return nil, fmt.Errorf("creating base router: %w", err)
}
for mid := range modelToGroup {
modelCfg, _, ok := conf.FindConfig(mid)
+4 -5
View File
@@ -10,7 +10,6 @@ import (
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
)
// newTestGroup builds a Group directly from the supplied processes and config,
@@ -27,10 +26,10 @@ func newTestGroup(t *testing.T, conf config.Config, processes map[string]process
config: conf,
modelToGroup: modelToGroup,
}
base := newBaseRouter("group", conf, processes, logmon.NewWriter(io.Discard),
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
})
base, err := newBaseRouter("group", conf, processes, logmon.NewWriter(io.Discard), swapper)
if err != nil {
t.Fatalf("newBaseRouter: %v", err)
}
base.testProcessed = make(chan struct{}, 64)
g := &Group{baseRouter: base}
go base.run()
+4 -5
View File
@@ -6,7 +6,6 @@ import (
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
)
type Matrix struct {
@@ -27,10 +26,10 @@ func NewMatrix(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Matr
// Build a process for every model in the config. Any model can run alone
// even if it is not part of a set; this mirrors proxy.NewMatrix.
processes := make(map[string]process.Process, len(conf.Models))
base := newBaseRouter("matrix", conf, processes, proxylog,
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
})
base, err := newBaseRouter("matrix", conf, processes, proxylog, swapper)
if err != nil {
return nil, fmt.Errorf("creating base router: %w", err)
}
for mid, modelCfg := range conf.Models {
procLog := logmon.NewWriter(upstreamlog)
+4 -5
View File
@@ -10,7 +10,6 @@ import (
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
)
// newTestMatrix builds a Matrix router from supplied processes, bypassing
@@ -22,10 +21,10 @@ func newTestMatrix(t *testing.T, conf config.Config, expanded []config.ExpandedS
solver: newMatrixSolver(expanded, evictCosts),
logger: logger,
}
base := newBaseRouter("matrix", conf, processes, logger,
func(name string, l *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
return scheduler.NewFIFO(name, l, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
})
base, err := newBaseRouter("matrix", conf, processes, logger, swapper)
if err != nil {
t.Fatalf("newBaseRouter: %v", err)
}
base.testProcessed = make(chan struct{}, 64)
r := &Matrix{baseRouter: base}
go base.run()
+35 -3
View File
@@ -8,8 +8,13 @@ import (
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/shared"
)
// defaultConcurrencyLimit caps simultaneous in-flight requests per model when
// the model config leaves concurrencyLimit unset.
const defaultConcurrencyLimit = 10
// activeSwap tracks one in-flight swap and the callers waiting on it.
type activeSwap struct {
modelID string
@@ -33,20 +38,32 @@ type FIFO struct {
cfg config.FifoConfig
effects Effects
limits map[string]int
active map[string]*activeSwap
inFlight map[string]int
queued []HandlerReq
}
// NewFIFO builds a FIFO scheduler. It matches scheduler.Factory once a planner
// is captured in a closure.
func NewFIFO(name string, logger *logmon.Monitor, planner Swapper, cfg config.FifoConfig, eff Effects) *FIFO {
// NewFIFO builds a FIFO scheduler. Per-model concurrency limits are derived
// from models: each model's ConcurrencyLimit overrides defaultConcurrencyLimit
// when set to a value greater than zero.
func NewFIFO(name string, logger *logmon.Monitor, planner Swapper, cfg config.FifoConfig, models map[string]config.ModelConfig, eff Effects) *FIFO {
limits := make(map[string]int, len(models))
for id, mc := range models {
limit := defaultConcurrencyLimit
if mc.ConcurrencyLimit > 0 {
limit = mc.ConcurrencyLimit
}
limits[id] = limit
}
return &FIFO{
name: name,
logger: logger,
planner: planner,
cfg: cfg,
effects: eff,
limits: limits,
active: make(map[string]*activeSwap),
inFlight: make(map[string]int),
}
@@ -254,12 +271,27 @@ func (s *FIFO) OnShutdown(err error) {
// grantHandler hands the caller a tracked handler for modelID and, only if the
// caller was still there to receive it, bumps the in-flight count. Incrementing
// when the grant failed would strand the counter and block future evictions.
// Requests that would exceed the model's concurrency limit are rejected with a
// shared.NewConcurrencyLimitError (HTTP 429 with Retry-After).
func (s *FIFO) grantHandler(req HandlerReq, modelID string) {
if s.inFlight[modelID] >= s.limit(modelID) {
s.effects.GrantError(req, shared.ConcurrencyLimitError{})
return
}
if s.effects.GrantServe(req, modelID) {
s.inFlight[modelID]++
}
}
// limit returns the per-model concurrency cap, defaulting to
// defaultConcurrencyLimit when the model has no explicit entry.
func (s *FIFO) limit(modelID string) int {
if l, ok := s.limits[modelID]; ok {
return l
}
return defaultConcurrencyLimit
}
// startSwap records the swap as active and launches it via Effects. running is
// the set EvictionFor saw, forwarded to OnSwapStart so the planner logs against
// the same picture it decided on.
+220 -2
View File
@@ -1,14 +1,17 @@
package scheduler
import (
"context"
"errors"
"io"
"net/http"
"testing"
"time"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/shared"
)
// FIFO methods all run on the router's single run-loop goroutine, so these
@@ -54,6 +57,7 @@ type stopRec struct {
type fakeEffects struct {
states map[string]process.ProcessState // model -> state; missing => not handled
serveResult map[string]bool // GrantServe return per model (default true)
lastServeReq HandlerReq
starts []startRec
grants []grantRec
@@ -96,6 +100,7 @@ func (f *fakeEffects) GrantServe(req HandlerReq, modelID string) bool {
if v, set := f.serveResult[modelID]; set {
ok = v
}
f.lastServeReq = req
f.grants = append(f.grants, grantRec{model: modelID, serve: ok})
return ok
}
@@ -138,7 +143,7 @@ func (f *fakeEffects) startsFor(modelID string) int {
}
func newFIFO(planner Swapper, eff Effects) *FIFO {
return NewFIFO("test", logmon.NewWriter(io.Discard), planner, config.FifoConfig{}, eff)
return NewFIFO("test", logmon.NewWriter(io.Discard), planner, config.FifoConfig{}, nil, eff)
}
func req(model string) HandlerReq { return HandlerReq{Model: model} }
@@ -167,6 +172,99 @@ func TestFIFO_FastPath(t *testing.T) {
}
}
func TestFIFO_GrantSetsPriorityMetadata(t *testing.T) {
eff := newFakeEffects()
eff.states["a"] = process.StateReady
cfg := config.FifoConfig{Priority: map[string]int{"a": 7}}
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)})
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
if got := eff.served("a"); got != 1 {
t.Fatalf("served(a)=%d want 1", got)
}
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
if !ok {
t.Fatal("context data missing from granted request")
}
if data.Metadata["fifo_priority"] != "7" {
t.Errorf("fifo_priority = %q, want 7", data.Metadata["fifo_priority"])
}
}
func TestFIFO_GrantSetsPriorityMetadata_DefaultZero(t *testing.T) {
// A model that is not listed in the Priority map should get fifo_priority="0".
eff := newFakeEffects()
eff.states["unlisted"] = process.StateReady
cfg := config.FifoConfig{Priority: map[string]int{"other": 5}} // "unlisted" absent
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "unlisted", Metadata: make(map[string]string)})
s.OnRequest(HandlerReq{Model: "unlisted", Ctx: ctx})
if got := eff.served("unlisted"); got != 1 {
t.Fatalf("served(unlisted)=%d want 1", got)
}
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
if !ok {
t.Fatal("context data missing from granted request")
}
if data.Metadata["fifo_priority"] != "0" {
t.Errorf("fifo_priority = %q, want %q", data.Metadata["fifo_priority"], "0")
}
}
func TestFIFO_GrantSetsPriorityMetadata_NoMetadataMap(t *testing.T) {
// When the request context has no Metadata map, grantHandler must not crash.
// It should log a debug message and still grant the request.
eff := newFakeEffects()
eff.states["a"] = process.StateReady
cfg := config.FifoConfig{Priority: map[string]int{"a": 3}}
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
// No Metadata map in the context data — SetReqData will return an error.
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a"})
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
// The grant must still succeed despite the missing metadata map.
if got := eff.served("a"); got != 1 {
t.Fatalf("served(a)=%d want 1 (metadata error must not prevent grant)", got)
}
}
func TestFIFO_GrantSetsPriorityMetadata_AfterSwapCompletion(t *testing.T) {
// Priority metadata must be set for waiters granted via OnSwapDone, not just
// requests that hit the fast path.
eff := newFakeEffects()
eff.states["a"] = process.StateStopped // forces a swap
cfg := config.FifoConfig{Priority: map[string]int{"a": 9}}
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)})
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
// Swap is in flight; no grant yet.
if got := eff.served("a"); got != 0 {
t.Fatalf("served(a)=%d want 0 before swap done", got)
}
// Complete the swap.
eff.states["a"] = process.StateReady
s.OnSwapDone(SwapDone{ModelID: "a"})
if got := eff.served("a"); got != 1 {
t.Fatalf("served(a)=%d want 1 after swap done", got)
}
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
if !ok {
t.Fatal("context data missing from granted request after swap")
}
if data.Metadata["fifo_priority"] != "9" {
t.Errorf("fifo_priority = %q, want %q", data.Metadata["fifo_priority"], "9")
}
}
func TestFIFO_ModelNotFound(t *testing.T) {
eff := newFakeEffects() // no states => model unknown
s := newFIFO(&stubPlanner{}, eff)
@@ -521,7 +619,7 @@ func TestFIFO_PriorityQueueOrder(t *testing.T) {
// loading collides with z's in-flight swap and parks in the queue.
planner := &stubPlanner{evict: map[string][]string{"z": {"A", "B", "C", "D"}}}
cfg := config.FifoConfig{Priority: map[string]int{"A": 10, "B": 5, "C": 5, "D": 1}}
s := NewFIFO("test", logmon.NewWriter(io.Discard), planner, cfg, eff)
s := NewFIFO("test", logmon.NewWriter(io.Discard), planner, cfg, nil, eff)
s.OnRequest(req("z")) // StartSwap(z, [A,B,C,D])
@@ -631,3 +729,123 @@ func TestFIFO_OnCancel_NotPresent(t *testing.T) {
t.Errorf("queue should be empty, len=%d", len(s.queued))
}
}
// newFIFOWithLimit builds a FIFO whose single model has the given concurrency
// limit, already in StateReady so every request exercises the fast path.
func newFIFOWithLimit(t *testing.T, model string, limit int) (*FIFO, *fakeEffects) {
t.Helper()
eff := newFakeEffects()
eff.states[model] = process.StateReady
models := map[string]config.ModelConfig{
model: {ConcurrencyLimit: limit},
}
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, config.FifoConfig{}, models, eff)
return s, eff
}
// TestFIFO_ConcurrencyLimit_RejectsOverLimit verifies that a request arriving
// while the model is at capacity gets an error grant instead of being served,
// and that a new request succeeds once an in-flight one completes.
func TestFIFO_ConcurrencyLimit_RejectsOverLimit(t *testing.T) {
s, eff := newFIFOWithLimit(t, "a", 1)
// First request: served (inFlight 0 → 1).
s.OnRequest(req("a"))
if got := eff.served("a"); got != 1 {
t.Fatalf("served(a)=%d want 1", got)
}
// Second request while slot is occupied: rejected with HTTPError 429.
s.OnRequest(req("a"))
if got := eff.errored("a"); got != 1 {
t.Fatalf("errored(a)=%d want 1 (over-limit)", got)
}
var httpErr shared.HTTPError
if !errors.As(eff.grants[len(eff.grants)-1].err, &httpErr) {
t.Fatalf("err=%v want HTTPError", eff.grants[len(eff.grants)-1].err)
}
if httpErr.StatusCode() != http.StatusTooManyRequests {
t.Fatalf("StatusCode()=%d want 429", httpErr.StatusCode())
}
if httpErr.Header().Get("Retry-After") == "" {
t.Fatal("missing Retry-After header")
}
// After the in-flight request finishes, a new request succeeds.
s.OnServeDone(ServeDoneEvent{ModelID: "a"})
s.OnRequest(req("a"))
if got := eff.served("a"); got != 2 {
t.Fatalf("served(a)=%d want 2 after drain", got)
}
}
// TestFIFO_ConcurrencyLimit_DefaultIsTen verifies that a model without an
// explicit ConcurrencyLimit gets the default cap of 10.
func TestFIFO_ConcurrencyLimit_DefaultIsTen(t *testing.T) {
eff := newFakeEffects()
eff.states["a"] = process.StateReady
// nil models → every model gets defaultConcurrencyLimit (10).
s := newFIFO(&stubPlanner{}, eff)
for i := 0; i < 10; i++ {
s.OnRequest(req("a"))
}
if got := eff.served("a"); got != 10 {
t.Fatalf("served(a)=%d want 10 (default limit)", got)
}
// 11th request is rejected.
s.OnRequest(req("a"))
if got := eff.errored("a"); got != 1 {
t.Fatalf("errored(a)=%d want 1 (over default limit)", got)
}
}
// TestFIFO_ConcurrencyLimit_CustomLimit verifies a ConcurrencyLimit greater
// than zero overrides the default.
func TestFIFO_ConcurrencyLimit_CustomLimit(t *testing.T) {
s, eff := newFIFOWithLimit(t, "a", 2)
s.OnRequest(req("a"))
s.OnRequest(req("a"))
s.OnRequest(req("a"))
if got := eff.served("a"); got != 2 {
t.Fatalf("served(a)=%d want 2 (custom limit)", got)
}
if got := eff.errored("a"); got != 1 {
t.Fatalf("errored(a)=%d want 1 (over custom limit)", got)
}
}
// TestFIFO_ConcurrencyLimit_SwapWaiters verifies that when more swap waiters
// exist than the concurrency limit, excess waiters are rejected on swap
// completion rather than exceeding the limit.
func TestFIFO_ConcurrencyLimit_SwapWaiters(t *testing.T) {
eff := newFakeEffects()
eff.states["a"] = process.StateStopped
models := map[string]config.ModelConfig{
"a": {ConcurrencyLimit: 2},
}
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, config.FifoConfig{}, models, eff)
// Three requests arrive while model is loading: one starts swap, two join.
s.OnRequest(req("a"))
s.OnRequest(req("a"))
s.OnRequest(req("a"))
if got := eff.startsFor("a"); got != 1 {
t.Fatalf("StartSwap(a)=%d want 1", got)
}
// Swap completes: two served (limit), one rejected.
eff.states["a"] = process.StateReady
s.OnSwapDone(SwapDone{ModelID: "a"})
if got := eff.served("a"); got != 2 {
t.Fatalf("served(a)=%d want 2 (limit on swap completion)", got)
}
if got := eff.errored("a"); got != 1 {
t.Fatalf("errored(a)=%d want 1 (excess waiter rejected)", got)
}
}
+17 -3
View File
@@ -11,9 +11,11 @@ package scheduler
import (
"context"
"fmt"
"net/http"
"time"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/process"
"github.com/mostlygeek/llama-swap/internal/shared"
@@ -90,9 +92,21 @@ type Effects interface {
StopProcesses(timeout time.Duration, ids []string)
}
// Factory builds a Scheduler bound to a baseRouter's Effects. The concrete
// router captures its Swapper in the closure it passes as a Factory.
type Factory func(name string, logger *logmon.Monitor, eff Effects) Scheduler
// New returns a Scheduler selected by conf.Routing.Scheduler.Use, configured
// from conf and bound to the given planner and effects. Currently only "fifo"
// (the default) is supported.
func New(conf config.Config, name string, logger *logmon.Monitor, planner Swapper, eff Effects) (Scheduler, error) {
use := conf.Routing.Scheduler.Use
if use == "" {
use = "fifo"
}
switch use {
case "fifo":
return NewFIFO(name, logger, planner, conf.Routing.Scheduler.Settings.Fifo, conf.Models, eff), nil
default:
return nil, fmt.Errorf("unsupported scheduler type: %q", use)
}
}
// HandlerReq is one in-flight ServeHTTP request waiting for a routing decision.
type HandlerReq struct {
-57
View File
@@ -1,57 +0,0 @@
package server
import (
"net/http"
"golang.org/x/sync/semaphore"
"github.com/mostlygeek/llama-swap/internal/chain"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/shared"
)
// defaultConcurrencyLimit caps simultaneous in-flight requests per model when
// the model config leaves concurrencyLimit unset. Matches the legacy
// proxy.Process default.
const defaultConcurrencyLimit = 10
// CreateConcurrencyMiddleware returns middleware that limits simultaneous
// model-dispatched requests per model. Each model gets a semaphore sized to
// its concurrencyLimit (or defaultConcurrencyLimit). A request that cannot
// immediately acquire a slot is rejected with 429. Models without a local
// config entry (e.g. peer-routed models) are not limited.
func CreateConcurrencyMiddleware(cfg config.Config) chain.Middleware {
semaphores := make(map[string]*semaphore.Weighted, len(cfg.Models))
for id, mc := range cfg.Models {
limit := defaultConcurrencyLimit
if mc.ConcurrencyLimit > 0 {
limit = mc.ConcurrencyLimit
}
semaphores[id] = semaphore.NewWeighted(int64(limit))
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := shared.FetchContext(r, cfg)
if err != nil {
shared.SendError(w, r, shared.ErrNoModelInContext)
return
}
// fall through for peer models
sem, ok := semaphores[data.ModelID]
if !ok {
next.ServeHTTP(w, r)
return
}
if !sem.TryAcquire(1) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte(`{"error":"Too many requests"}`))
return
}
defer sem.Release(1)
next.ServeHTTP(w, r)
})
}
}
-75
View File
@@ -1,75 +0,0 @@
package server
import (
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/shared"
)
func concurrencyTestReq(model string) *http.Request {
r := httptest.NewRequest("GET", "/v1/chat/completions", nil)
return r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: model, ModelID: model}))
}
func TestServer_ConcurrencyMiddleware_RejectsOverLimit(t *testing.T) {
cfg := config.Config{
Models: map[string]config.ModelConfig{
"m1": {ConcurrencyLimit: 1},
},
}
entered := make(chan struct{})
release := make(chan struct{})
var once sync.Once
final := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
once.Do(func() { close(entered) })
<-release
w.WriteHeader(http.StatusOK)
})
h := CreateConcurrencyMiddleware(cfg)(final)
// First request occupies the only slot.
done := make(chan struct{})
go func() {
defer close(done)
h.ServeHTTP(httptest.NewRecorder(), concurrencyTestReq("m1"))
}()
<-entered
// Second concurrent request is rejected with 429.
w := httptest.NewRecorder()
h.ServeHTTP(w, concurrencyTestReq("m1"))
if w.Code != http.StatusTooManyRequests {
t.Fatalf("over-limit status = %d, want 429", w.Code)
}
// Once the slot frees, a new request succeeds.
close(release)
<-done
w = httptest.NewRecorder()
h.ServeHTTP(w, concurrencyTestReq("m1"))
if w.Code != http.StatusOK {
t.Fatalf("post-release status = %d, want 200", w.Code)
}
}
func TestServer_ConcurrencyMiddleware_UnconfiguredModelPassesThrough(t *testing.T) {
cfg := config.Config{Models: map[string]config.ModelConfig{}}
called := 0
final := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called++
w.WriteHeader(http.StatusOK)
})
h := CreateConcurrencyMiddleware(cfg)(final)
w := httptest.NewRecorder()
h.ServeHTTP(w, concurrencyTestReq("peer-model"))
if w.Code != http.StatusOK || called != 1 {
t.Fatalf("unconfigured model: status=%d called=%d, want 200/1", w.Code, called)
}
}
+107
View File
@@ -1,9 +1,13 @@
package server
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/mostlygeek/llama-swap/internal/shared"
"github.com/tidwall/gjson"
)
@@ -56,6 +60,109 @@ func TestServer_ProcessStreamingResponse_NoData(t *testing.T) {
}
}
func TestMetricsMonitor_RecordMetadata(t *testing.T) {
mm := newMetricsMonitor(nil, 10, 0)
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"usage":{}}`))
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
ModelID: "m",
Metadata: map[string]string{"client": "web", "trace": "abc"},
}))
w := httptest.NewRecorder()
copier := newBodyCopier(w)
copier.WriteHeader(http.StatusOK)
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
mm.record("m", r, copier, 0, nil, nil)
entries := mm.getMetrics()
if len(entries) != 1 {
t.Fatalf("want 1 entry, got %d", len(entries))
}
if entries[0].Metadata["client"] != "web" {
t.Errorf("client = %q, want web", entries[0].Metadata["client"])
}
if entries[0].Metadata["trace"] != "abc" {
t.Errorf("trace = %q, want abc", entries[0].Metadata["trace"])
}
}
func TestMetricsMonitor_RecordMetadata_EmptyMap(t *testing.T) {
// An empty Metadata map in context must NOT set tm.Metadata (omitempty semantics).
mm := newMetricsMonitor(nil, 10, 0)
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
ModelID: "m",
Metadata: map[string]string{}, // empty, not nil
}))
w := httptest.NewRecorder()
copier := newBodyCopier(w)
copier.WriteHeader(http.StatusOK)
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
mm.record("m", r, copier, 0, nil, nil)
entries := mm.getMetrics()
if len(entries) != 1 {
t.Fatalf("want 1 entry, got %d", len(entries))
}
if entries[0].Metadata != nil {
t.Errorf("Metadata should be nil for empty context metadata, got %v", entries[0].Metadata)
}
}
func TestMetricsMonitor_RecordMetadata_NoContextData(t *testing.T) {
// A request with no ReqContextData in context should produce nil Metadata.
mm := newMetricsMonitor(nil, 10, 0)
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
// No shared.SetContext call — no ReqContextData in context.
w := httptest.NewRecorder()
copier := newBodyCopier(w)
copier.WriteHeader(http.StatusOK)
copier.Write([]byte(`{"usage":{"prompt_tokens":3,"completion_tokens":4}}`))
mm.record("m", r, copier, 0, nil, nil)
entries := mm.getMetrics()
if len(entries) != 1 {
t.Fatalf("want 1 entry, got %d", len(entries))
}
if entries[0].Metadata != nil {
t.Errorf("Metadata should be nil when no context data, got %v", entries[0].Metadata)
}
}
func TestMetricsMonitor_RecordMetadata_DeepCopy(t *testing.T) {
// Mutating the original context metadata after record() must not affect the stored entry.
mm := newMetricsMonitor(nil, 10, 0)
original := map[string]string{"key": "before"}
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
ModelID: "m",
Metadata: original,
}))
w := httptest.NewRecorder()
copier := newBodyCopier(w)
copier.WriteHeader(http.StatusOK)
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
mm.record("m", r, copier, 0, nil, nil)
// Mutate the original map after record.
original["key"] = "after"
entries := mm.getMetrics()
if len(entries) != 1 {
t.Fatalf("want 1 entry, got %d", len(entries))
}
if entries[0].Metadata["key"] != "before" {
t.Errorf("Metadata[key] = %q, want %q (deep copy expected)", entries[0].Metadata["key"], "before")
}
}
func TestServer_ParseMetrics_Infill(t *testing.T) {
// /infill responses are arrays; timings live in the last element.
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
-1
View File
@@ -177,7 +177,6 @@ func (s *Server) routes() {
modelChain := chain.New(
authMW,
CreateRequestContextMiddleware(s.cfg),
CreateConcurrencyMiddleware(s.cfg),
CreateFilterMiddleware(s.cfg),
CreateFormFilterMiddleware(s.cfg),
CreateInflightMiddleware(s.inflight),
+10
View File
@@ -37,6 +37,16 @@ var (
)
func SendError(w http.ResponseWriter, r *http.Request, err error) {
var httpErr HTTPError
if errors.As(err, &httpErr) {
for k, v := range httpErr.Header() {
w.Header()[k] = v
}
w.WriteHeader(httpErr.StatusCode())
w.Write(httpErr.Body())
return
}
switch {
case errors.Is(err, ErrNoModelInContext):
SendResponse(w, r, http.StatusNotFound, "no model id could be identified")
+99
View File
@@ -387,6 +387,105 @@ func TestExtractContext_ApiKey(t *testing.T) {
}
}
func TestSetReqData(t *testing.T) {
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3", Metadata: make(map[string]string)})
if err := SetReqData(ctx, "client", "web"); err != nil {
t.Fatalf("SetReqData: %v", err)
}
if err := SetReqData(ctx, "trace", "abc123"); err != nil {
t.Fatalf("SetReqData: %v", err)
}
data, ok := ReadContext(ctx)
if !ok {
t.Fatal("context data missing")
}
if data.Metadata["client"] != "web" {
t.Errorf("client = %q, want %q", data.Metadata["client"], "web")
}
if data.Metadata["trace"] != "abc123" {
t.Errorf("trace = %q, want %q", data.Metadata["trace"], "abc123")
}
}
func TestSetReqData_Errors(t *testing.T) {
if err := SetReqData(context.Background(), "k", "v"); err == nil {
t.Error("expected error when no request context data exists")
}
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3"})
if err := SetReqData(ctx, "k", "v"); err == nil {
t.Error("expected error when metadata map is missing")
}
}
func TestSetReqData_NilContext(t *testing.T) {
// nil context must return an error without panicking.
err := SetReqData(nil, "k", "v")
if err == nil {
t.Error("expected error for nil context, got nil")
}
}
func TestSetReqData_OverwritesExistingKey(t *testing.T) {
ctx := SetContext(context.Background(), ReqContextData{
Model: "m",
Metadata: map[string]string{"key": "old"},
})
if err := SetReqData(ctx, "key", "new"); err != nil {
t.Fatalf("SetReqData: %v", err)
}
data, _ := ReadContext(ctx)
if data.Metadata["key"] != "new" {
t.Errorf("key = %q, want %q", data.Metadata["key"], "new")
}
}
func TestExtractContext_MetadataInitialized_GET(t *testing.T) {
r, _ := http.NewRequest(http.MethodGet, "/?model=llama3", nil)
got, err := extractContext(r)
if err != nil {
t.Fatalf("extractContext: %v", err)
}
if got.Metadata == nil {
t.Error("Metadata should be initialized (not nil) for GET requests")
}
}
func TestExtractContext_MetadataInitialized_JSON(t *testing.T) {
r, _ := http.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"model":"llama3"}`))
r.Header.Set("Content-Type", "application/json")
got, err := extractContext(r)
if err != nil {
t.Fatalf("extractContext: %v", err)
}
if got.Metadata == nil {
t.Error("Metadata should be initialized (not nil) for JSON POST requests")
}
}
func TestExtractContext_MetadataInitialized_Form(t *testing.T) {
r, _ := http.NewRequest(http.MethodPost, "/v1/audio/transcriptions", strings.NewReader("model=whisper-1"))
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
got, err := extractContext(r)
if err != nil {
t.Fatalf("extractContext: %v", err)
}
if got.Metadata == nil {
t.Error("Metadata should be initialized (not nil) for form POST requests")
}
}
func TestExtractContext_MetadataIsWritable(t *testing.T) {
// Verify the initialized map is writable — i.e. SetReqData can use it.
r, _ := http.NewRequest(http.MethodGet, "/?model=llama3", nil)
got, _ := extractContext(r)
ctx := SetContext(context.Background(), got)
if err := SetReqData(ctx, "x", "y"); err != nil {
t.Fatalf("SetReqData on extractContext Metadata: %v", err)
}
}
func TestServer_ExtractAPIKey(t *testing.T) {
basicHeader := func(user, pass string) string {
return "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))
+63
View File
@@ -0,0 +1,63 @@
package shared
import (
"encoding/json"
"net/http"
"strconv"
)
// HTTPError is an error that carries a complete HTTP response. A producer (e.g.
// a scheduler shedding a request) returns one of these; a renderer (e.g.
// router.SendError) writes the status, headers, and body verbatim instead of
// mapping the error to a generic status. It is the seam that lets a component
// shed a request with a rich response (e.g. a 429 with rate-limit headers and a
// JSON hint body) without the renderer knowing the producer's internals.
type HTTPError interface {
error
StatusCode() int
Header() http.Header
Body() []byte
}
// ConcurrencyLimitError is an HTTPError for a 429 concurrency-limit rejection.
// Zero-value fields fall back to sensible defaults: a 1-second Retry-After and a
// JSON hint body.
type ConcurrencyLimitError struct {
// RetryAfter, when > 0, is sent as the Retry-After header (in seconds).
// Defaults to 1.
RetryAfter int
// Message overrides the JSON body's "error" field. Defaults to
// "Too many requests".
Message string
}
func (e ConcurrencyLimitError) Error() string { return "concurrency limit reached" }
func (e ConcurrencyLimitError) StatusCode() int { return http.StatusTooManyRequests }
func (e ConcurrencyLimitError) Header() http.Header {
h := http.Header{}
h.Set("Content-Type", "application/json")
h.Set("Retry-After", e.retryAfter())
return h
}
func (e ConcurrencyLimitError) Body() []byte {
b, _ := json.Marshal(map[string]string{"error": e.message()})
return b
}
func (e ConcurrencyLimitError) retryAfter() string {
if e.RetryAfter > 0 {
return strconv.Itoa(e.RetryAfter)
}
return "1"
}
func (e ConcurrencyLimitError) message() string {
if e.Message != "" {
return e.Message
}
return "Too many requests"
}
+217
View File
@@ -0,0 +1,217 @@
import { describe, it, expect } from "vitest";
import type { ActivityLogEntry, TokenMetrics } from "./types";
// Baseline token metrics used across tests.
const baseTokens: TokenMetrics = {
cache_tokens: 0,
input_tokens: 10,
output_tokens: 5,
prompt_per_second: 100,
tokens_per_second: 50,
};
function makeEntry(overrides: Partial<ActivityLogEntry> = {}): ActivityLogEntry {
return {
id: 0,
timestamp: "2024-01-01T00:00:00Z",
model: "llama3",
req_path: "/v1/chat/completions",
resp_content_type: "application/json",
resp_status_code: 200,
tokens: baseTokens,
duration_ms: 100,
has_capture: false,
...overrides,
};
}
describe("ActivityLogEntry", () => {
describe("metadata field", () => {
it("accepts an entry without metadata (undefined)", () => {
const entry = makeEntry();
expect(entry.metadata).toBeUndefined();
});
it("accepts an entry with metadata populated", () => {
const entry = makeEntry({ metadata: { client: "web", trace: "abc123" } });
expect(entry.metadata).toEqual({ client: "web", trace: "abc123" });
});
it("accepts an empty metadata object", () => {
const entry = makeEntry({ metadata: {} });
expect(entry.metadata).toEqual({});
});
it("allows reading a key from metadata when present", () => {
const entry = makeEntry({ metadata: { fifo_priority: "7" } });
expect(entry.metadata?.["fifo_priority"]).toBe("7");
});
it("returns undefined when accessing a missing key via optional chaining", () => {
const entry = makeEntry();
expect(entry.metadata?.["fifo_priority"]).toBeUndefined();
});
it("returns undefined for a missing key when metadata is an empty object", () => {
const entry = makeEntry({ metadata: {} });
expect(entry.metadata?.["missing"]).toBeUndefined();
});
it("supports metadata with multiple entries", () => {
const meta: Record<string, string> = { a: "1", b: "2", c: "3" };
const entry = makeEntry({ metadata: meta });
expect(Object.keys(entry.metadata!)).toHaveLength(3);
expect(entry.metadata!["b"]).toBe("2");
});
});
describe("ActivityLogEntry structure", () => {
it("round-trips through JSON with metadata", () => {
const entry = makeEntry({ metadata: { client: "web" } });
const json = JSON.stringify(entry);
const parsed: ActivityLogEntry = JSON.parse(json);
expect(parsed.metadata).toEqual({ client: "web" });
});
it("round-trips through JSON without metadata (field omitted)", () => {
const entry = makeEntry();
const json = JSON.stringify(entry);
const parsed: ActivityLogEntry = JSON.parse(json);
// When metadata is undefined it is dropped by JSON.stringify.
expect(parsed.metadata).toBeUndefined();
});
it("round-trips through JSON with null metadata preserved as-is", () => {
// Explicit null from a server that omits the field still satisfies
// the optional type when accessed with optional chaining.
const raw = { ...makeEntry(), metadata: null };
const json = JSON.stringify(raw);
const parsed = JSON.parse(json) as ActivityLogEntry;
// null is falsy — optional chaining returns undefined for null too.
expect(parsed.metadata ?? undefined).toBeUndefined();
});
});
});
// META_PREFIX helpers — mirror the logic in Activity.svelte so we can test it
// without importing the Svelte component.
const META_PREFIX = "meta:";
function isMetaKey(key: string): boolean {
return key.startsWith(META_PREFIX);
}
function metaKey(name: string): string {
return META_PREFIX + name;
}
function metaLabel(key: string): string {
return key.slice(META_PREFIX.length);
}
describe("Activity.svelte META_PREFIX helpers", () => {
describe("isMetaKey", () => {
it("returns true for keys with meta: prefix", () => {
expect(isMetaKey("meta:fifo_priority")).toBe(true);
expect(isMetaKey("meta:client")).toBe(true);
expect(isMetaKey("meta:")).toBe(true); // empty suffix is still prefixed
});
it("returns false for standard column keys", () => {
expect(isMetaKey("id")).toBe(false);
expect(isMetaKey("model")).toBe(false);
expect(isMetaKey("capture")).toBe(false);
expect(isMetaKey("duration")).toBe(false);
});
it("returns false for partial or incorrect prefixes", () => {
expect(isMetaKey("meta")).toBe(false);
expect(isMetaKey("Meta:key")).toBe(false); // case-sensitive
expect(isMetaKey("")).toBe(false);
});
});
describe("metaKey", () => {
it("prepends META_PREFIX to the name", () => {
expect(metaKey("fifo_priority")).toBe("meta:fifo_priority");
expect(metaKey("client")).toBe("meta:client");
});
it("handles empty name", () => {
expect(metaKey("")).toBe("meta:");
});
it("is the inverse of metaLabel", () => {
const name = "some_key";
expect(metaLabel(metaKey(name))).toBe(name);
});
});
describe("metaLabel", () => {
it("strips META_PREFIX and returns the bare name", () => {
expect(metaLabel("meta:fifo_priority")).toBe("fifo_priority");
expect(metaLabel("meta:client")).toBe("client");
});
it("handles empty suffix", () => {
expect(metaLabel("meta:")).toBe("");
});
it("is the inverse of metaKey", () => {
const key = "meta:trace_id";
expect(metaKey(metaLabel(key))).toBe(key);
});
});
describe("metadata column derivation", () => {
it("derives unique metadata keys from a list of entries", () => {
const entries: ActivityLogEntry[] = [
makeEntry({ metadata: { client: "web", trace: "a" } }),
makeEntry({ metadata: { client: "mobile" } }),
makeEntry({ metadata: { fifo_priority: "3" } }),
makeEntry({}), // no metadata
];
const keys = Array.from(
new Set(entries.flatMap((m) => Object.keys(m.metadata || {})))
).sort();
expect(keys).toEqual(["client", "fifo_priority", "trace"]);
});
it("returns empty array when no entries have metadata", () => {
const entries: ActivityLogEntry[] = [makeEntry(), makeEntry()];
const keys = Array.from(
new Set(entries.flatMap((m) => Object.keys(m.metadata || {})))
).sort();
expect(keys).toHaveLength(0);
});
it("maps metadata keys to meta:-prefixed column keys", () => {
const metaKeys = ["client", "fifo_priority"];
const columnKeys = metaKeys.map(metaKey);
expect(columnKeys).toEqual(["meta:client", "meta:fifo_priority"]);
});
it("resolves metadata value for a column key", () => {
const entry = makeEntry({ metadata: { fifo_priority: "7", client: "web" } });
const key = "meta:fifo_priority";
const value = entry.metadata?.[metaLabel(key)] ?? "-";
expect(value).toBe("7");
});
it("falls back to '-' for a column key not present in entry metadata", () => {
const entry = makeEntry({ metadata: { client: "web" } });
const key = "meta:fifo_priority";
const value = entry.metadata?.[metaLabel(key)] ?? "-";
expect(value).toBe("-");
});
it("falls back to '-' when entry has no metadata at all", () => {
const entry = makeEntry(); // metadata is undefined
const key = "meta:anything";
const value = entry.metadata?.[metaLabel(key)] ?? "-";
expect(value).toBe("-");
});
});
});