Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c79114d40a | |||
| 430166d5eb | |||
| 5b4beaceef |
@@ -20,6 +20,7 @@ Built in Go for performance and simplicity, llama-swap has zero dependencies and
|
||||
- `v1/chat/completions`
|
||||
- `v1/responses`
|
||||
- `v1/embeddings`
|
||||
- `v1/models` - list available models
|
||||
- `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36))
|
||||
- `v1/audio/transcriptions` ([docs](https://github.com/mostlygeek/llama-swap/issues/41#issuecomment-2722637867))
|
||||
- `v1/audio/voices`
|
||||
@@ -39,9 +40,17 @@ Built in Go for performance and simplicity, llama-swap has zero dependencies and
|
||||
- ✅ llama-swap API
|
||||
- `/ui` - web UI
|
||||
- `/upstream/:model_id` - direct access to upstream server ([demo](https://github.com/mostlygeek/llama-swap/pull/31))
|
||||
- `/models/unload` - manually unload running models ([#58](https://github.com/mostlygeek/llama-swap/issues/58))
|
||||
- `/running` - list currently running models ([#61](https://github.com/mostlygeek/llama-swap/issues/61))
|
||||
- `/log` - remote log monitoring
|
||||
- `POST /api/models/unload` - manually unload all running models ([#58](https://github.com/mostlygeek/llama-swap/issues/58))
|
||||
- `POST /api/models/unload/:model_id` - unload a specific model
|
||||
- `/logs` - remote log monitoring
|
||||
- `GET /logs` returns buffered plain text logs.
|
||||
- If `Accept: text/html` is sent, `/logs` redirects to `/ui/`.
|
||||
- `GET /logs/stream` keeps the connection open for live log streaming.
|
||||
- Stream endpoints send buffered history first by default; add `?no-history` to stream only new lines.
|
||||
- `GET /logs/stream/proxy` streams proxy logs only.
|
||||
- `GET /logs/stream/upstream` streams upstream process logs only.
|
||||
- `GET /logs/stream/{model_id}` streams logs for one model (including IDs with slashes, like `author/model`).
|
||||
- `/health` - just returns "OK"
|
||||
- ✅ API Key support - define keys to restrict access to API endpoints
|
||||
- ✅ Customizable
|
||||
|
||||
@@ -578,15 +578,11 @@ func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier {
|
||||
ResponseWriter: w,
|
||||
body: bodyBuffer,
|
||||
tee: io.MultiWriter(w, bodyBuffer),
|
||||
start: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *responseBodyCopier) Write(b []byte) (int, error) {
|
||||
if w.start.IsZero() {
|
||||
w.start = time.Now()
|
||||
}
|
||||
|
||||
// Single write operation that writes to both the response and buffer
|
||||
return w.tee.Write(b)
|
||||
}
|
||||
|
||||
|
||||
@@ -490,15 +490,11 @@ func TestMetricsMonitor_ResponseBodyCopier(t *testing.T) {
|
||||
assert.Equal(t, string(testData), rec.Body.String())
|
||||
})
|
||||
|
||||
t.Run("sets start time on first write", func(t *testing.T) {
|
||||
t.Run("sets start time on creation", func(t *testing.T) {
|
||||
rec := httptest.NewRecorder()
|
||||
ginCtx, _ := gin.CreateTestContext(rec)
|
||||
copier := newBodyCopier(ginCtx.Writer)
|
||||
|
||||
assert.True(t, copier.StartTime().IsZero())
|
||||
|
||||
copier.Write([]byte("test"))
|
||||
|
||||
assert.False(t, copier.StartTime().IsZero())
|
||||
})
|
||||
|
||||
|
||||
@@ -683,7 +683,7 @@ func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
|
||||
searchModelName, modelID, remainingPath, modelFound := pm.findModelInPath(upstreamPath)
|
||||
|
||||
if !modelFound {
|
||||
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
|
||||
pm.sendErrorResponse(c, http.StatusNotFound, "model not found")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,13 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
|
||||
c.Header("X-Accel-Buffering", "no")
|
||||
|
||||
logMonitorId := strings.TrimPrefix(c.Param("logMonitorID"), "/")
|
||||
|
||||
// Handle case where query string might be included in the parameter
|
||||
// (can happen with catch-all routes on some versions/setups)
|
||||
if idx := strings.Index(logMonitorId, "?"); idx != -1 {
|
||||
logMonitorId = logMonitorId[:idx]
|
||||
}
|
||||
|
||||
logger, err := pm.getLogger(logMonitorId)
|
||||
if err != nil {
|
||||
c.String(http.StatusBadRequest, err.Error())
|
||||
@@ -100,6 +107,12 @@ func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) {
|
||||
return process.Logger(), nil
|
||||
}
|
||||
}
|
||||
// also check the matrix when processGroups doesn't contain the model
|
||||
if pm.matrix != nil {
|
||||
if process, found := pm.matrix.GetProcess(name); found {
|
||||
return process.Logger(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("invalid logger. Use 'proxy', 'upstream' or a model's ID")
|
||||
|
||||
@@ -0,0 +1,173 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mostlygeek/llama-swap/proxy/config"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLogMonitorIdQueryParameterStripping(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "upstream without query param",
|
||||
input: "upstream",
|
||||
expected: "upstream",
|
||||
},
|
||||
{
|
||||
name: "upstream with query param",
|
||||
input: "upstream?no-history",
|
||||
expected: "upstream",
|
||||
},
|
||||
{
|
||||
name: "proxy with multiple query params",
|
||||
input: "proxy?no-history&foo=bar",
|
||||
expected: "proxy",
|
||||
},
|
||||
{
|
||||
name: "model with slash and query param",
|
||||
input: "author/model?no-history",
|
||||
expected: "author/model",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Simulate the query parameter stripping logic
|
||||
logMonitorId := tt.input
|
||||
if idx := strings.Index(logMonitorId, "?"); idx != -1 {
|
||||
logMonitorId = logMonitorId[:idx]
|
||||
}
|
||||
|
||||
if logMonitorId != tt.expected {
|
||||
t.Errorf("Query parameter stripping failed: got %q, want %q", logMonitorId, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyManager_GetLogger_ProcessGroups verifies getLogger resolves the
|
||||
// well-known "proxy"/"upstream" loggers and a model ID managed by processGroups.
|
||||
func TestProxyManager_GetLogger_ProcessGroups(t *testing.T) {
|
||||
cfg := testConfigFromYAML(t, `
|
||||
healthCheckTimeout: 15
|
||||
logLevel: error
|
||||
models:
|
||||
model1:
|
||||
cmd: {{RESPONDER}} --port ${PORT} --silent --respond model1
|
||||
`)
|
||||
pm := New(cfg)
|
||||
defer pm.StopProcesses(StopImmediately)
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
wantErr bool
|
||||
}{
|
||||
{"proxy", false},
|
||||
{"upstream", false},
|
||||
{"model1", false},
|
||||
{"does-not-exist", true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
logger, err := pm.getLogger(tt.id)
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "invalid logger")
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, logger)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyManager_GetLogger_Matrix verifies that getLogger can resolve a model
|
||||
// ID when the proxy is configured with a swap matrix (pm.processGroups is empty
|
||||
// for matrix-managed models).
|
||||
func TestProxyManager_GetLogger_Matrix(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
HealthCheckTimeout: 15,
|
||||
Models: map[string]config.ModelConfig{
|
||||
"model1": getTestSimpleResponderConfig("model1"),
|
||||
"model2": getTestSimpleResponderConfig("model2"),
|
||||
},
|
||||
ExpandedSets: []config.ExpandedSet{
|
||||
{SetName: "s1", Models: []string{"model1", "model2"}},
|
||||
},
|
||||
Matrix: &config.MatrixConfig{},
|
||||
}
|
||||
|
||||
pm := New(cfg)
|
||||
defer pm.StopProcesses(StopImmediately)
|
||||
|
||||
tests := []struct {
|
||||
id string
|
||||
wantErr bool
|
||||
}{
|
||||
{"proxy", false},
|
||||
{"upstream", false},
|
||||
{"model1", false},
|
||||
{"model2", false},
|
||||
{"does-not-exist", true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
logger, err := pm.getLogger(tt.id)
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "invalid logger")
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, logger)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyManager_StreamLogs_Matrix verifies that /logs/stream/<modelID>
|
||||
// returns 200 (not 400) for a model managed by the swap matrix.
|
||||
func TestProxyManager_StreamLogs_Matrix(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
HealthCheckTimeout: 15,
|
||||
Models: map[string]config.ModelConfig{
|
||||
"matrix-model": getTestSimpleResponderConfig("matrix-model"),
|
||||
},
|
||||
ExpandedSets: []config.ExpandedSet{
|
||||
{SetName: "s1", Models: []string{"matrix-model"}},
|
||||
},
|
||||
Matrix: &config.MatrixConfig{},
|
||||
}
|
||||
|
||||
pm := New(cfg)
|
||||
defer pm.StopProcesses(StopImmediately)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
req := httptest.NewRequest("GET", "/logs/stream/matrix-model", nil)
|
||||
req = req.WithContext(ctx)
|
||||
rec := CreateTestResponseRecorder()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
pm.ServeHTTP(rec, req)
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
<-done
|
||||
|
||||
assert.Equal(t, 200, rec.Code)
|
||||
}
|
||||
Reference in New Issue
Block a user