Compare commits

...

4 Commits

Author SHA1 Message Date
Benson Wong 63d4a7d0eb Improve LogMonitor to handle empty writes and ensure buffer immutability
- Add a check to return immediately if the write buffer is empty
- Create a copy of new history data to ensure it is immutable
- Update the `GetHistory` method to use the `any` type for the buffer interface
- Add a test case to verify that the buffer remains unchanged
  even if the original message is modified after writing
2024-11-02 10:41:23 -07:00
Benson Wong f45469f7ff Merge pull request #8 from mostlygeek/improve-upstream-monitoring-issue5
Improvements to handling of the upstream process so errors happen whenever one of these is first:

    the health check timeout is reached waiting for the upstream process to be ready
    the upstream process exits unexpectedly

With this change llama-swap is more compatible with use cases like containerized upstream services (#5) which pull the container before HTTP endpoints are ready.
2024-11-01 15:28:06 -07:00
Benson Wong 34f9fd7340 Improve timeout and exit handling of child processes. fix #3 and #5
llama-swap only waited a maximum of 5 seconds for an upstream
HTTP server to be available. If it took longer than that it will error
out the request. Now it will wait up to the configured healthCheckTimeout
or the upstream process unexpectedly exits.
2024-11-01 14:32:39 -07:00
Benson Wong 8448efa7fc revise health check logic to not error on 5 second timeout 2024-11-01 09:42:37 -07:00
4 changed files with 89 additions and 19 deletions
+6 -3
View File
@@ -1,6 +1,6 @@
# Seconds to wait for llama.cpp to be available to serve requests
# Default (and minimum): 15 seconds
healthCheckTimeout: 60
healthCheckTimeout: 15
models:
"llama":
@@ -35,7 +35,10 @@ models:
# until the upstream server is ready for traffic
checkEndpoint: none
# don't use this, just for testing if things are broken
# don't use these, just for testing if things are broken
"broken":
cmd: models/llama-server-osx --port 8999 -m models/doesnotexist.gguf
proxy: http://127.0.0.1:8999
proxy: http://127.0.0.1:8999
"broken_timeout":
cmd: models/llama-server-osx --port 8999 -m models/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
proxy: http://127.0.0.1:9000
+8 -2
View File
@@ -30,13 +30,19 @@ func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
}
func (w *LogMonitor) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
n, err = w.stdout.Write(p)
if err != nil {
return n, err
}
w.bufferMu.Lock()
w.buffer.Value = p
bufferCopy := make([]byte, len(p))
copy(bufferCopy, p)
w.buffer.Value = bufferCopy
w.buffer = w.buffer.Next()
w.bufferMu.Unlock()
@@ -49,7 +55,7 @@ func (w *LogMonitor) GetHistory() []byte {
defer w.bufferMu.RUnlock()
var history []byte
w.buffer.Do(func(p interface{}) {
w.buffer.Do(func(p any) {
if p != nil {
if content, ok := p.([]byte); ok {
history = append(history, content...)
+32
View File
@@ -1,6 +1,7 @@
package proxy
import (
"bytes"
"io"
"sync"
"testing"
@@ -61,3 +62,34 @@ func TestLogMonitor(t *testing.T) {
t.Errorf("Client2 expected %s, got: %s", expectedHistory, c2Data)
}
}
func TestWrite_ImmutableBuffer(t *testing.T) {
// Create a new LogMonitor instance
lm := NewLogMonitorWriter(io.Discard)
// Prepare a message to write
msg := []byte("Hello, World!")
lenmsg := len(msg)
// Write the message to the LogMonitor
n, err := lm.Write(msg)
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if n != lenmsg {
t.Errorf("Expected %d bytes written but got %d", lenmsg, n)
}
// Change the original message
msg[0] = 'B' // This should not affect the buffer
// Get the history from the LogMonitor
history := lm.GetHistory()
// Check that the history contains the original message, not the modified one
expected := []byte("Hello, World!")
if !bytes.Equal(history, expected) {
t.Errorf("Expected history to be %q, got %q", expected, history)
}
}
+43 -14
View File
@@ -149,14 +149,28 @@ func (pm *ProxyManager) swapModel(requestedModel string) error {
}
pm.currentCmd = cmd
if err := pm.checkHealthEndpoint(); err != nil {
// watch for the command to exist
cmdCtx, cancel := context.WithCancelCause(context.Background())
// monitor the command's exist status
go func() {
err := cmd.Wait()
if err != nil {
cancel(fmt.Errorf("command [%s] %s", strings.Join(cmd.Args, " "), err.Error()))
} else {
cancel(nil)
}
}()
// wait for checkHealthEndpoint
if err := pm.checkHealthEndpoint(cmdCtx); err != nil {
return err
}
return nil
}
func (pm *ProxyManager) checkHealthEndpoint() error {
func (pm *ProxyManager) checkHealthEndpoint(cmdCtx context.Context) error {
if pm.currentConfig.Proxy == "" {
return fmt.Errorf("no upstream available to check /health")
@@ -179,6 +193,7 @@ func (pm *ProxyManager) checkHealthEndpoint() error {
if err != nil {
return fmt.Errorf("failed to create health url with with %s and path %s", proxyTo, checkEndpoint)
}
client := &http.Client{}
startTime := time.Now()
@@ -187,33 +202,47 @@ func (pm *ProxyManager) checkHealthEndpoint() error {
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond)
ctx, cancel := context.WithTimeout(cmdCtx, 250*time.Millisecond)
defer cancel()
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if TCP dial can't connect any HTTP response after 5 seconds
// exit quickly.
if time.Since(startTime) > 5*time.Second {
return fmt.Errorf("health check endpoint took more than 5 seconds to respond")
}
ttl := (maxDuration - time.Since(startTime)).Seconds()
if err != nil {
// check if the context was cancelled
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
if time.Since(startTime) >= maxDuration {
// wait a bit longer for TCP connection issues
if strings.Contains(err.Error(), "connection refused") {
fmt.Fprintf(pm.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl)
time.Sleep(5 * time.Second)
} else {
time.Sleep(time.Second)
}
if ttl < 0 {
return fmt.Errorf("failed to check health from: %s", healthURL)
}
time.Sleep(time.Second)
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
if time.Since(startTime) >= maxDuration {
if ttl < 0 {
return fmt.Errorf("failed to check health from: %s", healthURL)
}
time.Sleep(time.Second)
}
}
@@ -236,7 +265,7 @@ func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request)
}
if err := pm.swapModel(model); err != nil {
http.Error(w, fmt.Sprintf("unable to swap to model: %s", err.Error()), http.StatusNotFound)
http.Error(w, fmt.Sprintf("unable to swap to model, %s", err.Error()), http.StatusNotFound)
return
}