Compare commits

...

3 Commits

Author SHA1 Message Date
Benson Wong c36986fef6 upstream handler support for model names with forward slash (#298)
The upstream handler would break on model IDs that contained a forward
slash. Model IDs like "aaa/bbb" called at upstream/aaa/bbb would result
in an error. This commit adds support for model IDs with a forward slash
by iteratively searching the path for a match.

Fixes: #229
2025-09-13 13:37:03 -07:00
Artur Podsiadły 558801db1a Fix nginx proxy buffering for streaming endpoints (#295)
* Fix nginx proxy buffering for streaming endpoints

- Add X-Accel-Buffering: no header to SSE endpoints (/api/events, /logs/stream)
- Add X-Accel-Buffering: no header to proxied text/event-stream responses
- Add nginx reverse proxy configuration section to README
- Add tests for X-Accel-Buffering header on streaming endpoints

Fixes #236

* Fix goroutine cleanup in streaming endpoints test

Add context cancellation to TestProxyManager_StreamingEndpointsReturnNoBufferingHeader
to ensure the goroutine is properly cleaned up when the test completes.
2025-09-09 16:07:46 -07:00
Benson Wong b21dee27c1 Fix #288 Vite hot module reloading creating multiple SSE connections (#290)
- move SSE (EventSource) connection to module level
- manage EventSource as a singleton, closing open connection before
  reopening a new one
2025-09-07 21:48:58 -07:00
7 changed files with 144 additions and 17 deletions
+24
View File
@@ -73,6 +73,30 @@ However, there are many more capabilities that llama-swap supports:
See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples.
## Reverse Proxy Configuration (nginx)
If you deploy llama-swap behind nginx, disable response buffering for streaming endpoints. By default, nginx buffers responses which breaks ServerSent Events (SSE) and streaming chat completion. ([#236](https://github.com/mostlygeek/llama-swap/issues/236))
Recommended nginx configuration snippets:
```nginx
# SSE for UI events/logs
location /api/events {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
# Streaming chat completions (stream=true)
location /v1/chat/completions {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
```
As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. However, explicitly disabling `proxy_buffering` at your reverse proxy is still recommended for reliable streaming behavior.
## Web UI
llama-swap includes a real time web interface for monitoring logs and models:
+4
View File
@@ -458,6 +458,10 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
w.Header().Add(k, v)
}
}
// prevent nginx from buffering streaming responses (e.g., SSE)
if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") {
w.Header().Set("X-Accel-Buffering", "no")
}
w.WriteHeader(resp.StatusCode)
// faster than io.Copy when streaming
+34 -6
View File
@@ -227,7 +227,7 @@ func (pm *ProxyManager) setupGinEngine() {
pm.ginEngine.GET("/upstream", func(c *gin.Context) {
c.Redirect(http.StatusFound, "/ui/models")
})
pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream)
pm.ginEngine.Any("/upstream/*upstreamPath", pm.proxyToUpstream)
pm.ginEngine.GET("/unload", pm.unloadAllModelsHandler)
pm.ginEngine.GET("/running", pm.listRunningProcessesHandler)
@@ -393,24 +393,52 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
}
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
requestedModel := c.Param("model_id")
upstreamPath := c.Param("upstreamPath")
if requestedModel == "" {
// split the upstream path by / and search for the model name
parts := strings.Split(strings.TrimSpace(upstreamPath), "/")
if len(parts) == 0 {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return
}
processGroup, realModelName, err := pm.swapProcessGroup(requestedModel)
modelFound := false
searchModelName := ""
var modelName, remainingPath string
for i, part := range parts {
if parts[i] == "" {
continue
}
if searchModelName == "" {
searchModelName = part
} else {
searchModelName = searchModelName + "/" + parts[i]
}
if real, ok := pm.config.RealModelName(searchModelName); ok {
modelName = real
remainingPath = "/" + strings.Join(parts[i+1:], "/")
modelFound = true
break
}
}
if !modelFound {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return
}
processGroup, realModelName, err := pm.swapProcessGroup(modelName)
if err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error()))
return
}
// rewrite the path
c.Request.URL.Path = c.Param("upstreamPath")
c.Request.URL.Path = remainingPath
processGroup.ProxyRequest(realModelName, c.Writer, c.Request)
}
func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) {
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
+2
View File
@@ -100,6 +100,8 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering SSE
c.Header("X-Accel-Buffering", "no")
sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context())
+2
View File
@@ -28,6 +28,8 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
c.Header("Content-Type", "text/plain")
c.Header("Transfer-Encoding", "chunked")
c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering streamed logs
c.Header("X-Accel-Buffering", "no")
logMonitorId := c.Param("logMonitorID")
logger, err := pm.getLogger(logMonitorId)
+65
View File
@@ -2,6 +2,7 @@ package proxy
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
@@ -913,3 +914,67 @@ models:
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState())
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState())
}
func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
endpoints := []string{
"/api/events",
"/logs/stream",
"/logs/stream/proxy",
"/logs/stream/upstream",
}
for _, endpoint := range endpoints {
t.Run(endpoint, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := httptest.NewRequest("GET", endpoint, nil)
req = req.WithContext(ctx)
rec := httptest.NewRecorder()
// We don't need the handler to fully complete, just to set the headers
// so run it in a goroutine and check the headers after a short delay
go proxy.ServeHTTP(rec, req)
time.Sleep(10 * time.Millisecond) // give it time to start and write headers
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
})
}
}
func TestProxyManager_ProxiedStreamingEndpointReturnsNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"streaming-model": getTestSimpleResponderConfig("streaming-model"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
// Make a streaming request
reqBody := `{"model":"streaming-model"}`
// simple-responder will return text/event-stream when stream=true is in the query
req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody))
rec := httptest.NewRecorder()
proxy.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
assert.Contains(t, rec.Header().Get("Content-Type"), "text/event-stream")
}
+13 -11
View File
@@ -1,4 +1,4 @@
import { useRef, createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react";
import { createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react";
import type { ConnectionState } from "../lib/types";
type ModelStatus = "ready" | "starting" | "stopping" | "stopped" | "shutdown" | "unknown";
@@ -51,12 +51,14 @@ type APIProviderProps = {
autoStartAPIEvents?: boolean;
};
let apiEventSource: EventSource | null = null;
export function APIProvider({ children, autoStartAPIEvents = true }: APIProviderProps) {
const [proxyLogs, setProxyLogs] = useState("");
const [upstreamLogs, setUpstreamLogs] = useState("");
const [metrics, setMetrics] = useState<Metrics[]>([]);
const [connectionStatus, setConnectionState] = useState<ConnectionState>("disconnected");
const apiEventSource = useRef<EventSource | null>(null);
//const apiEventSource = useRef<EventSource | null>(null);
const [models, setModels] = useState<Model[]>([]);
@@ -69,8 +71,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
const enableAPIEvents = useCallback((enabled: boolean) => {
if (!enabled) {
apiEventSource.current?.close();
apiEventSource.current = null;
apiEventSource?.close();
apiEventSource = null;
setMetrics([]);
return;
}
@@ -79,22 +81,22 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
const initialDelay = 1000; // 1 second
const connect = () => {
apiEventSource.current = null;
const eventSource = new EventSource("/api/events");
apiEventSource?.close();
apiEventSource = new EventSource("/api/events");
setConnectionState("connecting");
eventSource.onopen = () => {
apiEventSource.onopen = () => {
// clear everything out on connect to keep things in sync
setProxyLogs("");
setUpstreamLogs("");
setMetrics([]); // clear metrics on reconnect
setModels([]); // clear models on reconnect
apiEventSource.current = eventSource;
retryCount = 0;
setConnectionState("connected");
};
eventSource.onmessage = (e: MessageEvent) => {
apiEventSource.onmessage = (e: MessageEvent) => {
try {
const message = JSON.parse(e.data) as APIEventEnvelope;
switch (message.type) {
@@ -137,8 +139,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
}
};
eventSource.onerror = () => {
eventSource.close();
apiEventSource.onerror = () => {
apiEventSource?.close();
retryCount++;
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000);
setConnectionState("disconnected");