diff --git a/internal/router/scheduler/fifo.go b/internal/router/scheduler/fifo.go index addd3b94..44cd4d7d 100644 --- a/internal/router/scheduler/fifo.go +++ b/internal/router/scheduler/fifo.go @@ -3,6 +3,7 @@ package scheduler import ( "fmt" "sort" + "strconv" "time" "github.com/mostlygeek/llama-swap/internal/config" @@ -278,6 +279,11 @@ func (s *FIFO) grantHandler(req HandlerReq, modelID string) { s.effects.GrantError(req, shared.ConcurrencyLimitError{}) return } + + if err := shared.SetReqData(req.Ctx, "fifo_priority", strconv.Itoa(s.cfg.Priority[req.Model])); err != nil { + s.logger.Debugf("failed to set fifo_priority metadata: %v", err) + } + if s.effects.GrantServe(req, modelID) { s.inFlight[modelID]++ } diff --git a/internal/router/scheduler/fifo_test.go b/internal/router/scheduler/fifo_test.go index 6d177bb2..34b39749 100644 --- a/internal/router/scheduler/fifo_test.go +++ b/internal/router/scheduler/fifo_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "errors" "io" "net/http" @@ -54,8 +55,9 @@ type stopRec struct { // fakeEffects is an in-memory scheduler.Effects. Tests program process states // and GrantServe outcomes, then assert on the recorded calls. type fakeEffects struct { - states map[string]process.ProcessState // model -> state; missing => not handled - serveResult map[string]bool // GrantServe return per model (default true) + states map[string]process.ProcessState // model -> state; missing => not handled + serveResult map[string]bool // GrantServe return per model (default true) + lastServeReq HandlerReq starts []startRec grants []grantRec @@ -98,6 +100,7 @@ func (f *fakeEffects) GrantServe(req HandlerReq, modelID string) bool { if v, set := f.serveResult[modelID]; set { ok = v } + f.lastServeReq = req f.grants = append(f.grants, grantRec{model: modelID, serve: ok}) return ok } @@ -169,6 +172,27 @@ func TestFIFO_FastPath(t *testing.T) { } } +func TestFIFO_GrantSetsPriorityMetadata(t *testing.T) { + eff := newFakeEffects() + eff.states["a"] = process.StateReady + cfg := config.FifoConfig{Priority: map[string]int{"a": 7}} + s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff) + + ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)}) + s.OnRequest(HandlerReq{Model: "a", Ctx: ctx}) + + if got := eff.served("a"); got != 1 { + t.Fatalf("served(a)=%d want 1", got) + } + data, ok := shared.ReadContext(eff.lastServeReq.Ctx) + if !ok { + t.Fatal("context data missing from granted request") + } + if data.Metadata["fifo_priority"] != "7" { + t.Errorf("fifo_priority = %q, want 7", data.Metadata["fifo_priority"]) + } +} + func TestFIFO_ModelNotFound(t *testing.T) { eff := newFakeEffects() // no states => model unknown s := newFIFO(&stubPlanner{}, eff) diff --git a/internal/server/api.go b/internal/server/api.go index ef782033..b9ed8ba3 100644 --- a/internal/server/api.go +++ b/internal/server/api.go @@ -271,7 +271,7 @@ func (s *Server) startPreload() { if err != nil { continue } - req = req.WithContext(shared.SetContext(req.Context(), shared.ReqContextData{Model: modelID, ModelID: modelID})) + req = req.WithContext(shared.SetContext(req.Context(), shared.ReqContextData{Model: modelID, ModelID: modelID, Metadata: make(map[string]string)})) dw := &discardResponseWriter{status: http.StatusOK} s.local.ServeHTTP(dw, req) @@ -338,7 +338,7 @@ func (s *Server) handleUpstream(w http.ResponseWriter, r *http.Request) { // Strip the /upstream/ prefix before forwarding. r.URL.Path = remainingPath // Pin the resolved model so the router skips body/query extraction. - *r = *r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: searchName, ModelID: modelID})) + *r = *r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: searchName, ModelID: modelID, Metadata: make(map[string]string)})) switch { case s.local.Handles(modelID): diff --git a/internal/server/metrics.go b/internal/server/metrics.go index 733f9c1f..8157104b 100644 --- a/internal/server/metrics.go +++ b/internal/server/metrics.go @@ -33,15 +33,16 @@ type TokenMetrics struct { // ActivityLogEntry represents parsed token statistics from llama-server logs. type ActivityLogEntry struct { - ID int `json:"id"` - Timestamp time.Time `json:"timestamp"` - Model string `json:"model"` - ReqPath string `json:"req_path"` - RespContentType string `json:"resp_content_type"` - RespStatusCode int `json:"resp_status_code"` - Tokens TokenMetrics `json:"tokens"` - DurationMs int `json:"duration_ms"` - HasCapture bool `json:"has_capture"` + ID int `json:"id"` + Timestamp time.Time `json:"timestamp"` + Model string `json:"model"` + ReqPath string `json:"req_path"` + RespContentType string `json:"resp_content_type"` + RespStatusCode int `json:"resp_status_code"` + Tokens TokenMetrics `json:"tokens"` + DurationMs int `json:"duration_ms"` + HasCapture bool `json:"has_capture"` + Metadata map[string]string `json:"metadata,omitempty"` } // ActivityLogEvent carries a single activity log entry to event subscribers. @@ -135,6 +136,13 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp DurationMs: int(time.Since(recorder.StartTime()).Milliseconds()), } + if ctxData, ok := shared.ReadContext(r.Context()); ok && len(ctxData.Metadata) > 0 { + tm.Metadata = make(map[string]string, len(ctxData.Metadata)) + for k, v := range ctxData.Metadata { + tm.Metadata[k] = v + } + } + queueAndEmit := func() { tm.ID = mp.queueMetrics(tm) mp.emitMetric(tm) diff --git a/internal/server/metrics_test.go b/internal/server/metrics_test.go index 04412cd5..8f061710 100644 --- a/internal/server/metrics_test.go +++ b/internal/server/metrics_test.go @@ -1,9 +1,13 @@ package server import ( + "net/http" + "net/http/httptest" + "strings" "testing" "time" + "github.com/mostlygeek/llama-swap/internal/shared" "github.com/tidwall/gjson" ) @@ -56,6 +60,33 @@ func TestServer_ProcessStreamingResponse_NoData(t *testing.T) { } } +func TestMetricsMonitor_RecordMetadata(t *testing.T) { + mm := newMetricsMonitor(nil, 10, 0) + r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"usage":{}}`)) + r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{ + ModelID: "m", + Metadata: map[string]string{"client": "web", "trace": "abc"}, + })) + + w := httptest.NewRecorder() + copier := newBodyCopier(w) + copier.WriteHeader(http.StatusOK) + copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`)) + + mm.record("m", r, copier, 0, nil, nil) + + entries := mm.getMetrics() + if len(entries) != 1 { + t.Fatalf("want 1 entry, got %d", len(entries)) + } + if entries[0].Metadata["client"] != "web" { + t.Errorf("client = %q, want web", entries[0].Metadata["client"]) + } + if entries[0].Metadata["trace"] != "abc" { + t.Errorf("trace = %q, want abc", entries[0].Metadata["trace"]) + } +} + func TestServer_ParseMetrics_Infill(t *testing.T) { // /infill responses are arrays; timings live in the last element. body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]` diff --git a/internal/shared/http.go b/internal/shared/http.go index a0cb063b..30609b7c 100644 --- a/internal/shared/http.go +++ b/internal/shared/http.go @@ -26,6 +26,9 @@ type ReqContextData struct { ModelID string Streaming bool SendLoadingState bool + // Metadata is a request-scoped key/value bag that handlers may mutate + // while processing. The metrics middleware copies it into ActivityLogEntry. + Metadata map[string]string } var ( @@ -123,6 +126,25 @@ func ReadContext(ctx context.Context) (ReqContextData, bool) { return data, ok } +// SetReqData attaches a key/value pair to the request context's metadata map. +// The metadata map must already exist in the context's ReqContextData; callers +// should ensure FetchContext has run or initialize the map themselves. +// It returns an error for nil contexts or contexts without request data. +func SetReqData(ctx context.Context, key, value string) error { + if ctx == nil { + return fmt.Errorf("cannot set request metadata on nil context") + } + data, ok := ReadContext(ctx) + if !ok { + return fmt.Errorf("no request context data found") + } + if data.Metadata == nil { + return fmt.Errorf("no metadata map in request context") + } + data.Metadata[key] = value + return nil +} + // extractContext pulls fields from an HTTP request into a ReqContextData, // returning whatever is available. For GET requests it reads query parameters. // For POST requests it inspects Content-Type and parses JSON, @@ -139,6 +161,7 @@ func extractContext(r *http.Request) (ReqContextData, error) { Model: q.Get("model"), Streaming: q.Get("stream") == "true", ApiKey: apiKey, + Metadata: make(map[string]string), }, nil } @@ -157,6 +180,7 @@ func extractContext(r *http.Request) (ReqContextData, error) { Model: gjson.GetBytes(bodyBytes, "model").String(), Streaming: gjson.GetBytes(bodyBytes, "stream").Bool(), ApiKey: apiKey, + Metadata: make(map[string]string), }, nil } @@ -178,6 +202,7 @@ func extractContext(r *http.Request) (ReqContextData, error) { Model: r.FormValue("model"), Streaming: r.FormValue("stream") == "true", ApiKey: apiKey, + Metadata: make(map[string]string), }, nil } diff --git a/internal/shared/http_test.go b/internal/shared/http_test.go index 8bbdd69c..c6a88072 100644 --- a/internal/shared/http_test.go +++ b/internal/shared/http_test.go @@ -387,6 +387,38 @@ func TestExtractContext_ApiKey(t *testing.T) { } } +func TestSetReqData(t *testing.T) { + ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3", Metadata: make(map[string]string)}) + + if err := SetReqData(ctx, "client", "web"); err != nil { + t.Fatalf("SetReqData: %v", err) + } + if err := SetReqData(ctx, "trace", "abc123"); err != nil { + t.Fatalf("SetReqData: %v", err) + } + + data, ok := ReadContext(ctx) + if !ok { + t.Fatal("context data missing") + } + if data.Metadata["client"] != "web" { + t.Errorf("client = %q, want %q", data.Metadata["client"], "web") + } + if data.Metadata["trace"] != "abc123" { + t.Errorf("trace = %q, want %q", data.Metadata["trace"], "abc123") + } +} + +func TestSetReqData_Errors(t *testing.T) { + if err := SetReqData(context.Background(), "k", "v"); err == nil { + t.Error("expected error when no request context data exists") + } + ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3"}) + if err := SetReqData(ctx, "k", "v"); err == nil { + t.Error("expected error when metadata map is missing") + } +} + func TestServer_ExtractAPIKey(t *testing.T) { basicHeader := func(user, pass string) string { return "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass)) diff --git a/ui-svelte/src/components/CaptureDialog.svelte b/ui-svelte/src/components/CaptureDialog.svelte index e4795a1e..17eece53 100644 --- a/ui-svelte/src/components/CaptureDialog.svelte +++ b/ui-svelte/src/components/CaptureDialog.svelte @@ -193,7 +193,7 @@ {#if capture}
diff --git a/ui-svelte/src/components/MetadataTooltip.svelte b/ui-svelte/src/components/MetadataTooltip.svelte new file mode 100644 index 00000000..dc8f0c33 --- /dev/null +++ b/ui-svelte/src/components/MetadataTooltip.svelte @@ -0,0 +1,85 @@ + + + + {@render children()} + + +{#if show && entries.length > 0} +
+ + + {#each entries as [key, value]} + + + + + {/each} + +
{key}{value}
+
+{/if} diff --git a/ui-svelte/src/lib/types.ts b/ui-svelte/src/lib/types.ts index a6156e6c..1fced1cb 100644 --- a/ui-svelte/src/lib/types.ts +++ b/ui-svelte/src/lib/types.ts @@ -41,6 +41,7 @@ export interface ActivityLogEntry { tokens: TokenMetrics; duration_ms: number; has_capture: boolean; + metadata?: Record; } export interface ReqRespCapture { diff --git a/ui-svelte/src/routes/Activity.svelte b/ui-svelte/src/routes/Activity.svelte index 749a3c52..c0974e22 100644 --- a/ui-svelte/src/routes/Activity.svelte +++ b/ui-svelte/src/routes/Activity.svelte @@ -2,25 +2,13 @@ import { metrics, getCapture } from "../stores/api"; import ActivityStats from "../components/ActivityStats.svelte"; import Tooltip from "../components/Tooltip.svelte"; + import MetadataTooltip from "../components/MetadataTooltip.svelte"; import CaptureDialog from "../components/CaptureDialog.svelte"; import { persistentStore } from "../stores/persistent"; import { onMount } from "svelte"; import type { ReqRespCapture } from "../lib/types"; - type ColumnKey = - | "id" - | "time" - | "model" - | "req_path" - | "resp_status_code" - | "resp_content_type" - | "cached" - | "prompt" - | "generated" - | "prompt_speed" - | "gen_speed" - | "duration" - | "capture"; + type ColumnKey = string; interface ColumnDef { key: ColumnKey; @@ -42,17 +30,21 @@ { key: "gen_speed", label: "Gen Speed", defaultVisible: true }, { key: "duration", label: "Duration", defaultVisible: true }, { key: "capture", label: "Capture", defaultVisible: true }, + { key: "meta", label: "Meta", defaultVisible: false }, ]; const defaultVisibleKeys = columns.filter((c) => c.defaultVisible).map((c) => c.key); - const visibleColumns = persistentStore( - "activity-columns", - defaultVisibleKeys + const visibleColumns = persistentStore("activity-columns", defaultVisibleKeys); + const columnOrder = persistentStore( + "activity-column-order", + columns.map((c) => c.key) ); let columnsMenuOpen = $state(false); let dropdownContainer: HTMLDivElement | null = null; + let dragKey: ColumnKey | null = $state(null); + let dragOverKey: ColumnKey | null = $state(null); onMount(() => { function handleKeydown(e: KeyboardEvent) { @@ -84,6 +76,84 @@ } } + function isColumnVisible(key: ColumnKey): boolean { + return $visibleColumns.includes(key); + } + + function handleDragStart(e: DragEvent, key: ColumnKey) { + dragKey = key; + e.dataTransfer?.setData("text/plain", key); + if (e.dataTransfer) { + e.dataTransfer.effectAllowed = "move"; + } + } + + function handleDragOver(e: DragEvent, key: ColumnKey) { + e.preventDefault(); + if (e.dataTransfer) { + e.dataTransfer.dropEffect = "move"; + } + dragOverKey = key; + } + + function handleDrop(e: DragEvent, targetKey: ColumnKey) { + e.preventDefault(); + if (!dragKey || dragKey === targetKey) return; + const order = [...$columnOrder]; + const fromIndex = order.indexOf(dragKey); + let toIndex = order.indexOf(targetKey); + if (fromIndex === -1 || toIndex === -1) return; + order.splice(fromIndex, 1); + if (fromIndex < toIndex) { + toIndex -= 1; + } + order.splice(toIndex, 0, dragKey); + columnOrder.set(order); + } + + function handleDragEnd() { + dragKey = null; + dragOverKey = null; + } + + let orderedColumns = $derived( + columns.slice().sort((a, b) => { + const aIndex = $columnOrder.indexOf(a.key); + const bIndex = $columnOrder.indexOf(b.key); + if (aIndex === -1 && bIndex === -1) return 0; + if (aIndex === -1) return 1; + if (bIndex === -1) return -1; + return aIndex - bIndex; + }) + ); + + let activeVisibleColumns = $derived( + columns + .filter((c) => isColumnVisible(c.key)) + .sort((a, b) => { + const aIndex = $columnOrder.indexOf(a.key); + const bIndex = $columnOrder.indexOf(b.key); + if (aIndex === -1 && bIndex === -1) return 0; + if (aIndex === -1) return 1; + if (bIndex === -1) return -1; + return aIndex - bIndex; + }) + .map((c) => c.key) + ); + + let columnLabelMap = $derived(Object.fromEntries(columns.map((c) => [c.key, c.label]))); + + $effect(() => { + const staticKeys = new Set(columns.map((c) => c.key)); + const order = $columnOrder; + const hasStale = order.some((k) => !staticKeys.has(k)); + const missing = columns.filter((c) => !order.includes(c.key)).map((c) => c.key); + if (hasStale || missing.length > 0) { + const cleaned = order.filter((k) => staticKeys.has(k)); + columnOrder.set([...cleaned, ...missing]); + } + }); + function formatSpeed(speed: number): string { return speed < 0 ? "unknown" : speed.toFixed(2) + " t/s"; } @@ -157,22 +227,37 @@ {#if columnsMenuOpen} -
-
+
+ - {#each columns as col (col.key)} - + handleDragStart(e, key)} + ondragend={handleDragEnd} + >⋮⋮ + +
{/each}
{/if} @@ -182,112 +267,80 @@ - {#if $visibleColumns.includes("id")} - - {/if} - {#if $visibleColumns.includes("time")} - - {/if} - {#if $visibleColumns.includes("model")} - - {/if} - {#if $visibleColumns.includes("req_path")} - - {/if} - {#if $visibleColumns.includes("resp_status_code")} - - {/if} - {#if $visibleColumns.includes("resp_content_type")} - - {/if} - {#if $visibleColumns.includes("cached")} + {#each activeVisibleColumns as key (key)} - {/if} - {#if $visibleColumns.includes("prompt")} - - {/if} - {#if $visibleColumns.includes("generated")} - - {/if} - {#if $visibleColumns.includes("prompt_speed")} - - {/if} - {#if $visibleColumns.includes("gen_speed")} - - {/if} - {#if $visibleColumns.includes("duration")} - - {/if} - {#if $visibleColumns.includes("capture")} - - {/if} + {/each} {#if sortedMetrics.length === 0} - {:else} {#each sortedMetrics as metric (metric.id)} - {#if $visibleColumns.includes("id")} - - {/if} - {#if $visibleColumns.includes("time")} - - {/if} - {#if $visibleColumns.includes("model")} - - {/if} - {#if $visibleColumns.includes("req_path")} - - {/if} - {#if $visibleColumns.includes("resp_status_code")} - - {/if} - {#if $visibleColumns.includes("resp_content_type")} - - {/if} - {#if $visibleColumns.includes("cached")} - - {/if} - {#if $visibleColumns.includes("prompt")} - - {/if} - {#if $visibleColumns.includes("generated")} - - {/if} - {#if $visibleColumns.includes("prompt_speed")} - - {/if} - {#if $visibleColumns.includes("gen_speed")} - - {/if} - {#if $visibleColumns.includes("duration")} - - {/if} - {#if $visibleColumns.includes("capture")} + {#each activeVisibleColumns as key (key)} - {/if} + {/each} {/each} {/if}
IDTimeModelPathStatusContent-Type - Cached + {#if key === "cached"} + Cached + {:else if key === "prompt"} + Prompt + {:else} + {columnLabelMap[key] ?? key} + {/if} - Prompt - GeneratedPrompt SpeedGen SpeedDurationCapture
+ No activity recorded
{metric.id + 1}{formatRelativeTime(metric.timestamp)}{metric.model}{metric.req_path || "-"}{metric.resp_status_code || "-"}{metric.resp_content_type || "-"}{metric.tokens.cache_tokens > 0 ? metric.tokens.cache_tokens.toLocaleString() : "-"}{metric.tokens.input_tokens.toLocaleString()}{metric.tokens.output_tokens.toLocaleString()}{formatSpeed(metric.tokens.prompt_per_second)}{formatSpeed(metric.tokens.tokens_per_second)}{formatDuration(metric.duration_ms)} - {#if metric.has_capture} - + {#if key === "id"} + {metric.id + 1} + {:else if key === "time"} + {formatRelativeTime(metric.timestamp)} + {:else if key === "model"} + {metric.model} + {:else if key === "req_path"} + {metric.req_path || "-"} + {:else if key === "resp_status_code"} + {metric.resp_status_code || "-"} + {:else if key === "resp_content_type"} + {metric.resp_content_type || "-"} + {:else if key === "cached"} + {metric.tokens.cache_tokens > 0 ? metric.tokens.cache_tokens.toLocaleString() : "-"} + {:else if key === "prompt"} + {metric.tokens.input_tokens.toLocaleString()} + {:else if key === "generated"} + {metric.tokens.output_tokens.toLocaleString()} + {:else if key === "prompt_speed"} + {formatSpeed(metric.tokens.prompt_per_second)} + {:else if key === "gen_speed"} + {formatSpeed(metric.tokens.tokens_per_second)} + {:else if key === "duration"} + {formatDuration(metric.duration_ms)} + {:else if key === "capture"} + {#if metric.has_capture} + + {:else} + - + {/if} + {:else if key === "meta"} + {#if Object.keys(metric.metadata || {}).length > 0} + + ... + + {:else} + - + {/if} {:else} - - + - {/if}