Compare commits

..

6 Commits

Author SHA1 Message Date
Benson Wong 1921e570d7 Add Event Bus (#184)
Major internal refactor to use an event bus to pass event/messages along. These changes are largely invisible user facing but sets up internal design for real time stats and information.

- `--watch-config` logic refactored for events
- remove multiple SSE api endpoints, replaced with /api/events
- keep all functionality essentially the same
- UI/backend sync is in near real time now
2025-07-01 22:17:35 -07:00
Benson Wong c867a6c9a2 Add name and description to v1/models list (#179)
* Add support for name and description in v1/models list
* add configuration example for name and description
2025-06-30 23:02:44 -07:00
Leoyzen 3bd1b23ce0 fix config hot-reload on k8s (#181)
Co-authored-by: Leoyzen <leoyzen@gmial.com>
2025-06-27 11:49:31 -07:00
srevn 10606abf89 fix config hot-reload on macos (#180)
Co-authored-by: srevn <srevn@github>
2025-06-26 09:20:50 -07:00
Benson Wong fefd14903d improve log display and add a small stats table in ui (#178) 2025-06-25 12:27:49 -07:00
Benson Wong 717d64e336 update GUI image in README [skip ci] 2025-06-24 10:38:28 -07:00
20 changed files with 522 additions and 409 deletions
+1 -3
View File
@@ -72,9 +72,7 @@ See the [configuration documentation](https://github.com/mostlygeek/llama-swap/w
llama-swap ships with a web based interface to make it easier to monitor logs and check the status of models.
<img width="1854" alt="image" src="https://github.com/user-attachments/assets/ee0025f0-f031-4158-9b5d-cd98b2b9fe4d" />
<img width="1758" alt="image" src="https://github.com/user-attachments/assets/31ae5bcd-5efd-46b0-b64b-6db9e60196d3" />
## Docker Install ([download images](https://github.com/mostlygeek/llama-swap/pkgs/container/llama-swap))
+13 -1
View File
@@ -49,7 +49,19 @@ models:
cmd: |
# ${latest-llama} is a macro that is defined above
${latest-llama}
--model path/to/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
--model path/to/llama-8B-Q4_K_M.gguf
# name: a display name for the model
# - optional, default: empty string
# - if set, it will be used in the v1/models API response
# - if not set, it will be omitted in the JSON model record
name: "llama 3.1 8B"
# description: a description for the model
# - optional, default: empty string
# - if set, it will be used in the v1/models API response
# - if not set, it will be omitted in the JSON model record
description: "A small but capable model used for quick testing"
# env: define an array of environment variables to inject into cmd's environment
# - optional, default: empty array
+1
View File
@@ -9,6 +9,7 @@ require (
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
gopkg.in/yaml.v3 v3.0.1
github.com/kelindar/event v1.5.2
)
require (
+2
View File
@@ -36,6 +36,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY=
github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
+104 -111
View File
@@ -14,6 +14,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/gin-gonic/gin"
"github.com/kelindar/event"
"github.com/mostlygeek/llama-swap/proxy"
)
@@ -53,137 +54,129 @@ func main() {
gin.SetMode(gin.ReleaseMode)
}
proxyManager := proxy.New(config)
// Setup channels for server management
reloadChan := make(chan *proxy.ProxyManager)
exitChan := make(chan struct{})
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create server with initial handler
srv := &http.Server{
Addr: *listenStr,
Handler: proxyManager,
Addr: *listenStr,
}
// Support for watching config and reloading when it changes
reloadProxyManager := func() {
if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Warning, unable to reload configuration: %v\n", err)
return
}
fmt.Println("Configuration Changed")
currentPM.Shutdown()
srv.Handler = proxy.New(config)
fmt.Println("Configuration Reloaded")
// wait a few seconds and tell any UI to reload
time.AfterFunc(3*time.Second, func() {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateEnd,
})
})
} else {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Error, unable to load configuration: %v\n", err)
os.Exit(1)
}
srv.Handler = proxy.New(config)
}
}
// load the initial proxy manager
reloadProxyManager()
debouncedReload := debounce(time.Second, reloadProxyManager)
if *watchConfig {
defer event.On(func(e proxy.ConfigFileChangedEvent) {
if e.ReloadingState == proxy.ReloadingStateStart {
debouncedReload()
}
})()
fmt.Println("Watching Configuration for changes")
go func() {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
fmt.Printf("Error getting absolute path for watching config file: %v\n", err)
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err)
return
}
err = watcher.Add(absConfigPath)
if err != nil {
fmt.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", absConfigPath, err)
return
}
defer watcher.Close()
for {
select {
case changeEvent := <-watcher.Events:
if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) {
event.Emit(proxy.ConfigFileChangedEvent{
ReloadingState: proxy.ReloadingStateStart,
})
}
case err := <-watcher.Errors:
log.Printf("File watcher error: %v", err)
}
}
}()
}
// shutdown on signal
go func() {
sig := <-sigChan
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if pm, ok := srv.Handler.(*proxy.ProxyManager); ok {
pm.Shutdown()
} else {
fmt.Println("srv.Handler is not of type *proxy.ProxyManager")
}
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
}()
// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("Fatal server error: %v\n", err)
close(exitChan)
log.Fatalf("Fatal server error: %v\n", err)
}
}()
// Handle config reloads and signals
go func() {
currentManager := proxyManager
for {
select {
case newManager := <-reloadChan:
log.Println("Config change detected, waiting for in-flight requests to complete...")
// Stop old manager processes gracefully (this waits for in-flight requests)
currentManager.StopProcesses(proxy.StopWaitForInflightRequest)
// Now do a full shutdown to clear the process map
currentManager.Shutdown()
currentManager = newManager
srv.Handler = newManager
log.Println("Server handler updated with new config")
case sig := <-sigChan:
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
currentManager.Shutdown()
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
return
}
}
}()
// Start file watcher if requested
if *watchConfig {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
log.Printf("Error getting absolute path for config: %v. File watching disabled.", err)
} else {
go watchConfigFileWithReload(absConfigPath, reloadChan)
}
}
// Wait for exit signal
<-exitChan
}
// watchConfigFileWithReload monitors the configuration file and sends new ProxyManager instances through reloadChan.
func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.ProxyManager) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Error creating file watcher: %v. File watching disabled.", err)
return
}
defer watcher.Close()
err = watcher.Add(configPath)
if err != nil {
log.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", configPath, err)
return
}
log.Printf("Watching config file for changes: %s", configPath)
var debounceTimer *time.Timer
debounceDuration := 2 * time.Second
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// We only care about writes to the specific config file
if event.Name == configPath && event.Has(fsnotify.Write) {
// Reset or start the debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDuration, func() {
log.Printf("Config file modified: %s, reloading...", event.Name)
// Try up to 3 times with exponential backoff
var newConfig proxy.Config
var err error
for retries := 0; retries < 3; retries++ {
// Load new configuration
newConfig, err = proxy.LoadConfig(configPath)
if err == nil {
break
}
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
if retries < 2 {
time.Sleep(time.Duration(1<<retries) * time.Second)
}
}
if err != nil {
log.Printf("Failed to load new config after retries: %v", err)
return
}
// Create new ProxyManager with new config
newPM := proxy.New(newConfig)
reloadChan <- newPM
log.Println("Config reloaded successfully")
})
}
case err, ok := <-watcher.Errors:
if !ok {
log.Println("File watcher error channel closed.")
return
}
log.Printf("File watcher error: %v", err)
func debounce(interval time.Duration, f func()) func() {
var timer *time.Timer
return func() {
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(interval, f)
}
}
+6
View File
@@ -28,6 +28,10 @@ type ModelConfig struct {
Unlisted bool `yaml:"unlisted"`
UseModelName string `yaml:"useModelName"`
// #179 for /v1/models
Name string `yaml:"name"`
Description string `yaml:"description"`
// Limit concurrency of HTTP requests to process
ConcurrencyLimit int `yaml:"concurrencyLimit"`
@@ -48,6 +52,8 @@ func (m *ModelConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
Unlisted: false,
UseModelName: "",
ConcurrencyLimit: 0,
Name: "",
Description: "",
}
// the default cmdStop to taskkill /f /t /pid ${PID}
+4
View File
@@ -104,6 +104,8 @@ models:
model1:
cmd: path/to/cmd --arg1 one
proxy: "http://localhost:8080"
name: "Model 1"
description: "This is model 1"
aliases:
- "m1"
- "model-one"
@@ -168,6 +170,8 @@ groups:
Aliases: []string{"m1", "model-one"},
Env: []string{"VAR1=value1", "VAR2=value2"},
CheckEndpoint: "/health",
Name: "Model 1",
Description: "This is model 1",
},
"model2": {
Cmd: "path/to/server --arg1 one",
+49
View File
@@ -0,0 +1,49 @@
package proxy
// package level registry of the different event types
const ProcessStateChangeEventID = 0x01
const ChatCompletionStatsEventID = 0x02
const ConfigFileChangedEventID = 0x03
const LogDataEventID = 0x04
type ProcessStateChangeEvent struct {
ProcessName string
NewState ProcessState
OldState ProcessState
}
func (e ProcessStateChangeEvent) Type() uint32 {
return ProcessStateChangeEventID
}
type ChatCompletionStats struct {
TokensGenerated int
}
func (e ChatCompletionStats) Type() uint32 {
return ChatCompletionStatsEventID
}
type ReloadingState int
const (
ReloadingStateStart ReloadingState = iota
ReloadingStateEnd
)
type ConfigFileChangedEvent struct {
ReloadingState ReloadingState
}
func (e ConfigFileChangedEvent) Type() uint32 {
return ConfigFileChangedEventID
}
type LogDataEvent struct {
Data []byte
}
func (e LogDataEvent) Type() uint32 {
return LogDataEventID
}
+14 -31
View File
@@ -2,10 +2,13 @@ package proxy
import (
"container/ring"
"context"
"fmt"
"io"
"os"
"sync"
"github.com/kelindar/event"
)
type LogLevel int
@@ -18,7 +21,7 @@ const (
)
type LogMonitor struct {
clients map[chan []byte]bool
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
bufferMu sync.RWMutex
@@ -37,11 +40,11 @@ func NewLogMonitor() *LogMonitor {
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
clients: make(map[chan []byte]bool),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
eventbus: event.NewDispatcher(),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
}
}
@@ -81,34 +84,14 @@ func (w *LogMonitor) GetHistory() []byte {
return history
}
func (w *LogMonitor) Subscribe() chan []byte {
w.mu.Lock()
defer w.mu.Unlock()
ch := make(chan []byte, 100)
w.clients[ch] = true
return ch
}
func (w *LogMonitor) Unsubscribe(ch chan []byte) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.clients, ch)
close(ch)
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
return event.Subscribe(w.eventbus, func(e LogDataEvent) {
callback(e.Data)
})
}
func (w *LogMonitor) broadcast(msg []byte) {
w.mu.RLock()
defer w.mu.RUnlock()
for client := range w.clients {
select {
case client <- msg:
default:
// If client buffer is full, skip
}
}
event.Publish(w.eventbus, LogDataEvent{Data: msg})
}
func (w *LogMonitor) SetPrefix(prefix string) {
+13 -22
View File
@@ -10,38 +10,29 @@ import (
func TestLogMonitor(t *testing.T) {
logMonitor := NewLogMonitorWriter(io.Discard)
// Test subscription
client1 := logMonitor.Subscribe()
client2 := logMonitor.Subscribe()
defer logMonitor.Unsubscribe(client1)
defer logMonitor.Unsubscribe(client2)
// A WaitGroup is used to wait for all the expected writes to complete
var wg sync.WaitGroup
client1Messages := make([]byte, 0)
client2Messages := make([]byte, 0)
var wg sync.WaitGroup
wg.Add(1)
defer logMonitor.OnLogData(func(data []byte) {
client1Messages = append(client1Messages, data...)
wg.Done()
})()
go func() {
defer wg.Done()
for {
select {
case data := <-client1:
client1Messages = append(client1Messages, data...)
case data := <-client2:
client2Messages = append(client2Messages, data...)
default:
return
}
}
}()
defer logMonitor.OnLogData(func(data []byte) {
client2Messages = append(client2Messages, data...)
wg.Done()
})()
wg.Add(6) // 2 x 3 writes
logMonitor.Write([]byte("1"))
logMonitor.Write([]byte("2"))
logMonitor.Write([]byte("3"))
// Wait for the goroutine to finish
// wait for all writes to complete
wg.Wait()
// Check the buffer
+3
View File
@@ -13,6 +13,8 @@ import (
"sync"
"syscall"
"time"
"github.com/kelindar/event"
)
type ProcessState string
@@ -127,6 +129,7 @@ func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState,
p.state = newState
p.proxyLogger.Debugf("<%s> swapState() State transitioned from %s to %s", p.ID, expectedState, newState)
event.Emit(ProcessStateChangeEvent{ProcessName: p.ID, NewState: newState, OldState: expectedState})
return p.state, nil
}
+33 -16
View File
@@ -2,7 +2,7 @@ package proxy
import (
"bytes"
"encoding/json"
"context"
"fmt"
"io"
"mime/multipart"
@@ -34,6 +34,10 @@ type ProxyManager struct {
muxLogger *LogMonitor
processGroups map[string]*ProcessGroup
// shutdown signaling
shutdownCtx context.Context
shutdownCancel context.CancelFunc
}
func New(config Config) *ProxyManager {
@@ -64,6 +68,8 @@ func New(config Config) *ProxyManager {
upstreamLogger.SetLogLevel(LevelInfo)
}
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
pm := &ProxyManager{
config: config,
ginEngine: gin.New(),
@@ -73,6 +79,9 @@ func New(config Config) *ProxyManager {
upstreamLogger: upstreamLogger,
processGroups: make(map[string]*ProcessGroup),
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
}
// create the process groups
@@ -158,9 +167,7 @@ func (pm *ProxyManager) setupGinEngine() {
// in proxymanager_loghandlers.go
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/streamSSE/:logMonitorID", pm.streamLogsHandlerSSE)
/**
* User Interface Endpoints
@@ -262,6 +269,7 @@ func (pm *ProxyManager) Shutdown() {
}(processGroup)
}
wg.Wait()
pm.shutdownCancel()
}
func (pm *ProxyManager) swapProcessGroup(requestedModel string) (*ProcessGroup, string, error) {
@@ -289,32 +297,41 @@ func (pm *ProxyManager) swapProcessGroup(requestedModel string) (*ProcessGroup,
}
func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
data := []interface{}{}
data := make([]gin.H, 0, len(pm.config.Models))
createdTime := time.Now().Unix()
for id, modelConfig := range pm.config.Models {
if modelConfig.Unlisted {
continue
}
data = append(data, map[string]interface{}{
record := gin.H{
"id": id,
"object": "model",
"created": time.Now().Unix(),
"created": createdTime,
"owned_by": "llama-swap",
})
}
if name := strings.TrimSpace(modelConfig.Name); name != "" {
record["name"] = name
}
if desc := strings.TrimSpace(modelConfig.Description); desc != "" {
record["description"] = desc
}
data = append(data, record)
}
// Set the Content-Type header to application/json
c.Header("Content-Type", "application/json")
if origin := c.Request.Header.Get("Origin"); origin != "" {
// Set CORS headers if origin exists
if origin := c.GetHeader("Origin"); origin != "" {
c.Header("Access-Control-Allow-Origin", origin)
}
// Encode the data as JSON and write it to the response writer
if err := json.NewEncoder(c.Writer).Encode(map[string]interface{}{"object": "list", "data": data}); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error encoding JSON %s", err.Error()))
return
}
// Use gin's JSON method which handles content-type and encoding
c.JSON(http.StatusOK, gin.H{
"object": "list",
"data": data,
})
}
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
+86 -18
View File
@@ -1,25 +1,28 @@
package proxy
import (
"context"
"encoding/json"
"net/http"
"sort"
"time"
"github.com/gin-gonic/gin"
"github.com/kelindar/event"
)
type Model struct {
Id string `json:"id"`
State string `json:"state"`
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
State string `json:"state"`
}
func addApiHandlers(pm *ProxyManager) {
// Add API endpoints for React to consume
apiGroup := pm.ginEngine.Group("/api")
{
apiGroup.GET("/models", pm.apiListModels)
apiGroup.GET("/modelsSSE", pm.apiListModelsSSE)
apiGroup.POST("/models/unload", pm.apiUnloadAllModels)
apiGroup.GET("/events", pm.apiSendEvents)
}
}
@@ -65,37 +68,102 @@ func (pm *ProxyManager) getModelStatus() []Model {
}
}
models = append(models, Model{
Id: modelID,
State: state,
Id: modelID,
Name: pm.config.Models[modelID].Name,
Description: pm.config.Models[modelID].Description,
State: state,
})
}
return models
}
func (pm *ProxyManager) apiListModels(c *gin.Context) {
c.JSON(http.StatusOK, pm.getModelStatus())
type messageType string
const (
msgTypeModelStatus messageType = "modelStatus"
msgTypeLogData messageType = "logData"
)
type messageEnvelope struct {
Type messageType `json:"type"`
Data string `json:"data"`
}
// stream the models as a SSE
func (pm *ProxyManager) apiListModelsSSE(c *gin.Context) {
// sends a stream of different message types that happen on the server
func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
notify := c.Request.Context().Done()
sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context())
sendModels := func() {
data, err := json.Marshal(pm.getModelStatus())
if err == nil {
msg := messageEnvelope{Type: msgTypeModelStatus, Data: string(data)}
select {
case sendBuffer <- msg:
case <-ctx.Done():
return
default:
}
}
}
sendLogData := func(source string, data []byte) {
data, err := json.Marshal(gin.H{
"source": source,
"data": string(data),
})
if err == nil {
select {
case sendBuffer <- messageEnvelope{Type: msgTypeLogData, Data: string(data)}:
case <-ctx.Done():
return
default:
}
}
}
/**
* Send updated models list
*/
defer event.On(func(e ProcessStateChangeEvent) {
sendModels()
})()
defer event.On(func(e ConfigFileChangedEvent) {
sendModels()
})()
/**
* Send Log data
*/
defer pm.proxyLogger.OnLogData(func(data []byte) {
sendLogData("proxy", data)
})()
defer pm.upstreamLogger.OnLogData(func(data []byte) {
sendLogData("upstream", data)
})()
// send initial batch of data
sendLogData("proxy", pm.proxyLogger.GetHistory())
sendLogData("upstream", pm.upstreamLogger.GetHistory())
sendModels()
// Stream new events
for {
select {
case <-notify:
case <-c.Request.Context().Done():
cancel()
return
default:
models := pm.getModelStatus()
c.SSEvent("message", models)
case <-pm.shutdownCtx.Done():
cancel()
return
case msg := <-sendBuffer:
c.SSEvent("message", msg)
c.Writer.Flush()
<-time.After(1000 * time.Millisecond)
}
}
}
+20 -51
View File
@@ -1,6 +1,7 @@
package proxy
import (
"context"
"fmt"
"net/http"
"strings"
@@ -34,10 +35,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
notify := c.Request.Context().Done()
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("streaming unsupported"))
@@ -55,57 +53,28 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
}
}
// Stream new logs
sendChan := make(chan []byte, 10)
ctx, cancel := context.WithCancel(c.Request.Context())
defer logger.OnLogData(func(data []byte) {
select {
case sendChan <- data:
case <-ctx.Done():
return
default:
}
})()
for {
select {
case msg := <-ch:
_, err := c.Writer.Write(msg)
if err != nil {
// just break the loop if we can't write for some reason
return
}
case <-c.Request.Context().Done():
cancel()
return
case <-pm.shutdownCtx.Done():
cancel()
return
case data := <-sendChan:
c.Writer.Write(data)
flusher.Flush()
case <-notify:
return
}
}
}
func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
logMonitorId := c.Param("logMonitorID")
logger, err := pm.getLogger(logMonitorId)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
notify := c.Request.Context().Done()
// Send history first if not skipped
_, skipHistory := c.GetQuery("no-history")
if !skipHistory {
history := logger.GetHistory()
if len(history) != 0 {
c.SSEvent("message", string(history))
c.Writer.Flush()
}
}
// Stream new logs
for {
select {
case msg := <-ch:
c.SSEvent("message", string(msg))
c.Writer.Flush()
case <-notify:
return
}
}
}
+28 -2
View File
@@ -183,11 +183,20 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) {
}
func TestProxyManager_ListModelsHandler(t *testing.T) {
model1Config := getTestSimpleResponderConfig("model1")
model1Config.Name = "Model 1"
model1Config.Description = "Model 1 description is used for testing"
model2Config := getTestSimpleResponderConfig("model2")
model2Config.Name = " " // empty whitespace only strings will get ignored
model2Config.Description = " "
config := Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
"model1": model1Config,
"model2": model2Config,
"model3": getTestSimpleResponderConfig("model3"),
},
LogLevel: "error",
@@ -213,6 +222,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
var response struct {
Data []map[string]interface{} `json:"data"`
}
if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil {
t.Fatalf("Failed to parse JSON response: %v", err)
}
@@ -227,6 +237,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
"model3": {},
}
// make all models
for _, model := range response.Data {
modelID, ok := model["id"].(string)
assert.True(t, ok, "model ID should be a string")
@@ -245,6 +256,21 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
ownedBy, ok := model["owned_by"].(string)
assert.True(t, ok, "owned_by should be a string")
assert.Equal(t, "llama-swap", ownedBy)
// check for optional name and description
if modelID == "model1" {
name, ok := model["name"].(string)
assert.True(t, ok, "name should be a string")
assert.Equal(t, "Model 1", name)
description, ok := model["description"].(string)
assert.True(t, ok, "description should be a string")
assert.Equal(t, "Model 1 description is used for testing", description)
} else {
_, exists := model["name"]
assert.False(t, exists, "unexpected name field for model: %s", modelID)
_, exists = model["description"]
assert.False(t, exists, "unexpected description field for model: %s", modelID)
}
}
// Ensure all expected models were returned
+4 -4
View File
@@ -10,10 +10,10 @@ function App() {
<Router basename="/ui/">
<APIProvider>
<div>
<nav className="bg-surface border-b border-border p-4">
<div className="flex items-center justify-between mx-auto px-4">
<h1>llama-swap</h1>
<div className="flex space-x-4">
<nav className="bg-surface border-b border-border p-2 h-[75px]">
<div className="flex items-center justify-between mx-auto px-4 h-full">
<h1 className="flex items-center p-0">llama-swap</h1>
<div className="flex items-center space-x-4">
<NavLink to="/" className={({ isActive }) => (isActive ? "navlink active" : "navlink")}>
Logs
</NavLink>
+63 -109
View File
@@ -6,6 +6,8 @@ const LOG_LENGTH_LIMIT = 1024 * 100; /* 100KB of log data */
export interface Model {
id: string;
state: ModelStatus;
name: string;
description: string;
}
interface APIProviderType {
@@ -13,12 +15,18 @@ interface APIProviderType {
listModels: () => Promise<Model[]>;
unloadAllModels: () => Promise<void>;
loadModel: (model: string) => Promise<void>;
enableProxyLogs: (enabled: boolean) => void;
enableUpstreamLogs: (enabled: boolean) => void;
enableModelUpdates: (enabled: boolean) => void;
enableAPIEvents: (enabled: boolean) => void;
proxyLogs: string;
upstreamLogs: string;
}
interface LogData {
source: "upstream" | "proxy";
data: string;
}
interface APIEventEnvelope {
type: "modelStatus" | "logData";
data: string;
}
const APIContext = createContext<APIProviderType | undefined>(undefined);
type APIProviderProps = {
@@ -30,6 +38,7 @@ export function APIProvider({ children }: APIProviderProps) {
const [upstreamLogs, setUpstreamLogs] = useState("");
const proxyEventSource = useRef<EventSource | null>(null);
const upstreamEventSource = useRef<EventSource | null>(null);
const apiEventSource = useRef<EventSource | null>(null);
const [models, setModels] = useState<Model[]>([]);
const modelStatusEventSource = useRef<EventSource | null>(null);
@@ -41,104 +50,61 @@ export function APIProvider({ children }: APIProviderProps) {
});
}, []);
const handleProxyMessage = useCallback(
(e: MessageEvent) => {
appendLog(e.data, setProxyLogs);
},
[proxyLogs, appendLog]
);
const enableAPIEvents = useCallback((enabled: boolean) => {
if (!enabled) {
apiEventSource.current?.close();
apiEventSource.current = null;
return;
}
const handleUpstreamMessage = useCallback(
(e: MessageEvent) => {
appendLog(e.data, setUpstreamLogs);
},
[appendLog]
);
let retryCount = 0;
const maxRetries = 3;
const initialDelay = 1000; // 1 second
const enableProxyLogs = useCallback(
(enabled: boolean) => {
if (enabled) {
let retryCount = 0;
const maxRetries = 3;
const initialDelay = 1000; // 1 second
const connect = () => {
const eventSource = new EventSource("/api/events");
const connect = () => {
const eventSource = new EventSource("/logs/streamSSE/proxy");
eventSource.onmessage = (e: MessageEvent) => {
try {
const message = JSON.parse(e.data) as APIEventEnvelope;
switch (message.type) {
case "modelStatus":
{
const models = JSON.parse(message.data) as Model[];
setModels(models);
}
break;
eventSource.onmessage = handleProxyMessage;
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = initialDelay * Math.pow(2, retryCount - 1);
setTimeout(connect, delay);
case "logData": {
const logData = JSON.parse(message.data) as LogData;
switch (logData.source) {
case "proxy":
appendLog(logData.data, setProxyLogs);
break;
case "upstream":
appendLog(logData.data, setUpstreamLogs);
break;
}
}
};
proxyEventSource.current = eventSource;
};
connect();
} else {
proxyEventSource.current?.close();
proxyEventSource.current = null;
}
},
[handleProxyMessage]
);
const enableUpstreamLogs = useCallback(
(enabled: boolean) => {
if (enabled) {
let retryCount = 0;
const maxRetries = 3;
const initialDelay = 1000; // 1 second
const connect = () => {
const eventSource = new EventSource("/logs/streamSSE/upstream");
eventSource.onmessage = handleUpstreamMessage;
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = initialDelay * Math.pow(2, retryCount - 1);
setTimeout(connect, delay);
}
};
upstreamEventSource.current = eventSource;
};
connect();
} else {
upstreamEventSource.current?.close();
upstreamEventSource.current = null;
}
},
[handleUpstreamMessage]
);
const enableModelUpdates = useCallback(
(enabled: boolean) => {
if (enabled) {
const eventSource = new EventSource("/api/modelsSSE");
eventSource.onmessage = (e: MessageEvent) => {
try {
const models = JSON.parse(e.data) as Model[];
setModels(models);
} catch (e) {
console.error(e);
}
};
modelStatusEventSource.current = eventSource;
} else {
modelStatusEventSource.current?.close();
modelStatusEventSource.current = null;
}
},
[setModels]
);
} catch (err) {
console.error(e.data, err);
}
};
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = initialDelay * Math.pow(2, retryCount - 1);
setTimeout(connect, delay);
}
};
apiEventSource.current = eventSource;
};
connect();
}, []);
useEffect(() => {
return () => {
@@ -196,23 +162,11 @@ export function APIProvider({ children }: APIProviderProps) {
listModels,
unloadAllModels,
loadModel,
enableProxyLogs,
enableUpstreamLogs,
enableModelUpdates,
enableAPIEvents,
proxyLogs,
upstreamLogs,
}),
[
models,
listModels,
unloadAllModels,
loadModel,
enableProxyLogs,
enableUpstreamLogs,
enableModelUpdates,
proxyLogs,
upstreamLogs,
]
[models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs]
);
return <APIContext.Provider value={value}>{children}</APIContext.Provider>;
+18
View File
@@ -0,0 +1,18 @@
export function processEvalTimes(text: string) {
const lines = text.match(/^ *eval time.*$/gm) || [];
let totalTokens = 0;
let totalTime = 0;
lines.forEach((line) => {
const tokensMatch = line.match(/\/\s*(\d+)\s*tokens/);
const timeMatch = line.match(/=\s*(\d+\.\d+)\s*ms/);
if (tokensMatch) totalTokens += parseFloat(tokensMatch[1]);
if (timeMatch) totalTime += parseFloat(timeMatch[1]);
});
const avgTokensPerSecond = totalTime > 0 ? totalTokens / (totalTime / 1000) : 0;
return [lines.length, totalTokens, Math.round(avgTokensPerSecond * 100) / 100];
}
+14 -30
View File
@@ -3,19 +3,17 @@ import { useAPI } from "../contexts/APIProvider";
import { usePersistentState } from "../hooks/usePersistentState";
const LogViewer = () => {
const { proxyLogs, upstreamLogs, enableProxyLogs, enableUpstreamLogs } = useAPI();
const { proxyLogs, upstreamLogs, enableAPIEvents } = useAPI();
useEffect(() => {
enableProxyLogs(true);
enableUpstreamLogs(true);
enableAPIEvents(true);
return () => {
enableProxyLogs(false);
enableUpstreamLogs(false);
enableAPIEvents(false);
};
}, []);
return (
<div className="flex flex-col gap-5">
<div className="flex flex-col gap-5" style={{ height: "calc(100vh - 125px)" }}>
<LogPanel id="proxy" title="Proxy Logs" logData={proxyLogs} />
<LogPanel id="upstream" title="Upstream Logs" logData={upstreamLogs} />
</div>
@@ -30,11 +28,8 @@ interface LogPanelProps {
}
export const LogPanel = ({ id, title, logData, className }: LogPanelProps) => {
const [isCollapsed, setIsCollapsed] = usePersistentState(`logPanel-${id}-isCollapsed`, false);
const [filterRegex, setFilterRegex] = useState("");
const [panelState, setPanelState] = usePersistentState<"hide" | "small" | "max">(
`logPanel-${id}-panelState`,
"small"
);
const [fontSize, setFontSize] = usePersistentState<"xxs" | "xs" | "small" | "normal">(
`logPanel-${id}-fontSize`,
"normal"
@@ -60,14 +55,6 @@ export const LogPanel = ({ id, title, logData, className }: LogPanelProps) => {
});
}, []);
const togglePanelState = useCallback(() => {
setPanelState((prev) => {
if (prev === "small") return "max";
if (prev === "hide") return "small";
return "hide";
});
}, []);
const fontSizeClass = useMemo(() => {
switch (fontSize) {
case "xxs":
@@ -101,20 +88,21 @@ export const LogPanel = ({ id, title, logData, className }: LogPanelProps) => {
}, [filteredLogs]);
return (
<div className={`bg-surface border border-border rounded-lg overflow-hidden flex flex-col ${className || ""}`}>
<div
className={`bg-surface border border-border rounded-lg overflow-hidden flex flex-col ${
!isCollapsed && "h-full"
} ${className || ""}`}
>
<div className="p-4 border-b border-border bg-secondary">
<div className="flex flex-col md:flex-row md:items-center md:justify-between gap-4">
{/* Title - Always full width on mobile, normal on desktop */}
<div className="w-full md:w-auto" onClick={togglePanelState}>
<div className="w-full md:w-auto" onClick={() => setIsCollapsed(!isCollapsed)}>
<h3 className="m-0 text-lg">{title}</h3>
</div>
<div className="flex flex-col sm:flex-row gap-4 w-full md:w-auto">
{/* Sizing Buttons - Stacks vertically on mobile */}
<div className="flex flex-wrap gap-2">
<button className="btn" onClick={togglePanelState}>
size: {panelState}
</button>
<button className="btn" onClick={toggleFontSize}>
font: {fontSize}
</button>
@@ -140,14 +128,11 @@ export const LogPanel = ({ id, title, logData, className }: LogPanelProps) => {
</div>
</div>
{panelState !== "hide" && (
<div className="flex-1 bg-background font-mono text-sm leading-[1.4] p-3">
{!isCollapsed && (
<div className="flex-1 bg-background font-mono text-sm p-3 overflow-hidden">
<pre
ref={preTagRef}
className={`flex-1 p-4 overflow-y-auto whitespace-pre min-h-0 ${textWrapClass} ${fontSizeClass}`}
style={{
maxHeight: panelState === "max" ? "1500px" : "500px",
}}
className={`h-full p-4 overflow-y-auto whitespace-pre min-h-0 ${textWrapClass} ${fontSizeClass}`}
>
{filteredLogs}
</pre>
@@ -156,5 +141,4 @@ export const LogPanel = ({ id, title, logData, className }: LogPanelProps) => {
</div>
);
};
export default LogViewer;
+46 -11
View File
@@ -1,17 +1,16 @@
import { useState, useEffect, useCallback } from "react";
import { useState, useEffect, useCallback, useMemo } from "react";
import { useAPI } from "../contexts/APIProvider";
import { LogPanel } from "./LogViewer";
import { processEvalTimes } from "../lib/Utils";
export default function ModelsPage() {
const { models, enableModelUpdates, unloadAllModels, loadModel, upstreamLogs, enableUpstreamLogs } = useAPI();
const { models, unloadAllModels, loadModel, upstreamLogs, enableAPIEvents } = useAPI();
const [isUnloading, setIsUnloading] = useState(false);
useEffect(() => {
enableModelUpdates(true);
enableUpstreamLogs(true);
enableAPIEvents(true);
return () => {
enableModelUpdates(false);
enableUpstreamLogs(false);
enableAPIEvents(false);
};
}, []);
@@ -29,8 +28,12 @@ export default function ModelsPage() {
}
}, []);
const [totalLines, totalTokens, avgTokensPerSecond] = useMemo(() => {
return processEvalTimes(upstreamLogs);
}, [upstreamLogs]);
return (
<div className="h-screen">
<div>
<div className="flex flex-col md:flex-row gap-4">
{/* Left Column */}
<div className="w-full md:w-1/2 flex items-top">
@@ -52,11 +55,22 @@ export default function ModelsPage() {
<tr key={model.id} className="border-b hover:bg-secondary-hover border-border">
<td className="p-2">
<a href={`/upstream/${model.id}/`} className="underline" target="_blank">
{model.id}
{model.name !== "" ? model.name : model.id}
</a>
{model.description != "" && (
<p>
<em>{model.description}</em>
</p>
)}
</td>
<td className="p-2">
<button className="btn btn--sm" disabled={model.state !== "stopped"} onClick={() => loadModel(model.id)}>Load</button>
<button
className="btn btn--sm"
disabled={model.state !== "stopped"}
onClick={() => loadModel(model.id)}
>
Load
</button>
</td>
<td className="p-2">
<span className={`status status--${model.state}`}>{model.state}</span>
@@ -69,8 +83,29 @@ export default function ModelsPage() {
</div>
{/* Right Column */}
<div className="w-full md:w-1/2 flex items-top">
<LogPanel id="modelsupstream" title="Upstream Logs" logData={upstreamLogs} className="h-full" />
<div className="w-full md:w-1/2 flex flex-col" style={{ height: "calc(100vh - 125px)" }}>
<div className="card mb-4 min-h-[250px]">
<h2>Log Stats</h2>
<p className="italic my-2">note: eval logs from llama-server</p>
<table className="w-full border border-gray-200">
<tbody>
<tr className="border-b border-gray-200">
<td className="py-2 px-4 font-medium border-r border-gray-200">Requests</td>
<td className="py-2 px-4 text-right">{totalLines}</td>
</tr>
<tr className="border-b border-gray-200">
<td className="py-2 px-4 font-medium border-r border-gray-200">Total Tokens Generated</td>
<td className="py-2 px-4 text-right">{totalTokens}</td>
</tr>
<tr>
<td className="py-2 px-4 font-medium border-r border-gray-200">Average Tokens/Second</td>
<td className="py-2 px-4 text-right">{avgTokensPerSecond}</td>
</tr>
</tbody>
</table>
</div>
<LogPanel id="modelsupstream" title="Upstream Logs" logData={upstreamLogs} />
</div>
</div>
</div>