Compare commits

...

6 Commits

Author SHA1 Message Date
Benson Wong 06eda7f591 tag all process logs with its ID (#103)
Makes identifying Process of log messages easier
2025-04-25 12:58:25 -07:00
Benson Wong 5fad24c16f Make checkHealthTimeout Interruptable during startup (#102)
interrupt and exit Process.start() early if the upstream process exits prematurely or unexpectedly.
2025-04-24 14:39:33 -07:00
Benson Wong 8404244fab Moderate security update for golang/x/net -> v0.38.0 2025-04-24 09:58:40 -07:00
Benson Wong 712cd01081 fix confusing INFO message [no ci] 2025-04-24 09:56:20 -07:00
Benson Wong 1f7aa359b1 Update header image
AI has finally made my dreams of llamas in funny clothing and stuck in
a claw machine waiting to be picked come true!
2025-04-23 13:02:12 -07:00
Benson Wong b138d6cf25 fix starhistory in README 2025-04-15 20:23:46 -07:00
8 changed files with 114 additions and 44 deletions
+2 -6
View File
@@ -1,4 +1,4 @@
![llama-swap header image](header.jpeg)
![llama-swap header image](header2.png)
![GitHub Downloads (all assets, all releases)](https://img.shields.io/github/downloads/mostlygeek/llama-swap/total)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/mostlygeek/llama-swap/go-ci.yml)
![GitHub Repo stars](https://img.shields.io/github/stars/mostlygeek/llama-swap)
@@ -269,8 +269,4 @@ WantedBy=multi-user.target
## Star History
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date&theme=dark" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date" />
</picture>
[![Star History Chart](https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date)](https://www.star-history.com/#mostlygeek/llama-swap&Date)
+1 -1
View File
@@ -37,7 +37,7 @@ require (
github.com/ugorji/go/codec v1.2.12 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
+2
View File
@@ -86,6 +86,8 @@ golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 351 KiB

+52 -26
View File
@@ -34,6 +34,9 @@ type Process struct {
config ModelConfig
cmd *exec.Cmd
// for p.cmd.Wait() select { ... }
cmdWaitChan chan error
processLogger *LogMonitor
proxyLogger *LogMonitor
@@ -61,6 +64,7 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo
ID: ID,
config: config,
cmd: nil,
cmdWaitChan: make(chan error, 1),
processLogger: processLogger,
proxyLogger: proxyLogger,
healthCheckTimeout: healthCheckTimeout,
@@ -89,16 +93,17 @@ func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState,
defer p.stateMutex.Unlock()
if p.state != expectedState {
p.proxyLogger.Warnf("<%s> swapState() Unexpected current state %s, expected %s", p.ID, p.state, expectedState)
return p.state, ErrExpectedStateMismatch
}
if !isValidTransition(p.state, newState) {
p.proxyLogger.Warnf("Invalid state transition from %s to %s", p.state, newState)
p.proxyLogger.Warnf("<%s> swapState() Invalid state transition from %s to %s", p.ID, p.state, newState)
return p.state, ErrInvalidStateTransition
}
p.proxyLogger.Debugf("State transition from %s to %s", expectedState, newState)
p.state = newState
p.proxyLogger.Debugf("<%s> swapState() State transitioned from %s to %s", p.ID, expectedState, newState)
return p.state, nil
}
@@ -179,6 +184,13 @@ func (p *Process) start() error {
return fmt.Errorf("start() failed: %v", err)
}
// Capture the exit error for later signaling
go func() {
exitErr := p.cmd.Wait()
p.proxyLogger.Debugf("<%s> cmd.Wait() returned error: %v", p.ID, exitErr)
p.cmdWaitChan <- exitErr
}()
// One of three things can happen at this stage:
// 1. The command exits unexpectedly
// 2. The health check fails
@@ -222,18 +234,34 @@ func (p *Process) start() error {
}
case <-p.shutdownCtx.Done():
return errors.New("health check interrupted due to shutdown")
case exitErr := <-p.cmdWaitChan:
if exitErr != nil {
p.proxyLogger.Warnf("<%s> upstream command exited prematurely with error: %v", p.ID, exitErr)
if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
return fmt.Errorf("upstream command exited unexpectedly: %s AND state swap failed: %v, current state: %v", exitErr.Error(), err, curState)
} else {
return fmt.Errorf("upstream command exited unexpectedly: %s", exitErr.Error())
}
} else {
p.proxyLogger.Warnf("<%s> upstream command exited prematurely but successfully", p.ID)
if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
return fmt.Errorf("upstream command exited prematurely but successfully AND state swap failed: %v, current state: %v", err, curState)
} else {
return fmt.Errorf("upstream command exited prematurely but successfully")
}
}
default:
if err := p.checkHealthEndpoint(healthURL); err == nil {
p.proxyLogger.Infof("Health check passed on %s", healthURL)
p.proxyLogger.Infof("<%s> Health check passed on %s", p.ID, healthURL)
cancelHealthCheck()
break loop
} else {
if strings.Contains(err.Error(), "connection refused") {
endTime, _ := checkDeadline.Deadline()
ttl := time.Until(endTime)
p.proxyLogger.Infof("Connection refused on %s, retrying in %.0fs", healthURL, ttl.Seconds())
p.proxyLogger.Infof("<%s> Connection refused on %s, giving up in %.0fs", p.ID, healthURL, ttl.Seconds())
} else {
p.proxyLogger.Infof("Health check error on %s, %v", healthURL, err)
p.proxyLogger.Infof("<%s> Health check error on %s, %v", p.ID, healthURL, err)
}
}
}
@@ -257,8 +285,7 @@ func (p *Process) start() error {
p.inFlightRequests.Wait()
if time.Since(p.lastRequestHandled) > maxDuration {
p.proxyLogger.Infof("Unloading model %s, TTL of %ds reached.", p.ID, p.config.UnloadAfter)
p.proxyLogger.Infof("<%s> Unloading model, TTL of %ds reached", p.ID, p.config.UnloadAfter)
p.Stop()
return
}
@@ -276,10 +303,11 @@ func (p *Process) start() error {
func (p *Process) Stop() {
// wait for any inflight requests before proceeding
p.inFlightRequests.Wait()
p.proxyLogger.Debugf("<%s> Stopping process", p.ID)
// calling Stop() when state is invalid is a no-op
if curState, err := p.swapState(StateReady, StateStopping); err != nil {
p.proxyLogger.Infof("Stop() Ready -> StateStopping err: %v, current state: %v", err, curState)
p.proxyLogger.Infof("<%s> Stop() Ready -> StateStopping err: %v, current state: %v", p.ID, err, curState)
return
}
@@ -287,7 +315,7 @@ func (p *Process) Stop() {
p.stopCommand(5 * time.Second)
if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
p.proxyLogger.Infof("Stop() StateStopping -> StateStopped err: %v, current state: %v", err, curState)
p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
}
}
@@ -305,51 +333,49 @@ func (p *Process) Shutdown() {
func (p *Process) stopCommand(sigtermTTL time.Duration) {
stopStartTime := time.Now()
defer func() {
p.proxyLogger.Debugf("Process [%s] stopCommand took %v", p.ID, time.Since(stopStartTime))
p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime))
}()
sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL)
defer cancelTimeout()
sigtermNormal := make(chan error, 1)
go func() {
sigtermNormal <- p.cmd.Wait()
}()
if p.cmd == nil || p.cmd.Process == nil {
p.proxyLogger.Warnf("Process [%s] cmd or cmd.Process is nil", p.ID)
p.proxyLogger.Warnf("<%s> cmd or cmd.Process is nil", p.ID)
return
}
if err := p.terminateProcess(); err != nil {
p.proxyLogger.Infof("Failed to gracefully terminate process [%s]: %v", p.ID, err)
p.proxyLogger.Infof("<%s> Failed to gracefully terminate process: %v", p.ID, err)
}
select {
case <-sigtermTimeout.Done():
p.proxyLogger.Infof("Process [%s] timed out waiting to stop, sending KILL signal", p.ID)
p.proxyLogger.Infof("<%s> Process timed out waiting to stop, sending KILL signal", p.ID)
p.cmd.Process.Kill()
case err := <-sigtermNormal:
case err := <-p.cmdWaitChan:
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
// because if we make it here then the cmd has been successfully running and made it
// through the health check. There is a possibility that ithe cmd crashed after the health check
// succeeded but that's not a case llama-swap is handling for now.
if err != nil {
if errno, ok := err.(syscall.Errno); ok {
p.proxyLogger.Errorf("Process [%s] errno >> %v", p.ID, errno)
p.proxyLogger.Errorf("<%s> errno >> %v", p.ID, errno)
} else if exitError, ok := err.(*exec.ExitError); ok {
if strings.Contains(exitError.String(), "signal: terminated") {
p.proxyLogger.Infof("Process [%s] stopped OK", p.ID)
p.proxyLogger.Infof("<%s> Process stopped OK", p.ID)
} else if strings.Contains(exitError.String(), "signal: interrupt") {
p.proxyLogger.Infof("Process [%s] interrupted OK", p.ID)
p.proxyLogger.Infof("<%s> Process interrupted OK", p.ID)
} else {
p.proxyLogger.Warnf("Process [%s] ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
p.proxyLogger.Warnf("<%s> ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
}
} else {
p.proxyLogger.Errorf("Process [%s] exited >> %v", p.ID, err)
p.proxyLogger.Errorf("<%s> Process exited >> %v", p.ID, err)
}
}
}
}
func (p *Process) checkHealthEndpoint(healthURL string) error {
client := &http.Client{
Timeout: 500 * time.Millisecond,
}
@@ -444,6 +470,6 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
}
totalTime := time.Since(requestBeginTime)
p.proxyLogger.Debugf("Process [%s] request %s - start: %v, total: %v",
p.proxyLogger.Debugf("<%s> request %s - start: %v, total: %v",
p.ID, r.RequestURI, startDuration, totalTime)
}
+39 -10
View File
@@ -2,9 +2,9 @@ package proxy
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
@@ -13,16 +13,25 @@ import (
)
var (
discardLogger = NewLogMonitorWriter(io.Discard)
debugLogger = NewLogMonitorWriter(os.Stdout)
)
func init() {
// flip to help with debugging tests
if false {
debugLogger.SetLogLevel(LevelDebug)
} else {
debugLogger.SetLogLevel(LevelError)
}
}
func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
expectedMessage := "testing91931"
config := getTestSimpleResponderConfig(expectedMessage)
// Create a process
process := NewProcess("test-process", 5, config, discardLogger, discardLogger)
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
defer process.Stop()
req := httptest.NewRequest("GET", "/test", nil)
@@ -58,7 +67,7 @@ func TestProcess_WaitOnMultipleStarts(t *testing.T) {
expectedMessage := "testing91931"
config := getTestSimpleResponderConfig(expectedMessage)
process := NewProcess("test-process", 5, config, discardLogger, discardLogger)
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
defer process.Stop()
var wg sync.WaitGroup
@@ -86,7 +95,7 @@ func TestProcess_BrokenModelConfig(t *testing.T) {
CheckEndpoint: "/health",
}
process := NewProcess("broken", 1, config, discardLogger, discardLogger)
process := NewProcess("broken", 1, config, debugLogger, debugLogger)
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
@@ -111,7 +120,7 @@ func TestProcess_UnloadAfterTTL(t *testing.T) {
config.UnloadAfter = 3 // seconds
assert.Equal(t, 3, config.UnloadAfter)
process := NewProcess("ttl_test", 2, config, discardLogger, discardLogger)
process := NewProcess("ttl_test", 2, config, debugLogger, debugLogger)
defer process.Stop()
// this should take 4 seconds
@@ -153,7 +162,7 @@ func TestProcess_LowTTLValue(t *testing.T) {
config.UnloadAfter = 1 // second
assert.Equal(t, 1, config.UnloadAfter)
process := NewProcess("ttl", 2, config, discardLogger, discardLogger)
process := NewProcess("ttl", 2, config, debugLogger, debugLogger)
defer process.Stop()
for i := 0; i < 100; i++ {
@@ -180,7 +189,7 @@ func TestProcess_HTTPRequestsHaveTimeToFinish(t *testing.T) {
expectedMessage := "12345"
config := getTestSimpleResponderConfig(expectedMessage)
process := NewProcess("t", 10, config, discardLogger, discardLogger)
process := NewProcess("t", 10, config, debugLogger, debugLogger)
defer process.Stop()
results := map[string]string{
@@ -257,7 +266,7 @@ func TestProcess_SwapState(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := NewProcess("test", 10, getTestSimpleResponderConfig("test"), discardLogger, discardLogger)
p := NewProcess("test", 10, getTestSimpleResponderConfig("test"), debugLogger, debugLogger)
p.state = test.currentState
resultState, err := p.swapState(test.expectedState, test.newState)
@@ -290,7 +299,7 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) {
config.Proxy = "http://localhost:9998/test"
healthCheckTTLSeconds := 30
process := NewProcess("test-process", healthCheckTTLSeconds, config, discardLogger, discardLogger)
process := NewProcess("test-process", healthCheckTTLSeconds, config, debugLogger, debugLogger)
// make it a lot faster
process.healthCheckLoopInterval = time.Second
@@ -311,3 +320,23 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) {
assert.ErrorContains(t, err, "health check interrupted due to shutdown")
assert.Equal(t, StateShutdown, process.CurrentState())
}
func TestProcess_ExitInterruptsHealthCheck(t *testing.T) {
if testing.Short() {
t.Skip("skipping Exit Interrupts Health Check test")
}
// should run and exit but interrupt the long checkHealthTimeout
checkHealthTimeout := 5
config := ModelConfig{
Cmd: "sleep 1",
Proxy: "http://127.0.0.1:9913",
CheckEndpoint: "/health",
}
process := NewProcess("sleepy", checkHealthTimeout, config, debugLogger, debugLogger)
process.healthCheckLoopInterval = time.Second // make it faster
err := process.start()
assert.Equal(t, "upstream command exited prematurely but successfully", err.Error())
assert.Equal(t, process.CurrentState(), StateFailed)
}
+5
View File
@@ -49,14 +49,19 @@ func New(config *Config) *ProxyManager {
switch strings.ToLower(strings.TrimSpace(config.LogLevel)) {
case "debug":
proxyLogger.SetLogLevel(LevelDebug)
upstreamLogger.SetLogLevel(LevelDebug)
case "info":
proxyLogger.SetLogLevel(LevelInfo)
upstreamLogger.SetLogLevel(LevelInfo)
case "warn":
proxyLogger.SetLogLevel(LevelWarn)
upstreamLogger.SetLogLevel(LevelWarn)
case "error":
proxyLogger.SetLogLevel(LevelError)
upstreamLogger.SetLogLevel(LevelError)
default:
proxyLogger.SetLogLevel(LevelInfo)
upstreamLogger.SetLogLevel(LevelInfo)
}
pm := &ProxyManager{
+13 -1
View File
@@ -22,6 +22,7 @@ func TestProxyManager_SwapProcessCorrectly(t *testing.T) {
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -62,6 +63,7 @@ func TestProxyManager_SwapMultiProcess(t *testing.T) {
Profiles: map[string][]string{
"test": {model1, model2},
},
LogLevel: "error",
}
proxy := New(config)
@@ -103,6 +105,7 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) {
"model2": getTestSimpleResponderConfig("model2"),
"model3": getTestSimpleResponderConfig("model3"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -153,6 +156,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
"model2": getTestSimpleResponderConfig("model2"),
"model3": getTestSimpleResponderConfig("model3"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -230,6 +234,7 @@ func TestProxyManager_ProfileNonMember(t *testing.T) {
Profiles: map[string][]string{
"test": {model1},
},
LogLevel: "error",
}
proxy := New(config)
@@ -278,6 +283,7 @@ func TestProxyManager_Shutdown(t *testing.T) {
"model2": model2Config,
"model3": model3Config,
},
LogLevel: "error",
}
proxy := New(config)
@@ -313,6 +319,7 @@ func TestProxyManager_Unload(t *testing.T) {
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -339,6 +346,7 @@ func TestProxyManager_StripProfileSlug(t *testing.T) {
Models: map[string]ModelConfig{
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -365,6 +373,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
Profiles: map[string][]string{
"test": {"model1", "model2"},
},
LogLevel: "error",
}
// Define a helper struct to parse the JSON response.
@@ -472,6 +481,7 @@ func TestProxyManager_AudioTranscriptionHandler(t *testing.T) {
Models: map[string]ModelConfig{
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
},
LogLevel: "error",
}
proxy := New(config)
@@ -580,6 +590,8 @@ func TestProxyManager_UseModelName(t *testing.T) {
Models: map[string]ModelConfig{
"model1": modelConfig,
},
LogLevel: "error",
}
proxy := New(config)
@@ -647,7 +659,7 @@ func TestProxyManager_CORSOptionsHandler(t *testing.T) {
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
},
LogRequests: true,
LogLevel: "error",
}
tests := []struct {