Compare commits

..

3 Commits

Author SHA1 Message Date
Benson Wong c36986fef6 upstream handler support for model names with forward slash (#298)
The upstream handler would break on model IDs that contained a forward
slash. Model IDs like "aaa/bbb" called at upstream/aaa/bbb would result
in an error. This commit adds support for model IDs with a forward slash
by iteratively searching the path for a match.

Fixes: #229
2025-09-13 13:37:03 -07:00
Artur Podsiadły 558801db1a Fix nginx proxy buffering for streaming endpoints (#295)
* Fix nginx proxy buffering for streaming endpoints

- Add X-Accel-Buffering: no header to SSE endpoints (/api/events, /logs/stream)
- Add X-Accel-Buffering: no header to proxied text/event-stream responses
- Add nginx reverse proxy configuration section to README
- Add tests for X-Accel-Buffering header on streaming endpoints

Fixes #236

* Fix goroutine cleanup in streaming endpoints test

Add context cancellation to TestProxyManager_StreamingEndpointsReturnNoBufferingHeader
to ensure the goroutine is properly cleaned up when the test completes.
2025-09-09 16:07:46 -07:00
Benson Wong b21dee27c1 Fix #288 Vite hot module reloading creating multiple SSE connections (#290)
- move SSE (EventSource) connection to module level
- manage EventSource as a singleton, closing open connection before
  reopening a new one
2025-09-07 21:48:58 -07:00
7 changed files with 144 additions and 17 deletions
+24
View File
@@ -73,6 +73,30 @@ However, there are many more capabilities that llama-swap supports:
See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples. See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples.
## Reverse Proxy Configuration (nginx)
If you deploy llama-swap behind nginx, disable response buffering for streaming endpoints. By default, nginx buffers responses which breaks ServerSent Events (SSE) and streaming chat completion. ([#236](https://github.com/mostlygeek/llama-swap/issues/236))
Recommended nginx configuration snippets:
```nginx
# SSE for UI events/logs
location /api/events {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
# Streaming chat completions (stream=true)
location /v1/chat/completions {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
```
As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. However, explicitly disabling `proxy_buffering` at your reverse proxy is still recommended for reliable streaming behavior.
## Web UI ## Web UI
llama-swap includes a real time web interface for monitoring logs and models: llama-swap includes a real time web interface for monitoring logs and models:
+4
View File
@@ -458,6 +458,10 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
w.Header().Add(k, v) w.Header().Add(k, v)
} }
} }
// prevent nginx from buffering streaming responses (e.g., SSE)
if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") {
w.Header().Set("X-Accel-Buffering", "no")
}
w.WriteHeader(resp.StatusCode) w.WriteHeader(resp.StatusCode)
// faster than io.Copy when streaming // faster than io.Copy when streaming
+34 -6
View File
@@ -227,7 +227,7 @@ func (pm *ProxyManager) setupGinEngine() {
pm.ginEngine.GET("/upstream", func(c *gin.Context) { pm.ginEngine.GET("/upstream", func(c *gin.Context) {
c.Redirect(http.StatusFound, "/ui/models") c.Redirect(http.StatusFound, "/ui/models")
}) })
pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream) pm.ginEngine.Any("/upstream/*upstreamPath", pm.proxyToUpstream)
pm.ginEngine.GET("/unload", pm.unloadAllModelsHandler) pm.ginEngine.GET("/unload", pm.unloadAllModelsHandler)
pm.ginEngine.GET("/running", pm.listRunningProcessesHandler) pm.ginEngine.GET("/running", pm.listRunningProcessesHandler)
@@ -393,24 +393,52 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
} }
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) { func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
requestedModel := c.Param("model_id") upstreamPath := c.Param("upstreamPath")
if requestedModel == "" { // split the upstream path by / and search for the model name
parts := strings.Split(strings.TrimSpace(upstreamPath), "/")
if len(parts) == 0 {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path") pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return return
} }
processGroup, realModelName, err := pm.swapProcessGroup(requestedModel) modelFound := false
searchModelName := ""
var modelName, remainingPath string
for i, part := range parts {
if parts[i] == "" {
continue
}
if searchModelName == "" {
searchModelName = part
} else {
searchModelName = searchModelName + "/" + parts[i]
}
if real, ok := pm.config.RealModelName(searchModelName); ok {
modelName = real
remainingPath = "/" + strings.Join(parts[i+1:], "/")
modelFound = true
break
}
}
if !modelFound {
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
return
}
processGroup, realModelName, err := pm.swapProcessGroup(modelName)
if err != nil { if err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error())) pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error()))
return return
} }
// rewrite the path // rewrite the path
c.Request.URL.Path = c.Param("upstreamPath") c.Request.URL.Path = remainingPath
processGroup.ProxyRequest(realModelName, c.Writer, c.Request) processGroup.ProxyRequest(realModelName, c.Writer, c.Request)
} }
func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) { func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) {
bodyBytes, err := io.ReadAll(c.Request.Body) bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil { if err != nil {
+2
View File
@@ -100,6 +100,8 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Cache-Control", "no-cache") c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive") c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff") c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering SSE
c.Header("X-Accel-Buffering", "no")
sendBuffer := make(chan messageEnvelope, 25) sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context()) ctx, cancel := context.WithCancel(c.Request.Context())
+2
View File
@@ -28,6 +28,8 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
c.Header("Content-Type", "text/plain") c.Header("Content-Type", "text/plain")
c.Header("Transfer-Encoding", "chunked") c.Header("Transfer-Encoding", "chunked")
c.Header("X-Content-Type-Options", "nosniff") c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering streamed logs
c.Header("X-Accel-Buffering", "no")
logMonitorId := c.Param("logMonitorID") logMonitorId := c.Param("logMonitorID")
logger, err := pm.getLogger(logMonitorId) logger, err := pm.getLogger(logMonitorId)
+65
View File
@@ -2,6 +2,7 @@ package proxy
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
@@ -913,3 +914,67 @@ models:
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState()) assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState())
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState()) assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState())
} }
func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
endpoints := []string{
"/api/events",
"/logs/stream",
"/logs/stream/proxy",
"/logs/stream/upstream",
}
for _, endpoint := range endpoints {
t.Run(endpoint, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := httptest.NewRequest("GET", endpoint, nil)
req = req.WithContext(ctx)
rec := httptest.NewRecorder()
// We don't need the handler to fully complete, just to set the headers
// so run it in a goroutine and check the headers after a short delay
go proxy.ServeHTTP(rec, req)
time.Sleep(10 * time.Millisecond) // give it time to start and write headers
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
})
}
}
func TestProxyManager_ProxiedStreamingEndpointReturnsNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"streaming-model": getTestSimpleResponderConfig("streaming-model"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
// Make a streaming request
reqBody := `{"model":"streaming-model"}`
// simple-responder will return text/event-stream when stream=true is in the query
req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody))
rec := httptest.NewRecorder()
proxy.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
assert.Contains(t, rec.Header().Get("Content-Type"), "text/event-stream")
}
+13 -11
View File
@@ -1,4 +1,4 @@
import { useRef, createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react"; import { createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react";
import type { ConnectionState } from "../lib/types"; import type { ConnectionState } from "../lib/types";
type ModelStatus = "ready" | "starting" | "stopping" | "stopped" | "shutdown" | "unknown"; type ModelStatus = "ready" | "starting" | "stopping" | "stopped" | "shutdown" | "unknown";
@@ -51,12 +51,14 @@ type APIProviderProps = {
autoStartAPIEvents?: boolean; autoStartAPIEvents?: boolean;
}; };
let apiEventSource: EventSource | null = null;
export function APIProvider({ children, autoStartAPIEvents = true }: APIProviderProps) { export function APIProvider({ children, autoStartAPIEvents = true }: APIProviderProps) {
const [proxyLogs, setProxyLogs] = useState(""); const [proxyLogs, setProxyLogs] = useState("");
const [upstreamLogs, setUpstreamLogs] = useState(""); const [upstreamLogs, setUpstreamLogs] = useState("");
const [metrics, setMetrics] = useState<Metrics[]>([]); const [metrics, setMetrics] = useState<Metrics[]>([]);
const [connectionStatus, setConnectionState] = useState<ConnectionState>("disconnected"); const [connectionStatus, setConnectionState] = useState<ConnectionState>("disconnected");
const apiEventSource = useRef<EventSource | null>(null); //const apiEventSource = useRef<EventSource | null>(null);
const [models, setModels] = useState<Model[]>([]); const [models, setModels] = useState<Model[]>([]);
@@ -69,8 +71,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
const enableAPIEvents = useCallback((enabled: boolean) => { const enableAPIEvents = useCallback((enabled: boolean) => {
if (!enabled) { if (!enabled) {
apiEventSource.current?.close(); apiEventSource?.close();
apiEventSource.current = null; apiEventSource = null;
setMetrics([]); setMetrics([]);
return; return;
} }
@@ -79,22 +81,22 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
const initialDelay = 1000; // 1 second const initialDelay = 1000; // 1 second
const connect = () => { const connect = () => {
apiEventSource.current = null; apiEventSource?.close();
const eventSource = new EventSource("/api/events"); apiEventSource = new EventSource("/api/events");
setConnectionState("connecting"); setConnectionState("connecting");
eventSource.onopen = () => { apiEventSource.onopen = () => {
// clear everything out on connect to keep things in sync // clear everything out on connect to keep things in sync
setProxyLogs(""); setProxyLogs("");
setUpstreamLogs(""); setUpstreamLogs("");
setMetrics([]); // clear metrics on reconnect setMetrics([]); // clear metrics on reconnect
setModels([]); // clear models on reconnect setModels([]); // clear models on reconnect
apiEventSource.current = eventSource;
retryCount = 0; retryCount = 0;
setConnectionState("connected"); setConnectionState("connected");
}; };
eventSource.onmessage = (e: MessageEvent) => { apiEventSource.onmessage = (e: MessageEvent) => {
try { try {
const message = JSON.parse(e.data) as APIEventEnvelope; const message = JSON.parse(e.data) as APIEventEnvelope;
switch (message.type) { switch (message.type) {
@@ -137,8 +139,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
} }
}; };
eventSource.onerror = () => { apiEventSource.onerror = () => {
eventSource.close(); apiEventSource?.close();
retryCount++; retryCount++;
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000); const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000);
setConnectionState("disconnected"); setConnectionState("disconnected");