Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c36986fef6 | |||
| 558801db1a | |||
| b21dee27c1 |
@@ -73,6 +73,30 @@ However, there are many more capabilities that llama-swap supports:
|
|||||||
|
|
||||||
See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples.
|
See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples.
|
||||||
|
|
||||||
|
## Reverse Proxy Configuration (nginx)
|
||||||
|
|
||||||
|
If you deploy llama-swap behind nginx, disable response buffering for streaming endpoints. By default, nginx buffers responses which breaks Server‑Sent Events (SSE) and streaming chat completion. ([#236](https://github.com/mostlygeek/llama-swap/issues/236))
|
||||||
|
|
||||||
|
Recommended nginx configuration snippets:
|
||||||
|
|
||||||
|
```nginx
|
||||||
|
# SSE for UI events/logs
|
||||||
|
location /api/events {
|
||||||
|
proxy_pass http://your-llama-swap-backend;
|
||||||
|
proxy_buffering off;
|
||||||
|
proxy_cache off;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Streaming chat completions (stream=true)
|
||||||
|
location /v1/chat/completions {
|
||||||
|
proxy_pass http://your-llama-swap-backend;
|
||||||
|
proxy_buffering off;
|
||||||
|
proxy_cache off;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. However, explicitly disabling `proxy_buffering` at your reverse proxy is still recommended for reliable streaming behavior.
|
||||||
|
|
||||||
## Web UI
|
## Web UI
|
||||||
|
|
||||||
llama-swap includes a real time web interface for monitoring logs and models:
|
llama-swap includes a real time web interface for monitoring logs and models:
|
||||||
|
|||||||
@@ -458,6 +458,10 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Header().Add(k, v)
|
w.Header().Add(k, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// prevent nginx from buffering streaming responses (e.g., SSE)
|
||||||
|
if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") {
|
||||||
|
w.Header().Set("X-Accel-Buffering", "no")
|
||||||
|
}
|
||||||
w.WriteHeader(resp.StatusCode)
|
w.WriteHeader(resp.StatusCode)
|
||||||
|
|
||||||
// faster than io.Copy when streaming
|
// faster than io.Copy when streaming
|
||||||
|
|||||||
+34
-6
@@ -227,7 +227,7 @@ func (pm *ProxyManager) setupGinEngine() {
|
|||||||
pm.ginEngine.GET("/upstream", func(c *gin.Context) {
|
pm.ginEngine.GET("/upstream", func(c *gin.Context) {
|
||||||
c.Redirect(http.StatusFound, "/ui/models")
|
c.Redirect(http.StatusFound, "/ui/models")
|
||||||
})
|
})
|
||||||
pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream)
|
pm.ginEngine.Any("/upstream/*upstreamPath", pm.proxyToUpstream)
|
||||||
|
|
||||||
pm.ginEngine.GET("/unload", pm.unloadAllModelsHandler)
|
pm.ginEngine.GET("/unload", pm.unloadAllModelsHandler)
|
||||||
pm.ginEngine.GET("/running", pm.listRunningProcessesHandler)
|
pm.ginEngine.GET("/running", pm.listRunningProcessesHandler)
|
||||||
@@ -393,24 +393,52 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
|
func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
|
||||||
requestedModel := c.Param("model_id")
|
upstreamPath := c.Param("upstreamPath")
|
||||||
|
|
||||||
if requestedModel == "" {
|
// split the upstream path by / and search for the model name
|
||||||
|
parts := strings.Split(strings.TrimSpace(upstreamPath), "/")
|
||||||
|
if len(parts) == 0 {
|
||||||
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
|
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processGroup, realModelName, err := pm.swapProcessGroup(requestedModel)
|
modelFound := false
|
||||||
|
searchModelName := ""
|
||||||
|
var modelName, remainingPath string
|
||||||
|
for i, part := range parts {
|
||||||
|
if parts[i] == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if searchModelName == "" {
|
||||||
|
searchModelName = part
|
||||||
|
} else {
|
||||||
|
searchModelName = searchModelName + "/" + parts[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
if real, ok := pm.config.RealModelName(searchModelName); ok {
|
||||||
|
modelName = real
|
||||||
|
remainingPath = "/" + strings.Join(parts[i+1:], "/")
|
||||||
|
modelFound = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !modelFound {
|
||||||
|
pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
processGroup, realModelName, err := pm.swapProcessGroup(modelName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error()))
|
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// rewrite the path
|
// rewrite the path
|
||||||
c.Request.URL.Path = c.Param("upstreamPath")
|
c.Request.URL.Path = remainingPath
|
||||||
processGroup.ProxyRequest(realModelName, c.Writer, c.Request)
|
processGroup.ProxyRequest(realModelName, c.Writer, c.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) {
|
func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) {
|
||||||
bodyBytes, err := io.ReadAll(c.Request.Body)
|
bodyBytes, err := io.ReadAll(c.Request.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -100,6 +100,8 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
|
|||||||
c.Header("Cache-Control", "no-cache")
|
c.Header("Cache-Control", "no-cache")
|
||||||
c.Header("Connection", "keep-alive")
|
c.Header("Connection", "keep-alive")
|
||||||
c.Header("X-Content-Type-Options", "nosniff")
|
c.Header("X-Content-Type-Options", "nosniff")
|
||||||
|
// prevent nginx from buffering SSE
|
||||||
|
c.Header("X-Accel-Buffering", "no")
|
||||||
|
|
||||||
sendBuffer := make(chan messageEnvelope, 25)
|
sendBuffer := make(chan messageEnvelope, 25)
|
||||||
ctx, cancel := context.WithCancel(c.Request.Context())
|
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
|
|||||||
c.Header("Content-Type", "text/plain")
|
c.Header("Content-Type", "text/plain")
|
||||||
c.Header("Transfer-Encoding", "chunked")
|
c.Header("Transfer-Encoding", "chunked")
|
||||||
c.Header("X-Content-Type-Options", "nosniff")
|
c.Header("X-Content-Type-Options", "nosniff")
|
||||||
|
// prevent nginx from buffering streamed logs
|
||||||
|
c.Header("X-Accel-Buffering", "no")
|
||||||
|
|
||||||
logMonitorId := c.Param("logMonitorID")
|
logMonitorId := c.Param("logMonitorID")
|
||||||
logger, err := pm.getLogger(logMonitorId)
|
logger, err := pm.getLogger(logMonitorId)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@@ -913,3 +914,67 @@ models:
|
|||||||
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState())
|
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState())
|
||||||
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState())
|
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
|
||||||
|
config := AddDefaultGroupToConfig(Config{
|
||||||
|
HealthCheckTimeout: 15,
|
||||||
|
Models: map[string]ModelConfig{
|
||||||
|
"model1": getTestSimpleResponderConfig("model1"),
|
||||||
|
},
|
||||||
|
LogLevel: "error",
|
||||||
|
})
|
||||||
|
|
||||||
|
proxy := New(config)
|
||||||
|
defer proxy.StopProcesses(StopWaitForInflightRequest)
|
||||||
|
|
||||||
|
endpoints := []string{
|
||||||
|
"/api/events",
|
||||||
|
"/logs/stream",
|
||||||
|
"/logs/stream/proxy",
|
||||||
|
"/logs/stream/upstream",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, endpoint := range endpoints {
|
||||||
|
t.Run(endpoint, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req := httptest.NewRequest("GET", endpoint, nil)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
// We don't need the handler to fully complete, just to set the headers
|
||||||
|
// so run it in a goroutine and check the headers after a short delay
|
||||||
|
go proxy.ServeHTTP(rec, req)
|
||||||
|
time.Sleep(10 * time.Millisecond) // give it time to start and write headers
|
||||||
|
|
||||||
|
assert.Equal(t, http.StatusOK, rec.Code)
|
||||||
|
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProxyManager_ProxiedStreamingEndpointReturnsNoBufferingHeader(t *testing.T) {
|
||||||
|
config := AddDefaultGroupToConfig(Config{
|
||||||
|
HealthCheckTimeout: 15,
|
||||||
|
Models: map[string]ModelConfig{
|
||||||
|
"streaming-model": getTestSimpleResponderConfig("streaming-model"),
|
||||||
|
},
|
||||||
|
LogLevel: "error",
|
||||||
|
})
|
||||||
|
|
||||||
|
proxy := New(config)
|
||||||
|
defer proxy.StopProcesses(StopWaitForInflightRequest)
|
||||||
|
|
||||||
|
// Make a streaming request
|
||||||
|
reqBody := `{"model":"streaming-model"}`
|
||||||
|
// simple-responder will return text/event-stream when stream=true is in the query
|
||||||
|
req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
proxy.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
assert.Equal(t, http.StatusOK, rec.Code)
|
||||||
|
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
|
||||||
|
assert.Contains(t, rec.Header().Get("Content-Type"), "text/event-stream")
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { useRef, createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react";
|
import { createContext, useState, useContext, useEffect, useCallback, useMemo, type ReactNode } from "react";
|
||||||
import type { ConnectionState } from "../lib/types";
|
import type { ConnectionState } from "../lib/types";
|
||||||
|
|
||||||
type ModelStatus = "ready" | "starting" | "stopping" | "stopped" | "shutdown" | "unknown";
|
type ModelStatus = "ready" | "starting" | "stopping" | "stopped" | "shutdown" | "unknown";
|
||||||
@@ -51,12 +51,14 @@ type APIProviderProps = {
|
|||||||
autoStartAPIEvents?: boolean;
|
autoStartAPIEvents?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let apiEventSource: EventSource | null = null;
|
||||||
|
|
||||||
export function APIProvider({ children, autoStartAPIEvents = true }: APIProviderProps) {
|
export function APIProvider({ children, autoStartAPIEvents = true }: APIProviderProps) {
|
||||||
const [proxyLogs, setProxyLogs] = useState("");
|
const [proxyLogs, setProxyLogs] = useState("");
|
||||||
const [upstreamLogs, setUpstreamLogs] = useState("");
|
const [upstreamLogs, setUpstreamLogs] = useState("");
|
||||||
const [metrics, setMetrics] = useState<Metrics[]>([]);
|
const [metrics, setMetrics] = useState<Metrics[]>([]);
|
||||||
const [connectionStatus, setConnectionState] = useState<ConnectionState>("disconnected");
|
const [connectionStatus, setConnectionState] = useState<ConnectionState>("disconnected");
|
||||||
const apiEventSource = useRef<EventSource | null>(null);
|
//const apiEventSource = useRef<EventSource | null>(null);
|
||||||
|
|
||||||
const [models, setModels] = useState<Model[]>([]);
|
const [models, setModels] = useState<Model[]>([]);
|
||||||
|
|
||||||
@@ -69,8 +71,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
|
|||||||
|
|
||||||
const enableAPIEvents = useCallback((enabled: boolean) => {
|
const enableAPIEvents = useCallback((enabled: boolean) => {
|
||||||
if (!enabled) {
|
if (!enabled) {
|
||||||
apiEventSource.current?.close();
|
apiEventSource?.close();
|
||||||
apiEventSource.current = null;
|
apiEventSource = null;
|
||||||
setMetrics([]);
|
setMetrics([]);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -79,22 +81,22 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
|
|||||||
const initialDelay = 1000; // 1 second
|
const initialDelay = 1000; // 1 second
|
||||||
|
|
||||||
const connect = () => {
|
const connect = () => {
|
||||||
apiEventSource.current = null;
|
apiEventSource?.close();
|
||||||
const eventSource = new EventSource("/api/events");
|
apiEventSource = new EventSource("/api/events");
|
||||||
|
|
||||||
setConnectionState("connecting");
|
setConnectionState("connecting");
|
||||||
|
|
||||||
eventSource.onopen = () => {
|
apiEventSource.onopen = () => {
|
||||||
// clear everything out on connect to keep things in sync
|
// clear everything out on connect to keep things in sync
|
||||||
setProxyLogs("");
|
setProxyLogs("");
|
||||||
setUpstreamLogs("");
|
setUpstreamLogs("");
|
||||||
setMetrics([]); // clear metrics on reconnect
|
setMetrics([]); // clear metrics on reconnect
|
||||||
setModels([]); // clear models on reconnect
|
setModels([]); // clear models on reconnect
|
||||||
apiEventSource.current = eventSource;
|
|
||||||
retryCount = 0;
|
retryCount = 0;
|
||||||
setConnectionState("connected");
|
setConnectionState("connected");
|
||||||
};
|
};
|
||||||
|
|
||||||
eventSource.onmessage = (e: MessageEvent) => {
|
apiEventSource.onmessage = (e: MessageEvent) => {
|
||||||
try {
|
try {
|
||||||
const message = JSON.parse(e.data) as APIEventEnvelope;
|
const message = JSON.parse(e.data) as APIEventEnvelope;
|
||||||
switch (message.type) {
|
switch (message.type) {
|
||||||
@@ -137,8 +139,8 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
eventSource.onerror = () => {
|
apiEventSource.onerror = () => {
|
||||||
eventSource.close();
|
apiEventSource?.close();
|
||||||
retryCount++;
|
retryCount++;
|
||||||
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000);
|
const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), 5000);
|
||||||
setConnectionState("disconnected");
|
setConnectionState("disconnected");
|
||||||
|
|||||||
Reference in New Issue
Block a user