Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 090bb4623c | |||
| 6cf1317341 | |||
| 8e84b2ec4f |
@@ -88,10 +88,11 @@ Real time log streaming:
|
|||||||
llama-swap can be installed in multiple ways
|
llama-swap can be installed in multiple ways
|
||||||
|
|
||||||
1. Docker
|
1. Docker
|
||||||
2. Homebrew (OSX and Linux)
|
2. Homebrew (macOS and Linux)
|
||||||
3. WinGet
|
3. MacPorts (macOS)
|
||||||
4. From release binaries
|
4. WinGet
|
||||||
5. From source
|
5. From release binaries
|
||||||
|
6. From source
|
||||||
|
|
||||||
### Docker Install ([download images](https://github.com/mostlygeek/llama-swap/pkgs/container/llama-swap))
|
### Docker Install ([download images](https://github.com/mostlygeek/llama-swap/pkgs/container/llama-swap))
|
||||||
|
|
||||||
@@ -155,6 +156,16 @@ brew install llama-swap
|
|||||||
llama-swap --config path/to/config.yaml --listen localhost:8080
|
llama-swap --config path/to/config.yaml --listen localhost:8080
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### MacPorts (macOS)
|
||||||
|
|
||||||
|
> [!NOTE]
|
||||||
|
> Maintained by MacPorts community - [llama-swap port](https://ports.macports.org/port/llama-swap). It is not an official part of llama-swap.
|
||||||
|
|
||||||
|
```shell
|
||||||
|
sudo port install llama-swap
|
||||||
|
llama-swap --config path/to/config.yaml --listen localhost:8080
|
||||||
|
```
|
||||||
|
|
||||||
### WinGet Install (Windows)
|
### WinGet Install (Windows)
|
||||||
|
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
|
|||||||
@@ -28,8 +28,7 @@ type unloadReq struct {
|
|||||||
|
|
||||||
// baseRouter owns the channels, run-loop, and process machinery shared by every
|
// baseRouter owns the channels, run-loop, and process machinery shared by every
|
||||||
// concrete router. Concrete routers embed *baseRouter and supply a
|
// concrete router. Concrete routers embed *baseRouter and supply a
|
||||||
// scheduler.Factory (which captures their scheduler.Swapper) describing how
|
// scheduler.Swapper describing how eviction sets are decided. baseRouter
|
||||||
// requests are scheduled and how their eviction set is decided. baseRouter
|
|
||||||
// implements scheduler.Effects so the scheduler can call back for side-effects.
|
// implements scheduler.Effects so the scheduler can call back for side-effects.
|
||||||
type baseRouter struct {
|
type baseRouter struct {
|
||||||
name string
|
name string
|
||||||
@@ -75,8 +74,8 @@ func newBaseRouter(
|
|||||||
conf config.Config,
|
conf config.Config,
|
||||||
processes map[string]process.Process,
|
processes map[string]process.Process,
|
||||||
logger *logmon.Monitor,
|
logger *logmon.Monitor,
|
||||||
newSched scheduler.Factory,
|
planner scheduler.Swapper,
|
||||||
) *baseRouter {
|
) (*baseRouter, error) {
|
||||||
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
|
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
|
||||||
procCtx, procCancel := context.WithCancel(context.Background())
|
procCtx, procCancel := context.WithCancel(context.Background())
|
||||||
b := &baseRouter{
|
b := &baseRouter{
|
||||||
@@ -96,8 +95,12 @@ func newBaseRouter(
|
|||||||
serveDoneCh: make(chan scheduler.ServeDoneEvent),
|
serveDoneCh: make(chan scheduler.ServeDoneEvent),
|
||||||
runDone: make(chan struct{}),
|
runDone: make(chan struct{}),
|
||||||
}
|
}
|
||||||
b.schedule = newSched(name, logger, b)
|
sched, err := scheduler.New(conf, name, logger, planner, b)
|
||||||
return b
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
b.schedule = sched
|
||||||
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *baseRouter) notifyProcessed() {
|
func (b *baseRouter) notifyProcessed() {
|
||||||
|
|||||||
@@ -29,10 +29,10 @@ func (s *stubPlanner) OnSwapStart(string, []string) {}
|
|||||||
func newTestBase(t *testing.T, processes map[string]process.Process, planner scheduler.Swapper) *baseRouter {
|
func newTestBase(t *testing.T, processes map[string]process.Process, planner scheduler.Swapper) *baseRouter {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
conf := config.Config{HealthCheckTimeout: 5}
|
conf := config.Config{HealthCheckTimeout: 5}
|
||||||
b := newBaseRouter("test", conf, processes, logmon.NewWriter(io.Discard),
|
b, err := newBaseRouter("test", conf, processes, logmon.NewWriter(io.Discard), planner)
|
||||||
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
|
if err != nil {
|
||||||
return scheduler.NewFIFO(name, logger, planner, conf.Routing.Scheduler.Settings.Fifo, eff)
|
t.Fatalf("newBaseRouter: %v", err)
|
||||||
})
|
}
|
||||||
b.testProcessed = make(chan struct{}, 64)
|
b.testProcessed = make(chan struct{}, 64)
|
||||||
go b.run()
|
go b.run()
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Group struct {
|
type Group struct {
|
||||||
@@ -30,10 +29,10 @@ func NewGroup(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Group
|
|||||||
}
|
}
|
||||||
|
|
||||||
processes := make(map[string]process.Process, len(modelToGroup))
|
processes := make(map[string]process.Process, len(modelToGroup))
|
||||||
base := newBaseRouter("group", conf, processes, proxylog,
|
base, err := newBaseRouter("group", conf, processes, proxylog, swapper)
|
||||||
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
|
if err != nil {
|
||||||
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
|
return nil, fmt.Errorf("creating base router: %w", err)
|
||||||
})
|
}
|
||||||
|
|
||||||
for mid := range modelToGroup {
|
for mid := range modelToGroup {
|
||||||
modelCfg, _, ok := conf.FindConfig(mid)
|
modelCfg, _, ok := conf.FindConfig(mid)
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// newTestGroup builds a Group directly from the supplied processes and config,
|
// newTestGroup builds a Group directly from the supplied processes and config,
|
||||||
@@ -27,10 +26,10 @@ func newTestGroup(t *testing.T, conf config.Config, processes map[string]process
|
|||||||
config: conf,
|
config: conf,
|
||||||
modelToGroup: modelToGroup,
|
modelToGroup: modelToGroup,
|
||||||
}
|
}
|
||||||
base := newBaseRouter("group", conf, processes, logmon.NewWriter(io.Discard),
|
base, err := newBaseRouter("group", conf, processes, logmon.NewWriter(io.Discard), swapper)
|
||||||
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
|
if err != nil {
|
||||||
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
|
t.Fatalf("newBaseRouter: %v", err)
|
||||||
})
|
}
|
||||||
base.testProcessed = make(chan struct{}, 64)
|
base.testProcessed = make(chan struct{}, 64)
|
||||||
g := &Group{baseRouter: base}
|
g := &Group{baseRouter: base}
|
||||||
go base.run()
|
go base.run()
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Matrix struct {
|
type Matrix struct {
|
||||||
@@ -27,10 +26,10 @@ func NewMatrix(conf config.Config, proxylog, upstreamlog *logmon.Monitor) (*Matr
|
|||||||
// Build a process for every model in the config. Any model can run alone
|
// Build a process for every model in the config. Any model can run alone
|
||||||
// even if it is not part of a set; this mirrors proxy.NewMatrix.
|
// even if it is not part of a set; this mirrors proxy.NewMatrix.
|
||||||
processes := make(map[string]process.Process, len(conf.Models))
|
processes := make(map[string]process.Process, len(conf.Models))
|
||||||
base := newBaseRouter("matrix", conf, processes, proxylog,
|
base, err := newBaseRouter("matrix", conf, processes, proxylog, swapper)
|
||||||
func(name string, logger *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
|
if err != nil {
|
||||||
return scheduler.NewFIFO(name, logger, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
|
return nil, fmt.Errorf("creating base router: %w", err)
|
||||||
})
|
}
|
||||||
|
|
||||||
for mid, modelCfg := range conf.Models {
|
for mid, modelCfg := range conf.Models {
|
||||||
procLog := logmon.NewWriter(upstreamlog)
|
procLog := logmon.NewWriter(upstreamlog)
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
"github.com/mostlygeek/llama-swap/internal/router/scheduler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// newTestMatrix builds a Matrix router from supplied processes, bypassing
|
// newTestMatrix builds a Matrix router from supplied processes, bypassing
|
||||||
@@ -22,10 +21,10 @@ func newTestMatrix(t *testing.T, conf config.Config, expanded []config.ExpandedS
|
|||||||
solver: newMatrixSolver(expanded, evictCosts),
|
solver: newMatrixSolver(expanded, evictCosts),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
base := newBaseRouter("matrix", conf, processes, logger,
|
base, err := newBaseRouter("matrix", conf, processes, logger, swapper)
|
||||||
func(name string, l *logmon.Monitor, eff scheduler.Effects) scheduler.Scheduler {
|
if err != nil {
|
||||||
return scheduler.NewFIFO(name, l, swapper, conf.Routing.Scheduler.Settings.Fifo, eff)
|
t.Fatalf("newBaseRouter: %v", err)
|
||||||
})
|
}
|
||||||
base.testProcessed = make(chan struct{}, 64)
|
base.testProcessed = make(chan struct{}, 64)
|
||||||
r := &Matrix{baseRouter: base}
|
r := &Matrix{baseRouter: base}
|
||||||
go base.run()
|
go base.run()
|
||||||
|
|||||||
@@ -8,8 +8,13 @@ import (
|
|||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
|
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// defaultConcurrencyLimit caps simultaneous in-flight requests per model when
|
||||||
|
// the model config leaves concurrencyLimit unset.
|
||||||
|
const defaultConcurrencyLimit = 10
|
||||||
|
|
||||||
// activeSwap tracks one in-flight swap and the callers waiting on it.
|
// activeSwap tracks one in-flight swap and the callers waiting on it.
|
||||||
type activeSwap struct {
|
type activeSwap struct {
|
||||||
modelID string
|
modelID string
|
||||||
@@ -33,20 +38,32 @@ type FIFO struct {
|
|||||||
cfg config.FifoConfig
|
cfg config.FifoConfig
|
||||||
effects Effects
|
effects Effects
|
||||||
|
|
||||||
|
limits map[string]int
|
||||||
active map[string]*activeSwap
|
active map[string]*activeSwap
|
||||||
inFlight map[string]int
|
inFlight map[string]int
|
||||||
queued []HandlerReq
|
queued []HandlerReq
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFIFO builds a FIFO scheduler. It matches scheduler.Factory once a planner
|
// NewFIFO builds a FIFO scheduler. Per-model concurrency limits are derived
|
||||||
// is captured in a closure.
|
// from models: each model's ConcurrencyLimit overrides defaultConcurrencyLimit
|
||||||
func NewFIFO(name string, logger *logmon.Monitor, planner Swapper, cfg config.FifoConfig, eff Effects) *FIFO {
|
// when set to a value greater than zero.
|
||||||
|
func NewFIFO(name string, logger *logmon.Monitor, planner Swapper, cfg config.FifoConfig, models map[string]config.ModelConfig, eff Effects) *FIFO {
|
||||||
|
limits := make(map[string]int, len(models))
|
||||||
|
for id, mc := range models {
|
||||||
|
limit := defaultConcurrencyLimit
|
||||||
|
if mc.ConcurrencyLimit > 0 {
|
||||||
|
limit = mc.ConcurrencyLimit
|
||||||
|
}
|
||||||
|
limits[id] = limit
|
||||||
|
}
|
||||||
|
|
||||||
return &FIFO{
|
return &FIFO{
|
||||||
name: name,
|
name: name,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
planner: planner,
|
planner: planner,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
effects: eff,
|
effects: eff,
|
||||||
|
limits: limits,
|
||||||
active: make(map[string]*activeSwap),
|
active: make(map[string]*activeSwap),
|
||||||
inFlight: make(map[string]int),
|
inFlight: make(map[string]int),
|
||||||
}
|
}
|
||||||
@@ -254,12 +271,27 @@ func (s *FIFO) OnShutdown(err error) {
|
|||||||
// grantHandler hands the caller a tracked handler for modelID and, only if the
|
// grantHandler hands the caller a tracked handler for modelID and, only if the
|
||||||
// caller was still there to receive it, bumps the in-flight count. Incrementing
|
// caller was still there to receive it, bumps the in-flight count. Incrementing
|
||||||
// when the grant failed would strand the counter and block future evictions.
|
// when the grant failed would strand the counter and block future evictions.
|
||||||
|
// Requests that would exceed the model's concurrency limit are rejected with a
|
||||||
|
// shared.NewConcurrencyLimitError (HTTP 429 with Retry-After).
|
||||||
func (s *FIFO) grantHandler(req HandlerReq, modelID string) {
|
func (s *FIFO) grantHandler(req HandlerReq, modelID string) {
|
||||||
|
if s.inFlight[modelID] >= s.limit(modelID) {
|
||||||
|
s.effects.GrantError(req, shared.ConcurrencyLimitError{})
|
||||||
|
return
|
||||||
|
}
|
||||||
if s.effects.GrantServe(req, modelID) {
|
if s.effects.GrantServe(req, modelID) {
|
||||||
s.inFlight[modelID]++
|
s.inFlight[modelID]++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// limit returns the per-model concurrency cap, defaulting to
|
||||||
|
// defaultConcurrencyLimit when the model has no explicit entry.
|
||||||
|
func (s *FIFO) limit(modelID string) int {
|
||||||
|
if l, ok := s.limits[modelID]; ok {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
return defaultConcurrencyLimit
|
||||||
|
}
|
||||||
|
|
||||||
// startSwap records the swap as active and launches it via Effects. running is
|
// startSwap records the swap as active and launches it via Effects. running is
|
||||||
// the set EvictionFor saw, forwarded to OnSwapStart so the planner logs against
|
// the set EvictionFor saw, forwarded to OnSwapStart so the planner logs against
|
||||||
// the same picture it decided on.
|
// the same picture it decided on.
|
||||||
|
|||||||
@@ -1,14 +1,17 @@
|
|||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
|
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FIFO methods all run on the router's single run-loop goroutine, so these
|
// FIFO methods all run on the router's single run-loop goroutine, so these
|
||||||
@@ -52,8 +55,9 @@ type stopRec struct {
|
|||||||
// fakeEffects is an in-memory scheduler.Effects. Tests program process states
|
// fakeEffects is an in-memory scheduler.Effects. Tests program process states
|
||||||
// and GrantServe outcomes, then assert on the recorded calls.
|
// and GrantServe outcomes, then assert on the recorded calls.
|
||||||
type fakeEffects struct {
|
type fakeEffects struct {
|
||||||
states map[string]process.ProcessState // model -> state; missing => not handled
|
states map[string]process.ProcessState // model -> state; missing => not handled
|
||||||
serveResult map[string]bool // GrantServe return per model (default true)
|
serveResult map[string]bool // GrantServe return per model (default true)
|
||||||
|
lastServeReq HandlerReq
|
||||||
|
|
||||||
starts []startRec
|
starts []startRec
|
||||||
grants []grantRec
|
grants []grantRec
|
||||||
@@ -96,6 +100,7 @@ func (f *fakeEffects) GrantServe(req HandlerReq, modelID string) bool {
|
|||||||
if v, set := f.serveResult[modelID]; set {
|
if v, set := f.serveResult[modelID]; set {
|
||||||
ok = v
|
ok = v
|
||||||
}
|
}
|
||||||
|
f.lastServeReq = req
|
||||||
f.grants = append(f.grants, grantRec{model: modelID, serve: ok})
|
f.grants = append(f.grants, grantRec{model: modelID, serve: ok})
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
@@ -138,7 +143,7 @@ func (f *fakeEffects) startsFor(modelID string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newFIFO(planner Swapper, eff Effects) *FIFO {
|
func newFIFO(planner Swapper, eff Effects) *FIFO {
|
||||||
return NewFIFO("test", logmon.NewWriter(io.Discard), planner, config.FifoConfig{}, eff)
|
return NewFIFO("test", logmon.NewWriter(io.Discard), planner, config.FifoConfig{}, nil, eff)
|
||||||
}
|
}
|
||||||
|
|
||||||
func req(model string) HandlerReq { return HandlerReq{Model: model} }
|
func req(model string) HandlerReq { return HandlerReq{Model: model} }
|
||||||
@@ -167,6 +172,99 @@ func TestFIFO_FastPath(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFO_GrantSetsPriorityMetadata(t *testing.T) {
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["a"] = process.StateReady
|
||||||
|
cfg := config.FifoConfig{Priority: map[string]int{"a": 7}}
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
|
||||||
|
|
||||||
|
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)})
|
||||||
|
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
|
||||||
|
|
||||||
|
if got := eff.served("a"); got != 1 {
|
||||||
|
t.Fatalf("served(a)=%d want 1", got)
|
||||||
|
}
|
||||||
|
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("context data missing from granted request")
|
||||||
|
}
|
||||||
|
if data.Metadata["fifo_priority"] != "7" {
|
||||||
|
t.Errorf("fifo_priority = %q, want 7", data.Metadata["fifo_priority"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFO_GrantSetsPriorityMetadata_DefaultZero(t *testing.T) {
|
||||||
|
// A model that is not listed in the Priority map should get fifo_priority="0".
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["unlisted"] = process.StateReady
|
||||||
|
cfg := config.FifoConfig{Priority: map[string]int{"other": 5}} // "unlisted" absent
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
|
||||||
|
|
||||||
|
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "unlisted", Metadata: make(map[string]string)})
|
||||||
|
s.OnRequest(HandlerReq{Model: "unlisted", Ctx: ctx})
|
||||||
|
|
||||||
|
if got := eff.served("unlisted"); got != 1 {
|
||||||
|
t.Fatalf("served(unlisted)=%d want 1", got)
|
||||||
|
}
|
||||||
|
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("context data missing from granted request")
|
||||||
|
}
|
||||||
|
if data.Metadata["fifo_priority"] != "0" {
|
||||||
|
t.Errorf("fifo_priority = %q, want %q", data.Metadata["fifo_priority"], "0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFO_GrantSetsPriorityMetadata_NoMetadataMap(t *testing.T) {
|
||||||
|
// When the request context has no Metadata map, grantHandler must not crash.
|
||||||
|
// It should log a debug message and still grant the request.
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["a"] = process.StateReady
|
||||||
|
cfg := config.FifoConfig{Priority: map[string]int{"a": 3}}
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
|
||||||
|
|
||||||
|
// No Metadata map in the context data — SetReqData will return an error.
|
||||||
|
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a"})
|
||||||
|
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
|
||||||
|
|
||||||
|
// The grant must still succeed despite the missing metadata map.
|
||||||
|
if got := eff.served("a"); got != 1 {
|
||||||
|
t.Fatalf("served(a)=%d want 1 (metadata error must not prevent grant)", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFO_GrantSetsPriorityMetadata_AfterSwapCompletion(t *testing.T) {
|
||||||
|
// Priority metadata must be set for waiters granted via OnSwapDone, not just
|
||||||
|
// requests that hit the fast path.
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["a"] = process.StateStopped // forces a swap
|
||||||
|
cfg := config.FifoConfig{Priority: map[string]int{"a": 9}}
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
|
||||||
|
|
||||||
|
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)})
|
||||||
|
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
|
||||||
|
|
||||||
|
// Swap is in flight; no grant yet.
|
||||||
|
if got := eff.served("a"); got != 0 {
|
||||||
|
t.Fatalf("served(a)=%d want 0 before swap done", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Complete the swap.
|
||||||
|
eff.states["a"] = process.StateReady
|
||||||
|
s.OnSwapDone(SwapDone{ModelID: "a"})
|
||||||
|
|
||||||
|
if got := eff.served("a"); got != 1 {
|
||||||
|
t.Fatalf("served(a)=%d want 1 after swap done", got)
|
||||||
|
}
|
||||||
|
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("context data missing from granted request after swap")
|
||||||
|
}
|
||||||
|
if data.Metadata["fifo_priority"] != "9" {
|
||||||
|
t.Errorf("fifo_priority = %q, want %q", data.Metadata["fifo_priority"], "9")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFIFO_ModelNotFound(t *testing.T) {
|
func TestFIFO_ModelNotFound(t *testing.T) {
|
||||||
eff := newFakeEffects() // no states => model unknown
|
eff := newFakeEffects() // no states => model unknown
|
||||||
s := newFIFO(&stubPlanner{}, eff)
|
s := newFIFO(&stubPlanner{}, eff)
|
||||||
@@ -521,7 +619,7 @@ func TestFIFO_PriorityQueueOrder(t *testing.T) {
|
|||||||
// loading collides with z's in-flight swap and parks in the queue.
|
// loading collides with z's in-flight swap and parks in the queue.
|
||||||
planner := &stubPlanner{evict: map[string][]string{"z": {"A", "B", "C", "D"}}}
|
planner := &stubPlanner{evict: map[string][]string{"z": {"A", "B", "C", "D"}}}
|
||||||
cfg := config.FifoConfig{Priority: map[string]int{"A": 10, "B": 5, "C": 5, "D": 1}}
|
cfg := config.FifoConfig{Priority: map[string]int{"A": 10, "B": 5, "C": 5, "D": 1}}
|
||||||
s := NewFIFO("test", logmon.NewWriter(io.Discard), planner, cfg, eff)
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), planner, cfg, nil, eff)
|
||||||
|
|
||||||
s.OnRequest(req("z")) // StartSwap(z, [A,B,C,D])
|
s.OnRequest(req("z")) // StartSwap(z, [A,B,C,D])
|
||||||
|
|
||||||
@@ -631,3 +729,123 @@ func TestFIFO_OnCancel_NotPresent(t *testing.T) {
|
|||||||
t.Errorf("queue should be empty, len=%d", len(s.queued))
|
t.Errorf("queue should be empty, len=%d", len(s.queued))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newFIFOWithLimit builds a FIFO whose single model has the given concurrency
|
||||||
|
// limit, already in StateReady so every request exercises the fast path.
|
||||||
|
func newFIFOWithLimit(t *testing.T, model string, limit int) (*FIFO, *fakeEffects) {
|
||||||
|
t.Helper()
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states[model] = process.StateReady
|
||||||
|
models := map[string]config.ModelConfig{
|
||||||
|
model: {ConcurrencyLimit: limit},
|
||||||
|
}
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, config.FifoConfig{}, models, eff)
|
||||||
|
return s, eff
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFIFO_ConcurrencyLimit_RejectsOverLimit verifies that a request arriving
|
||||||
|
// while the model is at capacity gets an error grant instead of being served,
|
||||||
|
// and that a new request succeeds once an in-flight one completes.
|
||||||
|
func TestFIFO_ConcurrencyLimit_RejectsOverLimit(t *testing.T) {
|
||||||
|
s, eff := newFIFOWithLimit(t, "a", 1)
|
||||||
|
|
||||||
|
// First request: served (inFlight 0 → 1).
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
if got := eff.served("a"); got != 1 {
|
||||||
|
t.Fatalf("served(a)=%d want 1", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second request while slot is occupied: rejected with HTTPError 429.
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
if got := eff.errored("a"); got != 1 {
|
||||||
|
t.Fatalf("errored(a)=%d want 1 (over-limit)", got)
|
||||||
|
}
|
||||||
|
var httpErr shared.HTTPError
|
||||||
|
if !errors.As(eff.grants[len(eff.grants)-1].err, &httpErr) {
|
||||||
|
t.Fatalf("err=%v want HTTPError", eff.grants[len(eff.grants)-1].err)
|
||||||
|
}
|
||||||
|
if httpErr.StatusCode() != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("StatusCode()=%d want 429", httpErr.StatusCode())
|
||||||
|
}
|
||||||
|
if httpErr.Header().Get("Retry-After") == "" {
|
||||||
|
t.Fatal("missing Retry-After header")
|
||||||
|
}
|
||||||
|
|
||||||
|
// After the in-flight request finishes, a new request succeeds.
|
||||||
|
s.OnServeDone(ServeDoneEvent{ModelID: "a"})
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
if got := eff.served("a"); got != 2 {
|
||||||
|
t.Fatalf("served(a)=%d want 2 after drain", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFIFO_ConcurrencyLimit_DefaultIsTen verifies that a model without an
|
||||||
|
// explicit ConcurrencyLimit gets the default cap of 10.
|
||||||
|
func TestFIFO_ConcurrencyLimit_DefaultIsTen(t *testing.T) {
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["a"] = process.StateReady
|
||||||
|
// nil models → every model gets defaultConcurrencyLimit (10).
|
||||||
|
s := newFIFO(&stubPlanner{}, eff)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
}
|
||||||
|
if got := eff.served("a"); got != 10 {
|
||||||
|
t.Fatalf("served(a)=%d want 10 (default limit)", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 11th request is rejected.
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
if got := eff.errored("a"); got != 1 {
|
||||||
|
t.Fatalf("errored(a)=%d want 1 (over default limit)", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFIFO_ConcurrencyLimit_CustomLimit verifies a ConcurrencyLimit greater
|
||||||
|
// than zero overrides the default.
|
||||||
|
func TestFIFO_ConcurrencyLimit_CustomLimit(t *testing.T) {
|
||||||
|
s, eff := newFIFOWithLimit(t, "a", 2)
|
||||||
|
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
|
||||||
|
if got := eff.served("a"); got != 2 {
|
||||||
|
t.Fatalf("served(a)=%d want 2 (custom limit)", got)
|
||||||
|
}
|
||||||
|
if got := eff.errored("a"); got != 1 {
|
||||||
|
t.Fatalf("errored(a)=%d want 1 (over custom limit)", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFIFO_ConcurrencyLimit_SwapWaiters verifies that when more swap waiters
|
||||||
|
// exist than the concurrency limit, excess waiters are rejected on swap
|
||||||
|
// completion rather than exceeding the limit.
|
||||||
|
func TestFIFO_ConcurrencyLimit_SwapWaiters(t *testing.T) {
|
||||||
|
eff := newFakeEffects()
|
||||||
|
eff.states["a"] = process.StateStopped
|
||||||
|
models := map[string]config.ModelConfig{
|
||||||
|
"a": {ConcurrencyLimit: 2},
|
||||||
|
}
|
||||||
|
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, config.FifoConfig{}, models, eff)
|
||||||
|
|
||||||
|
// Three requests arrive while model is loading: one starts swap, two join.
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
s.OnRequest(req("a"))
|
||||||
|
|
||||||
|
if got := eff.startsFor("a"); got != 1 {
|
||||||
|
t.Fatalf("StartSwap(a)=%d want 1", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap completes: two served (limit), one rejected.
|
||||||
|
eff.states["a"] = process.StateReady
|
||||||
|
s.OnSwapDone(SwapDone{ModelID: "a"})
|
||||||
|
|
||||||
|
if got := eff.served("a"); got != 2 {
|
||||||
|
t.Fatalf("served(a)=%d want 2 (limit on swap completion)", got)
|
||||||
|
}
|
||||||
|
if got := eff.errored("a"); got != 1 {
|
||||||
|
t.Fatalf("errored(a)=%d want 1 (excess waiter rejected)", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,9 +11,11 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
"github.com/mostlygeek/llama-swap/internal/logmon"
|
"github.com/mostlygeek/llama-swap/internal/logmon"
|
||||||
"github.com/mostlygeek/llama-swap/internal/process"
|
"github.com/mostlygeek/llama-swap/internal/process"
|
||||||
"github.com/mostlygeek/llama-swap/internal/shared"
|
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||||
@@ -90,9 +92,21 @@ type Effects interface {
|
|||||||
StopProcesses(timeout time.Duration, ids []string)
|
StopProcesses(timeout time.Duration, ids []string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Factory builds a Scheduler bound to a baseRouter's Effects. The concrete
|
// New returns a Scheduler selected by conf.Routing.Scheduler.Use, configured
|
||||||
// router captures its Swapper in the closure it passes as a Factory.
|
// from conf and bound to the given planner and effects. Currently only "fifo"
|
||||||
type Factory func(name string, logger *logmon.Monitor, eff Effects) Scheduler
|
// (the default) is supported.
|
||||||
|
func New(conf config.Config, name string, logger *logmon.Monitor, planner Swapper, eff Effects) (Scheduler, error) {
|
||||||
|
use := conf.Routing.Scheduler.Use
|
||||||
|
if use == "" {
|
||||||
|
use = "fifo"
|
||||||
|
}
|
||||||
|
switch use {
|
||||||
|
case "fifo":
|
||||||
|
return NewFIFO(name, logger, planner, conf.Routing.Scheduler.Settings.Fifo, conf.Models, eff), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported scheduler type: %q", use)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// HandlerReq is one in-flight ServeHTTP request waiting for a routing decision.
|
// HandlerReq is one in-flight ServeHTTP request waiting for a routing decision.
|
||||||
type HandlerReq struct {
|
type HandlerReq struct {
|
||||||
|
|||||||
@@ -1,57 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"golang.org/x/sync/semaphore"
|
|
||||||
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/chain"
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/shared"
|
|
||||||
)
|
|
||||||
|
|
||||||
// defaultConcurrencyLimit caps simultaneous in-flight requests per model when
|
|
||||||
// the model config leaves concurrencyLimit unset. Matches the legacy
|
|
||||||
// proxy.Process default.
|
|
||||||
const defaultConcurrencyLimit = 10
|
|
||||||
|
|
||||||
// CreateConcurrencyMiddleware returns middleware that limits simultaneous
|
|
||||||
// model-dispatched requests per model. Each model gets a semaphore sized to
|
|
||||||
// its concurrencyLimit (or defaultConcurrencyLimit). A request that cannot
|
|
||||||
// immediately acquire a slot is rejected with 429. Models without a local
|
|
||||||
// config entry (e.g. peer-routed models) are not limited.
|
|
||||||
func CreateConcurrencyMiddleware(cfg config.Config) chain.Middleware {
|
|
||||||
semaphores := make(map[string]*semaphore.Weighted, len(cfg.Models))
|
|
||||||
for id, mc := range cfg.Models {
|
|
||||||
limit := defaultConcurrencyLimit
|
|
||||||
if mc.ConcurrencyLimit > 0 {
|
|
||||||
limit = mc.ConcurrencyLimit
|
|
||||||
}
|
|
||||||
semaphores[id] = semaphore.NewWeighted(int64(limit))
|
|
||||||
}
|
|
||||||
|
|
||||||
return func(next http.Handler) http.Handler {
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
data, err := shared.FetchContext(r, cfg)
|
|
||||||
if err != nil {
|
|
||||||
shared.SendError(w, r, shared.ErrNoModelInContext)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// fall through for peer models
|
|
||||||
sem, ok := semaphores[data.ModelID]
|
|
||||||
if !ok {
|
|
||||||
next.ServeHTTP(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !sem.TryAcquire(1) {
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.WriteHeader(http.StatusTooManyRequests)
|
|
||||||
w.Write([]byte(`{"error":"Too many requests"}`))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer sem.Release(1)
|
|
||||||
next.ServeHTTP(w, r)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,75 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/shared"
|
|
||||||
)
|
|
||||||
|
|
||||||
func concurrencyTestReq(model string) *http.Request {
|
|
||||||
r := httptest.NewRequest("GET", "/v1/chat/completions", nil)
|
|
||||||
return r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: model, ModelID: model}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServer_ConcurrencyMiddleware_RejectsOverLimit(t *testing.T) {
|
|
||||||
cfg := config.Config{
|
|
||||||
Models: map[string]config.ModelConfig{
|
|
||||||
"m1": {ConcurrencyLimit: 1},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
entered := make(chan struct{})
|
|
||||||
release := make(chan struct{})
|
|
||||||
var once sync.Once
|
|
||||||
final := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
once.Do(func() { close(entered) })
|
|
||||||
<-release
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
})
|
|
||||||
h := CreateConcurrencyMiddleware(cfg)(final)
|
|
||||||
|
|
||||||
// First request occupies the only slot.
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
h.ServeHTTP(httptest.NewRecorder(), concurrencyTestReq("m1"))
|
|
||||||
}()
|
|
||||||
<-entered
|
|
||||||
|
|
||||||
// Second concurrent request is rejected with 429.
|
|
||||||
w := httptest.NewRecorder()
|
|
||||||
h.ServeHTTP(w, concurrencyTestReq("m1"))
|
|
||||||
if w.Code != http.StatusTooManyRequests {
|
|
||||||
t.Fatalf("over-limit status = %d, want 429", w.Code)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Once the slot frees, a new request succeeds.
|
|
||||||
close(release)
|
|
||||||
<-done
|
|
||||||
w = httptest.NewRecorder()
|
|
||||||
h.ServeHTTP(w, concurrencyTestReq("m1"))
|
|
||||||
if w.Code != http.StatusOK {
|
|
||||||
t.Fatalf("post-release status = %d, want 200", w.Code)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServer_ConcurrencyMiddleware_UnconfiguredModelPassesThrough(t *testing.T) {
|
|
||||||
cfg := config.Config{Models: map[string]config.ModelConfig{}}
|
|
||||||
|
|
||||||
called := 0
|
|
||||||
final := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
called++
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
})
|
|
||||||
h := CreateConcurrencyMiddleware(cfg)(final)
|
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
|
||||||
h.ServeHTTP(w, concurrencyTestReq("peer-model"))
|
|
||||||
if w.Code != http.StatusOK || called != 1 {
|
|
||||||
t.Fatalf("unconfigured model: status=%d called=%d, want 200/1", w.Code, called)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,9 +1,13 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -56,6 +60,109 @@ func TestServer_ProcessStreamingResponse_NoData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordMetadata(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(nil, 10, 0)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"usage":{}}`))
|
||||||
|
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
|
||||||
|
ModelID: "m",
|
||||||
|
Metadata: map[string]string{"client": "web", "trace": "abc"},
|
||||||
|
}))
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusOK)
|
||||||
|
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, 0, nil, nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].Metadata["client"] != "web" {
|
||||||
|
t.Errorf("client = %q, want web", entries[0].Metadata["client"])
|
||||||
|
}
|
||||||
|
if entries[0].Metadata["trace"] != "abc" {
|
||||||
|
t.Errorf("trace = %q, want abc", entries[0].Metadata["trace"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordMetadata_EmptyMap(t *testing.T) {
|
||||||
|
// An empty Metadata map in context must NOT set tm.Metadata (omitempty semantics).
|
||||||
|
mm := newMetricsMonitor(nil, 10, 0)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
|
||||||
|
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
|
||||||
|
ModelID: "m",
|
||||||
|
Metadata: map[string]string{}, // empty, not nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusOK)
|
||||||
|
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, 0, nil, nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].Metadata != nil {
|
||||||
|
t.Errorf("Metadata should be nil for empty context metadata, got %v", entries[0].Metadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordMetadata_NoContextData(t *testing.T) {
|
||||||
|
// A request with no ReqContextData in context should produce nil Metadata.
|
||||||
|
mm := newMetricsMonitor(nil, 10, 0)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
|
||||||
|
// No shared.SetContext call — no ReqContextData in context.
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusOK)
|
||||||
|
copier.Write([]byte(`{"usage":{"prompt_tokens":3,"completion_tokens":4}}`))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, 0, nil, nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].Metadata != nil {
|
||||||
|
t.Errorf("Metadata should be nil when no context data, got %v", entries[0].Metadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordMetadata_DeepCopy(t *testing.T) {
|
||||||
|
// Mutating the original context metadata after record() must not affect the stored entry.
|
||||||
|
mm := newMetricsMonitor(nil, 10, 0)
|
||||||
|
original := map[string]string{"key": "before"}
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{}`))
|
||||||
|
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
|
||||||
|
ModelID: "m",
|
||||||
|
Metadata: original,
|
||||||
|
}))
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusOK)
|
||||||
|
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, 0, nil, nil)
|
||||||
|
|
||||||
|
// Mutate the original map after record.
|
||||||
|
original["key"] = "after"
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].Metadata["key"] != "before" {
|
||||||
|
t.Errorf("Metadata[key] = %q, want %q (deep copy expected)", entries[0].Metadata["key"], "before")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_ParseMetrics_Infill(t *testing.T) {
|
func TestServer_ParseMetrics_Infill(t *testing.T) {
|
||||||
// /infill responses are arrays; timings live in the last element.
|
// /infill responses are arrays; timings live in the last element.
|
||||||
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
|
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
|
||||||
|
|||||||
@@ -177,7 +177,6 @@ func (s *Server) routes() {
|
|||||||
modelChain := chain.New(
|
modelChain := chain.New(
|
||||||
authMW,
|
authMW,
|
||||||
CreateRequestContextMiddleware(s.cfg),
|
CreateRequestContextMiddleware(s.cfg),
|
||||||
CreateConcurrencyMiddleware(s.cfg),
|
|
||||||
CreateFilterMiddleware(s.cfg),
|
CreateFilterMiddleware(s.cfg),
|
||||||
CreateFormFilterMiddleware(s.cfg),
|
CreateFormFilterMiddleware(s.cfg),
|
||||||
CreateInflightMiddleware(s.inflight),
|
CreateInflightMiddleware(s.inflight),
|
||||||
|
|||||||
@@ -37,6 +37,16 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func SendError(w http.ResponseWriter, r *http.Request, err error) {
|
func SendError(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
|
var httpErr HTTPError
|
||||||
|
if errors.As(err, &httpErr) {
|
||||||
|
for k, v := range httpErr.Header() {
|
||||||
|
w.Header()[k] = v
|
||||||
|
}
|
||||||
|
w.WriteHeader(httpErr.StatusCode())
|
||||||
|
w.Write(httpErr.Body())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, ErrNoModelInContext):
|
case errors.Is(err, ErrNoModelInContext):
|
||||||
SendResponse(w, r, http.StatusNotFound, "no model id could be identified")
|
SendResponse(w, r, http.StatusNotFound, "no model id could be identified")
|
||||||
|
|||||||
@@ -387,6 +387,105 @@ func TestExtractContext_ApiKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetReqData(t *testing.T) {
|
||||||
|
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3", Metadata: make(map[string]string)})
|
||||||
|
|
||||||
|
if err := SetReqData(ctx, "client", "web"); err != nil {
|
||||||
|
t.Fatalf("SetReqData: %v", err)
|
||||||
|
}
|
||||||
|
if err := SetReqData(ctx, "trace", "abc123"); err != nil {
|
||||||
|
t.Fatalf("SetReqData: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, ok := ReadContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("context data missing")
|
||||||
|
}
|
||||||
|
if data.Metadata["client"] != "web" {
|
||||||
|
t.Errorf("client = %q, want %q", data.Metadata["client"], "web")
|
||||||
|
}
|
||||||
|
if data.Metadata["trace"] != "abc123" {
|
||||||
|
t.Errorf("trace = %q, want %q", data.Metadata["trace"], "abc123")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetReqData_Errors(t *testing.T) {
|
||||||
|
if err := SetReqData(context.Background(), "k", "v"); err == nil {
|
||||||
|
t.Error("expected error when no request context data exists")
|
||||||
|
}
|
||||||
|
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3"})
|
||||||
|
if err := SetReqData(ctx, "k", "v"); err == nil {
|
||||||
|
t.Error("expected error when metadata map is missing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetReqData_NilContext(t *testing.T) {
|
||||||
|
// nil context must return an error without panicking.
|
||||||
|
err := SetReqData(nil, "k", "v")
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error for nil context, got nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetReqData_OverwritesExistingKey(t *testing.T) {
|
||||||
|
ctx := SetContext(context.Background(), ReqContextData{
|
||||||
|
Model: "m",
|
||||||
|
Metadata: map[string]string{"key": "old"},
|
||||||
|
})
|
||||||
|
if err := SetReqData(ctx, "key", "new"); err != nil {
|
||||||
|
t.Fatalf("SetReqData: %v", err)
|
||||||
|
}
|
||||||
|
data, _ := ReadContext(ctx)
|
||||||
|
if data.Metadata["key"] != "new" {
|
||||||
|
t.Errorf("key = %q, want %q", data.Metadata["key"], "new")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractContext_MetadataInitialized_GET(t *testing.T) {
|
||||||
|
r, _ := http.NewRequest(http.MethodGet, "/?model=llama3", nil)
|
||||||
|
got, err := extractContext(r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("extractContext: %v", err)
|
||||||
|
}
|
||||||
|
if got.Metadata == nil {
|
||||||
|
t.Error("Metadata should be initialized (not nil) for GET requests")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractContext_MetadataInitialized_JSON(t *testing.T) {
|
||||||
|
r, _ := http.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"model":"llama3"}`))
|
||||||
|
r.Header.Set("Content-Type", "application/json")
|
||||||
|
got, err := extractContext(r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("extractContext: %v", err)
|
||||||
|
}
|
||||||
|
if got.Metadata == nil {
|
||||||
|
t.Error("Metadata should be initialized (not nil) for JSON POST requests")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractContext_MetadataInitialized_Form(t *testing.T) {
|
||||||
|
r, _ := http.NewRequest(http.MethodPost, "/v1/audio/transcriptions", strings.NewReader("model=whisper-1"))
|
||||||
|
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
got, err := extractContext(r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("extractContext: %v", err)
|
||||||
|
}
|
||||||
|
if got.Metadata == nil {
|
||||||
|
t.Error("Metadata should be initialized (not nil) for form POST requests")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractContext_MetadataIsWritable(t *testing.T) {
|
||||||
|
// Verify the initialized map is writable — i.e. SetReqData can use it.
|
||||||
|
r, _ := http.NewRequest(http.MethodGet, "/?model=llama3", nil)
|
||||||
|
got, _ := extractContext(r)
|
||||||
|
ctx := SetContext(context.Background(), got)
|
||||||
|
if err := SetReqData(ctx, "x", "y"); err != nil {
|
||||||
|
t.Fatalf("SetReqData on extractContext Metadata: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_ExtractAPIKey(t *testing.T) {
|
func TestServer_ExtractAPIKey(t *testing.T) {
|
||||||
basicHeader := func(user, pass string) string {
|
basicHeader := func(user, pass string) string {
|
||||||
return "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))
|
return "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))
|
||||||
|
|||||||
@@ -0,0 +1,63 @@
|
|||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HTTPError is an error that carries a complete HTTP response. A producer (e.g.
|
||||||
|
// a scheduler shedding a request) returns one of these; a renderer (e.g.
|
||||||
|
// router.SendError) writes the status, headers, and body verbatim instead of
|
||||||
|
// mapping the error to a generic status. It is the seam that lets a component
|
||||||
|
// shed a request with a rich response (e.g. a 429 with rate-limit headers and a
|
||||||
|
// JSON hint body) without the renderer knowing the producer's internals.
|
||||||
|
type HTTPError interface {
|
||||||
|
error
|
||||||
|
StatusCode() int
|
||||||
|
Header() http.Header
|
||||||
|
Body() []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConcurrencyLimitError is an HTTPError for a 429 concurrency-limit rejection.
|
||||||
|
// Zero-value fields fall back to sensible defaults: a 1-second Retry-After and a
|
||||||
|
// JSON hint body.
|
||||||
|
type ConcurrencyLimitError struct {
|
||||||
|
// RetryAfter, when > 0, is sent as the Retry-After header (in seconds).
|
||||||
|
// Defaults to 1.
|
||||||
|
RetryAfter int
|
||||||
|
|
||||||
|
// Message overrides the JSON body's "error" field. Defaults to
|
||||||
|
// "Too many requests".
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) Error() string { return "concurrency limit reached" }
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) StatusCode() int { return http.StatusTooManyRequests }
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) Header() http.Header {
|
||||||
|
h := http.Header{}
|
||||||
|
h.Set("Content-Type", "application/json")
|
||||||
|
h.Set("Retry-After", e.retryAfter())
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) Body() []byte {
|
||||||
|
b, _ := json.Marshal(map[string]string{"error": e.message()})
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) retryAfter() string {
|
||||||
|
if e.RetryAfter > 0 {
|
||||||
|
return strconv.Itoa(e.RetryAfter)
|
||||||
|
}
|
||||||
|
return "1"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ConcurrencyLimitError) message() string {
|
||||||
|
if e.Message != "" {
|
||||||
|
return e.Message
|
||||||
|
}
|
||||||
|
return "Too many requests"
|
||||||
|
}
|
||||||
@@ -0,0 +1,217 @@
|
|||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import type { ActivityLogEntry, TokenMetrics } from "./types";
|
||||||
|
|
||||||
|
// Baseline token metrics used across tests.
|
||||||
|
const baseTokens: TokenMetrics = {
|
||||||
|
cache_tokens: 0,
|
||||||
|
input_tokens: 10,
|
||||||
|
output_tokens: 5,
|
||||||
|
prompt_per_second: 100,
|
||||||
|
tokens_per_second: 50,
|
||||||
|
};
|
||||||
|
|
||||||
|
function makeEntry(overrides: Partial<ActivityLogEntry> = {}): ActivityLogEntry {
|
||||||
|
return {
|
||||||
|
id: 0,
|
||||||
|
timestamp: "2024-01-01T00:00:00Z",
|
||||||
|
model: "llama3",
|
||||||
|
req_path: "/v1/chat/completions",
|
||||||
|
resp_content_type: "application/json",
|
||||||
|
resp_status_code: 200,
|
||||||
|
tokens: baseTokens,
|
||||||
|
duration_ms: 100,
|
||||||
|
has_capture: false,
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("ActivityLogEntry", () => {
|
||||||
|
describe("metadata field", () => {
|
||||||
|
it("accepts an entry without metadata (undefined)", () => {
|
||||||
|
const entry = makeEntry();
|
||||||
|
expect(entry.metadata).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts an entry with metadata populated", () => {
|
||||||
|
const entry = makeEntry({ metadata: { client: "web", trace: "abc123" } });
|
||||||
|
expect(entry.metadata).toEqual({ client: "web", trace: "abc123" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts an empty metadata object", () => {
|
||||||
|
const entry = makeEntry({ metadata: {} });
|
||||||
|
expect(entry.metadata).toEqual({});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("allows reading a key from metadata when present", () => {
|
||||||
|
const entry = makeEntry({ metadata: { fifo_priority: "7" } });
|
||||||
|
expect(entry.metadata?.["fifo_priority"]).toBe("7");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns undefined when accessing a missing key via optional chaining", () => {
|
||||||
|
const entry = makeEntry();
|
||||||
|
expect(entry.metadata?.["fifo_priority"]).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns undefined for a missing key when metadata is an empty object", () => {
|
||||||
|
const entry = makeEntry({ metadata: {} });
|
||||||
|
expect(entry.metadata?.["missing"]).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("supports metadata with multiple entries", () => {
|
||||||
|
const meta: Record<string, string> = { a: "1", b: "2", c: "3" };
|
||||||
|
const entry = makeEntry({ metadata: meta });
|
||||||
|
expect(Object.keys(entry.metadata!)).toHaveLength(3);
|
||||||
|
expect(entry.metadata!["b"]).toBe("2");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("ActivityLogEntry structure", () => {
|
||||||
|
it("round-trips through JSON with metadata", () => {
|
||||||
|
const entry = makeEntry({ metadata: { client: "web" } });
|
||||||
|
const json = JSON.stringify(entry);
|
||||||
|
const parsed: ActivityLogEntry = JSON.parse(json);
|
||||||
|
expect(parsed.metadata).toEqual({ client: "web" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("round-trips through JSON without metadata (field omitted)", () => {
|
||||||
|
const entry = makeEntry();
|
||||||
|
const json = JSON.stringify(entry);
|
||||||
|
const parsed: ActivityLogEntry = JSON.parse(json);
|
||||||
|
// When metadata is undefined it is dropped by JSON.stringify.
|
||||||
|
expect(parsed.metadata).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("round-trips through JSON with null metadata preserved as-is", () => {
|
||||||
|
// Explicit null from a server that omits the field still satisfies
|
||||||
|
// the optional type when accessed with optional chaining.
|
||||||
|
const raw = { ...makeEntry(), metadata: null };
|
||||||
|
const json = JSON.stringify(raw);
|
||||||
|
const parsed = JSON.parse(json) as ActivityLogEntry;
|
||||||
|
// null is falsy — optional chaining returns undefined for null too.
|
||||||
|
expect(parsed.metadata ?? undefined).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// META_PREFIX helpers — mirror the logic in Activity.svelte so we can test it
|
||||||
|
// without importing the Svelte component.
|
||||||
|
const META_PREFIX = "meta:";
|
||||||
|
|
||||||
|
function isMetaKey(key: string): boolean {
|
||||||
|
return key.startsWith(META_PREFIX);
|
||||||
|
}
|
||||||
|
|
||||||
|
function metaKey(name: string): string {
|
||||||
|
return META_PREFIX + name;
|
||||||
|
}
|
||||||
|
|
||||||
|
function metaLabel(key: string): string {
|
||||||
|
return key.slice(META_PREFIX.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("Activity.svelte META_PREFIX helpers", () => {
|
||||||
|
describe("isMetaKey", () => {
|
||||||
|
it("returns true for keys with meta: prefix", () => {
|
||||||
|
expect(isMetaKey("meta:fifo_priority")).toBe(true);
|
||||||
|
expect(isMetaKey("meta:client")).toBe(true);
|
||||||
|
expect(isMetaKey("meta:")).toBe(true); // empty suffix is still prefixed
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for standard column keys", () => {
|
||||||
|
expect(isMetaKey("id")).toBe(false);
|
||||||
|
expect(isMetaKey("model")).toBe(false);
|
||||||
|
expect(isMetaKey("capture")).toBe(false);
|
||||||
|
expect(isMetaKey("duration")).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for partial or incorrect prefixes", () => {
|
||||||
|
expect(isMetaKey("meta")).toBe(false);
|
||||||
|
expect(isMetaKey("Meta:key")).toBe(false); // case-sensitive
|
||||||
|
expect(isMetaKey("")).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("metaKey", () => {
|
||||||
|
it("prepends META_PREFIX to the name", () => {
|
||||||
|
expect(metaKey("fifo_priority")).toBe("meta:fifo_priority");
|
||||||
|
expect(metaKey("client")).toBe("meta:client");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles empty name", () => {
|
||||||
|
expect(metaKey("")).toBe("meta:");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("is the inverse of metaLabel", () => {
|
||||||
|
const name = "some_key";
|
||||||
|
expect(metaLabel(metaKey(name))).toBe(name);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("metaLabel", () => {
|
||||||
|
it("strips META_PREFIX and returns the bare name", () => {
|
||||||
|
expect(metaLabel("meta:fifo_priority")).toBe("fifo_priority");
|
||||||
|
expect(metaLabel("meta:client")).toBe("client");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles empty suffix", () => {
|
||||||
|
expect(metaLabel("meta:")).toBe("");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("is the inverse of metaKey", () => {
|
||||||
|
const key = "meta:trace_id";
|
||||||
|
expect(metaKey(metaLabel(key))).toBe(key);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("metadata column derivation", () => {
|
||||||
|
it("derives unique metadata keys from a list of entries", () => {
|
||||||
|
const entries: ActivityLogEntry[] = [
|
||||||
|
makeEntry({ metadata: { client: "web", trace: "a" } }),
|
||||||
|
makeEntry({ metadata: { client: "mobile" } }),
|
||||||
|
makeEntry({ metadata: { fifo_priority: "3" } }),
|
||||||
|
makeEntry({}), // no metadata
|
||||||
|
];
|
||||||
|
|
||||||
|
const keys = Array.from(
|
||||||
|
new Set(entries.flatMap((m) => Object.keys(m.metadata || {})))
|
||||||
|
).sort();
|
||||||
|
|
||||||
|
expect(keys).toEqual(["client", "fifo_priority", "trace"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns empty array when no entries have metadata", () => {
|
||||||
|
const entries: ActivityLogEntry[] = [makeEntry(), makeEntry()];
|
||||||
|
const keys = Array.from(
|
||||||
|
new Set(entries.flatMap((m) => Object.keys(m.metadata || {})))
|
||||||
|
).sort();
|
||||||
|
expect(keys).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps metadata keys to meta:-prefixed column keys", () => {
|
||||||
|
const metaKeys = ["client", "fifo_priority"];
|
||||||
|
const columnKeys = metaKeys.map(metaKey);
|
||||||
|
expect(columnKeys).toEqual(["meta:client", "meta:fifo_priority"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves metadata value for a column key", () => {
|
||||||
|
const entry = makeEntry({ metadata: { fifo_priority: "7", client: "web" } });
|
||||||
|
const key = "meta:fifo_priority";
|
||||||
|
const value = entry.metadata?.[metaLabel(key)] ?? "-";
|
||||||
|
expect(value).toBe("7");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("falls back to '-' for a column key not present in entry metadata", () => {
|
||||||
|
const entry = makeEntry({ metadata: { client: "web" } });
|
||||||
|
const key = "meta:fifo_priority";
|
||||||
|
const value = entry.metadata?.[metaLabel(key)] ?? "-";
|
||||||
|
expect(value).toBe("-");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("falls back to '-' when entry has no metadata at all", () => {
|
||||||
|
const entry = makeEntry(); // metadata is undefined
|
||||||
|
const key = "meta:anything";
|
||||||
|
const value = entry.metadata?.[metaLabel(key)] ?? "-";
|
||||||
|
expect(value).toBe("-");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user