Compare commits

...

9 Commits

Author SHA1 Message Date
Benson Wong 565c44766d config,proxy: add new configuration logToStdout (#432)
The new logToStdout option controls what is logged to stdout. The
default has been changed to just the proxy logs, which contain swap and
http request logs.

There are four supported settings: none, proxy, upstream, both. The
"both" setting is the legacy setting where everything was spewed to
stdout.
2025-12-21 22:23:31 -08:00
Benson Wong e6a9e210ba proxy: fix path bug in /logs/stream/{model_id} (#431)
A {model_id} containing a forward slash trips up gin's path param
parsing. This updates /logs/stream to work like /upstream where the
model_id is built up in parts and searched for in the configuration.

Updates #421
2025-12-21 21:47:14 -08:00
Benson Wong d3f329f924 proxy: Improve logging performance and allow separate log streaming (#421)
Replace container/ring.Ring with a custom circularBuffer that uses a
single contiguous []byte slice. This fixes the original implementation
which created 10,240 ring elements instead of 10KB of storage.

GetHistory is now 139x faster (145μs → 1μs) and uses 117x less memory
(1.2MB → 10KB). Allocations reduced from 2 to 1 per write operation.

Create a LogMonitor per proxy.Process, replacing the usage
of a shared one. The buffer in LogMonitor is lazy allocated on the first
call to Write and freed when the Process is stopped. This reduces
unnecessary memory usage when a model is not active.

The /logs/stream/{model_id} endpoint was added to stream logs from a
specific process.
2025-12-18 21:49:25 -08:00
Benson Wong 98879b38c1 docker: add /app to $PATH (#424)
Make it so llama-server can be called directly instead of with the full
path at /app/llama-server.

Fixes #423
Ref: #233
2025-12-06 22:58:29 -08:00
Benson Wong 7b3b0f5eae move header images around [skip ci] 2025-12-02 19:40:42 -08:00
Benson Wong 021ccceef1 README: update hero image 2025-12-02 19:37:03 -08:00
Benson Wong f03871c50a Update README.md
- add supported anthropic API 
- add example for docker hot reload support
2025-12-02 19:03:01 -08:00
Ryan Steed dc00d17abe docs: add documentation for non-root container images and security considerations (#416)
* docs: add documentation for non-root container images and security considerations
* docs: move container security section to dedicated file and update README links
2025-12-02 08:52:26 -08:00
Benson Wong dea98733c3 proxy: extract metrics for v1/messages (#419) 2025-11-29 23:51:20 -08:00
20 changed files with 647 additions and 115 deletions
+27 -12
View File
@@ -1,4 +1,4 @@
![llama-swap header image](header2.png)
![llama-swap header image](docs/assets/hero3.webp)
![GitHub Downloads (all assets, all releases)](https://img.shields.io/github/downloads/mostlygeek/llama-swap/total)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/mostlygeek/llama-swap/go-ci.yml)
![GitHub Repo stars](https://img.shields.io/github/stars/mostlygeek/llama-swap)
@@ -13,7 +13,7 @@ Built in Go for performance and simplicity, llama-swap has zero dependencies and
- ✅ Easy to deploy and configure: one binary, one configuration file. no external dependencies
- ✅ On-demand model switching
- ✅ Use any local OpenAI compatible server (llama.cpp, vllm, tabbyAPI, etc)
- ✅ Use any local OpenAI compatible server (llama.cpp, vllm, tabbyAPI, etc.)
- future proof, upgrade your inference servers at any time.
- ✅ OpenAI API supported endpoints:
- `v1/completions`
@@ -21,6 +21,8 @@ Built in Go for performance and simplicity, llama-swap has zero dependencies and
- `v1/embeddings`
- `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36))
- `v1/audio/transcriptions` ([docs](https://github.com/mostlygeek/llama-swap/issues/41#issuecomment-2722637867))
- ✅ Anthropic API supported endpoints:
- `v1/messages`
- ✅ llama-server (llama.cpp) supported endpoints
- `v1/rerank`, `v1/reranking`, `/rerank`
- `/infill` - for code infilling
@@ -44,7 +46,6 @@ llama-swap includes a real time web interface for monitoring logs and controllin
<img width="1164" height="745" alt="image" src="https://github.com/user-attachments/assets/bacf3f9d-819f-430b-9ed2-1bfaa8d54579" />
The Activity Page shows recent requests:
<img width="1360" height="963" alt="image" src="https://github.com/user-attachments/assets/5f3edee6-d03a-4ae5-ae06-b20ac1f135bd" />
@@ -61,7 +62,7 @@ llama-swap can be installed in multiple ways
### Docker Install ([download images](https://github.com/mostlygeek/llama-swap/pkgs/container/llama-swap))
Nightly container images with llama-swap and llama-server are built for multiple platforms (cuda, vulkan, intel, etc).
Nightly container images with llama-swap and llama-server are built for multiple platforms (cuda, vulkan, intel, etc.) including [non-root variants with improved security](docs/container-security.md).
```shell
$ docker pull ghcr.io/mostlygeek/llama-swap:cuda
@@ -71,6 +72,14 @@ $ docker run -it --rm --runtime nvidia -p 9292:8080 \
-v /path/to/models:/models \
-v /path/to/custom/config.yaml:/app/config.yaml \
ghcr.io/mostlygeek/llama-swap:cuda
# configuration hot reload supported with a
# directory volume mount
$ docker run -it --rm --runtime nvidia -p 9292:8080 \
-v /path/to/models:/models \
-v /path/to/custom/config.yaml:/app/config.yaml \
-v /path/to/config:/config \
ghcr.io/mostlygeek/llama-swap:cuda -config /config/config.yaml -watch-config
```
<details>
@@ -89,6 +98,9 @@ docker pull ghcr.io/mostlygeek/llama-swap:musa
# tagged llama-swap, platform and llama-server version images
docker pull ghcr.io/mostlygeek/llama-swap:v166-cuda-b6795
# non-root cuda
docker pull ghcr.io/mostlygeek/llama-swap:cuda-non-root
```
</details>
@@ -191,23 +203,26 @@ As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. H
## Monitoring Logs on the CLI
```shell
```sh
# sends up to the last 10KB of logs
curl http://host/logs'
$ curl http://host/logs
# streams combined logs
curl -Ns 'http://host/logs/stream'
curl -Ns http://host/logs/stream
# just llama-swap's logs
curl -Ns 'http://host/logs/stream/proxy'
# stream llama-swap's proxy status logs
curl -Ns http://host/logs/stream/proxy
# just upstream's logs
curl -Ns 'http://host/logs/stream/upstream'
# stream logs from upstream processes that llama-swap loads
curl -Ns http://host/logs/stream/upstream
# stream logs only from a specific model
curl -Ns http://host/logs/stream/{model_id}
# stream and filter logs with linux pipes
curl -Ns http://host/logs/stream | grep 'eval time'
# skips history and just streams new log entries
# appending ?no-history will disable sending buffered history first
curl -Ns 'http://host/logs/stream?no-history'
```
@@ -0,0 +1,85 @@
# Replace ring.Ring with Efficient Circular Byte Buffer
## Overview
Replace the inefficient `container/ring.Ring` implementation in `logMonitor.go` with a simple circular byte buffer that uses a single contiguous `[]byte` slice. This eliminates per-write allocations, improves cache locality, and correctly implements a 10KB buffer.
## Current Issues
1. `ring.New(10 * 1024)` creates 10,240 ring **elements**, not 10KB of storage
2. Every `Write()` call allocates a new `[]byte` slice inside the lock
3. `GetHistory()` iterates all 10,240 elements and appends repeatedly (geometric reallocs)
4. Linked list structure has poor cache locality and pointer overhead
## Design Requirements
### New CircularBuffer Type
Create a simple circular byte buffer with:
- Single pre-allocated `[]byte` of fixed capacity (10KB)
- `head` and `size` integers to track write position and data length
- No per-write allocations
### API Requirements
The new buffer must support:
1. **Write(p []byte)** - Append bytes, overwriting oldest data when full
2. **GetHistory() []byte** - Return all buffered data in correct order (oldest to newest)
### Implementation Details
```go
type circularBuffer struct {
data []byte // pre-allocated capacity
head int // next write position
size int // current number of bytes stored (0 to cap)
}
```
**Write logic:**
- If `len(p) >= capacity`: just keep the last `capacity` bytes
- Otherwise: write bytes at `head`, wrapping around if needed
- Update `head` and `size` accordingly
- Data is copied into the internal buffer (not stored by reference)
**GetHistory logic:**
- Calculate start position: `(head - size + cap) % cap`
- If not wrapped: single slice copy
- If wrapped: two copies (end of buffer + beginning)
- Returns a **new slice** (copy), not a view into internal buffer
### Immutability Guarantees (must preserve)
Per existing tests:
1. Modifying input `[]byte` after `Write()` must not affect stored data
2. `GetHistory()` returns independent copy - modifications don't affect buffer
## Files to Modify
- `proxy/logMonitor.go` - Replace `buffer *ring.Ring` with new circular buffer
## Testing Plan
Existing tests in `logMonitor_test.go` should continue to pass:
- `TestLogMonitor` - Basic write/read and subscriber notification
- `TestWrite_ImmutableBuffer` - Verify writes don't affect returned history
- `TestWrite_LogTimeFormat` - Timestamp formatting
Add new tests:
- Test buffer wrap-around behavior
- Test large writes that exceed buffer capacity
- Test exact capacity boundary conditions
## Checklist
- [ ] Create `circularBuffer` struct in `logMonitor.go`
- [ ] Implement `Write()` method for circular buffer
- [ ] Implement `GetHistory()` method for circular buffer
- [ ] Update `LogMonitor` struct to use new buffer
- [ ] Update `NewLogMonitorWriter()` to initialize new buffer
- [ ] Update `LogMonitor.Write()` to use new buffer
- [ ] Update `LogMonitor.GetHistory()` to use new buffer
- [ ] Remove `"container/ring"` import
- [ ] Run `make test-dev` to verify existing tests pass
- [ ] Add wrap-around test case
- [ ] Run `make test-all` for final validation
+10
View File
@@ -34,6 +34,16 @@ logLevel: info
# - For more info, read: https://pkg.go.dev/time#pkg-constants
logTimeFormat: ""
# logToStdout: controls what is logged to stdout
# - optional, default: "proxy"
# - valid values:
# - "proxy": logs generated by llama-swap when swapping models,
# handling requests, etc.
# - "upstream": a copy of an upstream processes stdout logs
# - "both": both the proxy and upstream logs interleaved together
# - "none": no logs are ever written to stdout
logToStdout: "proxy"
# metricsMaxInMemory: maximum number of metrics to keep in memory
# - optional, default: 1000
# - controls how many metrics are stored in memory before older ones are discarded
+4
View File
@@ -29,6 +29,10 @@ RUN chown --recursive $UID:$GID $HOME /app
USER $UID:$GID
WORKDIR /app
# Add /app to PATH
ENV PATH="/app:${PATH}"
RUN \
curl -LO "https://github.com/${LS_REPO}/releases/download/v${LS_VER}/llama-swap_${LS_VER}_linux_amd64.tar.gz" && \
tar -zxf "llama-swap_${LS_VER}_linux_amd64.tar.gz" && \

Before

Width:  |  Height:  |  Size: 261 KiB

After

Width:  |  Height:  |  Size: 261 KiB

Before

Width:  |  Height:  |  Size: 351 KiB

After

Width:  |  Height:  |  Size: 351 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 198 KiB

+39
View File
@@ -89,6 +89,9 @@ llama-swap supports many more features to customize how you want to manage your
> This is a copy of `config.example.yaml`. Always check that for the most up to date examples.
```yaml
# add this modeline for validation in vscode
# yaml-language-server: $schema=https://raw.githubusercontent.com/mostlygeek/llama-swap/refs/heads/main/config-schema.json
#
# llama-swap YAML configuration example
# -------------------------------------
#
@@ -114,6 +117,24 @@ healthCheckTimeout: 500
# - Valid log levels: debug, info, warn, error
logLevel: info
# logTimeFormat: enables and sets the logging timestamp format
# - optional, default (disabled): ""
# - Valid values: "", "ansic", "unixdate", "rubydate", "rfc822", "rfc822z",
# "rfc850", "rfc1123", "rfc1123z", "rfc3339", "rfc3339nano", "kitchen",
# "stamp", "stampmilli", "stampmicro", and "stampnano".
# - For more info, read: https://pkg.go.dev/time#pkg-constants
logTimeFormat: ""
# logToStdout: controls what is logged to stdout
# - optional, default: "proxy"
# - valid values:
# - "proxy": logs generated by llama-swap when swapping models,
# handling requests, etc.
# - "upstream": a copy of an upstream processes stdout logs
# - "both": both the proxy and upstream logs interleaved together
# - "none": no logs are ever written to stdout
logToStdout: "proxy"
# metricsMaxInMemory: maximum number of metrics to keep in memory
# - optional, default: 1000
# - controls how many metrics are stored in memory before older ones are discarded
@@ -126,6 +147,20 @@ metricsMaxInMemory: 1000
# - it is automatically incremented for every model that uses it
startPort: 10001
# sendLoadingState: inject loading status updates into the reasoning (thinking)
# field
# - optional, default: false
# - when true, a stream of loading messages will be sent to the client in the
# reasoning field so chat UIs can show that loading is in progress.
# - see #366 for more details
sendLoadingState: true
# includeAliasesInList: present aliases within the /v1/models OpenAI API listing
# - optional, default: false
# - when true, model aliases will be output to the API model listing duplicating
# all fields except for Id so chat UIs can use the alias equivalent to the original.
includeAliasesInList: false
# macros: a dictionary of string substitutions
# - optional, default: empty dictionary
# - macros are reusable snippets
@@ -274,6 +309,10 @@ models:
# - recommended to be omitted and the default used
concurrencyLimit: 0
# sendLoadingState: overrides the global sendLoadingState setting for this model
# - optional, default: undefined (use global setting)
sendLoadingState: false
# Unlisted model example:
"qwen-unlisted":
# unlisted: boolean, true or false
+9
View File
@@ -0,0 +1,9 @@
## Container Security
For convenience, the default container images use the **root** user within the container. This permits simplified access to host resources including volume mounts and hardware devices under `/dev/dri` (_for Vulkan support_). But this can widen the attack surface to privilege escalation exploits.
Alternative images, tagged as `non-root`, are also available. For example, `llama-swap:cpu-non-root` uses the unprivileged **app** user by default. Depending on deployment requirements, additional configuration may be necessary to ensure that the container retains access to required hosts resources. This might entail customizing host filesystem permissions/ownership appropriately or injecting host group membership into the container.
Docker offers a [system-wide option enabling user namespace remapping](https://docs.docker.com/engine/security/userns-remap/) to accommodate situations were a **root** container user is required but also mentions that _"The best way to prevent privilege-escalation attacks from within a container is to configure your container's applications to run as unprivileged users."_ Podman offers similar capability, per-container, to [set UID/GID mapping in a new user namespace](https://docs.podman.io/en/latest/markdown/podman-run.1.html#set-uid-gid-mapping-in-a-new-user-namespace).
The Large Language Model (_LLM/AI_) ecosystem is rapidly evolving and [serious security vulnerabilities have surfaced in the past](https://huggingface.co/docs/hub/security-pickle). These alternative _non-root_ images could reduce the impact of future unknown problems. However, proper planning and configuration is recommended to utilize them.
+14
View File
@@ -15,6 +15,12 @@ import (
)
const DEFAULT_GROUP_ID = "(default)"
const (
LogToStdoutProxy = "proxy"
LogToStdoutUpstream = "upstream"
LogToStdoutBoth = "both"
LogToStdoutNone = "none"
)
type MacroEntry struct {
Name string
@@ -114,6 +120,7 @@ type Config struct {
LogRequests bool `yaml:"logRequests"`
LogLevel string `yaml:"logLevel"`
LogTimeFormat string `yaml:"logTimeFormat"`
LogToStdout string `yaml:"logToStdout"`
MetricsMaxInMemory int `yaml:"metricsMaxInMemory"`
Models map[string]ModelConfig `yaml:"models"` /* key is model ID */
Profiles map[string][]string `yaml:"profiles"`
@@ -177,6 +184,7 @@ func LoadConfigFromReader(r io.Reader) (Config, error) {
StartPort: 5800,
LogLevel: "info",
LogTimeFormat: "",
LogToStdout: LogToStdoutProxy,
MetricsMaxInMemory: 1000,
}
err = yaml.Unmarshal(data, &config)
@@ -193,6 +201,12 @@ func LoadConfigFromReader(r io.Reader) (Config, error) {
return Config{}, fmt.Errorf("startPort must be greater than 1")
}
switch config.LogToStdout {
case LogToStdoutProxy, LogToStdoutUpstream, LogToStdoutBoth, LogToStdoutNone:
default:
return Config{}, fmt.Errorf("logToStdout must be one of: proxy, upstream, both, none")
}
// Populate the aliases map
config.aliases = make(map[string]string)
for modelName, modelConfig := range config.Models {
+1
View File
@@ -166,6 +166,7 @@ groups:
expected := Config{
LogLevel: "info",
LogTimeFormat: "",
LogToStdout: LogToStdoutProxy,
StartPort: 5800,
Macros: MacroList{
{"svr-path", "path/to/server"},
+1
View File
@@ -158,6 +158,7 @@ groups:
expected := Config{
LogLevel: "info",
LogTimeFormat: "",
LogToStdout: LogToStdoutProxy,
StartPort: 5800,
Macros: MacroList{
{"svr-path", "path/to/server"},
+101 -16
View File
@@ -1,7 +1,6 @@
package proxy
import (
"container/ring"
"context"
"fmt"
"io"
@@ -12,6 +11,85 @@ import (
"github.com/mostlygeek/llama-swap/event"
)
// circularBuffer is a fixed-size circular byte buffer that overwrites
// oldest data when full. It provides O(1) writes and O(n) reads.
type circularBuffer struct {
data []byte // pre-allocated capacity
head int // next write position
size int // current number of bytes stored (0 to cap)
}
func newCircularBuffer(capacity int) *circularBuffer {
return &circularBuffer{
data: make([]byte, capacity),
head: 0,
size: 0,
}
}
// Write appends bytes to the buffer, overwriting oldest data when full.
// Data is copied into the internal buffer (not stored by reference).
func (cb *circularBuffer) Write(p []byte) {
if len(p) == 0 {
return
}
cap := len(cb.data)
// If input is larger than capacity, only keep the last cap bytes
if len(p) >= cap {
copy(cb.data, p[len(p)-cap:])
cb.head = 0
cb.size = cap
return
}
// Calculate how much space is available from head to end of buffer
firstPart := cap - cb.head
if firstPart >= len(p) {
// All data fits without wrapping
copy(cb.data[cb.head:], p)
cb.head = (cb.head + len(p)) % cap
} else {
// Data wraps around
copy(cb.data[cb.head:], p[:firstPart])
copy(cb.data[:len(p)-firstPart], p[firstPart:])
cb.head = len(p) - firstPart
}
// Update size
cb.size += len(p)
if cb.size > cap {
cb.size = cap
}
}
// GetHistory returns all buffered data in correct order (oldest to newest).
// Returns a new slice (copy), not a view into internal buffer.
func (cb *circularBuffer) GetHistory() []byte {
if cb.size == 0 {
return nil
}
result := make([]byte, cb.size)
cap := len(cb.data)
// Calculate start position (oldest data)
start := (cb.head - cb.size + cap) % cap
if start+cb.size <= cap {
// Data is contiguous, single copy
copy(result, cb.data[start:start+cb.size])
} else {
// Data wraps around, two copies
firstPart := cap - start
copy(result[:firstPart], cb.data[start:])
copy(result[firstPart:], cb.data[:cb.size-firstPart])
}
return result
}
type LogLevel int
const (
@@ -19,12 +97,14 @@ const (
LevelInfo
LevelWarn
LevelError
LogBufferSize = 100 * 1024
)
type LogMonitor struct {
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
buffer *circularBuffer
bufferMu sync.RWMutex
// typically this can be os.Stdout
@@ -45,7 +125,7 @@ func NewLogMonitor() *LogMonitor {
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
eventbus: event.NewDispatcherConfig(1000),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
buffer: nil, // lazy initialized on first Write
stdout: stdout,
level: LevelInfo,
prefix: "",
@@ -64,12 +144,15 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) {
}
w.bufferMu.Lock()
bufferCopy := make([]byte, len(p))
copy(bufferCopy, p)
w.buffer.Value = bufferCopy
w.buffer = w.buffer.Next()
if w.buffer == nil {
w.buffer = newCircularBuffer(LogBufferSize)
}
w.buffer.Write(p)
w.bufferMu.Unlock()
// Make a copy for broadcast to preserve immutability
bufferCopy := make([]byte, len(p))
copy(bufferCopy, p)
w.broadcast(bufferCopy)
return n, nil
}
@@ -77,16 +160,18 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) {
func (w *LogMonitor) GetHistory() []byte {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if w.buffer == nil {
return nil
}
return w.buffer.GetHistory()
}
var history []byte
w.buffer.Do(func(p any) {
if p != nil {
if content, ok := p.([]byte); ok {
history = append(history, content...)
}
}
})
return history
// Clear releases the buffer memory, making it eligible for GC.
// The buffer will be lazily re-allocated on the next Write.
func (w *LogMonitor) Clear() {
w.bufferMu.Lock()
w.buffer = nil
w.bufferMu.Unlock()
}
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
+201
View File
@@ -113,3 +113,204 @@ func TestWrite_LogTimeFormat(t *testing.T) {
t.Fatalf("Cannot find timestamp: %v", err)
}
}
func TestCircularBuffer_WrapAround(t *testing.T) {
// Create a small buffer to test wrap-around
cb := newCircularBuffer(10)
// Write "hello" (5 bytes)
cb.Write([]byte("hello"))
if got := string(cb.GetHistory()); got != "hello" {
t.Errorf("Expected 'hello', got %q", got)
}
// Write "world" (5 bytes) - buffer now full
cb.Write([]byte("world"))
if got := string(cb.GetHistory()); got != "helloworld" {
t.Errorf("Expected 'helloworld', got %q", got)
}
// Write "12345" (5 bytes) - should overwrite "hello"
cb.Write([]byte("12345"))
if got := string(cb.GetHistory()); got != "world12345" {
t.Errorf("Expected 'world12345', got %q", got)
}
// Write data larger than buffer capacity
cb.Write([]byte("abcdefghijklmnop")) // 16 bytes, only last 10 kept
if got := string(cb.GetHistory()); got != "ghijklmnop" {
t.Errorf("Expected 'ghijklmnop', got %q", got)
}
}
func TestCircularBuffer_BoundaryConditions(t *testing.T) {
// Test empty buffer
cb := newCircularBuffer(10)
if got := cb.GetHistory(); got != nil {
t.Errorf("Expected nil for empty buffer, got %q", got)
}
// Test exact capacity
cb.Write([]byte("1234567890"))
if got := string(cb.GetHistory()); got != "1234567890" {
t.Errorf("Expected '1234567890', got %q", got)
}
// Test write exactly at capacity boundary
cb = newCircularBuffer(10)
cb.Write([]byte("12345"))
cb.Write([]byte("67890"))
if got := string(cb.GetHistory()); got != "1234567890" {
t.Errorf("Expected '1234567890', got %q", got)
}
}
func TestLogMonitor_LazyInit(t *testing.T) {
lm := NewLogMonitorWriter(io.Discard)
// Buffer should be nil before any writes
if lm.buffer != nil {
t.Error("Expected buffer to be nil before first write")
}
// GetHistory should return nil when buffer is nil
if got := lm.GetHistory(); got != nil {
t.Errorf("Expected nil history before first write, got %q", got)
}
// Write should lazily initialize the buffer
lm.Write([]byte("test"))
if lm.buffer == nil {
t.Error("Expected buffer to be initialized after write")
}
if got := string(lm.GetHistory()); got != "test" {
t.Errorf("Expected 'test', got %q", got)
}
}
func TestLogMonitor_Clear(t *testing.T) {
lm := NewLogMonitorWriter(io.Discard)
// Write some data
lm.Write([]byte("hello"))
if got := string(lm.GetHistory()); got != "hello" {
t.Errorf("Expected 'hello', got %q", got)
}
// Clear should release the buffer
lm.Clear()
if lm.buffer != nil {
t.Error("Expected buffer to be nil after Clear")
}
if got := lm.GetHistory(); got != nil {
t.Errorf("Expected nil history after Clear, got %q", got)
}
}
func TestLogMonitor_ClearAndReuse(t *testing.T) {
lm := NewLogMonitorWriter(io.Discard)
// Write, clear, then write again
lm.Write([]byte("first"))
lm.Clear()
lm.Write([]byte("second"))
if got := string(lm.GetHistory()); got != "second" {
t.Errorf("Expected 'second' after clear and reuse, got %q", got)
}
}
func BenchmarkLogMonitorWrite(b *testing.B) {
// Test data of varying sizes
smallMsg := []byte("small message\n")
mediumMsg := []byte(strings.Repeat("medium message content ", 10) + "\n")
largeMsg := []byte(strings.Repeat("large message content for benchmarking ", 100) + "\n")
b.Run("SmallWrite", func(b *testing.B) {
lm := NewLogMonitorWriter(io.Discard)
b.ResetTimer()
for i := 0; i < b.N; i++ {
lm.Write(smallMsg)
}
})
b.Run("MediumWrite", func(b *testing.B) {
lm := NewLogMonitorWriter(io.Discard)
b.ResetTimer()
for i := 0; i < b.N; i++ {
lm.Write(mediumMsg)
}
})
b.Run("LargeWrite", func(b *testing.B) {
lm := NewLogMonitorWriter(io.Discard)
b.ResetTimer()
for i := 0; i < b.N; i++ {
lm.Write(largeMsg)
}
})
b.Run("WithSubscribers", func(b *testing.B) {
lm := NewLogMonitorWriter(io.Discard)
// Add some subscribers
for i := 0; i < 5; i++ {
lm.OnLogData(func(data []byte) {})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
lm.Write(mediumMsg)
}
})
b.Run("GetHistory", func(b *testing.B) {
lm := NewLogMonitorWriter(io.Discard)
// Pre-populate with data
for i := 0; i < 1000; i++ {
lm.Write(mediumMsg)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
lm.GetHistory()
}
})
}
/*
Benchmark Results - MBP M1 Pro
Before (ring.Ring):
| Benchmark | ns/op | bytes/op | allocs/op |
|---------------------------------|------------|----------|-----------|
| SmallWrite (14B) | 43 ns | 40 B | 2 |
| MediumWrite (241B) | 76 ns | 264 B | 2 |
| LargeWrite (4KB) | 504 ns | 4,120 B | 2 |
| WithSubscribers (5 subs) | 355 ns | 264 B | 2 |
| GetHistory (after 1000 writes) | 145,000 ns | 1.2 MB | 22 |
After (circularBuffer 10KB):
| Benchmark | ns/op | bytes/op | allocs/op |
|---------------------------------|------------|----------|-----------|
| SmallWrite (14B) | 26 ns | 16 B | 1 |
| MediumWrite (241B) | 67 ns | 240 B | 1 |
| LargeWrite (4KB) | 774 ns | 4,096 B | 1 |
| WithSubscribers (5 subs) | 325 ns | 240 B | 1 |
| GetHistory (after 1000 writes) | 1,042 ns | 10,240 B | 1 |
After (circularBuffer 100KB):
| Benchmark | ns/op | bytes/op | allocs/op |
|---------------------------------|------------|-----------|-----------|
| SmallWrite (14B) | 26 ns | 16 B | 1 |
| MediumWrite (241B) | 66 ns | 240 B | 1 |
| LargeWrite (4KB) | 753 ns | 4,096 B | 1 |
| WithSubscribers (5 subs) | 309 ns | 240 B | 1 |
| GetHistory (after 1000 writes) | 7,788 ns | 106,496 B | 1 |
Summary:
- GetHistory: 139x faster (10KB), 18x faster (100KB)
- Allocations: reduced from 2 to 1 across all operations
- Small/medium writes: ~1.1-1.6x faster
*/
+43 -19
View File
@@ -122,11 +122,18 @@ func (mp *metricsMonitor) wrapHandler(
}
} else {
if gjson.ValidBytes(body) {
if tm, err := parseMetrics(modelID, recorder.StartTime(), gjson.ParseBytes(body)); err != nil {
mp.logger.Warnf("error parsing metrics: %v, path=%s", err, request.URL.Path)
} else {
mp.addMetrics(tm)
parsed := gjson.ParseBytes(body)
usage := parsed.Get("usage")
timings := parsed.Get("timings")
if usage.Exists() || timings.Exists() {
if tm, err := parseMetrics(modelID, recorder.StartTime(), usage, timings); err != nil {
mp.logger.Warnf("error parsing metrics: %v, path=%s", err, request.URL.Path)
} else {
mp.addMetrics(tm)
}
}
} else {
mp.logger.Warnf("metrics skipped, invalid JSON in response body path=%s", request.URL.Path)
}
@@ -174,19 +181,20 @@ func processStreamingResponse(modelID string, start time.Time, body []byte) (Tok
}
if gjson.ValidBytes(data) {
return parseMetrics(modelID, start, gjson.ParseBytes(data))
parsed := gjson.ParseBytes(data)
usage := parsed.Get("usage")
timings := parsed.Get("timings")
if usage.Exists() || timings.Exists() {
return parseMetrics(modelID, start, usage, timings)
}
}
}
return TokenMetrics{}, fmt.Errorf("no valid JSON data found in stream")
}
func parseMetrics(modelID string, start time.Time, jsonData gjson.Result) (TokenMetrics, error) {
usage := jsonData.Get("usage")
timings := jsonData.Get("timings")
if !usage.Exists() && !timings.Exists() {
return TokenMetrics{}, fmt.Errorf("no usage or timings data found")
}
func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) (TokenMetrics, error) {
// default values
cachedTokens := -1 // unknown or missing data
outputTokens := 0
@@ -198,19 +206,35 @@ func parseMetrics(modelID string, start time.Time, jsonData gjson.Result) (Token
durationMs := int(time.Since(start).Milliseconds())
if usage.Exists() {
outputTokens = int(jsonData.Get("usage.completion_tokens").Int())
inputTokens = int(jsonData.Get("usage.prompt_tokens").Int())
if pt := usage.Get("prompt_tokens"); pt.Exists() {
// v1/chat/completions
inputTokens = int(pt.Int())
} else if it := usage.Get("input_tokens"); it.Exists() {
// v1/messages
inputTokens = int(it.Int())
}
if ct := usage.Get("completion_tokens"); ct.Exists() {
// v1/chat/completions
outputTokens = int(ct.Int())
} else if ot := usage.Get("output_tokens"); ot.Exists() {
outputTokens = int(ot.Int())
}
if ct := usage.Get("cache_read_input_tokens"); ct.Exists() {
cachedTokens = int(ct.Int())
}
}
// use llama-server's timing data for tok/sec and duration as it is more accurate
if timings.Exists() {
inputTokens = int(jsonData.Get("timings.prompt_n").Int())
outputTokens = int(jsonData.Get("timings.predicted_n").Int())
promptPerSecond = jsonData.Get("timings.prompt_per_second").Float()
tokensPerSecond = jsonData.Get("timings.predicted_per_second").Float()
durationMs = int(jsonData.Get("timings.prompt_ms").Float() + jsonData.Get("timings.predicted_ms").Float())
inputTokens = int(timings.Get("prompt_n").Int())
outputTokens = int(timings.Get("predicted_n").Int())
promptPerSecond = timings.Get("prompt_per_second").Float()
tokensPerSecond = timings.Get("predicted_per_second").Float()
durationMs = int(timings.Get("prompt_ms").Float() + timings.Get("predicted_ms").Float())
if cachedValue := jsonData.Get("timings.cache_n"); cachedValue.Exists() {
if cachedValue := timings.Get("cache_n"); cachedValue.Exists() {
cachedTokens = int(cachedValue.Int())
}
}
+8 -1
View File
@@ -414,6 +414,9 @@ func (p *Process) stopCommand() {
stopStartTime := time.Now()
defer func() {
p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime))
// free the buffer in processLogger so the memory can be recovered
p.processLogger.Clear()
}()
p.cmdMutex.RLock()
@@ -646,6 +649,11 @@ func (p *Process) cmdStopUpstreamProcess() error {
return nil
}
// Logger returns the logger for this process.
func (p *Process) Logger() *LogMonitor {
return p.processLogger
}
var loadingRemarks = []string{
"Still faster than your last standup meeting...",
"Reticulating splines...",
@@ -864,7 +872,6 @@ func (s *statusResponseWriter) WriteHeader(statusCode int) {
s.Flush()
}
// Add Flush method
func (s *statusResponseWriter) Flush() {
if flusher, ok := s.writer.(http.Flusher); ok {
flusher.Flush()
+9 -1
View File
@@ -46,7 +46,8 @@ func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, u
// Create a Process for each member in the group
for _, modelID := range groupConfig.Members {
modelConfig, modelID, _ := pg.config.FindConfig(modelID)
process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger)
processLogger := NewLogMonitorWriter(upstreamLogger)
process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, processLogger, pg.proxyLogger)
pg.processes[modelID] = process
}
@@ -88,6 +89,13 @@ func (pg *ProcessGroup) HasMember(modelName string) bool {
return slices.Contains(pg.config.Groups[pg.id].Members, modelName)
}
func (pg *ProcessGroup) GetMember(modelName string) (*Process, bool) {
if pg.HasMember(modelName) {
return pg.processes[modelName], true
}
return nil, false
}
func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error {
pg.Lock()
+72 -52
View File
@@ -52,17 +52,37 @@ type ProxyManager struct {
version string
}
func New(config config.Config) *ProxyManager {
func New(proxyConfig config.Config) *ProxyManager {
// set up loggers
stdoutLogger := NewLogMonitorWriter(os.Stdout)
upstreamLogger := NewLogMonitorWriter(stdoutLogger)
proxyLogger := NewLogMonitorWriter(stdoutLogger)
if config.LogRequests {
var muxLogger, upstreamLogger, proxyLogger *LogMonitor
switch proxyConfig.LogToStdout {
case config.LogToStdoutNone:
muxLogger = NewLogMonitorWriter(io.Discard)
upstreamLogger = NewLogMonitorWriter(io.Discard)
proxyLogger = NewLogMonitorWriter(io.Discard)
case config.LogToStdoutBoth:
muxLogger = NewLogMonitorWriter(os.Stdout)
upstreamLogger = NewLogMonitorWriter(muxLogger)
proxyLogger = NewLogMonitorWriter(muxLogger)
case config.LogToStdoutUpstream:
muxLogger = NewLogMonitorWriter(os.Stdout)
upstreamLogger = NewLogMonitorWriter(muxLogger)
proxyLogger = NewLogMonitorWriter(io.Discard)
default:
// same as config.LogToStdoutProxy
// helpful because some old tests create a config.Config directly and it
// may not have LogToStdout set explicitly
muxLogger = NewLogMonitorWriter(os.Stdout)
upstreamLogger = NewLogMonitorWriter(io.Discard)
proxyLogger = NewLogMonitorWriter(muxLogger)
}
if proxyConfig.LogRequests {
proxyLogger.Warn("LogRequests configuration is deprecated. Use logLevel instead.")
}
switch strings.ToLower(strings.TrimSpace(config.LogLevel)) {
switch strings.ToLower(strings.TrimSpace(proxyConfig.LogLevel)) {
case "debug":
proxyLogger.SetLogLevel(LevelDebug)
upstreamLogger.SetLogLevel(LevelDebug)
@@ -99,7 +119,7 @@ func New(config config.Config) *ProxyManager {
"stampnano": time.StampNano,
}
if timeFormat, ok := timeFormats[strings.ToLower(strings.TrimSpace(config.LogTimeFormat))]; ok {
if timeFormat, ok := timeFormats[strings.ToLower(strings.TrimSpace(proxyConfig.LogTimeFormat))]; ok {
proxyLogger.SetLogTimeFormat(timeFormat)
upstreamLogger.SetLogTimeFormat(timeFormat)
}
@@ -107,18 +127,18 @@ func New(config config.Config) *ProxyManager {
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
var maxMetrics int
if config.MetricsMaxInMemory <= 0 {
if proxyConfig.MetricsMaxInMemory <= 0 {
maxMetrics = 1000 // Default fallback
} else {
maxMetrics = config.MetricsMaxInMemory
maxMetrics = proxyConfig.MetricsMaxInMemory
}
pm := &ProxyManager{
config: config,
config: proxyConfig,
ginEngine: gin.New(),
proxyLogger: proxyLogger,
muxLogger: stdoutLogger,
muxLogger: muxLogger,
upstreamLogger: upstreamLogger,
metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics),
@@ -134,19 +154,19 @@ func New(config config.Config) *ProxyManager {
}
// create the process groups
for groupID := range config.Groups {
processGroup := NewProcessGroup(groupID, config, proxyLogger, upstreamLogger)
for groupID := range proxyConfig.Groups {
processGroup := NewProcessGroup(groupID, proxyConfig, proxyLogger, upstreamLogger)
pm.processGroups[groupID] = processGroup
}
pm.setupGinEngine()
// run any startup hooks
if len(config.Hooks.OnStartup.Preload) > 0 {
if len(proxyConfig.Hooks.OnStartup.Preload) > 0 {
// do it in the background, don't block startup -- not sure if good idea yet
go func() {
discardWriter := &DiscardWriter{}
for _, realModelName := range config.Hooks.OnStartup.Preload {
for _, realModelName := range proxyConfig.Hooks.OnStartup.Preload {
proxyLogger.Infof("Preloading model: %s", realModelName)
processGroup, _, err := pm.swapProcessGroup(realModelName)
@@ -266,7 +286,7 @@ func (pm *ProxyManager) setupGinEngine() {
// in proxymanager_loghandlers.go
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/stream/*logMonitorID", pm.streamLogsHandler)
/**
* User Interface Endpoints
@@ -466,61 +486,61 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
})
}
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
upstreamPath := c.Param("upstreamPath")
// split the upstream path by / and search for the model name
parts := strings.Split(strings.TrimSpace(upstreamPath), "/")
if len(parts) == 0 {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return
}
modelFound := false
// findModelInPath searches for a valid model name in a path with slashes.
// It iteratively builds up path segments until it finds a matching model.
// Returns: (searchModelName, realModelName, remainingPath, found)
// Example: "/author/model/endpoint" with model "author/model" -> ("author/model", "author/model", "/endpoint", true)
func (pm *ProxyManager) findModelInPath(path string) (searchName string, realName string, remainingPath string, found bool) {
parts := strings.Split(strings.TrimSpace(path), "/")
searchModelName := ""
var modelName, remainingPath string
for i, part := range parts {
if parts[i] == "" {
if part == "" {
continue
}
if searchModelName == "" {
searchModelName = part
} else {
searchModelName = searchModelName + "/" + parts[i]
searchModelName = searchModelName + "/" + part
}
if real, ok := pm.config.RealModelName(searchModelName); ok {
modelName = real
remainingPath = "/" + strings.Join(parts[i+1:], "/")
modelFound = true
// Check if this is exactly a model name with no additional path
// and doesn't end with a trailing slash
if remainingPath == "/" && !strings.HasSuffix(upstreamPath, "/") {
// Build new URL with query parameters preserved
newPath := "/upstream/" + searchModelName + "/"
if c.Request.URL.RawQuery != "" {
newPath += "?" + c.Request.URL.RawQuery
}
// Use 308 for non-GET/HEAD requests to preserve method
if c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead {
c.Redirect(http.StatusMovedPermanently, newPath)
} else {
c.Redirect(http.StatusPermanentRedirect, newPath)
}
return
}
break
return searchModelName, real, "/" + strings.Join(parts[i+1:], "/"), true
}
}
return "", "", "", false
}
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
upstreamPath := c.Param("upstreamPath")
searchModelName, modelName, remainingPath, modelFound := pm.findModelInPath(upstreamPath)
if !modelFound {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return
}
// Check if this is exactly a model name with no additional path
// and doesn't end with a trailing slash
if remainingPath == "/" && !strings.HasSuffix(upstreamPath, "/") {
// Build new URL with query parameters preserved
newPath := "/upstream/" + searchModelName + "/"
if c.Request.URL.RawQuery != "" {
newPath += "?" + c.Request.URL.RawQuery
}
// Use 308 for non-GET/HEAD requests to preserve method
if c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead {
c.Redirect(http.StatusMovedPermanently, newPath)
} else {
c.Redirect(http.StatusPermanentRedirect, newPath)
}
return
}
processGroup, realModelName, err := pm.swapProcessGroup(modelName)
if err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error()))
+20 -13
View File
@@ -31,7 +31,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
// prevent nginx from buffering streamed logs
c.Header("X-Accel-Buffering", "no")
logMonitorId := c.Param("logMonitorID")
logMonitorId := strings.TrimPrefix(c.Param("logMonitorID"), "/")
logger, err := pm.getLogger(logMonitorId)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
@@ -83,18 +83,25 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
// getLogger searches for the appropriate logger based on the logMonitorId
func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) {
var logger *LogMonitor
if logMonitorId == "" {
switch logMonitorId {
case "":
// maintain the default
logger = pm.muxLogger
} else if logMonitorId == "proxy" {
logger = pm.proxyLogger
} else if logMonitorId == "upstream" {
logger = pm.upstreamLogger
} else {
return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'")
}
return pm.muxLogger, nil
case "proxy":
return pm.proxyLogger, nil
case "upstream":
return pm.upstreamLogger, nil
default:
// search for a models specific logger using findModelInPath
// to handle model names with slashes (e.g., "author/model")
if _, name, _, found := pm.findModelInPath("/" + logMonitorId); found {
for _, group := range pm.processGroups {
if process, found := group.GetMember(name); found {
return process.Logger(), nil
}
}
}
return logger, nil
return nil, fmt.Errorf("invalid logger. Use 'proxy', 'upstream' or a model's ID")
}
}
+3 -1
View File
@@ -1078,7 +1078,8 @@ func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
config := config.AddDefaultGroupToConfig(config.Config{
HealthCheckTimeout: 15,
Models: map[string]config.ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model1": getTestSimpleResponderConfig("model1"),
"author/model": getTestSimpleResponderConfig("author/model"),
},
LogLevel: "error",
})
@@ -1091,6 +1092,7 @@ func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
"/logs/stream",
"/logs/stream/proxy",
"/logs/stream/upstream",
"/logs/stream/author/model",
}
for _, endpoint := range endpoints {