Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 85cd74a51c | |||
| 314d2f2212 |
@@ -30,13 +30,12 @@ Any OpenAI compatible server would work. llama-swap was originally designed for
|
||||
- `v1/rerank`
|
||||
- `v1/audio/speech` ([#36](https://github.com/mostlygeek/llama-swap/issues/36))
|
||||
- ✅ Multiple GPU support
|
||||
- ✅ Docker Support ([#40](https://github.com/mostlygeek/llama-swap/pull/40))
|
||||
- ✅ Docker and Podman support
|
||||
- ✅ Run multiple models at once with `profiles`
|
||||
- ✅ Remote log monitoring at `/log`
|
||||
- ✅ Automatic unloading of models from GPUs after timeout
|
||||
- ✅ Use any local OpenAI compatible server (llama.cpp, vllm, tabbyAPI, etc)
|
||||
- ✅ Direct access to upstream HTTP server via `/upstream/:model_id` ([demo](https://github.com/mostlygeek/llama-swap/pull/31))
|
||||
-
|
||||
|
||||
## config.yaml
|
||||
|
||||
@@ -91,14 +90,9 @@ models:
|
||||
cmd: llama-server --port 9999 -m Llama-3.2-1B-Instruct-Q4_K_M.gguf -ngl 0
|
||||
unlisted: true
|
||||
|
||||
# Docker Support (Experimental)
|
||||
# see: https://github.com/mostlygeek/llama-swap/pull/40
|
||||
"dockertest":
|
||||
# Docker Support (v26.1.4+ required!)
|
||||
"docker-llama":
|
||||
proxy: "http://127.0.0.1:9790"
|
||||
|
||||
# introduced to reliably stop containers
|
||||
cmd_stop: docker stop -t 2 dockertest
|
||||
|
||||
cmd: >
|
||||
docker run --name dockertest
|
||||
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
|
||||
|
||||
+1
-8
@@ -53,16 +53,9 @@ models:
|
||||
--ctx-size 8192
|
||||
--reranking
|
||||
|
||||
# EXPERIMENTAL! Docker Support
|
||||
# see:
|
||||
# - https://github.com/mostlygeek/llama-swap/pull/40
|
||||
# - https://github.com/mostlygeek/llama-swap/issues/35
|
||||
# Docker Support (v26.1.4+ required!)
|
||||
"dockertest":
|
||||
proxy: "http://127.0.0.1:9790"
|
||||
|
||||
# use this to reliably stop named containers
|
||||
cmd_stop: docker stop -t 2 dockertest
|
||||
|
||||
cmd: >
|
||||
docker run --name dockertest
|
||||
--init --rm -p 9790:8080 -v /mnt/nvme/models:/models
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
type ModelConfig struct {
|
||||
Cmd string `yaml:"cmd"`
|
||||
CmdStop string `yaml:"cmd_stop"`
|
||||
Proxy string `yaml:"proxy"`
|
||||
Aliases []string `yaml:"aliases"`
|
||||
Env []string `yaml:"env"`
|
||||
@@ -23,9 +22,6 @@ type ModelConfig struct {
|
||||
func (m *ModelConfig) SanitizedCommand() ([]string, error) {
|
||||
return SanitizeCommand(m.Cmd)
|
||||
}
|
||||
func (m *ModelConfig) SanitizeCommandStop() ([]string, error) {
|
||||
return SanitizeCommand(m.CmdStop)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
|
||||
|
||||
@@ -35,11 +35,6 @@ models:
|
||||
aliases:
|
||||
- "m2"
|
||||
checkEndpoint: "/"
|
||||
docker:
|
||||
cmd: docker run -p 9999:8080 --name "my_container"
|
||||
cmd_stop: docker stop my_container
|
||||
proxy: "http://localhost:9999"
|
||||
checkEndpoint: "/health"
|
||||
healthCheckTimeout: 15
|
||||
profiles:
|
||||
test:
|
||||
@@ -61,7 +56,6 @@ profiles:
|
||||
Models: map[string]ModelConfig{
|
||||
"model1": {
|
||||
Cmd: "path/to/cmd --arg1 one",
|
||||
CmdStop: "",
|
||||
Proxy: "http://localhost:8080",
|
||||
Aliases: []string{"m1", "model-one"},
|
||||
Env: []string{"VAR1=value1", "VAR2=value2"},
|
||||
@@ -69,19 +63,11 @@ profiles:
|
||||
},
|
||||
"model2": {
|
||||
Cmd: "path/to/cmd --arg1 one",
|
||||
CmdStop: "",
|
||||
Proxy: "http://localhost:8081",
|
||||
Aliases: []string{"m2"},
|
||||
Env: nil,
|
||||
CheckEndpoint: "/",
|
||||
},
|
||||
"docker": {
|
||||
Cmd: `docker run -p 9999:8080 --name "my_container"`,
|
||||
CmdStop: "docker stop my_container",
|
||||
Proxy: "http://localhost:9999",
|
||||
Env: nil,
|
||||
CheckEndpoint: "/health",
|
||||
},
|
||||
},
|
||||
HealthCheckTimeout: 15,
|
||||
Profiles: map[string][]string{
|
||||
@@ -113,18 +99,6 @@ func TestConfig_ModelConfigSanitizedCommand(t *testing.T) {
|
||||
assert.Equal(t, []string{"python", "model1.py", "--arg1", "value1", "--arg2", "value2"}, args)
|
||||
}
|
||||
|
||||
func TestConfig_ModelConfigSanitizedCommandStop(t *testing.T) {
|
||||
config := &ModelConfig{
|
||||
CmdStop: `docker stop my_container \
|
||||
--arg1 1
|
||||
--arg2 2`,
|
||||
}
|
||||
|
||||
args, err := config.SanitizeCommandStop()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []string{"docker", "stop", "my_container", "--arg1", "1", "--arg2", "2"}, args)
|
||||
}
|
||||
|
||||
func TestConfig_FindConfig(t *testing.T) {
|
||||
|
||||
// TODO?
|
||||
|
||||
+30
-92
@@ -2,7 +2,6 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -86,36 +85,11 @@ func (p *Process) start() error {
|
||||
// 3. The health check passes
|
||||
//
|
||||
// only in the third case will the process be considered Ready to accept
|
||||
healthCheckContext, cancelHealthCheck := context.WithCancelCause(context.Background())
|
||||
defer cancelHealthCheck(nil) // clean up
|
||||
cmdWaitChan := make(chan error, 1)
|
||||
healthCheckChan := make(chan error, 1)
|
||||
<-time.After(250 * time.Millisecond) // give process a bit of time to start
|
||||
|
||||
go func() {
|
||||
// possible cmd exits early
|
||||
cmdWaitChan <- p.cmd.Wait()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
<-time.After(250 * time.Millisecond) // give process a bit of time to start
|
||||
healthCheckChan <- p.checkHealthEndpoint(healthCheckContext)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-cmdWaitChan:
|
||||
if err := p.checkHealthEndpoint(); err != nil {
|
||||
p.state = StateFailed
|
||||
if err != nil {
|
||||
err = fmt.Errorf("command [%s] %s", strings.Join(p.cmd.Args, " "), err.Error())
|
||||
} else {
|
||||
err = fmt.Errorf("command [%s] exited unexpected", strings.Join(p.cmd.Args, " "))
|
||||
}
|
||||
cancelHealthCheck(err)
|
||||
return err
|
||||
case err := <-healthCheckChan:
|
||||
if err != nil {
|
||||
p.state = StateFailed
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if p.config.UnloadAfter > 0 {
|
||||
@@ -153,7 +127,7 @@ func (p *Process) Stop() {
|
||||
defer p.stateMutex.Unlock()
|
||||
|
||||
if p.state != StateReady {
|
||||
fmt.Fprintf(p.logMonitor, "!!! Stop() called but Process State is not READY\n")
|
||||
fmt.Fprintf(p.logMonitor, "!!! Info - Stop() called but Process State is not READY\n")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -164,58 +138,35 @@ func (p *Process) Stop() {
|
||||
return
|
||||
}
|
||||
|
||||
// Pretty sure this stopping code needs some work for windows and
|
||||
// will be a source of pain in the future.
|
||||
sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if p.config.CmdStop != "" {
|
||||
// for issue #35 to do things like `docker stop`
|
||||
args, err := p.config.SanitizeCommandStop()
|
||||
sigtermNormal := make(chan error, 1)
|
||||
go func() {
|
||||
sigtermNormal <- p.cmd.Wait()
|
||||
}()
|
||||
|
||||
p.cmd.Process.Signal(syscall.SIGTERM)
|
||||
|
||||
select {
|
||||
case <-sigtermTimeout.Done():
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] timed out waiting to stop, sending KILL signal\n", p.ID)
|
||||
p.cmd.Process.Kill()
|
||||
case err := <-sigtermNormal:
|
||||
if err != nil {
|
||||
fmt.Fprintf(p.logMonitor, "!!! Error sanitizing stop command: %v\n", err)
|
||||
|
||||
// leave the state as it is?
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(p.logMonitor, "!!! Running stop command: %s\n", strings.Join(args, " "))
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
cmd.Stdout = p.logMonitor
|
||||
cmd.Stderr = p.logMonitor
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
fmt.Fprintf(p.logMonitor, "!!! Error running stop command: %v\n", err)
|
||||
|
||||
// leave the state as it is?
|
||||
return
|
||||
}
|
||||
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
fmt.Fprintf(p.logMonitor, "!!! WARNING error waiting for stop command to complete: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
sigtermTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
sigtermNormal := make(chan error, 1)
|
||||
go func() {
|
||||
sigtermNormal <- p.cmd.Wait()
|
||||
}()
|
||||
|
||||
p.cmd.Process.Signal(syscall.SIGTERM)
|
||||
|
||||
select {
|
||||
case <-sigtermTimeout.Done():
|
||||
fmt.Fprintf(p.logMonitor, "XXX Process for %s timed out waiting to stop, sending SIGKILL to PID: %d\n", p.ID, p.cmd.Process.Pid)
|
||||
p.cmd.Process.Kill()
|
||||
p.cmd.Wait()
|
||||
case err := <-sigtermNormal:
|
||||
if err != nil {
|
||||
if err.Error() != "wait: no child processes" {
|
||||
// possible that simple-responder for testing is just not
|
||||
// existing right, so suppress those errors.
|
||||
fmt.Fprintf(p.logMonitor, "!!! process for %s stopped with error > %v\n", p.ID, err)
|
||||
if errno, ok := err.(syscall.Errno); ok {
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] errno >> %v\n", p.ID, errno)
|
||||
} else if exitError, ok := err.(*exec.ExitError); ok {
|
||||
if strings.Contains(exitError.String(), "signal: terminated") {
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] stopped OK\n", p.ID)
|
||||
} else if strings.Contains(exitError.String(), "signal: interrupt") {
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] interrupted OK\n", p.ID)
|
||||
} else {
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] ExitError >> %v, exit code: %d\n", p.ID, exitError, exitError.ExitCode())
|
||||
}
|
||||
|
||||
} else {
|
||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] exited >> %v\n", p.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -229,7 +180,7 @@ func (p *Process) CurrentState() ProcessState {
|
||||
return p.state
|
||||
}
|
||||
|
||||
func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error {
|
||||
func (p *Process) checkHealthEndpoint() error {
|
||||
if p.config.Proxy == "" {
|
||||
return fmt.Errorf("no upstream available to check /health")
|
||||
}
|
||||
@@ -261,24 +212,11 @@ func (p *Process) checkHealthEndpoint(ctxFromStart context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctxFromStart, time.Second)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := client.Do(req)
|
||||
|
||||
ttl := (maxDuration - time.Since(startTime)).Seconds()
|
||||
|
||||
if err != nil {
|
||||
// check if the context was cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := context.Cause(ctx)
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
// wait a bit longer for TCP connection issues
|
||||
if strings.Contains(err.Error(), "connection refused") {
|
||||
fmt.Fprintf(p.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl)
|
||||
|
||||
Reference in New Issue
Block a user