Compare commits

..

5 Commits

Author SHA1 Message Date
Benson Wong 85cd74a51c Improve process start and stop reliability (#38)
Refactor Process.start()/Stop() logic (#38)
- remove cmd.Wait() call in start(). This seems to conflict with the one
  in .Stop(). Removing it eliminated no child errors
- eliminate goroutines in .start() as it no longer required
2025-02-03 11:50:38 -08:00
Benson Wong 314d2f2212 remove cmd_stop configuration and functionality from PR #40 (#44)
* remove cmd_stop functionality from #40
2025-01-31 12:42:44 -08:00
Benson Wong fad25f3e11 Use client request context in proxy request (#43)
Canceled or closed HTTP requests from clients will also stop the proxied
HTTP requests to upstreamed servers.
2025-01-31 10:21:49 -08:00
Benson Wong 2c3e3e27f7 Support OPTIONS requests (#42)
Add middleware that responds with permissive OPTIONS headers
for all request paths.
2025-01-31 10:09:07 -08:00
Benson Wong baeb0c4e7f Add cmd_stop configuration to better support docker (#35)
Add `cmd_stop` to model configuration to run a command instead of sending a SIGTERM to shutdown a process before swapping.
2025-01-30 16:59:57 -08:00
5 changed files with 65 additions and 54 deletions
+10
View File
@@ -30,6 +30,7 @@ Any OpenAI compatible server would work. llama-swap was originally designed for
- `v1/rerank` - `v1/rerank`
- `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36)) - `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36))
- ✅ Multiple GPU support - ✅ Multiple GPU support
- ✅ Docker and Podman support
- ✅ Run multiple models at once with `profiles` - ✅ Run multiple models at once with `profiles`
- ✅ Remote log monitoring at `/log` - ✅ Remote log monitoring at `/log`
- ✅ Automatic unloading of models from GPUs after timeout - ✅ Automatic unloading of models from GPUs after timeout
@@ -89,6 +90,15 @@ models:
cmd: llama-server --port 9999 -m Llama-3.2-1B-Instruct-Q4_K_M.gguf -ngl 0 cmd: llama-server --port 9999 -m Llama-3.2-1B-Instruct-Q4_K_M.gguf -ngl 0
unlisted: true unlisted: true
# Docker Support (v26.1.4+ required!)
"docker-llama":
proxy: "http://127.0.0.1:9790"
cmd: >
docker run --name dockertest
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
ghcr.io/ggerganov/llama.cpp:server
--model '/models/Qwen2.5-Coder-0.5B-Instruct-Q4_K_M.gguf'
# profiles make it easy to managing multi model (and gpu) configurations. # profiles make it easy to managing multi model (and gpu) configurations.
# #
# Tips: # Tips:
+8
View File
@@ -53,6 +53,14 @@ models:
--ctx-size 8192 --ctx-size 8192
--reranking --reranking
# Docker Support (v26.1.4+ required!)
"dockertest":
proxy: "http://127.0.0.1:9790"
cmd: >
docker run --name dockertest
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
ghcr.io/ggerganov/llama.cpp:server
--model '/models/Qwen2.5-Coder-0.5B-Instruct-Q4_K_M.gguf'
"simple": "simple":
# example of setting environment variables # example of setting environment variables
+12
View File
@@ -4,6 +4,8 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"os/signal"
"syscall"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/mostlygeek/llama-swap/proxy" "github.com/mostlygeek/llama-swap/proxy"
@@ -39,6 +41,16 @@ func main() {
} }
proxyManager := proxy.New(config) proxyManager := proxy.New(config)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Shutting down llama-swap")
proxyManager.StopProcesses()
os.Exit(0)
}()
fmt.Println("llama-swap listening on " + *listenStr) fmt.Println("llama-swap listening on " + *listenStr)
if err := proxyManager.Run(*listenStr); err != nil { if err := proxyManager.Run(*listenStr); err != nil {
fmt.Printf("Server error: %v\n", err) fmt.Printf("Server error: %v\n", err)
+20 -52
View File
@@ -2,7 +2,6 @@ package proxy
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@@ -86,37 +85,12 @@ func (p *Process) start() error {
// 3. The health check passes // 3. The health check passes
// //
// only in the third case will the process be considered Ready to accept // only in the third case will the process be considered Ready to accept
healthCheckContext, cancelHealthCheck := context.WithCancelCause(context.Background())
defer cancelHealthCheck(nil) // clean up
cmdWaitChan := make(chan error, 1)
healthCheckChan := make(chan error, 1)
go func() {
// possible cmd exits early
cmdWaitChan <- p.cmd.Wait()
}()
go func() {
<-time.After(250 * time.Millisecond) // give process a bit of time to start <-time.After(250 * time.Millisecond) // give process a bit of time to start
healthCheckChan <- p.checkHealthEndpoint(healthCheckContext)
}()
select { if err := p.checkHealthEndpoint(); err != nil {
case err := <-cmdWaitChan:
p.state = StateFailed
if err != nil {
err = fmt.Errorf("command [%s] %s", strings.Join(p.cmd.Args, " "), err.Error())
} else {
err = fmt.Errorf("command [%s] exited unexpected", strings.Join(p.cmd.Args, " "))
}
cancelHealthCheck(err)
return err
case err := <-healthCheckChan:
if err != nil {
p.state = StateFailed p.state = StateFailed
return err return err
} }
}
if p.config.UnloadAfter > 0 { if p.config.UnloadAfter > 0 {
// start a goroutine to check every second if // start a goroutine to check every second if
@@ -153,19 +127,17 @@ func (p *Process) Stop() {
defer p.stateMutex.Unlock() defer p.stateMutex.Unlock()
if p.state != StateReady { if p.state != StateReady {
fmt.Fprintf(p.logMonitor, "!!! Info - Stop() called but Process State is not READY\n")
return return
} }
if p.cmd == nil || p.cmd.Process == nil { if p.cmd == nil || p.cmd.Process == nil {
// this situation should never happen... but if it does just update the state // this situation should never happen... but if it does just update the state
fmt.Fprintf(p.logMonitor, "!!! State is Ready but Command is nil.") fmt.Fprintf(p.logMonitor, "!!! State is Ready but Command is nil.\n")
p.state = StateStopped p.state = StateStopped
return return
} }
// Pretty sure this stopping code needs some work for windows and
// will be a source of pain in the future.
sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
@@ -178,18 +150,27 @@ func (p *Process) Stop() {
select { select {
case <-sigtermTimeout.Done(): case <-sigtermTimeout.Done():
fmt.Fprintf(p.logMonitor, "XXX Process for %s timed out waiting to stop, sending SIGKILL to PID: %d\n", p.ID, p.cmd.Process.Pid) fmt.Fprintf(p.logMonitor, "!!! process [%s] timed out waiting to stop, sending KILL signal\n", p.ID)
p.cmd.Process.Kill() p.cmd.Process.Kill()
p.cmd.Wait()
case err := <-sigtermNormal: case err := <-sigtermNormal:
if err != nil { if err != nil {
if err.Error() != "wait: no child processes" { if errno, ok := err.(syscall.Errno); ok {
// possible that simple-responder for testing is just not fmt.Fprintf(p.logMonitor, "!!! process [%s] errno >> %v\n", p.ID, errno)
// existing right, so suppress those errors. } else if exitError, ok := err.(*exec.ExitError); ok {
fmt.Fprintf(p.logMonitor, "!!! process for %s stopped with error > %v\n", p.ID, err) if strings.Contains(exitError.String(), "signal: terminated") {
fmt.Fprintf(p.logMonitor, "!!! process [%s] stopped OK\n", p.ID)
} else if strings.Contains(exitError.String(), "signal: interrupt") {
fmt.Fprintf(p.logMonitor, "!!! process [%s] interrupted OK\n", p.ID)
} else {
fmt.Fprintf(p.logMonitor, "!!! process [%s] ExitError >> %v, exit code: %d\n", p.ID, exitError, exitError.ExitCode())
}
} else {
fmt.Fprintf(p.logMonitor, "!!! process [%s] exited >> %v\n", p.ID, err)
} }
} }
} }
p.state = StateStopped p.state = StateStopped
} }
@@ -199,7 +180,7 @@ func (p *Process) CurrentState() ProcessState {
return p.state return p.state
} }
func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error { func (p *Process) checkHealthEndpoint() error {
if p.config.Proxy == "" { if p.config.Proxy == "" {
return fmt.Errorf("no upstream available to check /health") return fmt.Errorf("no upstream available to check /health")
} }
@@ -231,24 +212,11 @@ func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error {
return err return err
} }
ctx, cancel := context.WithTimeout(ctxFromStart, time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := client.Do(req) resp, err := client.Do(req)
ttl := (maxDuration - time.Since(startTime)).Seconds() ttl := (maxDuration - time.Since(startTime)).Seconds()
if err != nil { if err != nil {
// check if the context was cancelled
select {
case <-ctx.Done():
err := context.Cause(ctx)
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
default:
}
// wait a bit longer for TCP connection issues // wait a bit longer for TCP connection issues
if strings.Contains(err.Error(), "connection refused") { if strings.Contains(err.Error(), "connection refused") {
fmt.Fprintf(p.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl) fmt.Fprintf(p.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl)
@@ -296,7 +264,7 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
proxyTo := p.config.Proxy proxyTo := p.config.Proxy
client := &http.Client{} client := &http.Client{}
req, err := http.NewRequest(r.Method, proxyTo+r.URL.String(), r.Body) req, err := http.NewRequestWithContext(r.Context(), r.Method, proxyTo+r.URL.String(), r.Body)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
+13
View File
@@ -69,6 +69,19 @@ func New(config *Config) *ProxyManager {
}) })
} }
// see: https://github.com/mostlygeek/llama-swap/issues/42
// respond with permissive OPTIONS for any endpoint
pm.ginEngine.Use(func(c *gin.Context) {
if c.Request.Method == "OPTIONS" {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization")
c.AbortWithStatus(204)
return
}
c.Next()
})
// Set up routes using the Gin engine // Set up routes using the Gin engine
pm.ginEngine.POST("/v1/chat/completions", pm.proxyOAIHandler) pm.ginEngine.POST("/v1/chat/completions", pm.proxyOAIHandler)
// Support legacy /v1/completions api, see issue #12 // Support legacy /v1/completions api, see issue #12