server: capture failed (non-200) LLM requests (#862)
Store a request/response capture for non-200 responses so failed requests can be inspected in the activity log's Capture dialog, matching the existing behavior for successful requests. - extract storeCapture/decodeResponseBody helpers to share capture logic between the success and non-200 paths - record non-200 bodies (decompressed) so error details are viewable - the activity UI already gates the View button on has_capture, so it now appears for failed requests with no UI changes - add tests for capturing failed requests and the disabled-captures case closes #766
This commit is contained in:
@@ -44,6 +44,7 @@ type ActivityLogEntry struct {
|
|||||||
Tokens TokenMetrics `json:"tokens"`
|
Tokens TokenMetrics `json:"tokens"`
|
||||||
DurationMs int `json:"duration_ms"`
|
DurationMs int `json:"duration_ms"`
|
||||||
HasCapture bool `json:"has_capture"`
|
HasCapture bool `json:"has_capture"`
|
||||||
|
ErrorMsg string `json:"error_msg,omitempty"`
|
||||||
Metadata map[string]string `json:"metadata,omitempty"`
|
Metadata map[string]string `json:"metadata,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,9 +126,11 @@ func (mp *metricsMonitor) getMetricsJSON() ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// record parses a completed response body and stores/emits an activity entry.
|
// record parses a completed response body and stores/emits an activity entry.
|
||||||
// When captures are enabled, a zstd+CBOR capture is stored for successful
|
// Successful requests store a zstd+CBOR capture (when enabled) with cf
|
||||||
// requests, with cf controlling which request/response parts are retained.
|
// controlling which parts are retained. Failed (non-200) requests capture the
|
||||||
// reqBody and reqHeaders are the request data buffered before dispatch.
|
// request only and set ErrorMsg to a description of the failure, so the error
|
||||||
|
// can be inspected without storing unreadable raw response bytes. reqBody and
|
||||||
|
// reqHeaders are the request data buffered before dispatch.
|
||||||
func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *responseBodyCopier, cf captureFields, reqBody []byte, reqHeaders map[string]string) {
|
func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *responseBodyCopier, cf captureFields, reqBody []byte, reqHeaders map[string]string) {
|
||||||
tm := ActivityLogEntry{
|
tm := ActivityLogEntry{
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
@@ -152,7 +155,13 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp
|
|||||||
|
|
||||||
if recorder.Status() != http.StatusOK {
|
if recorder.Status() != http.StatusOK {
|
||||||
mp.logger.Warnf("non-200 response, recording partial metrics: status=%d, path=%s", recorder.Status(), r.URL.Path)
|
mp.logger.Warnf("non-200 response, recording partial metrics: status=%d, path=%s", recorder.Status(), r.URL.Path)
|
||||||
queueAndEmit()
|
decoded, decErr := mp.decodeResponseBody(recorder, r.URL.Path)
|
||||||
|
tm.ErrorMsg = failedErrorMessage(recorder.Status(), decoded, decErr)
|
||||||
|
tm.ID = mp.queueMetrics(tm)
|
||||||
|
// Capture the request only; the failure is surfaced via ErrorMsg
|
||||||
|
// rather than storing the (possibly undisplayable) response body.
|
||||||
|
tm.HasCapture = mp.storeCapture(tm.ID, r, recorder, cf&^captureRespBody, reqBody, reqHeaders, nil)
|
||||||
|
mp.emitMetric(tm)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,6 +176,7 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp
|
|||||||
decoded, err := decompressBody(body, encoding)
|
decoded, err := decompressBody(body, encoding)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.logger.Warnf("metrics: decompression failed: %v, path=%s, recording minimal metrics", err, r.URL.Path)
|
mp.logger.Warnf("metrics: decompression failed: %v, path=%s, recording minimal metrics", err, r.URL.Path)
|
||||||
|
tm.ErrorMsg = fmt.Sprintf("response decompression failed: %v", err)
|
||||||
queueAndEmit()
|
queueAndEmit()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -205,9 +215,20 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp
|
|||||||
}
|
}
|
||||||
|
|
||||||
tm.ID = mp.queueMetrics(tm)
|
tm.ID = mp.queueMetrics(tm)
|
||||||
if mp.enableCaptures {
|
tm.HasCapture = mp.storeCapture(tm.ID, r, recorder, cf, reqBody, reqHeaders, body)
|
||||||
|
mp.emitMetric(tm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeCapture assembles a ReqRespCapture for id, honoring the captureFields
|
||||||
|
// mask, and stores it when captures are enabled. body is the response body to
|
||||||
|
// capture (already decompressed by the caller); pass nil to omit it. Returns
|
||||||
|
// true if a capture was stored.
|
||||||
|
func (mp *metricsMonitor) storeCapture(id int, r *http.Request, recorder *responseBodyCopier, cf captureFields, reqBody []byte, reqHeaders map[string]string, body []byte) bool {
|
||||||
|
if !mp.enableCaptures {
|
||||||
|
return false
|
||||||
|
}
|
||||||
capture := ReqRespCapture{
|
capture := ReqRespCapture{
|
||||||
ID: tm.ID,
|
ID: id,
|
||||||
ReqPath: r.URL.Path,
|
ReqPath: r.URL.Path,
|
||||||
ReqHeaders: reqHeaders,
|
ReqHeaders: reqHeaders,
|
||||||
}
|
}
|
||||||
@@ -222,11 +243,71 @@ func (mp *metricsMonitor) record(modelID string, r *http.Request, recorder *resp
|
|||||||
if cf&captureRespBody != 0 {
|
if cf&captureRespBody != 0 {
|
||||||
capture.RespBody = body
|
capture.RespBody = body
|
||||||
}
|
}
|
||||||
if mp.addCapture(capture) {
|
return mp.addCapture(capture)
|
||||||
tm.HasCapture = true
|
}
|
||||||
|
|
||||||
|
// decodeResponseBody returns the buffered response body, decompressing it when
|
||||||
|
// the upstream set a Content-Encoding we recognize. On decompression failure it
|
||||||
|
// logs a warning and returns an error so the caller can record a description
|
||||||
|
// (via ErrorMsg) instead of storing unreadable raw bytes.
|
||||||
|
func (mp *metricsMonitor) decodeResponseBody(recorder *responseBodyCopier, path string) ([]byte, error) {
|
||||||
|
body := recorder.body.Bytes()
|
||||||
|
if len(body) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
encoding := recorder.Header().Get("Content-Encoding")
|
||||||
|
if encoding == "" {
|
||||||
|
return body, nil
|
||||||
|
}
|
||||||
|
decoded, err := decompressBody(body, encoding)
|
||||||
|
if err != nil {
|
||||||
|
mp.logger.Warnf("metrics: response decompression failed: %v, path=%s", err, path)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return decoded, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// errorMessagePaths lists JSON paths where a human-readable error message can
|
||||||
|
// live across OpenAI- and llama.cpp-style error responses.
|
||||||
|
var errorMessagePaths = []string{"error.message", "error", "message", "detail"}
|
||||||
|
|
||||||
|
// extractErrorMessage pulls a human-readable error string from a JSON error
|
||||||
|
// response. Returns "" if no message is found or the body is not valid JSON.
|
||||||
|
func extractErrorMessage(body []byte) string {
|
||||||
|
if !gjson.ValidBytes(body) {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
parsed := gjson.ParseBytes(body)
|
||||||
|
for _, path := range errorMessagePaths {
|
||||||
|
v := parsed.Get(path)
|
||||||
|
if v.Exists() && v.Type == gjson.String {
|
||||||
|
if s := strings.TrimSpace(v.String()); s != "" {
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mp.emitMetric(tm)
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// failedErrorMessage builds a human-readable description for a non-200 response.
|
||||||
|
// It prefers an error message parsed from the (decompressed) body and falls back
|
||||||
|
// to the HTTP status text. A non-nil decErr indicates the body could not be
|
||||||
|
// decoded, in which case the decode error is described instead.
|
||||||
|
func failedErrorMessage(status int, body []byte, decErr error) string {
|
||||||
|
const maxLen = 500
|
||||||
|
if decErr != nil {
|
||||||
|
return fmt.Sprintf("response decode failed: %v", decErr)
|
||||||
|
}
|
||||||
|
if msg := extractErrorMessage(body); msg != "" {
|
||||||
|
if len(msg) > maxLen {
|
||||||
|
msg = msg[:maxLen] + "..."
|
||||||
|
}
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
if text := http.StatusText(status); text != "" {
|
||||||
|
return fmt.Sprintf("%d %s", status, text)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("HTTP %d", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// usagePaths lists the JSON paths where a per-event usage object can live.
|
// usagePaths lists the JSON paths where a per-event usage object can live.
|
||||||
|
|||||||
@@ -90,6 +90,172 @@ func TestMetricsMonitor_RecordMetadata(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordFailedRequestCapture(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(logmon.NewWriter(io.Discard), 10, 5)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||||
|
reqHeaders := map[string]string{"content-type": "application/json"}
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.Header().Set("Content-Type", "application/json")
|
||||||
|
copier.WriteHeader(http.StatusBadGateway)
|
||||||
|
copier.Write([]byte(`{"error":{"message":"model unavailable"}}`))
|
||||||
|
|
||||||
|
reqBody := []byte(`{"model":"m","messages":[]}`)
|
||||||
|
mm.record("m", r, copier, captureAll, reqBody, reqHeaders)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
entry := entries[0]
|
||||||
|
if entry.RespStatusCode != http.StatusBadGateway {
|
||||||
|
t.Errorf("status = %d, want %d", entry.RespStatusCode, http.StatusBadGateway)
|
||||||
|
}
|
||||||
|
if entry.ErrorMsg != "model unavailable" {
|
||||||
|
t.Errorf("error_msg = %q, want extracted message", entry.ErrorMsg)
|
||||||
|
}
|
||||||
|
if !entry.HasCapture {
|
||||||
|
t.Fatal("failed request should capture the request so it can be inspected")
|
||||||
|
}
|
||||||
|
|
||||||
|
got := mm.getCaptureByID(entry.ID)
|
||||||
|
if got == nil {
|
||||||
|
t.Fatal("capture not found")
|
||||||
|
}
|
||||||
|
if string(got.ReqBody) != `{"model":"m","messages":[]}` {
|
||||||
|
t.Errorf("req body = %q", got.ReqBody)
|
||||||
|
}
|
||||||
|
if len(got.RespBody) != 0 {
|
||||||
|
t.Errorf("resp body stored for failed request (len=%d); want none", len(got.RespBody))
|
||||||
|
}
|
||||||
|
if got.RespHeaders["Content-Type"] != "application/json" {
|
||||||
|
t.Errorf("resp Content-Type = %q", got.RespHeaders["Content-Type"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordFailedRequestStatusFallback(t *testing.T) {
|
||||||
|
// Non-JSON error body: ErrorMsg falls back to the HTTP status text.
|
||||||
|
mm := newMetricsMonitor(logmon.NewWriter(io.Discard), 10, 5)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusBadGateway)
|
||||||
|
copier.Write([]byte("<html>upstream down</html>"))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, captureAll, nil, nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].ErrorMsg != "502 Bad Gateway" {
|
||||||
|
t.Errorf("error_msg = %q, want status text", entries[0].ErrorMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordFailedRequestCaptureDisabled(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(logmon.NewWriter(io.Discard), 10, 0) // captures disabled
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.WriteHeader(http.StatusInternalServerError)
|
||||||
|
copier.Write([]byte(`{"error":"boom"}`))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, captureAll, []byte("req"), nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].HasCapture {
|
||||||
|
t.Fatal("captures disabled, HasCapture should be false")
|
||||||
|
}
|
||||||
|
// ErrorMsg is independent of whether captures are enabled.
|
||||||
|
if entries[0].ErrorMsg != "boom" {
|
||||||
|
t.Errorf("error_msg = %q, want boom", entries[0].ErrorMsg)
|
||||||
|
}
|
||||||
|
if mm.getCaptureByID(entries[0].ID) != nil {
|
||||||
|
t.Fatal("no capture should be stored when disabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_RecordDecompressionFailureSetsErrorMsg(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(logmon.NewWriter(io.Discard), 10, 5)
|
||||||
|
r := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.Header().Set("Content-Encoding", "gzip")
|
||||||
|
copier.WriteHeader(http.StatusOK)
|
||||||
|
copier.Write([]byte("not-really-gzip"))
|
||||||
|
|
||||||
|
mm.record("m", r, copier, captureAll, []byte("req"), nil)
|
||||||
|
|
||||||
|
entries := mm.getMetrics()
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("want 1 entry, got %d", len(entries))
|
||||||
|
}
|
||||||
|
if entries[0].ErrorMsg == "" {
|
||||||
|
t.Fatal("expected ErrorMsg for decompression failure")
|
||||||
|
}
|
||||||
|
// Raw bytes must not be stored when the body could not be decoded.
|
||||||
|
if entries[0].HasCapture {
|
||||||
|
t.Fatal("decompression failure should not store a capture")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsMonitor_DecodeResponseBody(t *testing.T) {
|
||||||
|
mm := newMetricsMonitor(logmon.NewWriter(io.Discard), 10, 5)
|
||||||
|
|
||||||
|
// No Content-Encoding: body returned unchanged.
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
copier := newBodyCopier(w)
|
||||||
|
copier.Write([]byte("plain"))
|
||||||
|
got, err := mm.decodeResponseBody(copier, "/p")
|
||||||
|
if err != nil || string(got) != "plain" {
|
||||||
|
t.Fatalf("plain body = %q, err = %v", got, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bogus gzip payload: returns an error and no body (no raw bytes kept).
|
||||||
|
w2 := httptest.NewRecorder()
|
||||||
|
copier2 := newBodyCopier(w2)
|
||||||
|
copier2.Header().Set("Content-Encoding", "gzip")
|
||||||
|
copier2.Write([]byte("not-really-gzip"))
|
||||||
|
got, err = mm.decodeResponseBody(copier2, "/p")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected decompression error")
|
||||||
|
}
|
||||||
|
if got != nil {
|
||||||
|
t.Errorf("expected nil body on failure, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServer_ExtractErrorMessage(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
body string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{"openai object", `{"error":{"message":"rate limited"}}`, "rate limited"},
|
||||||
|
{"string error", `{"error":"bad request"}`, "bad request"},
|
||||||
|
{"message field", `{"message":"nope"}`, "nope"},
|
||||||
|
{"detail field", `{"detail":"oops"}`, "oops"},
|
||||||
|
{"object error ignored", `{"error":{"code":42}}`, ""},
|
||||||
|
{"no error", `{"usage":{}}`, ""},
|
||||||
|
{"invalid json", `not-json`, ""},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
if got := extractErrorMessage([]byte(tc.body)); got != tc.want {
|
||||||
|
t.Errorf("extractErrorMessage = %q, want %q", got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_ParseMetrics_Infill(t *testing.T) {
|
func TestServer_ParseMetrics_Infill(t *testing.T) {
|
||||||
// /infill responses are arrays; timings live in the last element.
|
// /infill responses are arrays; timings live in the last element.
|
||||||
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
|
body := `[{"content":"a"},{"content":"b","timings":{"prompt_n":5,"predicted_n":9,"prompt_ms":10,"predicted_ms":20}}]`
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ export interface ActivityLogEntry {
|
|||||||
tokens: TokenMetrics;
|
tokens: TokenMetrics;
|
||||||
duration_ms: number;
|
duration_ms: number;
|
||||||
has_capture: boolean;
|
has_capture: boolean;
|
||||||
|
error_msg?: string;
|
||||||
metadata?: Record<string, string>;
|
metadata?: Record<string, string>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@
|
|||||||
{ key: "time", label: "Time", defaultVisible: true },
|
{ key: "time", label: "Time", defaultVisible: true },
|
||||||
{ key: "model", label: "Model", defaultVisible: true },
|
{ key: "model", label: "Model", defaultVisible: true },
|
||||||
{ key: "req_path", label: "Path", defaultVisible: false },
|
{ key: "req_path", label: "Path", defaultVisible: false },
|
||||||
{ key: "resp_status_code", label: "Status", defaultVisible: false },
|
{ key: "resp_status_code", label: "Status", defaultVisible: true },
|
||||||
{ key: "resp_content_type", label: "Content-Type", defaultVisible: false },
|
{ key: "resp_content_type", label: "Content-Type", defaultVisible: false },
|
||||||
{ key: "cached", label: "Cached", defaultVisible: true },
|
{ key: "cached", label: "Cached", defaultVisible: true },
|
||||||
{ key: "prompt", label: "Prompt", defaultVisible: true },
|
{ key: "prompt", label: "Prompt", defaultVisible: true },
|
||||||
@@ -308,7 +308,13 @@
|
|||||||
{:else if key === "req_path"}
|
{:else if key === "req_path"}
|
||||||
{metric.req_path || "-"}
|
{metric.req_path || "-"}
|
||||||
{:else if key === "resp_status_code"}
|
{:else if key === "resp_status_code"}
|
||||||
|
{#if metric.error_msg}
|
||||||
|
<span class="text-red-500 dark:text-red-400 cursor-help" title={metric.error_msg}>
|
||||||
{metric.resp_status_code || "-"}
|
{metric.resp_status_code || "-"}
|
||||||
|
</span>
|
||||||
|
{:else}
|
||||||
|
{metric.resp_status_code || "-"}
|
||||||
|
{/if}
|
||||||
{:else if key === "resp_content_type"}
|
{:else if key === "resp_content_type"}
|
||||||
{metric.resp_content_type || "-"}
|
{metric.resp_content_type || "-"}
|
||||||
{:else if key === "cached"}
|
{:else if key === "cached"}
|
||||||
|
|||||||
Reference in New Issue
Block a user