internal/server,shared: support request metadata (#850)
- add support for http handlers in the request chain to append metadata to the request - metrics middleware will include metadata in the activity log - update Activity UI to support metadata, drag sort columns - update Activity UI capture dialog to use more screen space Updates #834
This commit is contained in:
@@ -3,6 +3,7 @@ package scheduler
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/mostlygeek/llama-swap/internal/config"
|
||||
@@ -278,6 +279,11 @@ func (s *FIFO) grantHandler(req HandlerReq, modelID string) {
|
||||
s.effects.GrantError(req, shared.ConcurrencyLimitError{})
|
||||
return
|
||||
}
|
||||
|
||||
if err := shared.SetReqData(req.Ctx, "fifo_priority", strconv.Itoa(s.cfg.Priority[req.Model])); err != nil {
|
||||
s.logger.Debugf("failed to set fifo_priority metadata: %v", err)
|
||||
}
|
||||
|
||||
if s.effects.GrantServe(req, modelID) {
|
||||
s.inFlight[modelID]++
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -54,8 +55,9 @@ type stopRec struct {
|
||||
// fakeEffects is an in-memory scheduler.Effects. Tests program process states
|
||||
// and GrantServe outcomes, then assert on the recorded calls.
|
||||
type fakeEffects struct {
|
||||
states map[string]process.ProcessState // model -> state; missing => not handled
|
||||
serveResult map[string]bool // GrantServe return per model (default true)
|
||||
states map[string]process.ProcessState // model -> state; missing => not handled
|
||||
serveResult map[string]bool // GrantServe return per model (default true)
|
||||
lastServeReq HandlerReq
|
||||
|
||||
starts []startRec
|
||||
grants []grantRec
|
||||
@@ -98,6 +100,7 @@ func (f *fakeEffects) GrantServe(req HandlerReq, modelID string) bool {
|
||||
if v, set := f.serveResult[modelID]; set {
|
||||
ok = v
|
||||
}
|
||||
f.lastServeReq = req
|
||||
f.grants = append(f.grants, grantRec{model: modelID, serve: ok})
|
||||
return ok
|
||||
}
|
||||
@@ -169,6 +172,27 @@ func TestFIFO_FastPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFO_GrantSetsPriorityMetadata(t *testing.T) {
|
||||
eff := newFakeEffects()
|
||||
eff.states["a"] = process.StateReady
|
||||
cfg := config.FifoConfig{Priority: map[string]int{"a": 7}}
|
||||
s := NewFIFO("test", logmon.NewWriter(io.Discard), &stubPlanner{}, cfg, nil, eff)
|
||||
|
||||
ctx := shared.SetContext(context.Background(), shared.ReqContextData{ModelID: "a", Metadata: make(map[string]string)})
|
||||
s.OnRequest(HandlerReq{Model: "a", Ctx: ctx})
|
||||
|
||||
if got := eff.served("a"); got != 1 {
|
||||
t.Fatalf("served(a)=%d want 1", got)
|
||||
}
|
||||
data, ok := shared.ReadContext(eff.lastServeReq.Ctx)
|
||||
if !ok {
|
||||
t.Fatal("context data missing from granted request")
|
||||
}
|
||||
if data.Metadata["fifo_priority"] != "7" {
|
||||
t.Errorf("fifo_priority = %q, want 7", data.Metadata["fifo_priority"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFO_ModelNotFound(t *testing.T) {
|
||||
eff := newFakeEffects() // no states => model unknown
|
||||
s := newFIFO(&stubPlanner{}, eff)
|
||||
|
||||
@@ -271,7 +271,7 @@ func (s *Server) startPreload() {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
req = req.WithContext(shared.SetContext(req.Context(), shared.ReqContextData{Model: modelID, ModelID: modelID}))
|
||||
req = req.WithContext(shared.SetContext(req.Context(), shared.ReqContextData{Model: modelID, ModelID: modelID, Metadata: make(map[string]string)}))
|
||||
|
||||
dw := &discardResponseWriter{status: http.StatusOK}
|
||||
s.local.ServeHTTP(dw, req)
|
||||
@@ -338,7 +338,7 @@ func (s *Server) handleUpstream(w http.ResponseWriter, r *http.Request) {
|
||||
// Strip the /upstream/<model> prefix before forwarding.
|
||||
r.URL.Path = remainingPath
|
||||
// Pin the resolved model so the router skips body/query extraction.
|
||||
*r = *r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: searchName, ModelID: modelID}))
|
||||
*r = *r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{Model: searchName, ModelID: modelID, Metadata: make(map[string]string)}))
|
||||
|
||||
switch {
|
||||
case s.local.Handles(modelID):
|
||||
|
||||
@@ -33,15 +33,16 @@ type TokenMetrics struct {
|
||||
|
||||
// ActivityLogEntry represents parsed token statistics from llama-server logs.
|
||||
type ActivityLogEntry struct {
|
||||
ID int `json:"id"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Model string `json:"model"`
|
||||
ReqPath string `json:"req_path"`
|
||||
RespContentType string `json:"resp_content_type"`
|
||||
RespStatusCode int `json:"resp_status_code"`
|
||||
Tokens TokenMetrics `json:"tokens"`
|
||||
DurationMs int `json:"duration_ms"`
|
||||
HasCapture bool `json:"has_capture"`
|
||||
ID int `json:"id"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Model string `json:"model"`
|
||||
ReqPath string `json:"req_path"`
|
||||
RespContentType string `json:"resp_content_type"`
|
||||
RespStatusCode int `json:"resp_status_code"`
|
||||
Tokens TokenMetrics `json:"tokens"`
|
||||
DurationMs int `json:"duration_ms"`
|
||||
HasCapture bool `json:"has_capture"`
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
// ActivityLogEvent carries a single activity log entry to event subscribers.
|
||||
@@ -135,6 +136,13 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp
|
||||
DurationMs: int(time.Since(recorder.StartTime()).Milliseconds()),
|
||||
}
|
||||
|
||||
if ctxData, ok := shared.ReadContext(r.Context()); ok && len(ctxData.Metadata) > 0 {
|
||||
tm.Metadata = make(map[string]string, len(ctxData.Metadata))
|
||||
for k, v := range ctxData.Metadata {
|
||||
tm.Metadata[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
queueAndEmit := func() {
|
||||
tm.ID = mp.queueMetrics(tm)
|
||||
mp.emitMetric(tm)
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mostlygeek/llama-swap/internal/shared"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
@@ -56,6 +60,33 @@ func TestServer_ProcessStreamingResponse_NoData(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsMonitor_RecordMetadata(t *testing.T) {
|
||||
mm := newMetricsMonitor(nil, 10, 0)
|
||||
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(`{"usage":{}}`))
|
||||
r = r.WithContext(shared.SetContext(r.Context(), shared.ReqContextData{
|
||||
ModelID: "m",
|
||||
Metadata: map[string]string{"client": "web", "trace": "abc"},
|
||||
}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
copier := newBodyCopier(w)
|
||||
copier.WriteHeader(http.StatusOK)
|
||||
copier.Write([]byte(`{"usage":{"prompt_tokens":1,"completion_tokens":2}}`))
|
||||
|
||||
mm.record("m", r, copier, 0, nil, nil)
|
||||
|
||||
entries := mm.getMetrics()
|
||||
if len(entries) != 1 {
|
||||
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||
}
|
||||
if entries[0].Metadata["client"] != "web" {
|
||||
t.Errorf("client = %q, want web", entries[0].Metadata["client"])
|
||||
}
|
||||
if entries[0].Metadata["trace"] != "abc" {
|
||||
t.Errorf("trace = %q, want abc", entries[0].Metadata["trace"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_ParseMetrics_Infill(t *testing.T) {
|
||||
// /infill responses are arrays; timings live in the last element.
|
||||
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
|
||||
|
||||
@@ -26,6 +26,9 @@ type ReqContextData struct {
|
||||
ModelID string
|
||||
Streaming bool
|
||||
SendLoadingState bool
|
||||
// Metadata is a request-scoped key/value bag that handlers may mutate
|
||||
// while processing. The metrics middleware copies it into ActivityLogEntry.
|
||||
Metadata map[string]string
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -123,6 +126,25 @@ func ReadContext(ctx context.Context) (ReqContextData, bool) {
|
||||
return data, ok
|
||||
}
|
||||
|
||||
// SetReqData attaches a key/value pair to the request context's metadata map.
|
||||
// The metadata map must already exist in the context's ReqContextData; callers
|
||||
// should ensure FetchContext has run or initialize the map themselves.
|
||||
// It returns an error for nil contexts or contexts without request data.
|
||||
func SetReqData(ctx context.Context, key, value string) error {
|
||||
if ctx == nil {
|
||||
return fmt.Errorf("cannot set request metadata on nil context")
|
||||
}
|
||||
data, ok := ReadContext(ctx)
|
||||
if !ok {
|
||||
return fmt.Errorf("no request context data found")
|
||||
}
|
||||
if data.Metadata == nil {
|
||||
return fmt.Errorf("no metadata map in request context")
|
||||
}
|
||||
data.Metadata[key] = value
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractContext pulls fields from an HTTP request into a ReqContextData,
|
||||
// returning whatever is available. For GET requests it reads query parameters.
|
||||
// For POST requests it inspects Content-Type and parses JSON,
|
||||
@@ -139,6 +161,7 @@ func extractContext(r *http.Request) (ReqContextData, error) {
|
||||
Model: q.Get("model"),
|
||||
Streaming: q.Get("stream") == "true",
|
||||
ApiKey: apiKey,
|
||||
Metadata: make(map[string]string),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -157,6 +180,7 @@ func extractContext(r *http.Request) (ReqContextData, error) {
|
||||
Model: gjson.GetBytes(bodyBytes, "model").String(),
|
||||
Streaming: gjson.GetBytes(bodyBytes, "stream").Bool(),
|
||||
ApiKey: apiKey,
|
||||
Metadata: make(map[string]string),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -178,6 +202,7 @@ func extractContext(r *http.Request) (ReqContextData, error) {
|
||||
Model: r.FormValue("model"),
|
||||
Streaming: r.FormValue("stream") == "true",
|
||||
ApiKey: apiKey,
|
||||
Metadata: make(map[string]string),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -387,6 +387,38 @@ func TestExtractContext_ApiKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetReqData(t *testing.T) {
|
||||
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3", Metadata: make(map[string]string)})
|
||||
|
||||
if err := SetReqData(ctx, "client", "web"); err != nil {
|
||||
t.Fatalf("SetReqData: %v", err)
|
||||
}
|
||||
if err := SetReqData(ctx, "trace", "abc123"); err != nil {
|
||||
t.Fatalf("SetReqData: %v", err)
|
||||
}
|
||||
|
||||
data, ok := ReadContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("context data missing")
|
||||
}
|
||||
if data.Metadata["client"] != "web" {
|
||||
t.Errorf("client = %q, want %q", data.Metadata["client"], "web")
|
||||
}
|
||||
if data.Metadata["trace"] != "abc123" {
|
||||
t.Errorf("trace = %q, want %q", data.Metadata["trace"], "abc123")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetReqData_Errors(t *testing.T) {
|
||||
if err := SetReqData(context.Background(), "k", "v"); err == nil {
|
||||
t.Error("expected error when no request context data exists")
|
||||
}
|
||||
ctx := SetContext(context.Background(), ReqContextData{Model: "llama3", ModelID: "llama3"})
|
||||
if err := SetReqData(ctx, "k", "v"); err == nil {
|
||||
t.Error("expected error when metadata map is missing")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_ExtractAPIKey(t *testing.T) {
|
||||
basicHeader := func(user, pass string) string {
|
||||
return "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))
|
||||
|
||||
Reference in New Issue
Block a user