From fe71e8a6ea4cbf8618777d07a6e4fd423799d8c1 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Thu, 14 May 2026 21:53:57 -0700 Subject: [PATCH] proxy,ui-svelte: improve support for v1/messages and v1/responses (#758) This improves the support for activity logging from the v1/responses and v1/messages endpoints. - add chat endpoint selection to Playground > Chat > Settings - improve metrics extraction for streaming v1/messages and v1/responses endpoints (tested with llama-server) Fixes #742 --- proxy/metrics_monitor.go | 199 +++++++----- proxy/metrics_monitor_test.go | 118 +++++++ .../playground/ChatInterface.svelte | 33 +- ui-svelte/src/lib/chatApi.ts | 306 +++++++++++++++--- 4 files changed, 537 insertions(+), 119 deletions(-) diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go index 62b9bbf2..1a99ef21 100644 --- a/proxy/metrics_monitor.go +++ b/proxy/metrics_monitor.go @@ -424,110 +424,159 @@ func (mp *metricsMonitor) wrapHandler( return nil } +// usagePaths lists the JSON paths where a per-event usage object can live. +// v1/chat/completions puts it at top-level "usage"; v1/responses nests under +// "response.usage"; v1/messages emits it at "message.usage" on message_start +// and at "usage" on message_delta. +var usagePaths = []string{"usage", "response.usage", "message.usage"} + +// extractUsageTokens reads input/output/cached token counts from a usage +// gjson.Result, handling the field-name differences across endpoints. +// cached returns -1 when the field is absent. ok is true when at least one +// field was present. +func extractUsageTokens(usage gjson.Result) (input, output, cached int64, ok bool) { + cached = -1 + if !usage.Exists() { + return + } + + if v := usage.Get("prompt_tokens"); v.Exists() { + // v1/chat/completions + input = v.Int() + ok = true + } else if v := usage.Get("input_tokens"); v.Exists() { + // v1/messages, v1/responses + input = v.Int() + ok = true + } + + if v := usage.Get("completion_tokens"); v.Exists() { + // v1/chat/completions + output = v.Int() + ok = true + } else if v := usage.Get("output_tokens"); v.Exists() { + // v1/messages, v1/responses + output = v.Int() + ok = true + } + + if v := usage.Get("cache_read_input_tokens"); v.Exists() { + // v1/messages (Anthropic) + cached = v.Int() + ok = true + } else if v := usage.Get("input_tokens_details.cached_tokens"); v.Exists() { + // v1/responses (OpenAI Responses API) + cached = v.Int() + ok = true + } else if v := usage.Get("prompt_tokens_details.cached_tokens"); v.Exists() { + // v1/chat/completions (OpenAI cache hits) + cached = v.Int() + ok = true + } + return +} + func processStreamingResponse(modelID string, start time.Time, body []byte) (ActivityLogEntry, error) { - // Iterate **backwards** through the body looking for the data payload with - // usage data. This avoids allocating a slice of all lines via bytes.Split. + // Walk SSE "data:" lines forward, merging usage info from every event. + // Different endpoints split usage across events: + // - v1/chat/completions: usage on the final chunk before [DONE] + // - v1/responses: usage on response.completed (response.usage) + // - v1/messages: input + cache on message_start (message.usage), + // output_tokens on message_delta (usage) + // We take the latest informative value per field so all three are covered. - // Start from the end of the body and scan backwards for newlines - pos := len(body) - for pos > 0 { - // Find the previous newline (or start of body) - lineStart := bytes.LastIndexByte(body[:pos], '\n') - if lineStart == -1 { - lineStart = 0 + var ( + inputTokens, outputTokens int64 + cachedTokens int64 = -1 + hasAny bool + timings gjson.Result + ) + + prefix := []byte("data:") + for offset := 0; offset < len(body); { + nl := bytes.IndexByte(body[offset:], '\n') + var line []byte + if nl == -1 { + line = body[offset:] + offset = len(body) } else { - lineStart++ // Move past the newline + line = body[offset : offset+nl] + offset += nl + 1 } - line := bytes.TrimSpace(body[lineStart:pos]) - pos = lineStart - 1 // Move position before the newline for next iteration - - if len(line) == 0 { - continue - } - - // SSE payload always follows "data:" - prefix := []byte("data:") - if !bytes.HasPrefix(line, prefix) { + line = bytes.TrimSpace(line) + if len(line) == 0 || !bytes.HasPrefix(line, prefix) { continue } data := bytes.TrimSpace(line[len(prefix):]) - - if len(data) == 0 { + if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { continue } - - if bytes.Equal(data, []byte("[DONE]")) { - // [DONE] line itself contains nothing of interest. + if !gjson.ValidBytes(data) { continue } + parsed := gjson.ParseBytes(data) - if gjson.ValidBytes(data) { - parsed := gjson.ParseBytes(data) - usage := parsed.Get("usage") - timings := parsed.Get("timings") - - // v1/responses format nests usage under response.usage - if !usage.Exists() { - usage = parsed.Get("response.usage") + for _, path := range usagePaths { + u := parsed.Get(path) + if !u.Exists() { + continue } - - if usage.Exists() || timings.Exists() { - return parseMetrics(modelID, start, usage, timings) + i, o, c, ok := extractUsageTokens(u) + if !ok { + continue } + hasAny = true + // Take the latest non-zero value so message_start's input_tokens + // is preserved when message_delta's usage omits it, and vice versa + // for output_tokens. + if i > 0 { + inputTokens = i + } + if o > 0 { + outputTokens = o + } + if c >= 0 { + cachedTokens = c + } + } + if t := parsed.Get("timings"); t.Exists() { + timings = t + hasAny = true } } - return ActivityLogEntry{}, fmt.Errorf("no valid JSON data found in stream") + if !hasAny { + return ActivityLogEntry{}, fmt.Errorf("no valid JSON data found in stream") + } + + return buildMetrics(modelID, start, inputTokens, outputTokens, cachedTokens, timings), nil } func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) (ActivityLogEntry, error) { + input, output, cached, _ := extractUsageTokens(usage) + return buildMetrics(modelID, start, input, output, cached, timings), nil +} + +// buildMetrics composes an ActivityLogEntry from accumulated token counts and +// optional llama-server timings (which override input/output and provide rates). +func buildMetrics(modelID string, start time.Time, inputTokens, outputTokens, cachedTokens int64, timings gjson.Result) ActivityLogEntry { wallDurationMs := int(time.Since(start).Milliseconds()) - - // default values - cachedTokens := -1 // unknown or missing data - outputTokens := 0 - inputTokens := 0 - - // timings data + durationMs := wallDurationMs tokensPerSecond := -1.0 promptPerSecond := -1.0 - durationMs := wallDurationMs - if usage.Exists() { - if pt := usage.Get("prompt_tokens"); pt.Exists() { - // v1/chat/completions - inputTokens = int(pt.Int()) - } else if it := usage.Get("input_tokens"); it.Exists() { - // v1/messages - inputTokens = int(it.Int()) - } - - if ct := usage.Get("completion_tokens"); ct.Exists() { - // v1/chat/completions - outputTokens = int(ct.Int()) - } else if ot := usage.Get("output_tokens"); ot.Exists() { - outputTokens = int(ot.Int()) - } - - if ct := usage.Get("cache_read_input_tokens"); ct.Exists() { - cachedTokens = int(ct.Int()) - } - } - - // use llama-server's timing data for tok/sec and duration as it is more accurate if timings.Exists() { - inputTokens = int(timings.Get("prompt_n").Int()) - outputTokens = int(timings.Get("predicted_n").Int()) + inputTokens = timings.Get("prompt_n").Int() + outputTokens = timings.Get("predicted_n").Int() promptPerSecond = timings.Get("prompt_per_second").Float() tokensPerSecond = timings.Get("predicted_per_second").Float() timingsDurationMs := int(timings.Get("prompt_ms").Float() + timings.Get("predicted_ms").Float()) if timingsDurationMs > durationMs { durationMs = timingsDurationMs } - if cachedValue := timings.Get("cache_n"); cachedValue.Exists() { - cachedTokens = int(cachedValue.Int()) + cachedTokens = cachedValue.Int() } } @@ -535,14 +584,14 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) Timestamp: time.Now(), Model: modelID, Tokens: TokenMetrics{ - CachedTokens: cachedTokens, - InputTokens: inputTokens, - OutputTokens: outputTokens, + CachedTokens: int(cachedTokens), + InputTokens: int(inputTokens), + OutputTokens: int(outputTokens), PromptPerSecond: promptPerSecond, TokensPerSecond: tokensPerSecond, }, DurationMs: durationMs, - }, nil + } } // decompressBody decompresses the body based on Content-Encoding header diff --git a/proxy/metrics_monitor_test.go b/proxy/metrics_monitor_test.go index 92a59d7e..a5699225 100644 --- a/proxy/metrics_monitor_test.go +++ b/proxy/metrics_monitor_test.go @@ -777,6 +777,124 @@ data: [DONE] assert.Equal(t, 23, metrics[0].Tokens.OutputTokens) }) + t.Run("v1/responses full stream with deltas, output, and cached tokens", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + // Realistic v1/responses stream: multiple delta events followed by + // done/completed events. Usage lives on response.completed and includes + // the OpenAI Responses cached-token shape (input_tokens_details.cached_tokens). + responseBody := "event: response.created\n" + + `data: {"type":"response.created","response":{"id":"resp_1","status":"in_progress"}}` + "\n\n" + + "event: response.output_item.added\n" + + `data: {"type":"response.output_item.added","item":{"id":"msg_1","role":"assistant","status":"in_progress","type":"message"}}` + "\n\n" + + "event: response.content_part.added\n" + + `data: {"type":"response.content_part.added","item_id":"msg_1","part":{"type":"output_text","text":""}}` + "\n\n" + + "event: response.output_text.delta\n" + + `data: {"type":"response.output_text.delta","item_id":"msg_1","delta":"Hello"}` + "\n\n" + + "event: response.output_text.delta\n" + + `data: {"type":"response.output_text.delta","item_id":"msg_1","delta":" world"}` + "\n\n" + + "event: response.output_text.done\n" + + `data: {"type":"response.output_text.done","item_id":"msg_1","text":"Hello world"}` + "\n\n" + + "event: response.content_part.done\n" + + `data: {"type":"response.content_part.done","item_id":"msg_1","part":{"type":"output_text","text":"Hello world"}}` + "\n\n" + + "event: response.output_item.done\n" + + `data: {"type":"response.output_item.done","item":{"type":"message","status":"completed","id":"msg_1","content":[{"type":"output_text","text":"Hello world"}],"role":"assistant"}}` + "\n\n" + + "event: response.completed\n" + + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","status":"completed","model":"test-model","output":[{"type":"message","status":"completed","id":"msg_1","content":[{"type":"output_text","text":"Hello world"}],"role":"assistant"}],"usage":{"input_tokens":14,"output_tokens":24,"total_tokens":38,"input_tokens_details":{"cached_tokens":13}}}}` + "\n\n" + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, "test-model", metrics[0].Model) + assert.Equal(t, 14, metrics[0].Tokens.InputTokens) + assert.Equal(t, 24, metrics[0].Tokens.OutputTokens) + assert.Equal(t, 13, metrics[0].Tokens.CachedTokens) + }) + + t.Run("v1/messages merges usage from message_start and message_delta", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + // v1/messages splits usage across two events: + // message_start.message.usage has input_tokens + cache_read_input_tokens + // message_delta.usage has the final output_tokens + // Without merging, output_tokens (last seen) would clobber the input fields. + responseBody := "event: message_start\n" + + `data: {"type":"message_start","message":{"id":"m1","type":"message","role":"assistant","content":[],"model":"test-model","usage":{"cache_read_input_tokens":5,"input_tokens":9,"output_tokens":0}}}` + "\n\n" + + "event: content_block_start\n" + + `data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}` + "\n\n" + + "event: content_block_delta\n" + + `data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hi"}}` + "\n\n" + + "event: content_block_delta\n" + + `data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there"}}` + "\n\n" + + "event: content_block_stop\n" + + `data: {"type":"content_block_stop","index":0}` + "\n\n" + + "event: message_delta\n" + + `data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":24}}` + "\n\n" + + "event: message_stop\n" + + `data: {"type":"message_stop"}` + "\n\n" + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/v1/messages", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 9, metrics[0].Tokens.InputTokens) + assert.Equal(t, 24, metrics[0].Tokens.OutputTokens) + assert.Equal(t, 5, metrics[0].Tokens.CachedTokens) + }) + + t.Run("v1/chat/completions OpenAI prompt_tokens_details.cached_tokens", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + responseBody := `data: {"choices":[{"delta":{"content":"hi"}}]}` + "\n\n" + + `data: {"choices":[{"delta":{}}],"usage":{"prompt_tokens":50,"completion_tokens":12,"prompt_tokens_details":{"cached_tokens":42}}}` + "\n\n" + + "data: [DONE]\n\n" + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/v1/chat/completions", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 50, metrics[0].Tokens.InputTokens) + assert.Equal(t, 12, metrics[0].Tokens.OutputTokens) + assert.Equal(t, 42, metrics[0].Tokens.CachedTokens) + }) + t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) { mm := newMetricsMonitor(testLogger, 10, 0) diff --git a/ui-svelte/src/components/playground/ChatInterface.svelte b/ui-svelte/src/components/playground/ChatInterface.svelte index 270e31a7..2e50358f 100644 --- a/ui-svelte/src/components/playground/ChatInterface.svelte +++ b/ui-svelte/src/components/playground/ChatInterface.svelte @@ -1,7 +1,7 @@