proxy: meter /upstream requests via metrics middleware (#858)

Wrap /upstream/{upstreamPath...} in the metrics middleware so activity
log entries are recorded for model-dispatched endpoints accessed through
the upstream passthrough.

- Move findModelInPath to shared.FindModelInPath and reuse it in
handleUpstream, the log monitor lookup, and FetchContext.
- Extend FetchContext to resolve the model from /upstream/<model>/...
paths without consuming the request body.
- Add isMetricsRecordPath to limit recording to the model-dispatched
endpoints that produce token usage/timings.
- Add tests for upstream metrics recording and FetchContext upstream
path resolution.

Fixes #855
This commit is contained in:
Benson Wong
2026-06-17 17:38:52 -07:00
committed by GitHub
parent 0ab214d1c8
commit a15e47922c
8 changed files with 303 additions and 38 deletions
+82 -2
View File
@@ -2,11 +2,15 @@ package server
import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/mostlygeek/llama-swap/internal/config"
"github.com/mostlygeek/llama-swap/internal/logmon"
"github.com/mostlygeek/llama-swap/internal/shared"
)
func TestServer_HandleListModels(t *testing.T) {
@@ -78,6 +82,7 @@ func TestServer_HandleListModels_Aliases(t *testing.T) {
func TestServer_FindModelInPath(t *testing.T) {
cfg := config.Config{Models: map[string]config.ModelConfig{
"author": {},
"author/model": {},
"simple": {},
}}
@@ -91,13 +96,14 @@ func TestServer_FindModelInPath(t *testing.T) {
{"/simple/v1/chat", "simple", "/v1/chat", true},
{"/author/model/v1/chat", "author/model", "/v1/chat", true},
{"/author/model", "author/model", "/", true},
{"/author/v1/chat", "author", "/v1/chat", true},
{"/missing/v1", "", "", false},
{"/", "", "", false},
}
for _, c := range cases {
name, _, rem, found := findModelInPath(cfg, c.path)
name, _, rem, found := shared.FindModelInPath(cfg, c.path)
if found != c.wantFound || name != c.wantName || (found && rem != c.wantRem) {
t.Errorf("findModelInPath(%q) = (%q,%q,%v), want (%q,%q,%v)",
t.Errorf("FindModelInPath(%q) = (%q,%q,%v), want (%q,%q,%v)",
c.path, name, rem, found, c.wantName, c.wantRem, c.wantFound)
}
}
@@ -133,6 +139,80 @@ func TestServer_HandleUpstream(t *testing.T) {
})
}
func upstreamMetricsServer(response string) *Server {
cfg := config.Config{Models: map[string]config.ModelConfig{"m1": {}}}
proxylog := logmon.NewWriter(io.Discard)
s := &Server{
cfg: cfg,
muxlog: logmon.NewWriter(io.Discard),
proxylog: proxylog,
upstreamlog: logmon.NewWriter(io.Discard),
inflight: &inflightCounter{},
metrics: newMetricsMonitor(proxylog, 10, 0),
local: newStubRouter([]string{"m1"}, response),
peer: newStubRouter(nil, ""),
}
s.routes()
return s
}
func TestServer_HandleUpstream_MetricsRecordsSupportedPath(t *testing.T) {
resp := `{"usage":{"prompt_tokens":3,"completion_tokens":5}}`
s := upstreamMetricsServer(resp)
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/upstream/m1/v1/chat/completions", strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
s.ServeHTTP(w, req)
if w.Code != http.StatusOK || w.Body.String() != resp {
t.Fatalf("status=%d body=%q", w.Code, w.Body.String())
}
entries := s.metrics.getMetrics()
if len(entries) != 1 {
t.Fatalf("want 1 metrics entry, got %d", len(entries))
}
if entries[0].Model != "m1" {
t.Errorf("model = %q, want m1", entries[0].Model)
}
if entries[0].ReqPath != "/v1/chat/completions" {
t.Errorf("req_path = %q, want /v1/chat/completions", entries[0].ReqPath)
}
if entries[0].Tokens.InputTokens != 3 || entries[0].Tokens.OutputTokens != 5 {
t.Errorf("tokens = %+v, want input=3 output=5", entries[0].Tokens)
}
}
func TestServer_HandleUpstream_MetricsSkipsUnsupportedPath(t *testing.T) {
s := upstreamMetricsServer("ok")
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/upstream/m1/probe", strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
s.ServeHTTP(w, req)
if w.Code != http.StatusOK || w.Body.String() != "ok" {
t.Fatalf("status=%d body=%q", w.Code, w.Body.String())
}
if len(s.metrics.getMetrics()) != 0 {
t.Errorf("want no metrics entries for unsupported path, got %d", len(s.metrics.getMetrics()))
}
}
func TestServer_HandleUpstream_MetricsSkipsGET(t *testing.T) {
s := upstreamMetricsServer(`{"usage":{}}`)
w := httptest.NewRecorder()
s.ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/upstream/m1/v1/chat/completions", nil))
if w.Code != http.StatusOK {
t.Fatalf("status=%d", w.Code)
}
if len(s.metrics.getMetrics()) != 0 {
t.Errorf("want no metrics entries for GET upstream, got %d", len(s.metrics.getMetrics()))
}
}
func TestServer_HandleMetrics_Unavailable(t *testing.T) {
s := newTestServer(newStubRouter(nil, ""), newStubRouter(nil, ""))