Compare commits

..

4 Commits

Author SHA1 Message Date
Benson Wong 85cd74a51c Improve process start and stop reliability (#38)
Refactor Process.start()/Stop() logic (#38)
- remove cmd.Wait() call in start(). This seems to conflict with the one
  in .Stop(). Removing it eliminated no child errors
- eliminate goroutines in .start() as it no longer required
2025-02-03 11:50:38 -08:00
Benson Wong 314d2f2212 remove cmd_stop configuration and functionality from PR #40 (#44)
* remove cmd_stop functionality from #40
2025-01-31 12:42:44 -08:00
Benson Wong fad25f3e11 Use client request context in proxy request (#43)
Canceled or closed HTTP requests from clients will also stop the proxied
HTTP requests to upstreamed servers.
2025-01-31 10:21:49 -08:00
Benson Wong 2c3e3e27f7 Support OPTIONS requests (#42)
Add middleware that responds with permissive OPTIONS headers
for all request paths.
2025-01-31 10:09:07 -08:00
6 changed files with 48 additions and 140 deletions
+3 -9
View File
@@ -30,13 +30,12 @@ Any OpenAI compatible server would work. llama-swap was originally designed for
- `v1/rerank`
- `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36))
- ✅ Multiple GPU support
- ✅ Docker Support ([#40](https://github.com/mostlygeek/llama-swap/pull/40))
- ✅ Docker and Podman support
- ✅ Run multiple models at once with `profiles`
- ✅ Remote log monitoring at `/log`
- ✅ Automatic unloading of models from GPUs after timeout
- ✅ Use any local OpenAI compatible server (llama.cpp, vllm, tabbyAPI, etc)
- ✅ Direct access to upstream HTTP server via `/upstream/:model_id` ([demo](https://github.com/mostlygeek/llama-swap/pull/31))
-
## config.yaml
@@ -91,14 +90,9 @@ models:
cmd: llama-server --port 9999 -m Llama-3.2-1B-Instruct-Q4_K_M.gguf -ngl 0
unlisted: true
# Docker Support (Experimental)
# see: https://github.com/mostlygeek/llama-swap/pull/40
"dockertest":
# Docker Support (v26.1.4+ required!)
"docker-llama":
proxy: "http://127.0.0.1:9790"
# introduced to reliably stop containers
cmd_stop: docker stop -t 2 dockertest
cmd: >
docker run --name dockertest
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
+1 -8
View File
@@ -53,16 +53,9 @@ models:
--ctx-size 8192
--reranking
# EXPERIMENTAL! Docker Support
# see:
# - https://github.com/mostlygeek/llama-swap/pull/40
# - https://github.com/mostlygeek/llama-swap/issues/35
# Docker Support (v26.1.4+ required!)
"dockertest":
proxy: "http://127.0.0.1:9790"
# use this to reliably stop named containers
cmd_stop: docker stop -t 2 dockertest
cmd: >
docker run --name dockertest
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
-4
View File
@@ -11,7 +11,6 @@ import (
type ModelConfig struct {
Cmd string `yaml:"cmd"`
CmdStop string `yaml:"cmd_stop"`
Proxy string `yaml:"proxy"`
Aliases []string `yaml:"aliases"`
Env []string `yaml:"env"`
@@ -23,9 +22,6 @@ type ModelConfig struct {
func (m *ModelConfig) SanitizedCommand() ([]string, error) {
return SanitizeCommand(m.Cmd)
}
func (m *ModelConfig) SanitizeCommandStop() ([]string, error) {
return SanitizeCommand(m.CmdStop)
}
type Config struct {
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
-26
View File
@@ -35,11 +35,6 @@ models:
aliases:
- "m2"
checkEndpoint: "/"
docker:
cmd: docker run -p 9999:8080 --name "my_container"
cmd_stop: docker stop my_container
proxy: "http://localhost:9999"
checkEndpoint: "/health"
healthCheckTimeout: 15
profiles:
test:
@@ -61,7 +56,6 @@ profiles:
Models: map[string]ModelConfig{
"model1": {
Cmd: "path/to/cmd --arg1 one",
CmdStop: "",
Proxy: "http://localhost:8080",
Aliases: []string{"m1", "model-one"},
Env: []string{"VAR1=value1", "VAR2=value2"},
@@ -69,19 +63,11 @@ profiles:
},
"model2": {
Cmd: "path/to/cmd --arg1 one",
CmdStop: "",
Proxy: "http://localhost:8081",
Aliases: []string{"m2"},
Env: nil,
CheckEndpoint: "/",
},
"docker": {
Cmd: `docker run -p 9999:8080 --name "my_container"`,
CmdStop: "docker stop my_container",
Proxy: "http://localhost:9999",
Env: nil,
CheckEndpoint: "/health",
},
},
HealthCheckTimeout: 15,
Profiles: map[string][]string{
@@ -113,18 +99,6 @@ func TestConfig_ModelConfigSanitizedCommand(t *testing.T) {
assert.Equal(t, []string{"python", "model1.py", "--arg1", "value1", "--arg2", "value2"}, args)
}
func TestConfig_ModelConfigSanitizedCommandStop(t *testing.T) {
config := &ModelConfig{
CmdStop: `docker stop my_container \
--arg1 1
--arg2 2`,
}
args, err := config.SanitizeCommandStop()
assert.NoError(t, err)
assert.Equal(t, []string{"docker", "stop", "my_container", "--arg1", "1", "--arg2", "2"}, args)
}
func TestConfig_FindConfig(t *testing.T) {
// TODO?
+31 -93
View File
@@ -2,7 +2,6 @@ package proxy
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -86,36 +85,11 @@ func (p *Process) start() error {
// 3. The health check passes
//
// only in the third case will the process be considered Ready to accept
healthCheckContext, cancelHealthCheck := context.WithCancelCause(context.Background())
defer cancelHealthCheck(nil) // clean up
cmdWaitChan := make(chan error, 1)
healthCheckChan := make(chan error, 1)
<-time.After(250 * time.Millisecond) // give process a bit of time to start
go func() {
// possible cmd exits early
cmdWaitChan <- p.cmd.Wait()
}()
go func() {
<-time.After(250 * time.Millisecond) // give process a bit of time to start
healthCheckChan <- p.checkHealthEndpoint(healthCheckContext)
}()
select {
case err := <-cmdWaitChan:
if err := p.checkHealthEndpoint(); err != nil {
p.state = StateFailed
if err != nil {
err = fmt.Errorf("command [%s] %s", strings.Join(p.cmd.Args, " "), err.Error())
} else {
err = fmt.Errorf("command [%s] exited unexpected", strings.Join(p.cmd.Args, " "))
}
cancelHealthCheck(err)
return err
case err := <-healthCheckChan:
if err != nil {
p.state = StateFailed
return err
}
}
if p.config.UnloadAfter > 0 {
@@ -153,7 +127,7 @@ func (p *Process) Stop() {
defer p.stateMutex.Unlock()
if p.state != StateReady {
fmt.Fprintf(p.logMonitor, "!!! Stop() called but Process State is not READY\n")
fmt.Fprintf(p.logMonitor, "!!! Info - Stop() called but Process State is not READY\n")
return
}
@@ -164,58 +138,35 @@ func (p *Process) Stop() {
return
}
// Pretty sure this stopping code needs some work for windows and
// will be a source of pain in the future.
sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if p.config.CmdStop != "" {
// for issue #35 to do things like `docker stop`
args, err := p.config.SanitizeCommandStop()
sigtermNormal := make(chan error, 1)
go func() {
sigtermNormal <- p.cmd.Wait()
}()
p.cmd.Process.Signal(syscall.SIGTERM)
select {
case <-sigtermTimeout.Done():
fmt.Fprintf(p.logMonitor, "!!! process [%s] timed out waiting to stop, sending KILL signal\n", p.ID)
p.cmd.Process.Kill()
case err := <-sigtermNormal:
if err != nil {
fmt.Fprintf(p.logMonitor, "!!! Error sanitizing stop command: %v\n", err)
// leave the state as it is?
return
}
fmt.Fprintf(p.logMonitor, "!!! Running stop command: %s\n", strings.Join(args, " "))
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = p.logMonitor
cmd.Stderr = p.logMonitor
err = cmd.Start()
if err != nil {
fmt.Fprintf(p.logMonitor, "!!! Error running stop command: %v\n", err)
// leave the state as it is?
return
}
err = cmd.Wait()
if err != nil {
fmt.Fprintf(p.logMonitor, "!!! WARNING error waiting for stop command to complete: %v\n", err)
}
} else {
sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
sigtermNormal := make(chan error, 1)
go func() {
sigtermNormal <- p.cmd.Wait()
}()
p.cmd.Process.Signal(syscall.SIGTERM)
select {
case <-sigtermTimeout.Done():
fmt.Fprintf(p.logMonitor, "XXX Process for %s timed out waiting to stop, sending SIGKILL to PID: %d\n", p.ID, p.cmd.Process.Pid)
p.cmd.Process.Kill()
p.cmd.Wait()
case err := <-sigtermNormal:
if err != nil {
if err.Error() != "wait: no child processes" {
// possible that simple-responder for testing is just not
// existing right, so suppress those errors.
fmt.Fprintf(p.logMonitor, "!!! process for %s stopped with error > %v\n", p.ID, err)
if errno, ok := err.(syscall.Errno); ok {
fmt.Fprintf(p.logMonitor, "!!! process [%s] errno >> %v\n", p.ID, errno)
} else if exitError, ok := err.(*exec.ExitError); ok {
if strings.Contains(exitError.String(), "signal: terminated") {
fmt.Fprintf(p.logMonitor, "!!! process [%s] stopped OK\n", p.ID)
} else if strings.Contains(exitError.String(), "signal: interrupt") {
fmt.Fprintf(p.logMonitor, "!!! process [%s] interrupted OK\n", p.ID)
} else {
fmt.Fprintf(p.logMonitor, "!!! process [%s] ExitError >> %v, exit code: %d\n", p.ID, exitError, exitError.ExitCode())
}
} else {
fmt.Fprintf(p.logMonitor, "!!! process [%s] exited >> %v\n", p.ID, err)
}
}
}
@@ -229,7 +180,7 @@ func (p *Process) CurrentState() ProcessState {
return p.state
}
func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error {
func (p *Process) checkHealthEndpoint() error {
if p.config.Proxy == "" {
return fmt.Errorf("no upstream available to check /health")
}
@@ -261,24 +212,11 @@ func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error {
return err
}
ctx, cancel := context.WithTimeout(ctxFromStart, time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := client.Do(req)
ttl := (maxDuration - time.Since(startTime)).Seconds()
if err != nil {
// check if the context was cancelled
select {
case <-ctx.Done():
err := context.Cause(ctx)
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
default:
}
// wait a bit longer for TCP connection issues
if strings.Contains(err.Error(), "connection refused") {
fmt.Fprintf(p.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl)
@@ -326,7 +264,7 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
proxyTo := p.config.Proxy
client := &http.Client{}
req, err := http.NewRequest(r.Method, proxyTo+r.URL.String(), r.Body)
req, err := http.NewRequestWithContext(r.Context(), r.Method, proxyTo+r.URL.String(), r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
+13
View File
@@ -69,6 +69,19 @@ func New(config *Config) *ProxyManager {
})
}
// see: https://github.com/mostlygeek/llama-swap/issues/42
// respond with permissive OPTIONS for any endpoint
pm.ginEngine.Use(func(c *gin.Context) {
if c.Request.Method == "OPTIONS" {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization")
c.AbortWithStatus(204)
return
}
c.Next()
})
// Set up routes using the Gin engine
pm.ginEngine.POST("/v1/chat/completions", pm.proxyOAIHandler)
// Support legacy /v1/completions api, see issue #12