Compare commits

...

11 Commits

Author SHA1 Message Date
Cr4xy 636b53e70f Improve rocm-smi performance monitoring (#775)
Fix hardcoded indices for rocm-smi.
2026-05-20 17:59:49 -07:00
gatkisson 59cd3b690d Added Windows performance monitoring using nvidia-smi (#773)
updates: #596, #771
2026-05-18 11:02:03 -07:00
Benson Wong 5d1e62d224 Disable auto review feature in coderabbit config 2026-05-18 10:40:21 -07:00
Benson Wong dbb869d019 Increase inactivity thresholds for stale issues
Updated stale issue and close messages to reflect new inactivity thresholds.
2026-05-17 22:52:58 -07:00
Benson Wong 26bb17e57e config.example.yaml: Improve matrix vs groups info
For some use cases groups are simpler to use. Note this in the
documentation that it is still fully supported.
2026-05-17 15:59:25 -07:00
Benson Wong 2982dd3d40 ui-svelte: update link to performance discussion thread 2026-05-17 11:45:56 -07:00
knguyen298 79dc87f881 Add ROCm stats via rocm-smi (#767)
Add ROCm GPU stats support using `rocm-smi`.
2026-05-17 07:58:26 -07:00
krzychdre b2fcc2daa1 ui-svelte: fix cached tokens total counting -1 sentinel (#760)
The backend uses cache_tokens=-1 as a sentinel for endpoints that don't
report cache stats (embeddings, vLLM). The activity table correctly
renders these as "-", but the totals widget summed the sentinels
directly, so each such request subtracted 1 from the displayed total.

- clamp cache_tokens with Math.max(0, ...) when reducing
2026-05-15 14:42:44 -07:00
cdwaage 6a9c4efc8f fix: use --loop instead of -loop for nvidia-smi (driver 540+ compat) (#759) 2026-05-15 13:20:29 -07:00
Benson Wong 0c813e44d1 ui-svelte: package updates 2026-05-14 21:56:04 -07:00
Benson Wong fe71e8a6ea proxy,ui-svelte: improve support for v1/messages and v1/responses (#758)
This improves the support for activity logging from the v1/responses and
v1/messages endpoints.

- add chat endpoint selection to Playground > Chat > Settings
- improve metrics extraction for streaming v1/messages and v1/responses
endpoints (tested with llama-server)

Fixes #742
2026-05-14 21:53:57 -07:00
14 changed files with 880 additions and 169 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ reviews:
docstrings:
enabled: false
auto_review:
enabled: true
enabled: false
drafts: false
chat:
auto_reply: true
+4 -4
View File
@@ -13,11 +13,11 @@ jobs:
steps:
- uses: actions/stale@b5d41d4e1d5dceea10e7104786b73624c18a190f #v10.2.0
with:
days-before-issue-stale: 14
days-before-issue-close: 14
days-before-issue-stale: 30
days-before-issue-close: 30
stale-issue-label: "stale"
stale-issue-message: "This issue is stale because it has been open for 2 weeks with no activity."
close-issue-message: "This issue was closed because it has been inactive for 2 weeks since being marked as stale."
stale-issue-message: "This issue is stale because it has been open without activity for 30 days. Please remove the stale label if this was an error."
close-issue-message: "This issue was closed because it has been inactive for 30 days since being marked as stale."
days-before-pr-stale: -1
days-before-pr-close: -1
repo-token: ${{ secrets.GITHUB_TOKEN }}
+13 -4
View File
@@ -281,7 +281,7 @@ models:
b: 2
# objects can contain complex types with macro substitution
# becomes: c: [0.7, false, "model: llama"]
c: [ "${temp}", false, "model: ${MODEL_ID}" ]
c: ["${temp}", false, "model: ${MODEL_ID}"]
# concurrencyLimit: overrides the allowed number of active parallel requests to a model
# - optional, default: 0
@@ -347,11 +347,20 @@ models:
# matrix: run concurrent models with a solver-based swap DSL
# =============================================================================
#
# Note:
# A config must use either a matrix or legacy groups, not both. A configuration error
# will occur if both are defined. Configuration examples for legacy Groups can be found:
# Matrix or Groups?
#
# Groups are available and fully supported. The syntax may be easier to use
# for simple use cases.
#
# Documentation can be found here:
# https://github.com/mostlygeek/llama-swap/blob/40e39f7/config.example.yaml#L334-L396
#
# A config can only use a matrix (recommended) or groups. A configuration error
# will occur if both are defined. Groups is legacy but is fully supported with
# no plans to deprecate it.
#
# ~~~~~
#
# The matrix declares valid combinations of models that can run concurrently.
# When a model is requested, the solver finds the cheapest way to make it
# available by evicting as few (and least costly) running models as possible.
+45
View File
@@ -0,0 +1,45 @@
package perf
import (
"strconv"
"strings"
"time"
)
// ParseNvidiaSmiLine parses a single line from nvidia-smi CSV output.
// Format: index,name,uuid,temperature.gpu,utilization.gpu,memory.used,memory.total,fan.speed,power.draw
func ParseNvidiaSmiLine(line string) *GpuStat {
fields := strings.Split(line, ",")
if len(fields) < 9 {
return nil
}
id, _ := strconv.Atoi(strings.TrimSpace(fields[0]))
name := strings.TrimSpace(fields[1])
uuid := strings.TrimSpace(fields[2])
tempC, _ := strconv.Atoi(strings.TrimSpace(fields[3]))
gpuUtil, _ := strconv.ParseFloat(strings.TrimSpace(fields[4]), 64)
memUsed, _ := strconv.Atoi(strings.TrimSpace(fields[5]))
memTotal, _ := strconv.Atoi(strings.TrimSpace(fields[6]))
fanSpeed, _ := strconv.ParseFloat(strings.TrimSpace(fields[7]), 64)
powerDraw, _ := strconv.ParseFloat(strings.TrimSpace(fields[8]), 64)
var memUtil float64
if memTotal > 0 {
memUtil = float64(memUsed) / float64(memTotal) * 100
}
return &GpuStat{
Timestamp: time.Now(),
ID: id,
Name: name,
UUID: uuid,
TempC: tempC,
GpuUtilPct: gpuUtil,
MemUtilPct: memUtil,
MemUsedMB: memUsed,
MemTotalMB: memTotal,
FanSpeedPct: fanSpeed,
PowerDrawW: powerDraw,
}
}
+40
View File
@@ -224,3 +224,43 @@ func TestCurrent_ConcurrentAccess(t *testing.T) {
}
wg.Wait()
}
func TestParseNvidiaSmiLine_ValidLine(t *testing.T) {
line := "0, NVIDIA GeForce RTX 3080, GPU-12345678-1234-1234-1234-123456789abc, 65, 80, 8192, 10240, 75, 250"
stat := ParseNvidiaSmiLine(line)
require.NotNil(t, stat)
assert.Equal(t, 0, stat.ID)
assert.Equal(t, "NVIDIA GeForce RTX 3080", stat.Name)
assert.Equal(t, "GPU-12345678-1234-1234-1234-123456789abc", stat.UUID)
assert.Equal(t, 65, stat.TempC)
assert.Equal(t, 80.0, stat.GpuUtilPct)
assert.Equal(t, 8192, stat.MemUsedMB)
assert.Equal(t, 10240, stat.MemTotalMB)
assert.Equal(t, 75.0, stat.FanSpeedPct)
assert.Equal(t, 250.0, stat.PowerDrawW)
assert.InDelta(t, 80.0, stat.MemUtilPct, 0.01)
}
func TestParseNvidiaSmiLine_ShortLine(t *testing.T) {
line := "0, NVIDIA GPU, GPU-123"
stat := ParseNvidiaSmiLine(line)
assert.Nil(t, stat)
}
func TestParseNvidiaSmiLine_MissingFields(t *testing.T) {
line := "0, NVIDIA GPU, GPU-123, 65, 80, 8192, 10240, 75"
stat := ParseNvidiaSmiLine(line)
assert.Nil(t, stat)
}
func TestParseNvidiaSmiLine_ZeroMemoryTotal(t *testing.T) {
line := "0, NVIDIA GPU, GPU-123, 65, 80, 0, 0, 75, 250"
stat := ParseNvidiaSmiLine(line)
require.NotNil(t, stat)
assert.Equal(t, 0.0, stat.MemUtilPct)
}
+161 -30
View File
@@ -38,6 +38,13 @@ func getGpuStats(ctx context.Context, every time.Duration, logger *logmon.Monito
logger.Debugf("nvidia-smi: %s", err.Error())
}
if ch, err := tryRocmSmi(ctx, every, logger); err == nil {
logger.Info("using rocm-smi for GPU monitoring")
return ch, nil
} else {
logger.Debugf("rocm-smi: %s", err.Error())
}
if ch, err := trySysfs(ctx, every, logger); err == nil {
logger.Info("using sysfs for GPU monitoring")
return ch, nil
@@ -139,7 +146,7 @@ func tryNvidiaSmi(ctx context.Context, every time.Duration, logger *logmon.Monit
cmd := exec.CommandContext(ctx, "nvidia-smi",
"--query-gpu=index,name,uuid,temperature.gpu,utilization.gpu,memory.used,memory.total,fan.speed,power.draw",
"--format=csv,noheader,nounits",
"-loop", fmt.Sprintf("%d", sec),
"--loop", fmt.Sprintf("%d", sec),
)
stdout, err := cmd.StdoutPipe()
@@ -163,7 +170,7 @@ func tryNvidiaSmi(ctx context.Context, every time.Duration, logger *logmon.Monit
continue
}
stat := parseNvidiaSmiLine(line)
stat := ParseNvidiaSmiLine(line)
if stat != nil {
select {
case ch <- []GpuStat{*stat}:
@@ -177,40 +184,164 @@ func tryNvidiaSmi(ctx context.Context, every time.Duration, logger *logmon.Monit
return ch, nil
}
func parseNvidiaSmiLine(line string) *GpuStat {
fields := strings.Split(line, ", ")
if len(fields) < 9 {
func tryRocmSmi(ctx context.Context, every time.Duration, logger *logmon.Monitor) (chan []GpuStat, error) {
if _, err := exec.LookPath("rocm-smi"); err != nil {
return nil, ErrNoGpuTool
}
if every < time.Second {
every = time.Second
}
const pollTimeout = 5 * time.Second
ch := make(chan []GpuStat, 1)
go func() {
defer close(ch)
ticker := time.NewTicker(every)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
pollCtx, cancel := context.WithTimeout(ctx, pollTimeout)
cmd := exec.CommandContext(pollCtx, "rocm-smi", "-i", "-P", "-t", "-f", "-u", "--showmemuse", "--showmeminfo", "vram", "--showproductname", "--csv")
out, err := cmd.Output()
timedOut := pollCtx.Err() == context.DeadlineExceeded
cancel()
if err != nil {
if timedOut {
logger.Debug("rocm-smi timed out")
}
continue
}
stats := make([]GpuStat, 0)
scanner := bufio.NewScanner(strings.NewReader(string(out)))
var header string
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if strings.HasPrefix(line, "device,") {
header = line
continue
}
stat := parseRocmSmiLine(header, line)
if stat != nil {
stats = append(stats, *stat)
}
}
if len(stats) > 0 {
select {
case ch <- stats:
default:
}
}
}
}
}()
return ch, nil
}
func parseRocmSmiLine(header string, line string) *GpuStat {
if header == "" || line == "" {
return nil
}
labels := strings.Split(header, ",")
fields := strings.Split(line, ",")
if len(labels) != len(fields) {
return nil
}
id, _ := strconv.Atoi(strings.TrimSpace(fields[0]))
name := strings.TrimSpace(fields[1])
uuid := strings.TrimSpace(fields[2])
tempC, _ := strconv.Atoi(strings.TrimSpace(fields[3]))
gpuUtil, _ := strconv.ParseFloat(strings.TrimSpace(fields[4]), 64)
memUsed, _ := strconv.Atoi(strings.TrimSpace(fields[5]))
memTotal, _ := strconv.Atoi(strings.TrimSpace(fields[6]))
fanSpeed, _ := strconv.ParseFloat(strings.TrimSpace(fields[7]), 64)
powerDraw, _ := strconv.ParseFloat(strings.TrimSpace(fields[8]), 64)
var memUtil float64
if memTotal > 0 {
memUtil = float64(memUsed) / float64(memTotal) * 100
result := &GpuStat{
Timestamp: time.Now(),
ID: -1,
}
return &GpuStat{
Timestamp: time.Now(),
ID: id,
Name: name,
UUID: uuid,
TempC: tempC,
GpuUtilPct: gpuUtil,
MemUtilPct: memUtil,
MemUsedMB: memUsed,
MemTotalMB: memTotal,
FanSpeedPct: fanSpeed,
PowerDrawW: powerDraw,
var device string
var deviceName string
var cardSeries string
var gfxVersion string
const toMB = 1024 * 1024
for i, col := range labels {
val := strings.TrimSpace(fields[i])
switch col {
case "device":
device = val
id, err := strconv.Atoi(strings.TrimPrefix(val, "card"))
if err != nil {
return nil
}
result.ID = id
break
case "Device Name":
deviceName = val
break
case "GUID":
result.UUID = val
break
case "Temperature (Sensor edge) (C)":
tempC, _ := strconv.ParseFloat(val, 64)
result.TempC = int(tempC)
break
case "Temperature (Sensor memory) (C)":
vramTempC, _ := strconv.ParseFloat(val, 64)
result.VramTempC = int(vramTempC)
break
case "Fan speed (%)":
fanSpeed, _ := strconv.ParseFloat(val, 64)
result.FanSpeedPct = fanSpeed
break
case "Current Socket Graphics Package Power (W)":
powerDraw, _ := strconv.ParseFloat(val, 64)
result.PowerDrawW = powerDraw
break
case "GPU use (%)":
gpuUtil, _ := strconv.ParseFloat(val, 64)
result.GpuUtilPct = gpuUtil
break
case "GPU Memory Allocated (VRAM%)":
memUtil, _ := strconv.ParseFloat(val, 64)
result.MemUtilPct = memUtil
break
case "VRAM Total Memory (B)":
memTotal, _ := strconv.ParseUint(val, 10, 64)
result.MemTotalMB = int(memTotal / toMB)
break
case "VRAM Total Used Memory (B)":
memUsed, _ := strconv.ParseUint(val, 10, 64)
result.MemUsedMB = int(memUsed / toMB)
break
case "Card Series":
cardSeries = val
break
case "GFX Version":
gfxVersion = val
break
}
}
if result.ID == -1 {
return nil
}
name := device
if cardSeries != "" && cardSeries != "N/A" {
name = cardSeries + " " + device + " (" + gfxVersion + ")"
} else if deviceName != "" && deviceName != "N/A" {
name = deviceName + " " + device + " (" + gfxVersion + ")"
}
result.Name = name
return result
}
func trySysfs(ctx context.Context, every time.Duration, logger *logmon.Monitor) (chan []GpuStat, error) {
+66 -1
View File
@@ -1,7 +1,11 @@
package perf
import (
"bufio"
"context"
"fmt"
"os/exec"
"strings"
"time"
"github.com/mostlygeek/llama-swap/internal/logmon"
@@ -11,7 +15,68 @@ import (
)
func getGpuStats(ctx context.Context, every time.Duration, logger *logmon.Monitor) (chan []GpuStat, error) {
return nil, ErrNotImplemented
if ch, err := tryNvidiaSmiWindows(ctx, every, logger); err == nil {
logger.Info("using nvidia-smi for GPU monitoring")
return ch, nil
} else {
logger.Debugf("nvidia-smi: %s", err.Error())
}
return nil, ErrNoGpuTool
}
// tryNvidiaSmiWindows starts nvidia-smi in loop mode on Windows and returns
// a channel receiving GPU stat snapshots. Returns ErrNoGpuTool if nvidia-smi
// is not available.
func tryNvidiaSmiWindows(ctx context.Context, every time.Duration, logger *logmon.Monitor) (chan []GpuStat, error) {
if _, err := exec.LookPath("nvidia-smi"); err != nil {
return nil, ErrNoGpuTool
}
sec := int(every.Seconds())
if sec < 1 {
sec = 1
}
cmd := exec.CommandContext(ctx, "nvidia-smi",
"--query-gpu=index,name,uuid,temperature.gpu,utilization.gpu,memory.used,memory.total,fan.speed,power.draw",
"--format=csv,noheader,nounits",
"--loop", fmt.Sprintf("%d", sec),
)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("nvidia-smi stdout pipe failed: %w", err)
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("nvidia-smi start failed: %w", err)
}
ch := make(chan []GpuStat, 1)
go func() {
defer close(ch)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
stat := ParseNvidiaSmiLine(line)
if stat != nil {
select {
case ch <- []GpuStat{*stat}:
default:
}
}
}
cmd.Wait()
}()
return ch, nil
}
func readSysStats() (SysStat, error) {
+124 -75
View File
@@ -424,110 +424,159 @@ func (mp *metricsMonitor) wrapHandler(
return nil
}
// usagePaths lists the JSON paths where a per-event usage object can live.
// v1/chat/completions puts it at top-level "usage"; v1/responses nests under
// "response.usage"; v1/messages emits it at "message.usage" on message_start
// and at "usage" on message_delta.
var usagePaths = []string{"usage", "response.usage", "message.usage"}
// extractUsageTokens reads input/output/cached token counts from a usage
// gjson.Result, handling the field-name differences across endpoints.
// cached returns -1 when the field is absent. ok is true when at least one
// field was present.
func extractUsageTokens(usage gjson.Result) (input, output, cached int64, ok bool) {
cached = -1
if !usage.Exists() {
return
}
if v := usage.Get("prompt_tokens"); v.Exists() {
// v1/chat/completions
input = v.Int()
ok = true
} else if v := usage.Get("input_tokens"); v.Exists() {
// v1/messages, v1/responses
input = v.Int()
ok = true
}
if v := usage.Get("completion_tokens"); v.Exists() {
// v1/chat/completions
output = v.Int()
ok = true
} else if v := usage.Get("output_tokens"); v.Exists() {
// v1/messages, v1/responses
output = v.Int()
ok = true
}
if v := usage.Get("cache_read_input_tokens"); v.Exists() {
// v1/messages (Anthropic)
cached = v.Int()
ok = true
} else if v := usage.Get("input_tokens_details.cached_tokens"); v.Exists() {
// v1/responses (OpenAI Responses API)
cached = v.Int()
ok = true
} else if v := usage.Get("prompt_tokens_details.cached_tokens"); v.Exists() {
// v1/chat/completions (OpenAI cache hits)
cached = v.Int()
ok = true
}
return
}
func processStreamingResponse(modelID string, start time.Time, body []byte) (ActivityLogEntry, error) {
// Iterate **backwards** through the body looking for the data payload with
// usage data. This avoids allocating a slice of all lines via bytes.Split.
// Walk SSE "data:" lines forward, merging usage info from every event.
// Different endpoints split usage across events:
// - v1/chat/completions: usage on the final chunk before [DONE]
// - v1/responses: usage on response.completed (response.usage)
// - v1/messages: input + cache on message_start (message.usage),
// output_tokens on message_delta (usage)
// We take the latest informative value per field so all three are covered.
// Start from the end of the body and scan backwards for newlines
pos := len(body)
for pos > 0 {
// Find the previous newline (or start of body)
lineStart := bytes.LastIndexByte(body[:pos], '\n')
if lineStart == -1 {
lineStart = 0
var (
inputTokens, outputTokens int64
cachedTokens int64 = -1
hasAny bool
timings gjson.Result
)
prefix := []byte("data:")
for offset := 0; offset < len(body); {
nl := bytes.IndexByte(body[offset:], '\n')
var line []byte
if nl == -1 {
line = body[offset:]
offset = len(body)
} else {
lineStart++ // Move past the newline
line = body[offset : offset+nl]
offset += nl + 1
}
line := bytes.TrimSpace(body[lineStart:pos])
pos = lineStart - 1 // Move position before the newline for next iteration
if len(line) == 0 {
continue
}
// SSE payload always follows "data:"
prefix := []byte("data:")
if !bytes.HasPrefix(line, prefix) {
line = bytes.TrimSpace(line)
if len(line) == 0 || !bytes.HasPrefix(line, prefix) {
continue
}
data := bytes.TrimSpace(line[len(prefix):])
if len(data) == 0 {
if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) {
continue
}
if bytes.Equal(data, []byte("[DONE]")) {
// [DONE] line itself contains nothing of interest.
if !gjson.ValidBytes(data) {
continue
}
parsed := gjson.ParseBytes(data)
if gjson.ValidBytes(data) {
parsed := gjson.ParseBytes(data)
usage := parsed.Get("usage")
timings := parsed.Get("timings")
// v1/responses format nests usage under response.usage
if !usage.Exists() {
usage = parsed.Get("response.usage")
for _, path := range usagePaths {
u := parsed.Get(path)
if !u.Exists() {
continue
}
if usage.Exists() || timings.Exists() {
return parseMetrics(modelID, start, usage, timings)
i, o, c, ok := extractUsageTokens(u)
if !ok {
continue
}
hasAny = true
// Take the latest non-zero value so message_start's input_tokens
// is preserved when message_delta's usage omits it, and vice versa
// for output_tokens.
if i > 0 {
inputTokens = i
}
if o > 0 {
outputTokens = o
}
if c >= 0 {
cachedTokens = c
}
}
if t := parsed.Get("timings"); t.Exists() {
timings = t
hasAny = true
}
}
return ActivityLogEntry{}, fmt.Errorf("no valid JSON data found in stream")
if !hasAny {
return ActivityLogEntry{}, fmt.Errorf("no valid JSON data found in stream")
}
return buildMetrics(modelID, start, inputTokens, outputTokens, cachedTokens, timings), nil
}
func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) (ActivityLogEntry, error) {
input, output, cached, _ := extractUsageTokens(usage)
return buildMetrics(modelID, start, input, output, cached, timings), nil
}
// buildMetrics composes an ActivityLogEntry from accumulated token counts and
// optional llama-server timings (which override input/output and provide rates).
func buildMetrics(modelID string, start time.Time, inputTokens, outputTokens, cachedTokens int64, timings gjson.Result) ActivityLogEntry {
wallDurationMs := int(time.Since(start).Milliseconds())
// default values
cachedTokens := -1 // unknown or missing data
outputTokens := 0
inputTokens := 0
// timings data
durationMs := wallDurationMs
tokensPerSecond := -1.0
promptPerSecond := -1.0
durationMs := wallDurationMs
if usage.Exists() {
if pt := usage.Get("prompt_tokens"); pt.Exists() {
// v1/chat/completions
inputTokens = int(pt.Int())
} else if it := usage.Get("input_tokens"); it.Exists() {
// v1/messages
inputTokens = int(it.Int())
}
if ct := usage.Get("completion_tokens"); ct.Exists() {
// v1/chat/completions
outputTokens = int(ct.Int())
} else if ot := usage.Get("output_tokens"); ot.Exists() {
outputTokens = int(ot.Int())
}
if ct := usage.Get("cache_read_input_tokens"); ct.Exists() {
cachedTokens = int(ct.Int())
}
}
// use llama-server's timing data for tok/sec and duration as it is more accurate
if timings.Exists() {
inputTokens = int(timings.Get("prompt_n").Int())
outputTokens = int(timings.Get("predicted_n").Int())
inputTokens = timings.Get("prompt_n").Int()
outputTokens = timings.Get("predicted_n").Int()
promptPerSecond = timings.Get("prompt_per_second").Float()
tokensPerSecond = timings.Get("predicted_per_second").Float()
timingsDurationMs := int(timings.Get("prompt_ms").Float() + timings.Get("predicted_ms").Float())
if timingsDurationMs > durationMs {
durationMs = timingsDurationMs
}
if cachedValue := timings.Get("cache_n"); cachedValue.Exists() {
cachedTokens = int(cachedValue.Int())
cachedTokens = cachedValue.Int()
}
}
@@ -535,14 +584,14 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result)
Timestamp: time.Now(),
Model: modelID,
Tokens: TokenMetrics{
CachedTokens: cachedTokens,
InputTokens: inputTokens,
OutputTokens: outputTokens,
CachedTokens: int(cachedTokens),
InputTokens: int(inputTokens),
OutputTokens: int(outputTokens),
PromptPerSecond: promptPerSecond,
TokensPerSecond: tokensPerSecond,
},
DurationMs: durationMs,
}, nil
}
}
// decompressBody decompresses the body based on Content-Encoding header
+118
View File
@@ -777,6 +777,124 @@ data: [DONE]
assert.Equal(t, 23, metrics[0].Tokens.OutputTokens)
})
t.Run("v1/responses full stream with deltas, output, and cached tokens", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)
// Realistic v1/responses stream: multiple delta events followed by
// done/completed events. Usage lives on response.completed and includes
// the OpenAI Responses cached-token shape (input_tokens_details.cached_tokens).
responseBody := "event: response.created\n" +
`data: {"type":"response.created","response":{"id":"resp_1","status":"in_progress"}}` + "\n\n" +
"event: response.output_item.added\n" +
`data: {"type":"response.output_item.added","item":{"id":"msg_1","role":"assistant","status":"in_progress","type":"message"}}` + "\n\n" +
"event: response.content_part.added\n" +
`data: {"type":"response.content_part.added","item_id":"msg_1","part":{"type":"output_text","text":""}}` + "\n\n" +
"event: response.output_text.delta\n" +
`data: {"type":"response.output_text.delta","item_id":"msg_1","delta":"Hello"}` + "\n\n" +
"event: response.output_text.delta\n" +
`data: {"type":"response.output_text.delta","item_id":"msg_1","delta":" world"}` + "\n\n" +
"event: response.output_text.done\n" +
`data: {"type":"response.output_text.done","item_id":"msg_1","text":"Hello world"}` + "\n\n" +
"event: response.content_part.done\n" +
`data: {"type":"response.content_part.done","item_id":"msg_1","part":{"type":"output_text","text":"Hello world"}}` + "\n\n" +
"event: response.output_item.done\n" +
`data: {"type":"response.output_item.done","item":{"type":"message","status":"completed","id":"msg_1","content":[{"type":"output_text","text":"Hello world"}],"role":"assistant"}}` + "\n\n" +
"event: response.completed\n" +
`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","status":"completed","model":"test-model","output":[{"type":"message","status":"completed","id":"msg_1","content":[{"type":"output_text","text":"Hello world"}],"role":"assistant"}],"usage":{"input_tokens":14,"output_tokens":24,"total_tokens":38,"input_tokens_details":{"cached_tokens":13}}}}` + "\n\n"
nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
w.Write([]byte(responseBody))
return nil
}
req := httptest.NewRequest("POST", "/v1/responses", nil)
rec := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(rec)
err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler)
assert.NoError(t, err)
metrics := mm.getMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, "test-model", metrics[0].Model)
assert.Equal(t, 14, metrics[0].Tokens.InputTokens)
assert.Equal(t, 24, metrics[0].Tokens.OutputTokens)
assert.Equal(t, 13, metrics[0].Tokens.CachedTokens)
})
t.Run("v1/messages merges usage from message_start and message_delta", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)
// v1/messages splits usage across two events:
// message_start.message.usage has input_tokens + cache_read_input_tokens
// message_delta.usage has the final output_tokens
// Without merging, output_tokens (last seen) would clobber the input fields.
responseBody := "event: message_start\n" +
`data: {"type":"message_start","message":{"id":"m1","type":"message","role":"assistant","content":[],"model":"test-model","usage":{"cache_read_input_tokens":5,"input_tokens":9,"output_tokens":0}}}` + "\n\n" +
"event: content_block_start\n" +
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}` + "\n\n" +
"event: content_block_delta\n" +
`data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hi"}}` + "\n\n" +
"event: content_block_delta\n" +
`data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there"}}` + "\n\n" +
"event: content_block_stop\n" +
`data: {"type":"content_block_stop","index":0}` + "\n\n" +
"event: message_delta\n" +
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":24}}` + "\n\n" +
"event: message_stop\n" +
`data: {"type":"message_stop"}` + "\n\n"
nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
w.Write([]byte(responseBody))
return nil
}
req := httptest.NewRequest("POST", "/v1/messages", nil)
rec := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(rec)
err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler)
assert.NoError(t, err)
metrics := mm.getMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, 9, metrics[0].Tokens.InputTokens)
assert.Equal(t, 24, metrics[0].Tokens.OutputTokens)
assert.Equal(t, 5, metrics[0].Tokens.CachedTokens)
})
t.Run("v1/chat/completions OpenAI prompt_tokens_details.cached_tokens", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)
responseBody := `data: {"choices":[{"delta":{"content":"hi"}}]}` + "\n\n" +
`data: {"choices":[{"delta":{}}],"usage":{"prompt_tokens":50,"completion_tokens":12,"prompt_tokens_details":{"cached_tokens":42}}}` + "\n\n" +
"data: [DONE]\n\n"
nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
w.Write([]byte(responseBody))
return nil
}
req := httptest.NewRequest("POST", "/v1/chat/completions", nil)
rec := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(rec)
err := mm.wrapHandler("test-model", ginCtx.Writer, req, captureAll, nextHandler)
assert.NoError(t, err)
metrics := mm.getMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, 50, metrics[0].Tokens.InputTokens)
assert.Equal(t, 12, metrics[0].Tokens.OutputTokens)
assert.Equal(t, 42, metrics[0].Tokens.CachedTokens)
})
t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)
+8 -8
View File
@@ -1227,9 +1227,9 @@
}
},
"node_modules/devalue": {
"version": "5.6.4",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.4.tgz",
"integrity": "sha512-Gp6rDldRsFh/7XuouDbxMH3Mx8GMCcgzIb1pDTvNyn8pZGQ22u+Wa+lGV9dQCltFQ7uVw0MhRyb8XDskNFOReA==",
"version": "5.8.1",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.8.1.tgz",
"integrity": "sha512-4CXDYRBGqN+57wVJkuXBYmpAVUSg3L6JAQa/DFqm238G73E1wuyc/JhGQJzN7vUf/CMphYau2zXbfWzDR5aTEw==",
"dev": true,
"license": "MIT"
},
@@ -3087,9 +3087,9 @@
}
},
"node_modules/svelte": {
"version": "5.53.11",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.11.tgz",
"integrity": "sha512-GYmqRjRhJYLQBonfdfGAt28gkfWEShrtXKGXcFGneXi502aBE+I1dJcs/YQriByvP6xqXRz/OdBGC6tfvUQHyQ==",
"version": "5.55.7",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.55.7.tgz",
"integrity": "sha512-ymI5ykLPwIHW839E053FQbI1G+jnRFJEw3Kv5Y4njixVWywQBx+NUFpkkKyk5LIb36Fg9DVXSYpqiGekLD0hyw==",
"dev": true,
"license": "MIT",
"dependencies": {
@@ -3102,9 +3102,9 @@
"aria-query": "5.3.1",
"axobject-query": "^4.1.0",
"clsx": "^2.1.1",
"devalue": "^5.6.3",
"devalue": "^5.8.1",
"esm-env": "^1.2.1",
"esrap": "^2.2.2",
"esrap": "^2.2.4",
"is-reference": "^3.0.3",
"locate-character": "^3.0.0",
"magic-string": "^0.30.11",
@@ -11,7 +11,7 @@
const totalRequests = $metrics.length;
const totalInputTokens = $metrics.reduce((sum, m) => sum + m.tokens.input_tokens, 0);
const totalOutputTokens = $metrics.reduce((sum, m) => sum + m.tokens.output_tokens, 0);
const totalCacheTokens = $metrics.reduce((sum, m) => sum + m.tokens.cache_tokens, 0);
const totalCacheTokens = $metrics.reduce((sum, m) => sum + Math.max(0, m.tokens.cache_tokens), 0);
const promptPerSecond = $metrics.filter((m) => m.tokens.prompt_per_second > 0).map((m) => m.tokens.prompt_per_second);
@@ -1,7 +1,7 @@
<script lang="ts">
import { models } from "../../stores/api";
import { persistentStore } from "../../stores/persistent";
import { streamChatCompletion } from "../../lib/chatApi";
import { streamChatCompletion, type Endpoint } from "../../lib/chatApi";
import { playgroundStores } from "../../stores/playgroundActivity";
import type { ChatMessage, ContentPart } from "../../lib/types";
import ChatMessageComponent from "./ChatMessage.svelte";
@@ -11,6 +11,8 @@
const selectedModelStore = persistentStore<string>("playground-selected-model", "");
const systemPromptStore = persistentStore<string>("playground-system-prompt", "");
const temperatureStore = persistentStore<number>("playground-temperature", 0.7);
const endpointStore = persistentStore<Endpoint>("playground-endpoint", "v1/chat/completions");
const maxTokensStore = persistentStore<number>("playground-max-tokens", 4096);
function loadMessages(): ChatMessage[] {
try {
@@ -142,7 +144,7 @@
$selectedModelStore,
apiMessages,
abortController.signal,
{ temperature: $temperatureStore }
{ temperature: $temperatureStore, endpoint: $endpointStore, max_tokens: $maxTokensStore }
);
for await (const chunk of stream) {
@@ -319,6 +321,19 @@
<!-- Settings panel -->
{#if showSettings}
<div class="shrink-0 mb-4 p-4 bg-surface border border-gray-200 dark:border-white/10 rounded">
<div class="mb-4">
<label class="block text-sm font-medium mb-1" for="endpoint">Endpoint</label>
<select
id="endpoint"
class="w-full px-3 py-2 rounded border border-gray-200 dark:border-white/10 bg-card focus:outline-none focus:ring-2 focus:ring-primary"
bind:value={$endpointStore}
disabled={isStreaming}
>
<option value="v1/chat/completions">/v1/chat/completions</option>
<option value="v1/messages">/v1/messages</option>
<option value="v1/responses">/v1/responses</option>
</select>
</div>
<div class="mb-4">
<label class="block text-sm font-medium mb-1" for="system-prompt">System Prompt</label>
<textarea
@@ -330,7 +345,7 @@
disabled={isStreaming}
></textarea>
</div>
<div>
<div class="mb-4">
<label class="block text-sm font-medium mb-1" for="temperature">
Temperature: {$temperatureStore.toFixed(2)}
</label>
@@ -349,6 +364,18 @@
<span>Creative (2)</span>
</div>
</div>
<div>
<label class="block text-sm font-medium mb-1" for="max-tokens">Max Tokens</label>
<input
id="max-tokens"
type="number"
min="1"
class="w-full px-3 py-2 rounded border border-gray-200 dark:border-white/10 bg-card focus:outline-none focus:ring-2 focus:ring-primary"
bind:value={$maxTokensStore}
disabled={isStreaming}
/>
<p class="text-xs text-txtsecondary mt-1">Required for /v1/messages.</p>
</div>
</div>
{/if}
+265 -41
View File
@@ -1,4 +1,6 @@
import type { ChatMessage, ChatCompletionRequest } from "./types";
import type { ChatMessage, ContentPart } from "./types";
export type Endpoint = "v1/chat/completions" | "v1/messages" | "v1/responses";
export interface StreamChunk {
content: string;
@@ -8,9 +10,126 @@ export interface StreamChunk {
export interface ChatOptions {
temperature?: number;
endpoint?: Endpoint;
max_tokens?: number;
}
function parseSSELine(line: string): StreamChunk | null {
function parseDataUrl(url: string): { media_type: string; data: string } {
const match = /^data:([^;]+);base64,(.*)$/i.exec(url);
if (!match) {
throw new Error("Image is not a base64 data URL");
}
return { media_type: match[1], data: match[2] };
}
function splitSystemMessages(messages: ChatMessage[]): { system: string; rest: ChatMessage[] } {
const systemParts: string[] = [];
const rest: ChatMessage[] = [];
for (const msg of messages) {
if (msg.role === "system") {
if (typeof msg.content === "string") {
systemParts.push(msg.content);
} else {
for (const part of msg.content) {
if (part.type === "text") systemParts.push(part.text);
}
}
} else {
rest.push(msg);
}
}
return { system: systemParts.join("\n\n"), rest };
}
function buildChatCompletionsBody(model: string, messages: ChatMessage[], options?: ChatOptions): object {
return {
model,
messages: messages.map((m) => ({
role: m.role,
content: m.content,
})),
stream: true,
temperature: options?.temperature,
...(options?.max_tokens ? { max_tokens: options.max_tokens } : {}),
};
}
function buildMessagesBody(model: string, messages: ChatMessage[], options?: ChatOptions): object {
const { system, rest } = splitSystemMessages(messages);
const mapped = rest.map((m) => {
if (typeof m.content === "string") {
return { role: m.role, content: m.content };
}
const blocks: object[] = [];
for (const part of m.content as ContentPart[]) {
if (part.type === "text") {
blocks.push({ type: "text", text: part.text });
} else if (m.role !== "assistant") {
const { media_type, data } = parseDataUrl(part.image_url.url);
blocks.push({ type: "image", source: { type: "base64", media_type, data } });
}
}
return { role: m.role, content: blocks };
});
const body: Record<string, unknown> = {
model,
messages: mapped,
stream: true,
max_tokens: options?.max_tokens ?? 4096,
};
if (system) body.system = system;
if (options?.temperature !== undefined) body.temperature = options.temperature;
return body;
}
function buildResponsesBody(model: string, messages: ChatMessage[], options?: ChatOptions): object {
const { system, rest } = splitSystemMessages(messages);
const input = rest.map((m) => {
const isAssistant = m.role === "assistant";
if (typeof m.content === "string") {
const partType = isAssistant ? "output_text" : "input_text";
return { role: m.role, content: [{ type: partType, text: m.content }] };
}
const content = m.content.map((part: ContentPart) => {
if (part.type === "text") {
return { type: isAssistant ? "output_text" : "input_text", text: part.text };
}
return { type: "input_image", image_url: part.image_url.url };
});
return { role: m.role, content };
});
const body: Record<string, unknown> = {
model,
input,
stream: true,
};
if (system) body.instructions = system;
if (options?.temperature !== undefined) body.temperature = options.temperature;
if (options?.max_tokens) body.max_output_tokens = options.max_tokens;
return body;
}
function buildRequest(
endpoint: Endpoint,
model: string,
messages: ChatMessage[],
options?: ChatOptions
): { url: string; body: object } {
const url = "/" + endpoint;
switch (endpoint) {
case "v1/messages":
return { url, body: buildMessagesBody(model, messages, options) };
case "v1/responses":
return { url, body: buildResponsesBody(model, messages, options) };
case "v1/chat/completions":
default:
return { url, body: buildChatCompletionsBody(model, messages, options) };
}
}
function parseChatCompletionsLine(line: string): StreamChunk | null {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data: ")) {
return null;
@@ -36,25 +155,158 @@ function parseSSELine(line: string): StreamChunk | null {
}
}
async function* parseChatCompletionsStream(
reader: ReadableStreamDefaultReader<Uint8Array>
): AsyncGenerator<StreamChunk> {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const result = parseChatCompletionsLine(line);
if (result?.done) {
yield result;
return;
}
if (result) {
yield result;
}
}
}
const result = parseChatCompletionsLine(buffer);
if (result && !result.done) {
yield result;
}
}
function parseSSEEventBlock(block: string): { event: string; data: string } | null {
let event = "";
const dataLines: string[] = [];
for (const rawLine of block.split("\n")) {
const line = rawLine.replace(/\r$/, "");
if (!line || line.startsWith(":")) continue;
if (line.startsWith("event:")) {
event = line.slice(6).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trim());
}
}
if (dataLines.length === 0 && !event) return null;
return { event, data: dataLines.join("\n") };
}
async function* parseMessagesStream(
reader: ReadableStreamDefaultReader<Uint8Array>
): AsyncGenerator<StreamChunk> {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() || "";
for (const block of blocks) {
const parsed = parseSSEEventBlock(block);
if (!parsed) continue;
if (parsed.event === "message_stop") {
yield { content: "", done: true };
return;
}
if (parsed.event !== "content_block_delta" || !parsed.data) continue;
try {
const json = JSON.parse(parsed.data);
const delta = json.delta;
if (!delta) continue;
if (delta.type === "text_delta" && delta.text) {
yield { content: delta.text, done: false };
} else if (delta.type === "thinking_delta" && delta.thinking) {
yield { content: "", reasoning_content: delta.thinking, done: false };
}
} catch {
// ignore malformed event
}
}
}
}
async function* parseResponsesStream(
reader: ReadableStreamDefaultReader<Uint8Array>
): AsyncGenerator<StreamChunk> {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() || "";
for (const block of blocks) {
const parsed = parseSSEEventBlock(block);
if (!parsed) continue;
if (parsed.event === "response.completed") {
yield { content: "", done: true };
return;
}
if (!parsed.data) continue;
try {
const json = JSON.parse(parsed.data);
if (parsed.event === "response.output_text.delta" && json.delta) {
yield { content: json.delta, done: false };
} else if (parsed.event === "response.reasoning_summary_text.delta" && json.delta) {
yield { content: "", reasoning_content: json.delta, done: false };
}
} catch {
// ignore malformed event
}
}
}
}
function parseStream(
endpoint: Endpoint,
reader: ReadableStreamDefaultReader<Uint8Array>
): AsyncGenerator<StreamChunk> {
switch (endpoint) {
case "v1/messages":
return parseMessagesStream(reader);
case "v1/responses":
return parseResponsesStream(reader);
case "v1/chat/completions":
default:
return parseChatCompletionsStream(reader);
}
}
export async function* streamChatCompletion(
model: string,
messages: ChatMessage[],
signal?: AbortSignal,
options?: ChatOptions
): AsyncGenerator<StreamChunk> {
const request: ChatCompletionRequest = {
model,
messages,
stream: true,
temperature: options?.temperature,
};
const endpoint = options?.endpoint ?? "v1/chat/completions";
const { url, body } = buildRequest(endpoint, model, messages, options);
const response = await fetch("/v1/chat/completions", {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(request),
body: JSON.stringify(body),
signal,
});
@@ -68,39 +320,11 @@ export async function* streamChatCompletion(
throw new Error("Response body is not readable");
}
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const result = parseSSELine(line);
if (result?.done) {
yield result;
return;
}
if (result) {
yield result;
}
}
for await (const chunk of parseStream(endpoint, reader)) {
yield chunk;
if (chunk.done) return;
}
// Process any remaining buffer
const result = parseSSELine(buffer);
if (result && !result.done) {
yield result;
}
yield { content: "", done: true };
} finally {
reader.releaseLock();
+4 -1
View File
@@ -400,7 +400,10 @@
</div>
</div>
<p class="text-sm text-txtsecondary">
This is an experimental feature. Please see <a class="underline hover:text-txtmain" href="https://github.com/mostlygeek/llama-swap/issues/596">issue 596</a> for instructions.
This is an experimental feature. Please use <a
class="underline hover:text-txtmain"
href="https://github.com/mostlygeek/llama-swap/discussions/771">discussion #711</a
> for instructions and to share feedback.
</p>
<!-- GPU Section -->