Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 63d4a7d0eb | |||
| f45469f7ff | |||
| 34f9fd7340 | |||
| 8448efa7fc |
+5
-2
@@ -1,6 +1,6 @@
|
|||||||
# Seconds to wait for llama.cpp to be available to serve requests
|
# Seconds to wait for llama.cpp to be available to serve requests
|
||||||
# Default (and minimum): 15 seconds
|
# Default (and minimum): 15 seconds
|
||||||
healthCheckTimeout: 60
|
healthCheckTimeout: 15
|
||||||
|
|
||||||
models:
|
models:
|
||||||
"llama":
|
"llama":
|
||||||
@@ -35,7 +35,10 @@ models:
|
|||||||
# until the upstream server is ready for traffic
|
# until the upstream server is ready for traffic
|
||||||
checkEndpoint: none
|
checkEndpoint: none
|
||||||
|
|
||||||
# don't use this, just for testing if things are broken
|
# don't use these, just for testing if things are broken
|
||||||
"broken":
|
"broken":
|
||||||
cmd: models/llama-server-osx --port 8999 -m models/doesnotexist.gguf
|
cmd: models/llama-server-osx --port 8999 -m models/doesnotexist.gguf
|
||||||
proxy: http://127.0.0.1:8999
|
proxy: http://127.0.0.1:8999
|
||||||
|
"broken_timeout":
|
||||||
|
cmd: models/llama-server-osx --port 8999 -m models/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
|
||||||
|
proxy: http://127.0.0.1:9000
|
||||||
+8
-2
@@ -30,13 +30,19 @@ func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) Write(p []byte) (n int, err error) {
|
func (w *LogMonitor) Write(p []byte) (n int, err error) {
|
||||||
|
if len(p) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
n, err = w.stdout.Write(p)
|
n, err = w.stdout.Write(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.bufferMu.Lock()
|
w.bufferMu.Lock()
|
||||||
w.buffer.Value = p
|
bufferCopy := make([]byte, len(p))
|
||||||
|
copy(bufferCopy, p)
|
||||||
|
w.buffer.Value = bufferCopy
|
||||||
w.buffer = w.buffer.Next()
|
w.buffer = w.buffer.Next()
|
||||||
w.bufferMu.Unlock()
|
w.bufferMu.Unlock()
|
||||||
|
|
||||||
@@ -49,7 +55,7 @@ func (w *LogMonitor) GetHistory() []byte {
|
|||||||
defer w.bufferMu.RUnlock()
|
defer w.bufferMu.RUnlock()
|
||||||
|
|
||||||
var history []byte
|
var history []byte
|
||||||
w.buffer.Do(func(p interface{}) {
|
w.buffer.Do(func(p any) {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
if content, ok := p.([]byte); ok {
|
if content, ok := p.([]byte); ok {
|
||||||
history = append(history, content...)
|
history = append(history, content...)
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -61,3 +62,34 @@ func TestLogMonitor(t *testing.T) {
|
|||||||
t.Errorf("Client2 expected %s, got: %s", expectedHistory, c2Data)
|
t.Errorf("Client2 expected %s, got: %s", expectedHistory, c2Data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWrite_ImmutableBuffer(t *testing.T) {
|
||||||
|
// Create a new LogMonitor instance
|
||||||
|
lm := NewLogMonitorWriter(io.Discard)
|
||||||
|
|
||||||
|
// Prepare a message to write
|
||||||
|
msg := []byte("Hello, World!")
|
||||||
|
lenmsg := len(msg)
|
||||||
|
|
||||||
|
// Write the message to the LogMonitor
|
||||||
|
n, err := lm.Write(msg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Write failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != lenmsg {
|
||||||
|
t.Errorf("Expected %d bytes written but got %d", lenmsg, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change the original message
|
||||||
|
msg[0] = 'B' // This should not affect the buffer
|
||||||
|
|
||||||
|
// Get the history from the LogMonitor
|
||||||
|
history := lm.GetHistory()
|
||||||
|
|
||||||
|
// Check that the history contains the original message, not the modified one
|
||||||
|
expected := []byte("Hello, World!")
|
||||||
|
if !bytes.Equal(history, expected) {
|
||||||
|
t.Errorf("Expected history to be %q, got %q", expected, history)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+41
-12
@@ -149,14 +149,28 @@ func (pm *ProxyManager) swapModel(requestedModel string) error {
|
|||||||
}
|
}
|
||||||
pm.currentCmd = cmd
|
pm.currentCmd = cmd
|
||||||
|
|
||||||
if err := pm.checkHealthEndpoint(); err != nil {
|
// watch for the command to exist
|
||||||
|
cmdCtx, cancel := context.WithCancelCause(context.Background())
|
||||||
|
|
||||||
|
// monitor the command's exist status
|
||||||
|
go func() {
|
||||||
|
err := cmd.Wait()
|
||||||
|
if err != nil {
|
||||||
|
cancel(fmt.Errorf("command [%s] %s", strings.Join(cmd.Args, " "), err.Error()))
|
||||||
|
} else {
|
||||||
|
cancel(nil)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for checkHealthEndpoint
|
||||||
|
if err := pm.checkHealthEndpoint(cmdCtx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProxyManager) checkHealthEndpoint() error {
|
func (pm *ProxyManager) checkHealthEndpoint(cmdCtx context.Context) error {
|
||||||
|
|
||||||
if pm.currentConfig.Proxy == "" {
|
if pm.currentConfig.Proxy == "" {
|
||||||
return fmt.Errorf("no upstream available to check /health")
|
return fmt.Errorf("no upstream available to check /health")
|
||||||
@@ -179,6 +193,7 @@ func (pm *ProxyManager) checkHealthEndpoint() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create health url with with %s and path %s", proxyTo, checkEndpoint)
|
return fmt.Errorf("failed to create health url with with %s and path %s", proxyTo, checkEndpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
@@ -187,33 +202,47 @@ func (pm *ProxyManager) checkHealthEndpoint() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond)
|
|
||||||
|
ctx, cancel := context.WithTimeout(cmdCtx, 250*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
req = req.WithContext(ctx)
|
req = req.WithContext(ctx)
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
|
|
||||||
|
ttl := (maxDuration - time.Since(startTime)).Seconds()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// check if the context was cancelled
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return context.Cause(ctx)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait a bit longer for TCP connection issues
|
||||||
if strings.Contains(err.Error(), "connection refused") {
|
if strings.Contains(err.Error(), "connection refused") {
|
||||||
|
fmt.Fprintf(pm.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl)
|
||||||
|
|
||||||
// if TCP dial can't connect any HTTP response after 5 seconds
|
time.Sleep(5 * time.Second)
|
||||||
// exit quickly.
|
} else {
|
||||||
if time.Since(startTime) > 5*time.Second {
|
time.Sleep(time.Second)
|
||||||
return fmt.Errorf("health check endpoint took more than 5 seconds to respond")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Since(startTime) >= maxDuration {
|
if ttl < 0 {
|
||||||
return fmt.Errorf("failed to check health from: %s", healthURL)
|
return fmt.Errorf("failed to check health from: %s", healthURL)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if time.Since(startTime) >= maxDuration {
|
|
||||||
|
if ttl < 0 {
|
||||||
return fmt.Errorf("failed to check health from: %s", healthURL)
|
return fmt.Errorf("failed to check health from: %s", healthURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -236,7 +265,7 @@ func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := pm.swapModel(model); err != nil {
|
if err := pm.swapModel(model); err != nil {
|
||||||
http.Error(w, fmt.Sprintf("unable to swap to model: %s", err.Error()), http.StatusNotFound)
|
http.Error(w, fmt.Sprintf("unable to swap to model, %s", err.Error()), http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user