proxy: fix metrics capture for v1/responses (#586)
properly parse anthropic compatible usage data from streaming responses. closes: #577
This commit is contained in:
@@ -350,6 +350,11 @@ func processStreamingResponse(modelID string, start time.Time, body []byte) (Tok
|
|||||||
usage := parsed.Get("usage")
|
usage := parsed.Get("usage")
|
||||||
timings := parsed.Get("timings")
|
timings := parsed.Get("timings")
|
||||||
|
|
||||||
|
// v1/responses format nests usage under response.usage
|
||||||
|
if !usage.Exists() {
|
||||||
|
usage = parsed.Get("response.usage")
|
||||||
|
}
|
||||||
|
|
||||||
if usage.Exists() || timings.Exists() {
|
if usage.Exists() || timings.Exists() {
|
||||||
return parseMetrics(modelID, start, usage, timings)
|
return parseMetrics(modelID, start, usage, timings)
|
||||||
}
|
}
|
||||||
@@ -503,9 +508,9 @@ func filterAcceptEncoding(acceptEncoding string) string {
|
|||||||
supported := map[string]bool{"gzip": true, "deflate": true}
|
supported := map[string]bool{"gzip": true, "deflate": true}
|
||||||
var filtered []string
|
var filtered []string
|
||||||
|
|
||||||
for _, part := range strings.Split(acceptEncoding, ",") {
|
for part := range strings.SplitSeq(acceptEncoding, ",") {
|
||||||
// Parse encoding and optional quality value (e.g., "gzip;q=1.0")
|
// Parse encoding and optional quality value (e.g., "gzip;q=1.0")
|
||||||
encoding := strings.TrimSpace(strings.Split(part, ";")[0])
|
encoding, _, _ := strings.Cut(strings.TrimSpace(part), ";")
|
||||||
if supported[strings.ToLower(encoding)] {
|
if supported[strings.ToLower(encoding)] {
|
||||||
filtered = append(filtered, strings.TrimSpace(part))
|
filtered = append(filtered, strings.TrimSpace(part))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -709,6 +709,35 @@ data: [DONE]
|
|||||||
assert.Equal(t, 0, metrics[0].OutputTokens)
|
assert.Equal(t, 0, metrics[0].OutputTokens)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("v1/responses format with nested response.usage", func(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(testLogger, 10, 0)
|
||||||
|
|
||||||
|
// v1/responses SSE format: usage is nested under response.usage
|
||||||
|
responseBody := "event: response.completed\n" +
|
||||||
|
`data: {"type":"response.completed","response":{"id":"resp_abc","object":"response","created_at":1773416985,"status":"completed","model":"test-model","output":[],"usage":{"input_tokens":17,"output_tokens":23,"total_tokens":40}}}` +
|
||||||
|
"\n\n"
|
||||||
|
|
||||||
|
nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(responseBody))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest("POST", "/v1/responses", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
ginCtx, _ := gin.CreateTestContext(rec)
|
||||||
|
|
||||||
|
err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := mm.getMetrics()
|
||||||
|
assert.Equal(t, 1, len(metrics))
|
||||||
|
assert.Equal(t, "test-model", metrics[0].Model)
|
||||||
|
assert.Equal(t, 17, metrics[0].InputTokens)
|
||||||
|
assert.Equal(t, 23, metrics[0].OutputTokens)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) {
|
t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) {
|
||||||
mm := newMetricsMonitor(testLogger, 10, 0)
|
mm := newMetricsMonitor(testLogger, 10, 0)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user