Compare commits

..

8 Commits

Author SHA1 Message Date
Benson Wong 6299c1b874 Fix High CPU (#189)
* vendor in kelindar/event lib and refactor to remove time.Ticker
2025-07-15 18:04:30 -07:00
Yathi a906cd459b Strip comments before macro expansion in config (#193)
A bug fix that ensures comments don't interfere with macro expansion by
removing them first. This prevents unwanted comment text from appearing
in the final expanded command.

Co-authored-by: Yathiraj Bollimbala G <yathi@yStudio.localdomain>
2025-07-15 10:14:16 -07:00
Benson Wong 78b2bc3dbc add toggle to hide/show unlisted models (#187) 2025-07-02 16:14:20 -07:00
Benson Wong 6a058e4191 Change fsnotify to watch config directory instead of file
The fsnotify library suggests watching a directory and checking that the
name matches the configuration file.
2025-07-02 10:23:52 -07:00
Benson Wong 1921e570d7 Add Event Bus (#184)
Major internal refactor to use an event bus to pass event/messages along. These changes are largely invisible user facing but sets up internal design for real time stats and information.

- `--watch-config` logic refactored for events
- remove multiple SSE api endpoints, replaced with /api/events
- keep all functionality essentially the same
- UI/backend sync is in near real time now
2025-07-01 22:17:35 -07:00
Benson Wong c867a6c9a2 Add name and description to v1/models list (#179)
* Add support for name and description in v1/models list
* add configuration example for name and description
2025-06-30 23:02:44 -07:00
Leoyzen 3bd1b23ce0 fix config hot-reload on k8s (#181)
Co-authored-by: Leoyzen <leoyzen@gmial.com>
2025-06-27 11:49:31 -07:00
srevn 10606abf89 fix config hot-reload on macos (#180)
Co-authored-by: srevn <srevn@github>
2025-06-26 09:20:50 -07:00
24 changed files with 1339 additions and 385 deletions
+13 -1
View File
@@ -49,7 +49,19 @@ models:
cmd: |
# ${latest-llama} is a macro that is defined above
${latest-llama}
--model path/to/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
--model path/to/llama-8B-Q4_K_M.gguf
# name: a display name for the model
# - optional, default: empty string
# - if set, it will be used in the v1/models API response
# - if not set, it will be omitted in the JSON model record
name: "llama 3.1 8B"
# description: a description for the model
# - optional, default: empty string
# - if set, it will be used in the v1/models API response
# - if not set, it will be omitted in the JSON model record
description: "A small but capable model used for quick testing"
# env: define an array of environment variables to inject into cmd's environment
# - optional, default: empty array
+3
View File
@@ -0,0 +1,3 @@
The code in `event` was originally a part of https://github.com/kelindar/event (v1.5.2)
The original code uses a `time.Ticker` to process the event queue which caused a large increase in CPU usage ([#189](https://github.com/mostlygeek/llama-swap/issues/189)). This code was ported to remove the ticker and instead be more event driven.
+30
View File
@@ -0,0 +1,30 @@
// Copyright (c) Roman Atachiants and contributore. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for detaile.
package event
import (
"context"
)
// Default initializes a default in-process dispatcher
var Default = NewDispatcherConfig(25000)
// On subscribes to an event, the type of the event will be automatically
// inferred from the provided type. Must be constant for this to work. This
// functions same way as Subscribe() but uses the default dispatcher instead.
func On[T Event](handler func(T)) context.CancelFunc {
return Subscribe(Default, handler)
}
// OnType subscribes to an event with the specified event type. This functions
// same way as SubscribeTo() but uses the default dispatcher instead.
func OnType[T Event](eventType uint32, handler func(T)) context.CancelFunc {
return SubscribeTo(Default, eventType, handler)
}
// Emit writes an event into the dispatcher. This functions same way as
// Publish() but uses the default dispatcher instead.
func Emit[T Event](ev T) {
Publish(Default, ev)
}
+54
View File
@@ -0,0 +1,54 @@
// Copyright (c) Roman Atachiants and contributore. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for detaile.
package event
import (
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
)
/*
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
BenchmarkSubcribeConcurrent-24 1826686 606.3 ns/op 1648 B/op 5 allocs/op
*/
func BenchmarkSubscribeConcurrent(b *testing.B) {
d := NewDispatcher()
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
unsub := Subscribe(d, func(ev MyEvent1) {})
unsub()
}
})
}
func TestDefaultPublish(t *testing.T) {
var wg sync.WaitGroup
// Subscribe
var count int64
defer On(func(ev MyEvent1) {
atomic.AddInt64(&count, 1)
wg.Done()
})()
defer OnType(TypeEvent1, func(ev MyEvent1) {
atomic.AddInt64(&count, 1)
wg.Done()
})()
// Publish
wg.Add(4)
Emit(MyEvent1{})
Emit(MyEvent1{})
// Wait and check
wg.Wait()
assert.Equal(t, int64(4), count)
}
+324
View File
@@ -0,0 +1,324 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package event
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
)
// Event represents an event contract
type Event interface {
Type() uint32
}
// registry holds an immutable sorted array of event mappings
type registry struct {
keys []uint32 // Event types (sorted)
grps []any // Corresponding subscribers
}
// ------------------------------------- Dispatcher -------------------------------------
// Dispatcher represents an event dispatcher.
type Dispatcher struct {
subs atomic.Pointer[registry] // Atomic pointer to immutable array
done chan struct{} // Cancellation
maxQueue int // Maximum queue size per consumer
mu sync.Mutex // Only for writes (subscribe/unsubscribe)
}
// NewDispatcher creates a new dispatcher of events.
func NewDispatcher() *Dispatcher {
return NewDispatcherConfig(50000)
}
// NewDispatcherConfig creates a new dispatcher with configurable max queue size
func NewDispatcherConfig(maxQueue int) *Dispatcher {
d := &Dispatcher{
done: make(chan struct{}),
maxQueue: maxQueue,
}
d.subs.Store(&registry{
keys: make([]uint32, 0, 16),
grps: make([]any, 0, 16),
})
return d
}
// Close closes the dispatcher
func (d *Dispatcher) Close() error {
close(d.done)
return nil
}
// isClosed returns whether the dispatcher is closed or not
func (d *Dispatcher) isClosed() bool {
select {
case <-d.done:
return true
default:
return false
}
}
// findGroup performs a lock-free binary search for the event type
func (d *Dispatcher) findGroup(eventType uint32) any {
reg := d.subs.Load()
keys := reg.keys
// Inlined binary search for better cache locality
left, right := 0, len(keys)
for left < right {
mid := left + (right-left)/2
if keys[mid] < eventType {
left = mid + 1
} else {
right = mid
}
}
if left < len(keys) && keys[left] == eventType {
return reg.grps[left]
}
return nil
}
// Subscribe subscribes to an event, the type of the event will be automatically
// inferred from the provided type. Must be constant for this to work.
func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc {
var event T
return SubscribeTo(broker, event.Type(), handler)
}
// SubscribeTo subscribes to an event with the specified event type.
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
if broker.isClosed() {
panic(errClosed)
}
broker.mu.Lock()
defer broker.mu.Unlock()
// Check if group already exists
if existing := broker.findGroup(eventType); existing != nil {
grp := groupOf[T](eventType, existing)
sub := grp.Add(handler)
return func() {
grp.Del(sub)
}
}
// Create new group
grp := &group[T]{cond: sync.NewCond(new(sync.Mutex)), maxQueue: broker.maxQueue}
sub := grp.Add(handler)
// Copy-on-write: insert new entry in sorted position
old := broker.subs.Load()
idx := sort.Search(len(old.keys), func(i int) bool {
return old.keys[i] >= eventType
})
// Create new arrays with space for one more element
newKeys := make([]uint32, len(old.keys)+1)
newGrps := make([]any, len(old.grps)+1)
// Copy elements before insertion point
copy(newKeys[:idx], old.keys[:idx])
copy(newGrps[:idx], old.grps[:idx])
// Insert new element
newKeys[idx] = eventType
newGrps[idx] = grp
// Copy elements after insertion point
copy(newKeys[idx+1:], old.keys[idx:])
copy(newGrps[idx+1:], old.grps[idx:])
// Atomically store the new registry (mutex ensures no concurrent writers)
newReg := &registry{keys: newKeys, grps: newGrps}
broker.subs.Store(newReg)
return func() {
grp.Del(sub)
}
}
// Publish writes an event into the dispatcher
func Publish[T Event](broker *Dispatcher, ev T) {
eventType := ev.Type()
if sub := broker.findGroup(eventType); sub != nil {
group := groupOf[T](eventType, sub)
group.Broadcast(ev)
}
}
// Count counts the number of subscribers, this is for testing only.
func (d *Dispatcher) count(eventType uint32) int {
if group := d.findGroup(eventType); group != nil {
return group.(interface{ Count() int }).Count()
}
return 0
}
// groupOf casts the subscriber group to the specified generic type
func groupOf[T Event](eventType uint32, subs any) *group[T] {
if group, ok := subs.(*group[T]); ok {
return group
}
panic(errConflict[T](eventType, subs))
}
// ------------------------------------- Subscriber -------------------------------------
// consumer represents a consumer with a message queue
type consumer[T Event] struct {
queue []T // Current work queue
stop bool // Stop signal
}
// Listen listens to the event queue and processes events
func (s *consumer[T]) Listen(c *sync.Cond, fn func(T)) {
pending := make([]T, 0, 128)
for {
c.L.Lock()
for len(s.queue) == 0 {
switch {
case s.stop:
c.L.Unlock()
return
default:
c.Wait()
}
}
// Swap buffers and reset the current queue
temp := s.queue
s.queue = pending[:0]
pending = temp
c.L.Unlock()
// Outside of the critical section, process the work
for _, event := range pending {
fn(event)
}
// Notify potential publishers waiting due to backpressure
c.Broadcast()
}
}
// ------------------------------------- Subscriber Group -------------------------------------
// group represents a consumer group
type group[T Event] struct {
cond *sync.Cond
subs []*consumer[T]
maxQueue int // Maximum queue size per consumer
maxLen int // Current maximum queue length across all consumers
}
// Broadcast sends an event to all consumers
func (s *group[T]) Broadcast(ev T) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
// Calculate current maximum queue length
s.maxLen = 0
for _, sub := range s.subs {
if len(sub.queue) > s.maxLen {
s.maxLen = len(sub.queue)
}
}
// Backpressure: wait if queues are full
for s.maxLen >= s.maxQueue {
s.cond.Wait()
// Recalculate after wakeup
s.maxLen = 0
for _, sub := range s.subs {
if len(sub.queue) > s.maxLen {
s.maxLen = len(sub.queue)
}
}
}
// Add event to all queues and track new maximum
newMax := 0
for _, sub := range s.subs {
sub.queue = append(sub.queue, ev)
if len(sub.queue) > newMax {
newMax = len(sub.queue)
}
}
s.maxLen = newMax
s.cond.Broadcast() // Wake consumers
}
// Add adds a subscriber to the list
func (s *group[T]) Add(handler func(T)) *consumer[T] {
sub := &consumer[T]{
queue: make([]T, 0, 64),
}
// Add the consumer to the list of active consumers
s.cond.L.Lock()
s.subs = append(s.subs, sub)
s.cond.L.Unlock()
// Start listening
go sub.Listen(s.cond, handler)
return sub
}
// Del removes a subscriber from the list
func (s *group[T]) Del(sub *consumer[T]) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
// Search and remove the subscriber
sub.stop = true
for i, v := range s.subs {
if v == sub {
copy(s.subs[i:], s.subs[i+1:])
s.subs = s.subs[:len(s.subs)-1]
break
}
}
}
// ------------------------------------- Debugging -------------------------------------
var errClosed = fmt.Errorf("event dispatcher is closed")
// Count returns the number of subscribers in this group
func (s *group[T]) Count() int {
return len(s.subs)
}
// String returns string representation of the type
func (s *group[T]) String() string {
typ := reflect.TypeOf(s).String()
idx := strings.LastIndex(typ, "/")
typ = typ[idx+1 : len(typ)-1]
return typ
}
// errConflict returns a conflict message
func errConflict[T any](eventType uint32, existing any) string {
var want T
return fmt.Sprintf(
"conflicting event type, want=<%T>, registered=<%s>, event=0x%v",
want, existing, eventType,
)
}
+324
View File
@@ -0,0 +1,324 @@
// Copyright (c) Roman Atachiants and contributore. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for detaile.
package event
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPublish(t *testing.T) {
d := NewDispatcher()
var wg sync.WaitGroup
// Subscribe, must be received in order
var count int64
defer Subscribe(d, func(ev MyEvent1) {
assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number)
wg.Done()
})()
// Publish
wg.Add(3)
Publish(d, MyEvent1{Number: 1})
Publish(d, MyEvent1{Number: 2})
Publish(d, MyEvent1{Number: 3})
// Wait and check
wg.Wait()
assert.Equal(t, int64(3), count)
}
func TestUnsubscribe(t *testing.T) {
d := NewDispatcher()
assert.Equal(t, 0, d.count(TypeEvent1))
unsubscribe := Subscribe(d, func(ev MyEvent1) {
// Nothing
})
assert.Equal(t, 1, d.count(TypeEvent1))
unsubscribe()
assert.Equal(t, 0, d.count(TypeEvent1))
}
func TestConcurrent(t *testing.T) {
const max = 1000000
var count int64
var wg sync.WaitGroup
wg.Add(1)
d := NewDispatcher()
defer Subscribe(d, func(ev MyEvent1) {
if current := atomic.AddInt64(&count, 1); current == max {
wg.Done()
}
})()
// Asynchronously publish
go func() {
for i := 0; i < max; i++ {
Publish(d, MyEvent1{})
}
}()
defer Subscribe(d, func(ev MyEvent1) {
// Subscriber that does nothing
})()
wg.Wait()
assert.Equal(t, max, int(count))
}
func TestSubscribeDifferentType(t *testing.T) {
d := NewDispatcher()
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {})
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
})
}
func TestPublishDifferentType(t *testing.T) {
d := NewDispatcher()
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
Publish(d, MyEvent1{})
})
}
func TestCloseDispatcher(t *testing.T) {
d := NewDispatcher()
defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})()
assert.NoError(t, d.Close())
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
})
}
func TestMatrix(t *testing.T) {
const amount = 1000
for _, subs := range []int{1, 10, 100} {
for _, topics := range []int{1, 10} {
expected := subs * topics * amount
t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) {
var count atomic.Int64
var wg sync.WaitGroup
wg.Add(expected)
d := NewDispatcher()
for i := 0; i < subs; i++ {
for id := 0; id < topics; id++ {
defer SubscribeTo(d, uint32(id), func(ev MyEvent3) {
count.Add(1)
wg.Done()
})()
}
}
for n := 0; n < amount; n++ {
for id := 0; id < topics; id++ {
go Publish(d, MyEvent3{ID: id})
}
}
wg.Wait()
assert.Equal(t, expected, int(count.Load()))
})
}
}
}
func TestConcurrentSubscriptionRace(t *testing.T) {
// This test specifically targets the race condition that occurs when multiple
// goroutines try to subscribe to different event types simultaneously.
// Without the CAS loop, subscriptions could be lost due to registry corruption.
const numGoroutines = 100
const numEventTypes = 50
d := NewDispatcher()
defer d.Close()
var wg sync.WaitGroup
var receivedCount int64
var subscribedTypes sync.Map // Thread-safe map
wg.Add(numGoroutines)
// Start multiple goroutines that subscribe to different event types concurrently
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
defer wg.Done()
// Each goroutine subscribes to a unique event type
eventType := uint32(goroutineID%numEventTypes + 1000) // Offset to avoid collision with other tests
// Subscribe to the event type
SubscribeTo(d, eventType, func(ev MyEvent3) {
atomic.AddInt64(&receivedCount, 1)
})
// Record that this type was subscribed
subscribedTypes.Store(eventType, true)
}(i)
}
// Wait for all subscriptions to complete
wg.Wait()
// Count the number of unique event types subscribed
expectedTypes := 0
subscribedTypes.Range(func(key, value interface{}) bool {
expectedTypes++
return true
})
// Small delay to ensure all subscriptions are fully processed
time.Sleep(10 * time.Millisecond)
// Publish events to each subscribed type
subscribedTypes.Range(func(key, value interface{}) bool {
eventType := key.(uint32)
Publish(d, MyEvent3{ID: int(eventType)})
return true
})
// Wait for all events to be processed
time.Sleep(50 * time.Millisecond)
// Verify that we received at least the expected number of events
// (there might be more if multiple goroutines subscribed to the same event type)
received := atomic.LoadInt64(&receivedCount)
assert.GreaterOrEqual(t, int(received), expectedTypes,
"Should have received at least %d events, got %d", expectedTypes, received)
// Verify that we have the expected number of unique event types
assert.Equal(t, numEventTypes, expectedTypes,
"Should have exactly %d unique event types", numEventTypes)
}
func TestConcurrentHandlerRegistration(t *testing.T) {
const numGoroutines = 100
// Test concurrent subscriptions to the same event type
t.Run("SameEventType", func(t *testing.T) {
d := NewDispatcher()
var handlerCount int64
var wg sync.WaitGroup
// Start multiple goroutines subscribing to the same event type (0x1)
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
SubscribeTo(d, uint32(0x1), func(ev MyEvent1) {
atomic.AddInt64(&handlerCount, 1)
})
}()
}
wg.Wait()
// Verify all handlers were registered by publishing an event
atomic.StoreInt64(&handlerCount, 0)
Publish(d, MyEvent1{})
// Small delay to ensure all handlers have executed
time.Sleep(10 * time.Millisecond)
assert.Equal(t, int64(numGoroutines), atomic.LoadInt64(&handlerCount),
"Not all handlers were registered due to race condition")
})
// Test concurrent subscriptions to different event types
t.Run("DifferentEventTypes", func(t *testing.T) {
d := NewDispatcher()
var wg sync.WaitGroup
receivedEvents := make(map[uint32]*int64)
// Create multiple event types and subscribe concurrently
for i := 0; i < numGoroutines; i++ {
eventType := uint32(100 + i)
counter := new(int64)
receivedEvents[eventType] = counter
wg.Add(1)
go func(et uint32, cnt *int64) {
defer wg.Done()
SubscribeTo(d, et, func(ev MyEvent3) {
atomic.AddInt64(cnt, 1)
})
}(eventType, counter)
}
wg.Wait()
// Publish events to all types
for eventType := uint32(100); eventType < uint32(100+numGoroutines); eventType++ {
Publish(d, MyEvent3{ID: int(eventType)})
}
// Small delay to ensure all handlers have executed
time.Sleep(10 * time.Millisecond)
// Verify all event types received their events
for eventType, counter := range receivedEvents {
assert.Equal(t, int64(1), atomic.LoadInt64(counter),
"Event type %d did not receive its event", eventType)
}
})
}
func TestBackpressure(t *testing.T) {
d := NewDispatcher()
d.maxQueue = 10
var processedCount int64
unsub := SubscribeTo(d, uint32(0x200), func(ev MyEvent3) {
atomic.AddInt64(&processedCount, 1)
})
defer unsub()
const eventsToPublish = 1000
for i := 0; i < eventsToPublish; i++ {
Publish(d, MyEvent3{ID: 0x200})
}
time.Sleep(100 * time.Millisecond)
// Verify all events were eventually processed
finalProcessed := atomic.LoadInt64(&processedCount)
assert.Equal(t, int64(eventsToPublish), finalProcessed)
t.Logf("Events processed: %d/%d", finalProcessed, eventsToPublish)
}
// ------------------------------------- Test Events -------------------------------------
const (
TypeEvent1 = 0x1
TypeEvent2 = 0x2
)
type MyEvent1 struct {
Number int
}
func (t MyEvent1) Type() uint32 { return TypeEvent1 }
type MyEvent2 struct {
Text string
}
func (t MyEvent2) Type() uint32 { return TypeEvent2 }
type MyEvent3 struct {
ID int
}
func (t MyEvent3) Type() uint32 { return uint32(t.ID) }
+1 -1
View File
@@ -3,6 +3,7 @@ module github.com/mostlygeek/llama-swap
go 1.23.0
require (
github.com/billziss-gh/golib v0.2.0
github.com/fsnotify/fsnotify v1.9.0
github.com/gin-gonic/gin v1.10.0
github.com/stretchr/testify v1.9.0
@@ -12,7 +13,6 @@ require (
)
require (
github.com/billziss-gh/golib v0.2.0 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
-2
View File
@@ -32,8 +32,6 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+105 -111
View File
@@ -14,6 +14,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/gin-gonic/gin"
"github.com/mostlygeek/llama-swap/event"
"github.com/mostlygeek/llama-swap/proxy"
)
@@ -53,137 +54,130 @@ func main() {
gin.SetMode(gin.ReleaseMode)
}
proxyManager := proxy.New(config)
// Setup channels for server management
reloadChan := make(chan *proxy.ProxyManager)
exitChan := make(chan struct{})
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create server with initial handler
srv := &http.Server{
Addr: *listenStr,
Handler: proxyManager,
Addr: *listenStr,
}
// Support for watching config and reloading when it changes
reloadProxyManager := func() {
if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Warning, unable to reload configuration: %v\n", err)
return
}
fmt.Println("Configuration Changed")
currentPM.Shutdown()
srv.Handler = proxy.New(config)
fmt.Println("Configuration Reloaded")
// wait a few seconds and tell any UI to reload
time.AfterFunc(3*time.Second, func() {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateEnd,
})
})
} else {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Error, unable to load configuration: %v\n", err)
os.Exit(1)
}
srv.Handler = proxy.New(config)
}
}
// load the initial proxy manager
reloadProxyManager()
debouncedReload := debounce(time.Second, reloadProxyManager)
if *watchConfig {
defer event.On(func(e proxy.ConfigFileChangedEvent) {
if e.ReloadingState == proxy.ReloadingStateStart {
debouncedReload()
}
})()
fmt.Println("Watching Configuration for changes")
go func() {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
fmt.Printf("Error getting absolute path for watching config file: %v\n", err)
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err)
return
}
configDir := filepath.Dir(absConfigPath)
err = watcher.Add(configDir)
if err != nil {
fmt.Printf("Error adding config path directory (%s) to watcher: %v. File watching disabled.", configDir, err)
return
}
defer watcher.Close()
for {
select {
case changeEvent := <-watcher.Events:
if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateStart,
})
}
case err := <-watcher.Errors:
log.Printf("File watcher error: %v", err)
}
}
}()
}
// shutdown on signal
go func() {
sig := <-sigChan
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if pm, ok := srv.Handler.(*proxy.ProxyManager); ok {
pm.Shutdown()
} else {
fmt.Println("srv.Handler is not of type *proxy.ProxyManager")
}
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
}()
// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("Fatal server error: %v\n", err)
close(exitChan)
log.Fatalf("Fatal server error: %v\n", err)
}
}()
// Handle config reloads and signals
go func() {
currentManager := proxyManager
for {
select {
case newManager := <-reloadChan:
log.Println("Config change detected, waiting for in-flight requests to complete...")
// Stop old manager processes gracefully (this waits for in-flight requests)
currentManager.StopProcesses(proxy.StopWaitForInflightRequest)
// Now do a full shutdown to clear the process map
currentManager.Shutdown()
currentManager = newManager
srv.Handler = newManager
log.Println("Server handler updated with new config")
case sig := <-sigChan:
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
currentManager.Shutdown()
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
return
}
}
}()
// Start file watcher if requested
if *watchConfig {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
log.Printf("Error getting absolute path for config: %v. File watching disabled.", err)
} else {
go watchConfigFileWithReload(absConfigPath, reloadChan)
}
}
// Wait for exit signal
<-exitChan
}
// watchConfigFileWithReload monitors the configuration file and sends new ProxyManager instances through reloadChan.
func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.ProxyManager) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Error creating file watcher: %v. File watching disabled.", err)
return
}
defer watcher.Close()
err = watcher.Add(configPath)
if err != nil {
log.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", configPath, err)
return
}
log.Printf("Watching config file for changes: %s", configPath)
var debounceTimer *time.Timer
debounceDuration := 2 * time.Second
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// We only care about writes to the specific config file
if event.Name == configPath && event.Has(fsnotify.Write) {
// Reset or start the debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDuration, func() {
log.Printf("Config file modified: %s, reloading...", event.Name)
// Try up to 3 times with exponential backoff
var newConfig proxy.Config
var err error
for retries := 0; retries < 3; retries++ {
// Load new configuration
newConfig, err = proxy.LoadConfig(configPath)
if err == nil {
break
}
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
if retries < 2 {
time.Sleep(time.Duration(1<<retries) * time.Second)
}
}
if err != nil {
log.Printf("Failed to load new config after retries: %v", err)
return
}
// Create new ProxyManager with new config
newPM := proxy.New(newConfig)
reloadChan <- newPM
log.Println("Config reloaded successfully")
})
}
case err, ok := <-watcher.Errors:
if !ok {
log.Println("File watcher error channel closed.")
return
}
log.Printf("File watcher error: %v", err)
func debounce(interval time.Duration, f func()) func() {
var timer *time.Timer
return func() {
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(interval, f)
}
}
+23
View File
@@ -28,6 +28,10 @@ type ModelConfig struct {
Unlisted bool `yaml:"unlisted"`
UseModelName string `yaml:"useModelName"`
// #179 for /v1/models
Name string `yaml:"name"`
Description string `yaml:"description"`
// Limit concurrency of HTTP requests to process
ConcurrencyLimit int `yaml:"concurrencyLimit"`
@@ -48,6 +52,8 @@ func (m *ModelConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
Unlisted: false,
UseModelName: "",
ConcurrencyLimit: 0,
Name: "",
Description: "",
}
// the default cmdStop to taskkill /f /t /pid ${PID}
@@ -249,6 +255,10 @@ func LoadConfigFromReader(r io.Reader) (Config, error) {
for _, modelId := range modelIds {
modelConfig := config.Models[modelId]
// Strip comments from command fields before macro expansion
modelConfig.Cmd = StripComments(modelConfig.Cmd)
modelConfig.CmdStop = StripComments(modelConfig.CmdStop)
// go through model config fields: cmd, cmdStop, proxy, checkEndPoint and replace macros with macro values
for macroName, macroValue := range config.Macros {
macroSlug := fmt.Sprintf("${%s}", macroName)
@@ -400,3 +410,16 @@ func SanitizeCommand(cmdStr string) ([]string, error) {
return args, nil
}
func StripComments(cmdStr string) string {
var cleanedLines []string
for _, line := range strings.Split(cmdStr, "\n") {
trimmed := strings.TrimSpace(line)
// Skip comment lines
if strings.HasPrefix(trimmed, "#") {
continue
}
cleanedLines = append(cleanedLines, line)
}
return strings.Join(cleanedLines, "\n")
}
+4
View File
@@ -104,6 +104,8 @@ models:
model1:
cmd: path/to/cmd --arg1 one
proxy: "http://localhost:8080"
name: "Model 1"
description: "This is model 1"
aliases:
- "m1"
- "model-one"
@@ -168,6 +170,8 @@ groups:
Aliases: []string{"m1", "model-one"},
Env: []string{"VAR1=value1", "VAR2=value2"},
CheckEndpoint: "/health",
Name: "Model 1",
Description: "This is model 1",
},
"model2": {
Cmd: "path/to/server --arg1 one",
+115
View File
@@ -1,6 +1,7 @@
package proxy
import (
"slices"
"strings"
"testing"
@@ -325,3 +326,117 @@ models:
assert.Equal(t, []string{"temperature", "top_k", "top_p"}, sanitized)
}
}
func TestStripComments(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "no comments",
input: "echo hello\necho world",
expected: "echo hello\necho world",
},
{
name: "single comment line",
input: "# this is a comment\necho hello",
expected: "echo hello",
},
{
name: "multiple comment lines",
input: "# comment 1\necho hello\n# comment 2\necho world",
expected: "echo hello\necho world",
},
{
name: "comment with spaces",
input: " # indented comment\necho hello",
expected: "echo hello",
},
{
name: "empty lines preserved",
input: "echo hello\n\necho world",
expected: "echo hello\n\necho world",
},
{
name: "only comments",
input: "# comment 1\n# comment 2",
expected: "",
},
{
name: "empty string",
input: "",
expected: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := StripComments(tt.input)
if result != tt.expected {
t.Errorf("StripComments() = %q, expected %q", result, tt.expected)
}
})
}
}
func TestConfig_MacroInCommentStrippedBeforeExpansion(t *testing.T) {
// Test case that reproduces the original bug where a macro in a comment
// would get expanded and cause the comment text to be included in the command
content := `
startPort: 9990
macros:
"latest-llama": >
/user/llama.cpp/build/bin/llama-server
--port ${PORT}
models:
"test-model":
cmd: |
# ${latest-llama} is a macro that is defined above
${latest-llama}
--model /path/to/model.gguf
-ngl 99
`
config, err := LoadConfigFromReader(strings.NewReader(content))
assert.NoError(t, err)
// Get the sanitized command
sanitizedCmd, err := SanitizeCommand(config.Models["test-model"].Cmd)
assert.NoError(t, err)
// Join the command for easier inspection
cmdStr := strings.Join(sanitizedCmd, " ")
// Verify that comment text is NOT present in the final command as separate arguments
commentWords := []string{"is", "macro", "that", "defined", "above"}
for _, word := range commentWords {
found := slices.Contains(sanitizedCmd, word)
assert.False(t, found, "Comment text '%s' should not be present as a separate argument in final command", word)
}
// Verify that the actual command components ARE present
expectedParts := []string{
"/user/llama.cpp/build/bin/llama-server",
"--port",
"9990",
"--model",
"/path/to/model.gguf",
"-ngl",
"99",
}
for _, part := range expectedParts {
assert.Contains(t, cmdStr, part, "Expected command part '%s' not found in final command", part)
}
// Verify the server path appears exactly once (not duplicated due to macro expansion)
serverPath := "/user/llama.cpp/build/bin/llama-server"
count := strings.Count(cmdStr, serverPath)
assert.Equal(t, 1, count, "Expected exactly 1 occurrence of server path, found %d", count)
// Verify the expected final command structure
expectedCmd := "/user/llama.cpp/build/bin/llama-server --port 9990 --model /path/to/model.gguf -ngl 99"
assert.Equal(t, expectedCmd, cmdStr, "Final command does not match expected structure")
}
+49
View File
@@ -0,0 +1,49 @@
package proxy
// package level registry of the different event types
const ProcessStateChangeEventID = 0x01
const ChatCompletionStatsEventID = 0x02
const ConfigFileChangedEventID = 0x03
const LogDataEventID = 0x04
type ProcessStateChangeEvent struct {
ProcessName string
NewState ProcessState
OldState ProcessState
}
func (e ProcessStateChangeEvent) Type() uint32 {
return ProcessStateChangeEventID
}
type ChatCompletionStats struct {
TokensGenerated int
}
func (e ChatCompletionStats) Type() uint32 {
return ChatCompletionStatsEventID
}
type ReloadingState int
const (
ReloadingStateStart ReloadingState = iota
ReloadingStateEnd
)
type ConfigFileChangedEvent struct {
ReloadingState ReloadingState
}
func (e ConfigFileChangedEvent) Type() uint32 {
return ConfigFileChangedEventID
}
type LogDataEvent struct {
Data []byte
}
func (e LogDataEvent) Type() uint32 {
return LogDataEventID
}
+14 -31
View File
@@ -2,10 +2,13 @@ package proxy
import (
"container/ring"
"context"
"fmt"
"io"
"os"
"sync"
"github.com/mostlygeek/llama-swap/event"
)
type LogLevel int
@@ -18,7 +21,7 @@ const (
)
type LogMonitor struct {
clients map[chan []byte]bool
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
bufferMu sync.RWMutex
@@ -37,11 +40,11 @@ func NewLogMonitor() *LogMonitor {
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
clients: make(map[chan []byte]bool),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
eventbus: event.NewDispatcherConfig(1000),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
}
}
@@ -81,34 +84,14 @@ func (w *LogMonitor) GetHistory() []byte {
return history
}
func (w *LogMonitor) Subscribe() chan []byte {
w.mu.Lock()
defer w.mu.Unlock()
ch := make(chan []byte, 100)
w.clients[ch] = true
return ch
}
func (w *LogMonitor) Unsubscribe(ch chan []byte) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.clients, ch)
close(ch)
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
return event.Subscribe(w.eventbus, func(e LogDataEvent) {
callback(e.Data)
})
}
func (w *LogMonitor) broadcast(msg []byte) {
w.mu.RLock()
defer w.mu.RUnlock()
for client := range w.clients {
select {
case client <- msg:
default:
// If client buffer is full, skip
}
}
event.Publish(w.eventbus, LogDataEvent{Data: msg})
}
func (w *LogMonitor) SetPrefix(prefix string) {
+13 -22
View File
@@ -10,38 +10,29 @@ import (
func TestLogMonitor(t *testing.T) {
logMonitor := NewLogMonitorWriter(io.Discard)
// Test subscription
client1 := logMonitor.Subscribe()
client2 := logMonitor.Subscribe()
defer logMonitor.Unsubscribe(client1)
defer logMonitor.Unsubscribe(client2)
// A WaitGroup is used to wait for all the expected writes to complete
var wg sync.WaitGroup
client1Messages := make([]byte, 0)
client2Messages := make([]byte, 0)
var wg sync.WaitGroup
wg.Add(1)
defer logMonitor.OnLogData(func(data []byte) {
client1Messages = append(client1Messages, data...)
wg.Done()
})()
go func() {
defer wg.Done()
for {
select {
case data := <-client1:
client1Messages = append(client1Messages, data...)
case data := <-client2:
client2Messages = append(client2Messages, data...)
default:
return
}
}
}()
defer logMonitor.OnLogData(func(data []byte) {
client2Messages = append(client2Messages, data...)
wg.Done()
})()
wg.Add(6) // 2 x 3 writes
logMonitor.Write([]byte("1"))
logMonitor.Write([]byte("2"))
logMonitor.Write([]byte("3"))
// Wait for the goroutine to finish
// wait for all writes to complete
wg.Wait()
// Check the buffer
+6 -3
View File
@@ -13,6 +13,8 @@ import (
"sync"
"syscall"
"time"
"github.com/mostlygeek/llama-swap/event"
)
type ProcessState string
@@ -127,6 +129,7 @@ func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState,
p.state = newState
p.proxyLogger.Debugf("<%s> swapState() State transitioned from %s to %s", p.ID, expectedState, newState)
event.Emit(ProcessStateChangeEvent{ProcessName: p.ID, NewState: newState, OldState: expectedState})
return p.state, nil
}
@@ -209,11 +212,11 @@ func (p *Process) start() error {
if curState, swapErr := p.swapState(StateStarting, StateStopped); swapErr != nil {
p.state = StateStopped // force it into a stopped state
return fmt.Errorf(
"failed to start command and state swap failed. command error: %v, current state: %v, state swap error: %v",
err, curState, swapErr,
"failed to start command '%s' and state swap failed. command error: %v, current state: %v, state swap error: %v",
strings.Join(args, " "), err, curState, swapErr,
)
}
return fmt.Errorf("start() failed: %v", err)
return fmt.Errorf("start() failed for command '%s': %v", strings.Join(args, " "), err)
}
// Capture the exit error for later signalling
+1 -1
View File
@@ -107,7 +107,7 @@ func TestProcess_BrokenModelConfig(t *testing.T) {
w = httptest.NewRecorder()
process.ProxyRequest(w, req)
assert.Equal(t, http.StatusBadGateway, w.Code)
assert.Contains(t, w.Body.String(), "start() failed: ")
assert.Contains(t, w.Body.String(), "start() failed for command 'nonexistent-command':")
}
func TestProcess_UnloadAfterTTL(t *testing.T) {
+33 -16
View File
@@ -2,7 +2,7 @@ package proxy
import (
"bytes"
"encoding/json"
"context"
"fmt"
"io"
"mime/multipart"
@@ -34,6 +34,10 @@ type ProxyManager struct {
muxLogger *LogMonitor
processGroups map[string]*ProcessGroup
// shutdown signaling
shutdownCtx context.Context
shutdownCancel context.CancelFunc
}
func New(config Config) *ProxyManager {
@@ -64,6 +68,8 @@ func New(config Config) *ProxyManager {
upstreamLogger.SetLogLevel(LevelInfo)
}
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
pm := &ProxyManager{
config: config,
ginEngine: gin.New(),
@@ -73,6 +79,9 @@ func New(config Config) *ProxyManager {
upstreamLogger: upstreamLogger,
processGroups: make(map[string]*ProcessGroup),
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
}
// create the process groups
@@ -158,9 +167,7 @@ func (pm *ProxyManager) setupGinEngine() {
// in proxymanager_loghandlers.go
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/streamSSE/:logMonitorID", pm.streamLogsHandlerSSE)
/**
* User Interface Endpoints
@@ -262,6 +269,7 @@ func (pm *ProxyManager) Shutdown() {
}(processGroup)
}
wg.Wait()
pm.shutdownCancel()
}
func (pm *ProxyManager) swapProcessGroup(requestedModel string) (*ProcessGroup, string, error) {
@@ -289,32 +297,41 @@ func (pm *ProxyManager) swapProcessGroup(requestedModel string) (*ProcessGroup,
}
func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
data := []interface{}{}
data := make([]gin.H, 0, len(pm.config.Models))
createdTime := time.Now().Unix()
for id, modelConfig := range pm.config.Models {
if modelConfig.Unlisted {
continue
}
data = append(data, map[string]interface{}{
record := gin.H{
"id": id,
"object": "model",
"created": time.Now().Unix(),
"created": createdTime,
"owned_by": "llama-swap",
})
}
if name := strings.TrimSpace(modelConfig.Name); name != "" {
record["name"] = name
}
if desc := strings.TrimSpace(modelConfig.Description); desc != "" {
record["description"] = desc
}
data = append(data, record)
}
// Set the Content-Type header to application/json
c.Header("Content-Type", "application/json")
if origin := c.Request.Header.Get("Origin"); origin != "" {
// Set CORS headers if origin exists
if origin := c.GetHeader("Origin"); origin != "" {
c.Header("Access-Control-Allow-Origin", origin)
}
// Encode the data as JSON and write it to the response writer
if err := json.NewEncoder(c.Writer).Encode(map[string]interface{}{"object": "list", "data": data}); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error encoding JSON %s", err.Error()))
return
}
// Use gin's JSON method which handles content-type and encoding
c.JSON(http.StatusOK, gin.H{
"object": "list",
"data": data,
})
}
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
+88 -18
View File
@@ -1,25 +1,29 @@
package proxy
import (
"context"
"encoding/json"
"net/http"
"sort"
"time"
"github.com/gin-gonic/gin"
"github.com/mostlygeek/llama-swap/event"
)
type Model struct {
Id string `json:"id"`
State string `json:"state"`
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
State string `json:"state"`
Unlisted bool `json:"unlisted"`
}
func addApiHandlers(pm *ProxyManager) {
// Add API endpoints for React to consume
apiGroup := pm.ginEngine.Group("/api")
{
apiGroup.GET("/models", pm.apiListModels)
apiGroup.GET("/modelsSSE", pm.apiListModelsSSE)
apiGroup.POST("/models/unload", pm.apiUnloadAllModels)
apiGroup.GET("/events", pm.apiSendEvents)
}
}
@@ -65,37 +69,103 @@ func (pm *ProxyManager) getModelStatus() []Model {
}
}
models = append(models, Model{
Id: modelID,
State: state,
Id: modelID,
Name: pm.config.Models[modelID].Name,
Description: pm.config.Models[modelID].Description,
State: state,
Unlisted: pm.config.Models[modelID].Unlisted,
})
}
return models
}
func (pm *ProxyManager) apiListModels(c *gin.Context) {
c.JSON(http.StatusOK, pm.getModelStatus())
type messageType string
const (
msgTypeModelStatus messageType = "modelStatus"
msgTypeLogData messageType = "logData"
)
type messageEnvelope struct {
Type messageType `json:"type"`
Data string `json:"data"`
}
// stream the models as a SSE
func (pm *ProxyManager) apiListModelsSSE(c *gin.Context) {
// sends a stream of different message types that happen on the server
func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
notify := c.Request.Context().Done()
sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context())
sendModels := func() {
data, err := json.Marshal(pm.getModelStatus())
if err == nil {
msg := messageEnvelope{Type: msgTypeModelStatus, Data: string(data)}
select {
case sendBuffer <- msg:
case <-ctx.Done():
return
default:
}
}
}
sendLogData := func(source string, data []byte) {
data, err := json.Marshal(gin.H{
"source": source,
"data": string(data),
})
if err == nil {
select {
case sendBuffer <- messageEnvelope{Type: msgTypeLogData, Data: string(data)}:
case <-ctx.Done():
return
default:
}
}
}
/**
* Send updated models list
*/
defer event.On(func(e ProcessStateChangeEvent) {
sendModels()
})()
defer event.On(func(e ConfigFileChangedEvent) {
sendModels()
})()
/**
* Send Log data
*/
defer pm.proxyLogger.OnLogData(func(data []byte) {
sendLogData("proxy", data)
})()
defer pm.upstreamLogger.OnLogData(func(data []byte) {
sendLogData("upstream", data)
})()
// send initial batch of data
sendLogData("proxy", pm.proxyLogger.GetHistory())
sendLogData("upstream", pm.upstreamLogger.GetHistory())
sendModels()
// Stream new events
for {
select {
case <-notify:
case <-c.Request.Context().Done():
cancel()
return
default:
models := pm.getModelStatus()
c.SSEvent("message", models)
case <-pm.shutdownCtx.Done():
cancel()
return
case msg := <-sendBuffer:
c.SSEvent("message", msg)
c.Writer.Flush()
<-time.After(1000 * time.Millisecond)
}
}
}
+20 -51
View File
@@ -1,6 +1,7 @@
package proxy
import (
"context"
"fmt"
"net/http"
"strings"
@@ -34,10 +35,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
notify := c.Request.Context().Done()
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("streaming unsupported"))
@@ -55,57 +53,28 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
}
}
// Stream new logs
sendChan := make(chan []byte, 10)
ctx, cancel := context.WithCancel(c.Request.Context())
defer logger.OnLogData(func(data []byte) {
select {
case sendChan <- data:
case <-ctx.Done():
return
default:
}
})()
for {
select {
case msg := <-ch:
_, err := c.Writer.Write(msg)
if err != nil {
// just break the loop if we can't write for some reason
return
}
case <-c.Request.Context().Done():
cancel()
return
case <-pm.shutdownCtx.Done():
cancel()
return
case data := <-sendChan:
c.Writer.Write(data)
flusher.Flush()
case <-notify:
return
}
}
}
func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
logMonitorId := c.Param("logMonitorID")
logger, err := pm.getLogger(logMonitorId)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
notify := c.Request.Context().Done()
// Send history first if not skipped
_, skipHistory := c.GetQuery("no-history")
if !skipHistory {
history := logger.GetHistory()
if len(history) != 0 {
c.SSEvent("message", string(history))
c.Writer.Flush()
}
}
// Stream new logs
for {
select {
case msg := <-ch:
c.SSEvent("message", string(msg))
c.Writer.Flush()
case <-notify:
return
}
}
}
+28 -2
View File
@@ -183,11 +183,20 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) {
}
func TestProxyManager_ListModelsHandler(t *testing.T) {
model1Config := getTestSimpleResponderConfig("model1")
model1Config.Name = "Model 1"
model1Config.Description = "Model 1 description is used for testing"
model2Config := getTestSimpleResponderConfig("model2")
model2Config.Name = " " // empty whitespace only strings will get ignored
model2Config.Description = " "
config := Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
"model1": model1Config,
"model2": model2Config,
"model3": getTestSimpleResponderConfig("model3"),
},
LogLevel: "error",
@@ -213,6 +222,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
var response struct {
Data []map[string]interface{} `json:"data"`
}
if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil {
t.Fatalf("Failed to parse JSON response: %v", err)
}
@@ -227,6 +237,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
"model3": {},
}
// make all models
for _, model := range response.Data {
modelID, ok := model["id"].(string)
assert.True(t, ok, "model ID should be a string")
@@ -245,6 +256,21 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
ownedBy, ok := model["owned_by"].(string)
assert.True(t, ok, "owned_by should be a string")
assert.Equal(t, "llama-swap", ownedBy)
// check for optional name and description
if modelID == "model1" {
name, ok := model["name"].(string)
assert.True(t, ok, "name should be a string")
assert.Equal(t, "Model 1", name)
description, ok := model["description"].(string)
assert.True(t, ok, "description should be a string")
assert.Equal(t, "Model 1 description is used for testing", description)
} else {
_, exists := model["name"]
assert.False(t, exists, "unexpected name field for model: %s", modelID)
_, exists = model["description"]
assert.False(t, exists, "unexpected description field for model: %s", modelID)
}
}
// Ensure all expected models were returned
+61 -109
View File
@@ -6,6 +6,9 @@ const LOG_LENGTH_LIMIT = 1024 * 100; /* 100KB of log data */
export interface Model {
id: string;
state: ModelStatus;
name: string;
description: string;
unlisted: boolean;
}
interface APIProviderType {
@@ -13,12 +16,18 @@ interface APIProviderType {
listModels: () => Promise<Model[]>;
unloadAllModels: () => Promise<void>;
loadModel: (model: string) => Promise<void>;
enableProxyLogs: (enabled: boolean) => void;
enableUpstreamLogs: (enabled: boolean) => void;
enableModelUpdates: (enabled: boolean) => void;
enableAPIEvents: (enabled: boolean) => void;
proxyLogs: string;
upstreamLogs: string;
}
interface LogData {
source: "upstream" | "proxy";
data: string;
}
interface APIEventEnvelope {
type: "modelStatus" | "logData";
data: string;
}
const APIContext = createContext<APIProviderType | undefined>(undefined);
type APIProviderProps = {
@@ -30,6 +39,7 @@ export function APIProvider({ children }: APIProviderProps) {
const [upstreamLogs, setUpstreamLogs] = useState("");
const proxyEventSource = useRef<EventSource | null>(null);
const upstreamEventSource = useRef<EventSource | null>(null);
const apiEventSource = useRef<EventSource | null>(null);
const [models, setModels] = useState<Model[]>([]);
const modelStatusEventSource = useRef<EventSource | null>(null);
@@ -41,104 +51,58 @@ export function APIProvider({ children }: APIProviderProps) {
});
}, []);
const handleProxyMessage = useCallback(
(e: MessageEvent) => {
appendLog(e.data, setProxyLogs);
},
[proxyLogs, appendLog]
);
const enableAPIEvents = useCallback((enabled: boolean) => {
if (!enabled) {
apiEventSource.current?.close();
apiEventSource.current = null;
return;
}
const handleUpstreamMessage = useCallback(
(e: MessageEvent) => {
appendLog(e.data, setUpstreamLogs);
},
[appendLog]
);
let retryCount = 0;
const initialDelay = 1000; // 1 second
const enableProxyLogs = useCallback(
(enabled: boolean) => {
if (enabled) {
let retryCount = 0;
const maxRetries = 3;
const initialDelay = 1000; // 1 second
const connect = () => {
const eventSource = new EventSource("/api/events");
const connect = () => {
const eventSource = new EventSource("/logs/streamSSE/proxy");
eventSource.onmessage = (e: MessageEvent) => {
try {
const message = JSON.parse(e.data) as APIEventEnvelope;
switch (message.type) {
case "modelStatus":
{
const models = JSON.parse(message.data) as Model[];
setModels(models);
}
break;
eventSource.onmessage = handleProxyMessage;
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = initialDelay * Math.pow(2, retryCount - 1);
setTimeout(connect, delay);
case "logData": {
const logData = JSON.parse(message.data) as LogData;
switch (logData.source) {
case "proxy":
appendLog(logData.data, setProxyLogs);
break;
case "upstream":
appendLog(logData.data, setUpstreamLogs);
break;
}
}
};
proxyEventSource.current = eventSource;
};
connect();
} else {
proxyEventSource.current?.close();
proxyEventSource.current = null;
}
},
[handleProxyMessage]
);
const enableUpstreamLogs = useCallback(
(enabled: boolean) => {
if (enabled) {
let retryCount = 0;
const maxRetries = 3;
const initialDelay = 1000; // 1 second
const connect = () => {
const eventSource = new EventSource("/logs/streamSSE/upstream");
eventSource.onmessage = handleUpstreamMessage;
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = initialDelay * Math.pow(2, retryCount - 1);
setTimeout(connect, delay);
}
};
upstreamEventSource.current = eventSource;
};
connect();
} else {
upstreamEventSource.current?.close();
upstreamEventSource.current = null;
}
},
[handleUpstreamMessage]
);
const enableModelUpdates = useCallback(
(enabled: boolean) => {
if (enabled) {
const eventSource = new EventSource("/api/modelsSSE");
eventSource.onmessage = (e: MessageEvent) => {
try {
const models = JSON.parse(e.data) as Model[];
setModels(models);
} catch (e) {
console.error(e);
}
};
modelStatusEventSource.current = eventSource;
} else {
modelStatusEventSource.current?.close();
modelStatusEventSource.current = null;
}
},
[setModels]
);
} catch (err) {
console.error(e.data, err);
}
};
eventSource.onerror = () => {
eventSource.close();
retryCount++;
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000);
setTimeout(connect, delay);
};
apiEventSource.current = eventSource;
};
connect();
}, []);
useEffect(() => {
return () => {
@@ -196,23 +160,11 @@ export function APIProvider({ children }: APIProviderProps) {
listModels,
unloadAllModels,
loadModel,
enableProxyLogs,
enableUpstreamLogs,
enableModelUpdates,
enableAPIEvents,
proxyLogs,
upstreamLogs,
}),
[
models,
listModels,
unloadAllModels,
loadModel,
enableProxyLogs,
enableUpstreamLogs,
enableModelUpdates,
proxyLogs,
upstreamLogs,
]
[models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs]
);
return <APIContext.Provider value={value}>{children}</APIContext.Provider>;
+3 -5
View File
@@ -3,14 +3,12 @@ import { useAPI } from "../contexts/APIProvider";
import { usePersistentState } from "../hooks/usePersistentState";
const LogViewer = () => {
const { proxyLogs, upstreamLogs, enableProxyLogs, enableUpstreamLogs } = useAPI();
const { proxyLogs, upstreamLogs, enableAPIEvents } = useAPI();
useEffect(() => {
enableProxyLogs(true);
enableUpstreamLogs(true);
enableAPIEvents(true);
return () => {
enableProxyLogs(false);
enableUpstreamLogs(false);
enableAPIEvents(false);
};
}, []);
+27 -12
View File
@@ -2,17 +2,21 @@ import { useState, useEffect, useCallback, useMemo } from "react";
import { useAPI } from "../contexts/APIProvider";
import { LogPanel } from "./LogViewer";
import { processEvalTimes } from "../lib/Utils";
import { usePersistentState } from "../hooks/usePersistentState";
export default function ModelsPage() {
const { models, enableModelUpdates, unloadAllModels, loadModel, upstreamLogs, enableUpstreamLogs } = useAPI();
const { models, unloadAllModels, loadModel, upstreamLogs, enableAPIEvents } = useAPI();
const [isUnloading, setIsUnloading] = useState(false);
const [showUnlisted, setShowUnlisted] = usePersistentState("showUnlisted", true);
const filteredModels = useMemo(() => {
return models.filter((model) => showUnlisted || !model.unlisted);
}, [models, showUnlisted]);
useEffect(() => {
enableModelUpdates(true);
enableUpstreamLogs(true);
enableAPIEvents(true);
return () => {
enableModelUpdates(false);
enableUpstreamLogs(false);
enableAPIEvents(false);
};
}, []);
@@ -41,9 +45,15 @@ export default function ModelsPage() {
<div className="w-full md:w-1/2 flex items-top">
<div className="card w-full">
<h2 className="">Models</h2>
<button className="btn" onClick={handleUnloadAllModels} disabled={isUnloading}>
{isUnloading ? "Unloading..." : "Unload All Models"}
</button>
<div className="flex justify-between">
<button className="btn" onClick={() => setShowUnlisted(!showUnlisted)} style={{ lineHeight: "1.2" }}>
{showUnlisted ? "🟢 unlisted" : "⚫️ unlisted"}
</button>
<button className="btn" onClick={handleUnloadAllModels} disabled={isUnloading}>
{isUnloading ? "Stopping ..." : "Stop All"}
</button>
</div>
<table className="w-full mt-4">
<thead>
<tr className="border-b border-primary">
@@ -53,14 +63,19 @@ export default function ModelsPage() {
</tr>
</thead>
<tbody>
{models.map((model) => (
{filteredModels.map((model) => (
<tr key={model.id} className="border-b hover:bg-secondary-hover border-border">
<td className="p-2">
<a href={`/upstream/${model.id}/`} className="underline" target="_blank">
{model.id}
{model.name !== "" ? model.name : model.id}
</a>
{model.description != "" && (
<p>
<em>{model.description}</em>
</p>
)}
</td>
<td className="p-2">
<td className="p-2 w-[50px]">
<button
className="btn btn--sm"
disabled={model.state !== "stopped"}
@@ -69,7 +84,7 @@ export default function ModelsPage() {
Load
</button>
</td>
<td className="p-2">
<td className="p-2 w-[75px]">
<span className={`status status--${model.state}`}>{model.state}</span>
</td>
</tr>