diff --git a/config-schema.json b/config-schema.json index 8baa0cc4..3de95fc2 100644 --- a/config-schema.json +++ b/config-schema.json @@ -87,6 +87,12 @@ "default": 1000, "description": "Maximum number of metrics to keep in memory. Controls how many metrics are stored before older ones are discarded." }, + "captureBuffer": { + "type": "integer", + "minimum": 0, + "default": 5, + "description": "Size in megabytes of the buffer for storing request/response captures. Set to 0 to disable captures." + }, "startPort": { "type": "integer", "default": 5800, diff --git a/config.example.yaml b/config.example.yaml index d8282fc1..fc1fee88 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -50,6 +50,11 @@ logToStdout: "proxy" # - useful for limiting memory usage when processing large volumes of metrics metricsMaxInMemory: 1000 +# captureBuffer: how many MBs to allocate for storing request/response captures +# - optional, default: 10 +# - set to 0 to disable +captureBuffer: 15 + # startPort: sets the starting port number for the automatic ${PORT} macro. # - optional, default: 5800 # - the ${PORT} macro can be used in model.cmd and model.proxy settings diff --git a/proxy/config/config.go b/proxy/config/config.go index 945a9d4b..ed74110c 100644 --- a/proxy/config/config.go +++ b/proxy/config/config.go @@ -123,6 +123,7 @@ type Config struct { LogTimeFormat string `yaml:"logTimeFormat"` LogToStdout string `yaml:"logToStdout"` MetricsMaxInMemory int `yaml:"metricsMaxInMemory"` + CaptureBuffer int `yaml:"captureBuffer"` Models map[string]ModelConfig `yaml:"models"` /* key is model ID */ Profiles map[string][]string `yaml:"profiles"` Groups map[string]GroupConfig `yaml:"groups"` /* key is group ID */ @@ -201,6 +202,7 @@ func LoadConfigFromReader(r io.Reader) (Config, error) { LogTimeFormat: "", LogToStdout: LogToStdoutProxy, MetricsMaxInMemory: 1000, + CaptureBuffer: 5, } if err = yaml.Unmarshal([]byte(yamlStr), &config); err != nil { return Config{}, err diff --git a/proxy/config/config_posix_test.go b/proxy/config/config_posix_test.go index 6a2b02f3..b177aa91 100644 --- a/proxy/config/config_posix_test.go +++ b/proxy/config/config_posix_test.go @@ -215,6 +215,7 @@ groups: }, HealthCheckTimeout: 15, MetricsMaxInMemory: 1000, + CaptureBuffer: 5, Profiles: map[string][]string{ "test": {"model1", "model2"}, }, diff --git a/proxy/config/config_windows_test.go b/proxy/config/config_windows_test.go index 1b674f61..d0a2e848 100644 --- a/proxy/config/config_windows_test.go +++ b/proxy/config/config_windows_test.go @@ -204,6 +204,7 @@ groups: }, HealthCheckTimeout: 15, MetricsMaxInMemory: 1000, + CaptureBuffer: 5, Profiles: map[string][]string{ "test": {"model1", "model2"}, }, diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go index a3b07de2..655d8b1a 100644 --- a/proxy/metrics_monitor.go +++ b/proxy/metrics_monitor.go @@ -28,6 +28,28 @@ type TokenMetrics struct { PromptPerSecond float64 `json:"prompt_per_second"` TokensPerSecond float64 `json:"tokens_per_second"` DurationMs int `json:"duration_ms"` + HasCapture bool `json:"has_capture"` +} + +type ReqRespCapture struct { + ID int `json:"id"` + ReqPath string `json:"req_path"` + ReqHeaders map[string]string `json:"req_headers"` + ReqBody []byte `json:"req_body"` + RespHeaders map[string]string `json:"resp_headers"` + RespBody []byte `json:"resp_body"` +} + +// Size returns the approximate memory usage of this capture in bytes +func (c *ReqRespCapture) Size() int { + size := len(c.ReqPath) + len(c.ReqBody) + len(c.RespBody) + for k, v := range c.ReqHeaders { + size += len(k) + len(v) + } + for k, v := range c.RespHeaders { + size += len(k) + len(v) + } + return size } // TokenMetricsEvent represents a token metrics event @@ -46,19 +68,32 @@ type metricsMonitor struct { maxMetrics int nextID int logger *LogMonitor + + // capture fields + enableCaptures bool + captures map[int]ReqRespCapture // map for O(1) lookup by ID + captureOrder []int // track insertion order for FIFO eviction + captureSize int // current total size in bytes + maxCaptureSize int // max bytes for captures } -func newMetricsMonitor(logger *LogMonitor, maxMetrics int) *metricsMonitor { - mp := &metricsMonitor{ - logger: logger, - maxMetrics: maxMetrics, +// newMetricsMonitor creates a new metricsMonitor. captureBufferMB is the +// capture buffer size in megabytes; 0 disables captures. +func newMetricsMonitor(logger *LogMonitor, maxMetrics int, captureBufferMB int) *metricsMonitor { + return &metricsMonitor{ + logger: logger, + maxMetrics: maxMetrics, + enableCaptures: captureBufferMB > 0, + captures: make(map[int]ReqRespCapture), + captureOrder: make([]int, 0), + captureSize: 0, + maxCaptureSize: captureBufferMB * 1024 * 1024, } - - return mp } -// addMetrics adds a new metric to the collection and publishes an event -func (mp *metricsMonitor) addMetrics(metric TokenMetrics) { +// addMetrics adds a new metric to the collection and publishes an event. +// Returns the assigned metric ID. +func (mp *metricsMonitor) addMetrics(metric TokenMetrics) int { mp.mu.Lock() defer mp.mu.Unlock() @@ -69,6 +104,49 @@ func (mp *metricsMonitor) addMetrics(metric TokenMetrics) { mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:] } event.Emit(TokenMetricsEvent{Metrics: metric}) + return metric.ID +} + +// addCapture adds a new capture to the buffer with size-based eviction. +// Captures are skipped if enableCaptures is false or if capture exceeds maxCaptureSize. +func (mp *metricsMonitor) addCapture(capture ReqRespCapture) { + if !mp.enableCaptures { + return + } + + mp.mu.Lock() + defer mp.mu.Unlock() + + captureSize := capture.Size() + if captureSize > mp.maxCaptureSize { + mp.logger.Warnf("capture size %d exceeds max %d, skipping", captureSize, mp.maxCaptureSize) + return + } + + // Evict oldest (FIFO) until room available + for mp.captureSize+captureSize > mp.maxCaptureSize && len(mp.captureOrder) > 0 { + oldestID := mp.captureOrder[0] + mp.captureOrder = mp.captureOrder[1:] + if evicted, exists := mp.captures[oldestID]; exists { + mp.captureSize -= evicted.Size() + delete(mp.captures, oldestID) + } + } + + mp.captures[capture.ID] = capture + mp.captureOrder = append(mp.captureOrder, capture.ID) + mp.captureSize += captureSize +} + +// getCaptureByID returns a capture by its ID, or nil if not found. +func (mp *metricsMonitor) getCaptureByID(id int) *ReqRespCapture { + mp.mu.RLock() + defer mp.mu.RUnlock() + + if capture, exists := mp.captures[id]; exists { + return &capture + } + return nil } // getMetrics returns a copy of the current metrics @@ -97,6 +175,28 @@ func (mp *metricsMonitor) wrapHandler( request *http.Request, next func(modelID string, w http.ResponseWriter, r *http.Request) error, ) error { + // Capture request body and headers if captures enabled + var reqBody []byte + var reqHeaders map[string]string + if mp.enableCaptures { + if request.Body != nil { + var err error + reqBody, err = io.ReadAll(request.Body) + if err != nil { + return fmt.Errorf("failed to read request body for capture: %w", err) + } + request.Body.Close() + request.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + } + reqHeaders = make(map[string]string) + for key, values := range request.Header { + if len(values) > 0 { + reqHeaders[key] = values[0] + } + } + redactHeaders(reqHeaders) + } + recorder := newBodyCopier(writer) // Filter Accept-Encoding to only include encodings we can decompress for metrics @@ -165,7 +265,38 @@ func (mp *metricsMonitor) wrapHandler( } } - mp.addMetrics(tm) + // Build capture if enabled and determine if it will be stored + var capture *ReqRespCapture + if mp.enableCaptures { + respHeaders := make(map[string]string) + for key, values := range recorder.Header() { + if len(values) > 0 { + respHeaders[key] = values[0] + } + } + redactHeaders(respHeaders) + delete(respHeaders, "Content-Encoding") + capture = &ReqRespCapture{ + ReqPath: request.URL.Path, + ReqHeaders: reqHeaders, + ReqBody: reqBody, + RespHeaders: respHeaders, + RespBody: body, + } + // Only set HasCapture if the capture will actually be stored (not too large) + if capture.Size() <= mp.maxCaptureSize { + tm.HasCapture = true + } + } + + metricID := mp.addMetrics(tm) + + // Store capture if enabled + if capture != nil { + capture.ID = metricID + mp.addCapture(*capture) + } + return nil } @@ -336,6 +467,24 @@ func (w *responseBodyCopier) StartTime() time.Time { return w.start } +// sensitiveHeaders lists headers that should be redacted in captures +var sensitiveHeaders = map[string]bool{ + "authorization": true, + "proxy-authorization": true, + "cookie": true, + "set-cookie": true, + "x-api-key": true, +} + +// redactHeaders replaces sensitive header values in-place with "[REDACTED]" +func redactHeaders(headers map[string]string) { + for key := range headers { + if sensitiveHeaders[strings.ToLower(key)] { + headers[key] = "[REDACTED]" + } + } +} + // filterAcceptEncoding filters the Accept-Encoding header to only include // encodings we can decompress (gzip, deflate). This respects the client's // preferences while ensuring we can parse response bodies for metrics. diff --git a/proxy/metrics_monitor_test.go b/proxy/metrics_monitor_test.go index b68cf191..079dba90 100644 --- a/proxy/metrics_monitor_test.go +++ b/proxy/metrics_monitor_test.go @@ -18,7 +18,7 @@ import ( func TestMetricsMonitor_AddMetrics(t *testing.T) { t.Run("adds metrics and assigns ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) metric := TokenMetrics{ Model: "test-model", @@ -37,7 +37,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("increments ID for each metric", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) for i := 0; i < 5; i++ { mm.addMetrics(TokenMetrics{Model: "model"}) @@ -51,7 +51,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("respects max metrics limit", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 3) + mm := newMetricsMonitor(testLogger, 3, 0) // Add 5 metrics for i := 0; i < 5; i++ { @@ -71,7 +71,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("emits TokenMetricsEvent", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) receivedEvent := make(chan TokenMetricsEvent, 1) cancel := event.On(func(e TokenMetricsEvent) { @@ -101,14 +101,14 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { func TestMetricsMonitor_GetMetrics(t *testing.T) { t.Run("returns empty slice when no metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) metrics := mm.getMetrics() assert.NotNil(t, metrics) assert.Equal(t, 0, len(metrics)) }) t.Run("returns copy of metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) mm.addMetrics(TokenMetrics{Model: "model1"}) mm.addMetrics(TokenMetrics{Model: "model2"}) @@ -128,7 +128,7 @@ func TestMetricsMonitor_GetMetrics(t *testing.T) { func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { t.Run("returns valid JSON for empty metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) jsonData, err := mm.getMetricsJSON() assert.NoError(t, err) assert.NotNil(t, jsonData) @@ -140,7 +140,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { }) t.Run("returns valid JSON with metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) mm.addMetrics(TokenMetrics{ Model: "model1", InputTokens: 100, @@ -168,7 +168,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { func TestMetricsMonitor_WrapHandler(t *testing.T) { t.Run("successful non-streaming request with usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{ "usage": { @@ -199,7 +199,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("successful request with timings data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{ "timings": { @@ -239,7 +239,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("streaming request with SSE format", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) // Note: SSE format requires proper line breaks - each data line followed by blank line responseBody := `data: {"choices":[{"text":"Hello"}]} @@ -275,7 +275,7 @@ data: [DONE] }) t.Run("non-OK status code does not record metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusBadRequest) @@ -295,7 +295,7 @@ data: [DONE] }) t.Run("empty response body records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusOK) @@ -317,7 +317,7 @@ data: [DONE] }) t.Run("invalid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.Header().Set("Content-Type", "application/json") @@ -341,7 +341,7 @@ data: [DONE] }) t.Run("next handler error is propagated", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) expectedErr := assert.AnError nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { @@ -360,7 +360,7 @@ data: [DONE] }) t.Run("response without usage or timings records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{"result": "ok"}` @@ -437,7 +437,7 @@ func TestMetricsMonitor_ResponseBodyCopier(t *testing.T) { func TestMetricsMonitor_Concurrent(t *testing.T) { t.Run("concurrent addMetrics is safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 1000) + mm := newMetricsMonitor(testLogger, 1000, 0) var wg sync.WaitGroup numGoroutines := 10 @@ -464,7 +464,7 @@ func TestMetricsMonitor_Concurrent(t *testing.T) { }) t.Run("concurrent reads and writes are safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 100) + mm := newMetricsMonitor(testLogger, 100, 0) done := make(chan bool) @@ -502,7 +502,7 @@ func TestMetricsMonitor_Concurrent(t *testing.T) { func TestMetricsMonitor_ParseMetrics(t *testing.T) { t.Run("prefers timings over usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) // Timings should take precedence over usage responseBody := `{ @@ -542,7 +542,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { }) t.Run("handles missing cache_n in timings", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{ "timings": { @@ -577,7 +577,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { func TestMetricsMonitor_StreamingResponse(t *testing.T) { t.Run("finds metrics in last valid SSE data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) // Metrics should be found in the last data line before [DONE] responseBody := `data: {"choices":[{"text":"First"}]} @@ -611,7 +611,7 @@ data: [DONE] }) t.Run("handles streaming with no valid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `data: not json @@ -641,7 +641,7 @@ data: [DONE] }) t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `` @@ -669,7 +669,7 @@ data: [DONE] // Benchmark tests func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { - mm := newMetricsMonitor(testLogger, 1000) + mm := newMetricsMonitor(testLogger, 1000, 0) metric := TokenMetrics{ Model: "test-model", @@ -690,7 +690,7 @@ func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { // Test performance with a smaller buffer where wrapping occurs more frequently - mm := newMetricsMonitor(testLogger, 100) + mm := newMetricsMonitor(testLogger, 100, 0) metric := TokenMetrics{ Model: "test-model", @@ -711,7 +711,7 @@ func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { t.Run("gzip encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` @@ -745,7 +745,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("deflate encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 200, "completion_tokens": 75}}` @@ -779,7 +779,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("invalid gzip data records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) // Invalid compressed data invalidData := []byte("this is not gzip data") @@ -807,7 +807,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("unknown encoding treated as uncompressed", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10) + mm := newMetricsMonitor(testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 300, "completion_tokens": 100}}` @@ -832,3 +832,228 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { assert.Equal(t, 100, metrics[0].OutputTokens) }) } + +func TestReqRespCapture_Size(t *testing.T) { + t.Run("calculates size correctly", func(t *testing.T) { + capture := ReqRespCapture{ + ID: 1, + ReqPath: "/v1/chat/completions", // 20 bytes + ReqHeaders: map[string]string{ + "Content-Type": "application/json", // 12 + 16 = 28 + }, + ReqBody: []byte("request body"), // 12 bytes + RespHeaders: map[string]string{ + "X-Test": "value", // 6 + 5 = 11 + }, + RespBody: []byte("response body"), // 13 bytes + } + + // Expected: 20 + 12 + 13 + 28 + 11 = 84 + assert.Equal(t, 84, capture.Size()) + }) + + t.Run("handles empty capture", func(t *testing.T) { + capture := ReqRespCapture{} + assert.Equal(t, 0, capture.Size()) + }) +} + +func TestMetricsMonitor_AddCapture(t *testing.T) { + t.Run("does nothing when captures disabled", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + capture := ReqRespCapture{ + ID: 0, + ReqBody: []byte("test"), + } + mm.addCapture(capture) + + // Should not store capture + assert.Nil(t, mm.getCaptureByID(0)) + }) + + t.Run("adds capture when enabled", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + + capture := ReqRespCapture{ + ID: 0, + ReqBody: []byte("test request"), + RespBody: []byte("test response"), + } + mm.addCapture(capture) + + retrieved := mm.getCaptureByID(0) + assert.NotNil(t, retrieved) + assert.Equal(t, 0, retrieved.ID) + assert.Equal(t, []byte("test request"), retrieved.ReqBody) + assert.Equal(t, []byte("test response"), retrieved.RespBody) + }) + + t.Run("evicts oldest when exceeding max size", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + mm.maxCaptureSize = 100 // Set small limit for test + + // Add captures that will exceed the limit + capture1 := ReqRespCapture{ID: 0, ReqBody: make([]byte, 40)} + capture2 := ReqRespCapture{ID: 1, ReqBody: make([]byte, 40)} + capture3 := ReqRespCapture{ID: 2, ReqBody: make([]byte, 40)} + + mm.addCapture(capture1) + mm.addCapture(capture2) + // Adding capture3 should evict capture1 + mm.addCapture(capture3) + + assert.Nil(t, mm.getCaptureByID(0), "capture 0 should be evicted") + assert.NotNil(t, mm.getCaptureByID(1), "capture 1 should exist") + assert.NotNil(t, mm.getCaptureByID(2), "capture 2 should exist") + }) + + t.Run("skips capture larger than max size", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + mm.maxCaptureSize = 100 + + // Add a capture larger than max + largeCapture := ReqRespCapture{ID: 0, ReqBody: make([]byte, 200)} + mm.addCapture(largeCapture) + + assert.Nil(t, mm.getCaptureByID(0), "oversized capture should not be stored") + }) +} + +func TestMetricsMonitor_GetCaptureByID(t *testing.T) { + t.Run("returns nil for non-existent ID", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + + assert.Nil(t, mm.getCaptureByID(999)) + }) + + t.Run("returns capture by ID", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + + capture := ReqRespCapture{ + ID: 42, + ReqBody: []byte("test"), + } + mm.addCapture(capture) + + retrieved := mm.getCaptureByID(42) + assert.NotNil(t, retrieved) + assert.Equal(t, 42, retrieved.ID) + }) +} + +func TestRedactHeaders(t *testing.T) { + t.Run("redacts sensitive headers", func(t *testing.T) { + headers := map[string]string{ + "Authorization": "Bearer secret-token", + "Proxy-Authorization": "Basic creds", + "Cookie": "session=abc123", + "Set-Cookie": "session=xyz789", + "X-Api-Key": "sk-12345", + "Content-Type": "application/json", + "X-Custom": "safe-value", + } + + redactHeaders(headers) + + assert.Equal(t, "[REDACTED]", headers["Authorization"]) + assert.Equal(t, "[REDACTED]", headers["Proxy-Authorization"]) + assert.Equal(t, "[REDACTED]", headers["Cookie"]) + assert.Equal(t, "[REDACTED]", headers["Set-Cookie"]) + assert.Equal(t, "[REDACTED]", headers["X-Api-Key"]) + assert.Equal(t, "application/json", headers["Content-Type"]) + assert.Equal(t, "safe-value", headers["X-Custom"]) + }) + + t.Run("handles mixed case header names", func(t *testing.T) { + headers := map[string]string{ + "authorization": "Bearer token", + "COOKIE": "session=abc", + "x-api-key": "key123", + } + + redactHeaders(headers) + + assert.Equal(t, "[REDACTED]", headers["authorization"]) + assert.Equal(t, "[REDACTED]", headers["COOKIE"]) + assert.Equal(t, "[REDACTED]", headers["x-api-key"]) + }) + + t.Run("handles empty headers", func(t *testing.T) { + headers := map[string]string{} + redactHeaders(headers) + assert.Empty(t, headers) + }) +} + +func TestMetricsMonitor_WrapHandler_Capture(t *testing.T) { + t.Run("captures request and response when enabled", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 5) + + requestBody := `{"model": "test", "prompt": "hello"}` + responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Custom", "header-value") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/test", bytes.NewBufferString(requestBody)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer secret") + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + // Check metric was recorded + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + metricID := metrics[0].ID + + // Check capture was stored with same ID + capture := mm.getCaptureByID(metricID) + assert.NotNil(t, capture) + assert.Equal(t, metricID, capture.ID) + assert.Equal(t, []byte(requestBody), capture.ReqBody) + assert.Equal(t, []byte(responseBody), capture.RespBody) + assert.Equal(t, "/test", capture.ReqPath) + assert.Equal(t, "application/json", capture.ReqHeaders["Content-Type"]) + assert.Equal(t, "[REDACTED]", capture.ReqHeaders["Authorization"]) + assert.Equal(t, "application/json", capture.RespHeaders["Content-Type"]) + assert.Equal(t, "header-value", capture.RespHeaders["X-Custom"]) + }) + + t.Run("does not capture when disabled", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + requestBody := `{"model": "test"}` + responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/test", bytes.NewBufferString(requestBody)) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + // Metrics should still be recorded + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + + // But no capture + capture := mm.getCaptureByID(metrics[0].ID) + assert.Nil(t, capture) + }) +} diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index e0201424..20b14602 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -151,7 +151,7 @@ func New(proxyConfig config.Config) *ProxyManager { muxLogger: muxLogger, upstreamLogger: upstreamLogger, - metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics), + metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics, proxyConfig.CaptureBuffer), processGroups: make(map[string]*ProcessGroup), diff --git a/proxy/proxymanager_api.go b/proxy/proxymanager_api.go index fe4326d0..4354ea4a 100644 --- a/proxy/proxymanager_api.go +++ b/proxy/proxymanager_api.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "sort" + "strconv" "strings" "github.com/gin-gonic/gin" @@ -31,6 +32,7 @@ func addApiHandlers(pm *ProxyManager) { apiGroup.GET("/events", pm.apiSendEvents) apiGroup.GET("/metrics", pm.apiGetMetrics) apiGroup.GET("/version", pm.apiGetVersion) + apiGroup.GET("/captures/:id", pm.apiGetCapture) } } @@ -250,3 +252,20 @@ func (pm *ProxyManager) apiGetVersion(c *gin.Context) { "build_date": pm.buildDate, }) } + +func (pm *ProxyManager) apiGetCapture(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.Atoi(idStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid capture ID"}) + return + } + + capture := pm.metricsMonitor.getCaptureByID(id) + if capture == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "capture not found"}) + return + } + + c.JSON(http.StatusOK, capture) +} diff --git a/ui-svelte/package-lock.json b/ui-svelte/package-lock.json index 8c86b603..1958d64d 100644 --- a/ui-svelte/package-lock.json +++ b/ui-svelte/package-lock.json @@ -925,6 +925,7 @@ "integrity": "sha512-Y1Cs7hhTc+a5E9Va/xwKlAJoariQyHY+5zBgCZg4PFWNYQ1nMN9sjK1zhw1gK69DuqVP++sht/1GZg1aRwmAXQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@sveltejs/vite-plugin-svelte-inspector": "^4.0.1", "debug": "^4.4.1", @@ -1307,6 +1308,7 @@ "integrity": "sha512-t7frlewr6+cbx+9Ohpl0NOTKXZNV9xHRmNOvql47BFJKcEG1CxtxlPEEe+gR9uhVWM4DwhnvTF110mIL4yP9RA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -1439,6 +1441,7 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3449,6 +3452,7 @@ "integrity": "sha512-e5lPJi/aui4TO1LpAXIRLySmwXSE8k3b9zoGfd42p67wzxog4WHjiZF3M2uheQih4DGyc25QEV4yRBbpueNiUA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -3561,6 +3565,7 @@ "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.48.5.tgz", "integrity": "sha512-NB3o70OxfmnE5UPyLr8uH3IV02Q43qJVAuWigYmsSOYsS0s/rHxP0TF81blG0onF/xkhNvZw4G8NfzIX+By5ZQ==", "license": "MIT", + "peer": true, "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", @@ -3716,6 +3721,7 @@ "integrity": "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3894,6 +3900,7 @@ "integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", diff --git a/ui-svelte/src/components/CaptureDialog.svelte b/ui-svelte/src/components/CaptureDialog.svelte new file mode 100644 index 00000000..088bbd8e --- /dev/null +++ b/ui-svelte/src/components/CaptureDialog.svelte @@ -0,0 +1,452 @@ + + + + {#if capture} +
+
+

Capture #{capture.id + 1}{#if capture.req_path} {capture.req_path}{/if}

+ +
+ +
+ +
+ + Request Headers + +
+ + + {#each Object.entries(capture.req_headers || {}) as [key, value]} + + + + + {/each} + +
{key}{value}
+
+
+ + +
+ + Request Body + + {#if requestBodyRaw} +
+
+ {#if isRequestJson} + + + {/if} +
+ +
+
+
{displayedRequestBody}
+
+ {:else} +
+
(empty)
+
+ {/if} +
+ + +
+ + Response Headers + +
+ + + {#each Object.entries(capture.resp_headers || {}) as [key, value]} + + + + + {/each} + +
{key}{value}
+
+
+ + +
+ + Response Body + + {#if isResponseImage && capture.resp_body} +
+
+ Response +
+
+ {:else if isSSE || isResponseText} +
+
+ {#if isSSE} + + {/if} + {#if isResponseJson} + + {/if} + {#if isSSE || isResponseJson} + + {/if} +
+ +
+
+ {#if respBodyTab === "chat"} +
+ {#if sseChat.reasoning} +
+
+ Reasoning +
+
{sseChat.reasoning}
+
+ {/if} + {#if sseChat.content} +
+ {#if sseChat.reasoning} +
+ Response +
+ {/if} +
{sseChat.content}
+
+ {/if} + {#if !sseChat.reasoning && !sseChat.content} +
(empty)
+ {/if} +
+ {:else} +
{displayedResponseBody || "(empty)"}
+ {/if} +
+ {:else if responseBodyRaw} +
+
+ (binary data - {responseContentType || "unknown content type"}) +
+
+ {:else} +
+
(empty)
+
+ {/if} +
+
+ +
+ +
+
+ {/if} +
+ + diff --git a/ui-svelte/src/lib/types.ts b/ui-svelte/src/lib/types.ts index cc81e986..e7d1a658 100644 --- a/ui-svelte/src/lib/types.ts +++ b/ui-svelte/src/lib/types.ts @@ -21,6 +21,16 @@ export interface Metrics { prompt_per_second: number; tokens_per_second: number; duration_ms: number; + has_capture: boolean; +} + +export interface ReqRespCapture { + id: number; + req_path: string; + req_headers: Record; + req_body: string; // base64 encoded bytes + resp_headers: Record; + resp_body: string; // base64 encoded bytes } export interface LogData { diff --git a/ui-svelte/src/routes/Activity.svelte b/ui-svelte/src/routes/Activity.svelte index 338e85cf..05363093 100644 --- a/ui-svelte/src/routes/Activity.svelte +++ b/ui-svelte/src/routes/Activity.svelte @@ -1,6 +1,8 @@
@@ -65,6 +86,7 @@ Prompt Processing Generation Speed Duration + Capture @@ -79,6 +101,19 @@ {formatSpeed(metric.prompt_per_second)} {formatSpeed(metric.tokens_per_second)} {formatDuration(metric.duration_ms)} + + {#if metric.has_capture} + + {:else} + - + {/if} + {/each} @@ -86,3 +121,5 @@
{/if} + + diff --git a/ui-svelte/src/stores/api.ts b/ui-svelte/src/stores/api.ts index cbeef16f..2a9babe0 100644 --- a/ui-svelte/src/stores/api.ts +++ b/ui-svelte/src/stores/api.ts @@ -1,5 +1,5 @@ import { writable } from "svelte/store"; -import type { Model, Metrics, VersionInfo, LogData, APIEventEnvelope } from "../lib/types"; +import type { Model, Metrics, VersionInfo, LogData, APIEventEnvelope, ReqRespCapture } from "../lib/types"; import { connectionState } from "./theme"; const LOG_LENGTH_LIMIT = 1024 * 100; /* 100KB of log data */ @@ -172,3 +172,19 @@ export async function loadModel(model: string): Promise { throw error; } } + +export async function getCapture(id: number): Promise { + try { + const response = await fetch(`/api/captures/${id}`); + if (response.status === 404) { + return null; + } + if (!response.ok) { + throw new Error(`Failed to fetch capture: ${response.status}`); + } + return await response.json(); + } catch (error) { + console.error("Failed to fetch capture:", error); + return null; + } +}