From d3f329f92461f3edfd69d747a8700974454bbde5 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Thu, 18 Dec 2025 21:49:25 -0800 Subject: [PATCH] proxy: Improve logging performance and allow separate log streaming (#421) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace container/ring.Ring with a custom circularBuffer that uses a single contiguous []byte slice. This fixes the original implementation which created 10,240 ring elements instead of 10KB of storage. GetHistory is now 139x faster (145μs → 1μs) and uses 117x less memory (1.2MB → 10KB). Allocations reduced from 2 to 1 per write operation. Create a LogMonitor per proxy.Process, replacing the usage of a shared one. The buffer in LogMonitor is lazy allocated on the first call to Write and freed when the Process is stopped. This reduces unnecessary memory usage when a model is not active. The /logs/stream/{model_id} endpoint was added to stream logs from a specific process. --- README.md | 19 +- ai-plans/2025-12-14-efficient-ring-buffer.md | 85 ++++++++ proxy/logMonitor.go | 117 +++++++++-- proxy/logMonitor_test.go | 201 +++++++++++++++++++ proxy/process.go | 9 +- proxy/processgroup.go | 10 +- proxy/proxymanager_loghandlers.go | 28 ++- 7 files changed, 432 insertions(+), 37 deletions(-) create mode 100644 ai-plans/2025-12-14-efficient-ring-buffer.md diff --git a/README.md b/README.md index 1a729f97..09b80b9e 100644 --- a/README.md +++ b/README.md @@ -203,23 +203,26 @@ As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. H ## Monitoring Logs on the CLI -```shell +```sh # sends up to the last 10KB of logs -curl http://host/logs' +$ curl http://host/logs # streams combined logs -curl -Ns 'http://host/logs/stream' +curl -Ns http://host/logs/stream -# just llama-swap's logs -curl -Ns 'http://host/logs/stream/proxy' +# stream llama-swap's proxy status logs +curl -Ns http://host/logs/stream/proxy -# just upstream's logs -curl -Ns 'http://host/logs/stream/upstream' +# stream logs from upstream processes that llama-swap loads +curl -Ns http://host/logs/stream/upstream + +# stream logs only from a specific model +curl -Ns http://host/logs/stream/{model_id} # stream and filter logs with linux pipes curl -Ns http://host/logs/stream | grep 'eval time' -# skips history and just streams new log entries +# appending ?no-history will disable sending buffered history first curl -Ns 'http://host/logs/stream?no-history' ``` diff --git a/ai-plans/2025-12-14-efficient-ring-buffer.md b/ai-plans/2025-12-14-efficient-ring-buffer.md new file mode 100644 index 00000000..dd66abdf --- /dev/null +++ b/ai-plans/2025-12-14-efficient-ring-buffer.md @@ -0,0 +1,85 @@ +# Replace ring.Ring with Efficient Circular Byte Buffer + +## Overview + +Replace the inefficient `container/ring.Ring` implementation in `logMonitor.go` with a simple circular byte buffer that uses a single contiguous `[]byte` slice. This eliminates per-write allocations, improves cache locality, and correctly implements a 10KB buffer. + +## Current Issues + +1. `ring.New(10 * 1024)` creates 10,240 ring **elements**, not 10KB of storage +2. Every `Write()` call allocates a new `[]byte` slice inside the lock +3. `GetHistory()` iterates all 10,240 elements and appends repeatedly (geometric reallocs) +4. Linked list structure has poor cache locality and pointer overhead + +## Design Requirements + +### New CircularBuffer Type + +Create a simple circular byte buffer with: +- Single pre-allocated `[]byte` of fixed capacity (10KB) +- `head` and `size` integers to track write position and data length +- No per-write allocations + +### API Requirements + +The new buffer must support: +1. **Write(p []byte)** - Append bytes, overwriting oldest data when full +2. **GetHistory() []byte** - Return all buffered data in correct order (oldest to newest) + +### Implementation Details + +```go +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} +``` + +**Write logic:** +- If `len(p) >= capacity`: just keep the last `capacity` bytes +- Otherwise: write bytes at `head`, wrapping around if needed +- Update `head` and `size` accordingly +- Data is copied into the internal buffer (not stored by reference) + +**GetHistory logic:** +- Calculate start position: `(head - size + cap) % cap` +- If not wrapped: single slice copy +- If wrapped: two copies (end of buffer + beginning) +- Returns a **new slice** (copy), not a view into internal buffer + +### Immutability Guarantees (must preserve) + +Per existing tests: +1. Modifying input `[]byte` after `Write()` must not affect stored data +2. `GetHistory()` returns independent copy - modifications don't affect buffer + +## Files to Modify + +- `proxy/logMonitor.go` - Replace `buffer *ring.Ring` with new circular buffer + +## Testing Plan + +Existing tests in `logMonitor_test.go` should continue to pass: +- `TestLogMonitor` - Basic write/read and subscriber notification +- `TestWrite_ImmutableBuffer` - Verify writes don't affect returned history +- `TestWrite_LogTimeFormat` - Timestamp formatting + +Add new tests: +- Test buffer wrap-around behavior +- Test large writes that exceed buffer capacity +- Test exact capacity boundary conditions + +## Checklist + +- [ ] Create `circularBuffer` struct in `logMonitor.go` +- [ ] Implement `Write()` method for circular buffer +- [ ] Implement `GetHistory()` method for circular buffer +- [ ] Update `LogMonitor` struct to use new buffer +- [ ] Update `NewLogMonitorWriter()` to initialize new buffer +- [ ] Update `LogMonitor.Write()` to use new buffer +- [ ] Update `LogMonitor.GetHistory()` to use new buffer +- [ ] Remove `"container/ring"` import +- [ ] Run `make test-dev` to verify existing tests pass +- [ ] Add wrap-around test case +- [ ] Run `make test-all` for final validation diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 9597c8a6..e440a6f1 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -1,7 +1,6 @@ package proxy import ( - "container/ring" "context" "fmt" "io" @@ -12,6 +11,85 @@ import ( "github.com/mostlygeek/llama-swap/event" ) +// circularBuffer is a fixed-size circular byte buffer that overwrites +// oldest data when full. It provides O(1) writes and O(n) reads. +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} + +func newCircularBuffer(capacity int) *circularBuffer { + return &circularBuffer{ + data: make([]byte, capacity), + head: 0, + size: 0, + } +} + +// Write appends bytes to the buffer, overwriting oldest data when full. +// Data is copied into the internal buffer (not stored by reference). +func (cb *circularBuffer) Write(p []byte) { + if len(p) == 0 { + return + } + + cap := len(cb.data) + + // If input is larger than capacity, only keep the last cap bytes + if len(p) >= cap { + copy(cb.data, p[len(p)-cap:]) + cb.head = 0 + cb.size = cap + return + } + + // Calculate how much space is available from head to end of buffer + firstPart := cap - cb.head + if firstPart >= len(p) { + // All data fits without wrapping + copy(cb.data[cb.head:], p) + cb.head = (cb.head + len(p)) % cap + } else { + // Data wraps around + copy(cb.data[cb.head:], p[:firstPart]) + copy(cb.data[:len(p)-firstPart], p[firstPart:]) + cb.head = len(p) - firstPart + } + + // Update size + cb.size += len(p) + if cb.size > cap { + cb.size = cap + } +} + +// GetHistory returns all buffered data in correct order (oldest to newest). +// Returns a new slice (copy), not a view into internal buffer. +func (cb *circularBuffer) GetHistory() []byte { + if cb.size == 0 { + return nil + } + + result := make([]byte, cb.size) + cap := len(cb.data) + + // Calculate start position (oldest data) + start := (cb.head - cb.size + cap) % cap + + if start+cb.size <= cap { + // Data is contiguous, single copy + copy(result, cb.data[start:start+cb.size]) + } else { + // Data wraps around, two copies + firstPart := cap - start + copy(result[:firstPart], cb.data[start:]) + copy(result[firstPart:], cb.data[:cb.size-firstPart]) + } + + return result +} + type LogLevel int const ( @@ -19,12 +97,14 @@ const ( LevelInfo LevelWarn LevelError + + LogBufferSize = 100 * 1024 ) type LogMonitor struct { eventbus *event.Dispatcher mu sync.RWMutex - buffer *ring.Ring + buffer *circularBuffer bufferMu sync.RWMutex // typically this can be os.Stdout @@ -45,7 +125,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ eventbus: event.NewDispatcherConfig(1000), - buffer: ring.New(10 * 1024), // keep 10KB of buffered logs + buffer: nil, // lazy initialized on first Write stdout: stdout, level: LevelInfo, prefix: "", @@ -64,12 +144,15 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { } w.bufferMu.Lock() - bufferCopy := make([]byte, len(p)) - copy(bufferCopy, p) - w.buffer.Value = bufferCopy - w.buffer = w.buffer.Next() + if w.buffer == nil { + w.buffer = newCircularBuffer(LogBufferSize) + } + w.buffer.Write(p) w.bufferMu.Unlock() + // Make a copy for broadcast to preserve immutability + bufferCopy := make([]byte, len(p)) + copy(bufferCopy, p) w.broadcast(bufferCopy) return n, nil } @@ -77,16 +160,18 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { func (w *LogMonitor) GetHistory() []byte { w.bufferMu.RLock() defer w.bufferMu.RUnlock() + if w.buffer == nil { + return nil + } + return w.buffer.GetHistory() +} - var history []byte - w.buffer.Do(func(p any) { - if p != nil { - if content, ok := p.([]byte); ok { - history = append(history, content...) - } - } - }) - return history +// Clear releases the buffer memory, making it eligible for GC. +// The buffer will be lazily re-allocated on the next Write. +func (w *LogMonitor) Clear() { + w.bufferMu.Lock() + w.buffer = nil + w.bufferMu.Unlock() } func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc { diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index a25b637b..aff0d3e3 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -113,3 +113,204 @@ func TestWrite_LogTimeFormat(t *testing.T) { t.Fatalf("Cannot find timestamp: %v", err) } } + +func TestCircularBuffer_WrapAround(t *testing.T) { + // Create a small buffer to test wrap-around + cb := newCircularBuffer(10) + + // Write "hello" (5 bytes) + cb.Write([]byte("hello")) + if got := string(cb.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Write "world" (5 bytes) - buffer now full + cb.Write([]byte("world")) + if got := string(cb.GetHistory()); got != "helloworld" { + t.Errorf("Expected 'helloworld', got %q", got) + } + + // Write "12345" (5 bytes) - should overwrite "hello" + cb.Write([]byte("12345")) + if got := string(cb.GetHistory()); got != "world12345" { + t.Errorf("Expected 'world12345', got %q", got) + } + + // Write data larger than buffer capacity + cb.Write([]byte("abcdefghijklmnop")) // 16 bytes, only last 10 kept + if got := string(cb.GetHistory()); got != "ghijklmnop" { + t.Errorf("Expected 'ghijklmnop', got %q", got) + } +} + +func TestCircularBuffer_BoundaryConditions(t *testing.T) { + // Test empty buffer + cb := newCircularBuffer(10) + if got := cb.GetHistory(); got != nil { + t.Errorf("Expected nil for empty buffer, got %q", got) + } + + // Test exact capacity + cb.Write([]byte("1234567890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } + + // Test write exactly at capacity boundary + cb = newCircularBuffer(10) + cb.Write([]byte("12345")) + cb.Write([]byte("67890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } +} + +func TestLogMonitor_LazyInit(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Buffer should be nil before any writes + if lm.buffer != nil { + t.Error("Expected buffer to be nil before first write") + } + + // GetHistory should return nil when buffer is nil + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history before first write, got %q", got) + } + + // Write should lazily initialize the buffer + lm.Write([]byte("test")) + + if lm.buffer == nil { + t.Error("Expected buffer to be initialized after write") + } + + if got := string(lm.GetHistory()); got != "test" { + t.Errorf("Expected 'test', got %q", got) + } +} + +func TestLogMonitor_Clear(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write some data + lm.Write([]byte("hello")) + if got := string(lm.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Clear should release the buffer + lm.Clear() + + if lm.buffer != nil { + t.Error("Expected buffer to be nil after Clear") + } + + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history after Clear, got %q", got) + } +} + +func TestLogMonitor_ClearAndReuse(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write, clear, then write again + lm.Write([]byte("first")) + lm.Clear() + lm.Write([]byte("second")) + + if got := string(lm.GetHistory()); got != "second" { + t.Errorf("Expected 'second' after clear and reuse, got %q", got) + } +} + +func BenchmarkLogMonitorWrite(b *testing.B) { + // Test data of varying sizes + smallMsg := []byte("small message\n") + mediumMsg := []byte(strings.Repeat("medium message content ", 10) + "\n") + largeMsg := []byte(strings.Repeat("large message content for benchmarking ", 100) + "\n") + + b.Run("SmallWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(smallMsg) + } + }) + + b.Run("MediumWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("LargeWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(largeMsg) + } + }) + + b.Run("WithSubscribers", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Add some subscribers + for i := 0; i < 5; i++ { + lm.OnLogData(func(data []byte) {}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("GetHistory", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Pre-populate with data + for i := 0; i < 1000; i++ { + lm.Write(mediumMsg) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.GetHistory() + } + }) +} + +/* +Benchmark Results - MBP M1 Pro + +Before (ring.Ring): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 43 ns | 40 B | 2 | +| MediumWrite (241B) | 76 ns | 264 B | 2 | +| LargeWrite (4KB) | 504 ns | 4,120 B | 2 | +| WithSubscribers (5 subs) | 355 ns | 264 B | 2 | +| GetHistory (after 1000 writes) | 145,000 ns | 1.2 MB | 22 | + +After (circularBuffer 10KB): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 67 ns | 240 B | 1 | +| LargeWrite (4KB) | 774 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 325 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 1,042 ns | 10,240 B | 1 | + +After (circularBuffer 100KB): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|-----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 66 ns | 240 B | 1 | +| LargeWrite (4KB) | 753 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 309 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 7,788 ns | 106,496 B | 1 | + +Summary: +- GetHistory: 139x faster (10KB), 18x faster (100KB) +- Allocations: reduced from 2 to 1 across all operations +- Small/medium writes: ~1.1-1.6x faster +*/ diff --git a/proxy/process.go b/proxy/process.go index 640ba34a..41427059 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -414,6 +414,9 @@ func (p *Process) stopCommand() { stopStartTime := time.Now() defer func() { p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime)) + + // free the buffer in processLogger so the memory can be recovered + p.processLogger.Clear() }() p.cmdMutex.RLock() @@ -646,6 +649,11 @@ func (p *Process) cmdStopUpstreamProcess() error { return nil } +// Logger returns the logger for this process. +func (p *Process) Logger() *LogMonitor { + return p.processLogger +} + var loadingRemarks = []string{ "Still faster than your last standup meeting...", "Reticulating splines...", @@ -864,7 +872,6 @@ func (s *statusResponseWriter) WriteHeader(statusCode int) { s.Flush() } -// Add Flush method func (s *statusResponseWriter) Flush() { if flusher, ok := s.writer.(http.Flusher); ok { flusher.Flush() diff --git a/proxy/processgroup.go b/proxy/processgroup.go index e0b06008..b401d8a6 100644 --- a/proxy/processgroup.go +++ b/proxy/processgroup.go @@ -46,7 +46,8 @@ func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, u // Create a Process for each member in the group for _, modelID := range groupConfig.Members { modelConfig, modelID, _ := pg.config.FindConfig(modelID) - process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) + processLogger := NewLogMonitorWriter(upstreamLogger) + process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, processLogger, pg.proxyLogger) pg.processes[modelID] = process } @@ -88,6 +89,13 @@ func (pg *ProcessGroup) HasMember(modelName string) bool { return slices.Contains(pg.config.Groups[pg.id].Members, modelName) } +func (pg *ProcessGroup) GetMember(modelName string) (*Process, bool) { + if pg.HasMember(modelName) { + return pg.processes[modelName], true + } + return nil, false +} + func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error { pg.Lock() diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go index a3de806a..d4a59e88 100644 --- a/proxy/proxymanager_loghandlers.go +++ b/proxy/proxymanager_loghandlers.go @@ -83,18 +83,24 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { // getLogger searches for the appropriate logger based on the logMonitorId func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) { - var logger *LogMonitor - - if logMonitorId == "" { + switch logMonitorId { + case "": // maintain the default - logger = pm.muxLogger - } else if logMonitorId == "proxy" { - logger = pm.proxyLogger - } else if logMonitorId == "upstream" { - logger = pm.upstreamLogger - } else { + return pm.muxLogger, nil + case "proxy": + return pm.proxyLogger, nil + case "upstream": + return pm.upstreamLogger, nil + default: + // search for a models specific logger + if name, found := pm.config.RealModelName(logMonitorId); found { + for _, group := range pm.processGroups { + if process, found := group.GetMember(name); found { + return process.Logger(), nil + } + } + } + return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'") } - - return logger, nil }