proxy: Refactor tests (#660)
- use YAML for test configurations - remove most uses of simple-responder, opting to use process.testHandler Fixes #655
This commit is contained in:
@@ -0,0 +1,183 @@
|
|||||||
|
# Improve Testability (#655)
|
||||||
|
|
||||||
|
## Current Pain Points
|
||||||
|
|
||||||
|
1. **Tests bypass config loading** - ~80% of tests build `config.Config` structs directly, skipping YAML parsing, env var substitution, macro expansion, and `${PORT}` assignment. Config bugs in those paths go untested.
|
||||||
|
|
||||||
|
2. **simple-responder is everywhere** - Every proxy/routing test launches a real subprocess, waits for health checks (~healthCheckTimeout: 15), and manages process lifecycle just to test HTTP routing. Most of that overhead is wasted.
|
||||||
|
|
||||||
|
3. **Port counter is fragile** - A global `nextTestPort` counter starting at 12000 with a mutex. Parallel tests or leftover processes can collide.
|
||||||
|
|
||||||
|
## Stages
|
||||||
|
|
||||||
|
### Stage 1: YAML-based test config helper
|
||||||
|
|
||||||
|
**Goal:** Tests go through the real `LoadConfigFromReader` path instead of hand-building structs.
|
||||||
|
|
||||||
|
**Effort:** Low | **Impact:** Config bugs caught earlier | **Risk:** None
|
||||||
|
|
||||||
|
Create a test helper in `proxy/helpers_test.go`:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// testConfigFromYAML substitutes simple-responder paths and loads through
|
||||||
|
// the real config pipeline (env vars, macros, port assignment, etc.)
|
||||||
|
func testConfigFromYAML(t *testing.T, yamlTmpl string) config.Config {
|
||||||
|
t.Helper()
|
||||||
|
yamlStr := strings.ReplaceAll(yamlTmpl, "{{RESPONDER}}", filepath.ToSlash(simpleResponderPath))
|
||||||
|
cfg, err := config.LoadConfigFromReader(strings.NewReader(yamlStr))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Tests would then look like:
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestProxyManager_SwapProcessCorrectly(t *testing.T) {
|
||||||
|
config := testConfigFromYAML(t, `
|
||||||
|
healthCheckTimeout: 15
|
||||||
|
logLevel: error
|
||||||
|
models:
|
||||||
|
model1:
|
||||||
|
cmd: {{RESPONDER}} --port ${PORT} -silent -respond model1
|
||||||
|
model2:
|
||||||
|
cmd: {{RESPONDER}} --port ${PORT} -silent -respond model2
|
||||||
|
`)
|
||||||
|
proxy := New(config)
|
||||||
|
// ... same assertions
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Why this stage first:** Zero production code changes. Pure test-side refactoring. Can be done incrementally - migrate tests one at a time. Each migrated test now validates the full config pipeline.
|
||||||
|
|
||||||
|
**Scope:** ~20-30 tests in `proxymanager_test.go`, `processgroup_test.go`, `peerproxy_test.go`.
|
||||||
|
|
||||||
|
### Stage 2: Injected test handler (eliminate simple-responder for routing tests)
|
||||||
|
|
||||||
|
**Goal:** Replace simple-responder subprocess launches with an injected `http.Handler` for tests that don't specifically test process lifecycle.
|
||||||
|
|
||||||
|
**Effort:** Medium | **Impact:** 10-100x faster routing tests | **Risk:** Low (additive, no existing code broken)
|
||||||
|
|
||||||
|
Add a `testHandler http.Handler` field to `Process`. When set, `ProxyRequest` delegates directly to this handler instead of going through the reverse proxy. No subprocess, no health checks, no TCP roundtrip.
|
||||||
|
|
||||||
|
**2a. Add testHandler to Process:**
|
||||||
|
|
||||||
|
```go
|
||||||
|
// In Process struct (process.go):
|
||||||
|
testHandler http.Handler // set only in tests; bypasses subprocess and reverse proxy
|
||||||
|
```
|
||||||
|
|
||||||
|
In `Process.Start()`, skip subprocess + health check when handler is set:
|
||||||
|
|
||||||
|
```go
|
||||||
|
func (p *Process) start() error {
|
||||||
|
if p.testHandler != nil {
|
||||||
|
p.setState(StateReady)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// existing subprocess logic...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
In `Process.ProxyRequest()`, delegate directly to the handler:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Before the reverseProxy.ServeHTTP call:
|
||||||
|
if p.testHandler != nil {
|
||||||
|
p.testHandler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**2b. Test helper to create the handler:**
|
||||||
|
|
||||||
|
```go
|
||||||
|
// newTestHandler returns an http.Handler that mimics llama.cpp's API
|
||||||
|
// (same endpoints as simple-responder).
|
||||||
|
func newTestHandler(respond string) http.Handler {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { ... })
|
||||||
|
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { ... })
|
||||||
|
// ... other endpoints
|
||||||
|
return mux
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Tests for routing/auth/CORS/streaming then become:
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestProxyManager_AuthRequired(t *testing.T) {
|
||||||
|
handler := newTestHandler("model1")
|
||||||
|
|
||||||
|
config := testConfigFromYAML(t, `
|
||||||
|
healthCheckTimeout: 15
|
||||||
|
logLevel: error
|
||||||
|
requiredAPIKeys: [test-key]
|
||||||
|
models:
|
||||||
|
model1:
|
||||||
|
cmd: {{RESPONDER}} --port ${PORT} -silent -respond model1
|
||||||
|
`)
|
||||||
|
pm := NewProxyManager(config)
|
||||||
|
// inject handler — skips subprocess, health check, port allocation
|
||||||
|
pm.processGroups["model1"].process.testHandler = handler
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Why this matters:** The handler is called directly in-process. No subprocess spawn, no health check timeout, no port allocation, no TCP roundtrip, no reverse proxy overhead. Routing tests go from ~100ms each (process startup + health check) to ~1ms. Unlike an `httptest.Server` approach, there are zero network hops.
|
||||||
|
|
||||||
|
**Why not blank-cmd + proxy URL:** A blank `cmd` with a `proxy` field pointing at `httptest.Server` still requires a real TCP roundtrip through the reverse proxy and introduces "external process" semantics to the config schema. Injecting the handler directly keeps it purely a test concern with no config changes.
|
||||||
|
|
||||||
|
**Scope:** Most tests in `proxymanager_test.go` (auth, CORS, model listing, streaming, peer proxy), `peerproxy_test.go`, `metrics_monitor_test.go`.
|
||||||
|
|
||||||
|
### Stage 3: Migrate tests incrementally
|
||||||
|
|
||||||
|
**Goal:** Convert existing tests to use the Stage 1 + Stage 2 helpers.
|
||||||
|
|
||||||
|
**Effort:** Medium | **Impact:** Cleaner, more reliable tests | **Risk:** None
|
||||||
|
|
||||||
|
Priority order:
|
||||||
|
1. `proxymanager_test.go` routing tests (highest count, most repetition)
|
||||||
|
2. `peerproxy_test.go` (straightforward, all HTTP routing)
|
||||||
|
3. `metrics_monitor_test.go` (capture logic doesn't need real processes)
|
||||||
|
4. `processgroup_test.go` swap tests (keep simple-responder for actual swap lifecycle tests)
|
||||||
|
|
||||||
|
Tests that **must keep simple-responder:**
|
||||||
|
- Process lifecycle: start/stop, SIGKILL, SIGTERM, TTL expiry, health check failures, failed start counting
|
||||||
|
- ProcessGroup swap concurrency (the port-collision test in `TestProcessGroup_ProxyRequestSwapIsTrueParallel`)
|
||||||
|
|
||||||
|
**Scope:** ~60-70% of tests can drop simple-responder.
|
||||||
|
|
||||||
|
### Stage 4 (optional): Process interface for ProcessGroup
|
||||||
|
|
||||||
|
**Goal:** Enable pure unit tests of ProcessGroup's swap/exclusive/concurrency logic without any HTTP server at all.
|
||||||
|
|
||||||
|
**Effort:** High | **Impact:** Pure unit tests possible | **Risk:** Medium (refactor core code)
|
||||||
|
|
||||||
|
```go
|
||||||
|
type ProcessController interface {
|
||||||
|
Start() error
|
||||||
|
Stop(StopStrategy)
|
||||||
|
ProxyRequest(http.ResponseWriter, *http.Request) error
|
||||||
|
CurrentState() ProcessState
|
||||||
|
ID() string
|
||||||
|
SetState(ProcessState) // for test setup
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
This requires:
|
||||||
|
- Extracting the interface
|
||||||
|
- A `MockProcess` implementation
|
||||||
|
- Refactoring `ProcessGroup` to use the interface instead of `*Process`
|
||||||
|
|
||||||
|
**Recommendation:** Only do this if ProcessGroup grows significantly more complex. Stages 1-3 give 80% of the benefit for 20% of the effort.
|
||||||
|
|
||||||
|
## Effort/Impact Summary
|
||||||
|
|
||||||
|
| Stage | Effort | Impact | Risk |
|
||||||
|
|-------|--------|--------|------|
|
||||||
|
| 1. YAML config helper | Low | Config bugs caught earlier | None |
|
||||||
|
| 2. Injected test handler | Medium | 10-100x faster routing tests | Low |
|
||||||
|
| 3. Migrate tests | Medium | Cleaner, more reliable tests | None |
|
||||||
|
| 4. Process interface | High | Pure unit tests possible | Medium |
|
||||||
|
|
||||||
|
**Recommended approach:** Do stages 1-3 in order. Each stage is independently valuable and can ship on its own. Stage 4 is deferred unless there's a specific need.
|
||||||
@@ -1,15 +1,22 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/mostlygeek/llama-swap/proxy/config"
|
"github.com/mostlygeek/llama-swap/proxy/config"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -66,6 +73,16 @@ func getTestPort() int {
|
|||||||
return port
|
return port
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testConfigFromYAML substitutes {{RESPONDER}} with the simple-responder path and
|
||||||
|
// loads through the real config pipeline (env vars, macros, port assignment, etc.)
|
||||||
|
func testConfigFromYAML(t *testing.T, yamlTmpl string) config.Config {
|
||||||
|
t.Helper()
|
||||||
|
yamlStr := strings.ReplaceAll(yamlTmpl, "{{RESPONDER}}", filepath.ToSlash(simpleResponderPath))
|
||||||
|
cfg, err := config.LoadConfigFromReader(strings.NewReader(yamlStr))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
func getTestSimpleResponderConfig(expectedMessage string) config.ModelConfig {
|
func getTestSimpleResponderConfig(expectedMessage string) config.ModelConfig {
|
||||||
return getTestSimpleResponderConfigPort(expectedMessage, getTestPort())
|
return getTestSimpleResponderConfigPort(expectedMessage, getTestPort())
|
||||||
}
|
}
|
||||||
@@ -88,3 +105,188 @@ proxy: "http://127.0.0.1:%d"
|
|||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// injectTestHandlers sets a testHandler on every Process in every ProcessGroup
|
||||||
|
// of the given ProxyManager, bypassing subprocess launches. modelResponses maps
|
||||||
|
// model IDs to their respond strings; if a model ID is not in the map, the model
|
||||||
|
// ID itself is used.
|
||||||
|
func injectTestHandlers(pm *ProxyManager, modelResponses map[string]string) {
|
||||||
|
for _, pg := range pm.processGroups {
|
||||||
|
for modelID, process := range pg.processes {
|
||||||
|
respond := modelID
|
||||||
|
if r, ok := modelResponses[modelID]; ok {
|
||||||
|
respond = r
|
||||||
|
}
|
||||||
|
process.testHandler = newTestHandler(respond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTestHandler returns an http.Handler that mimics simple-responder's API.
|
||||||
|
// It supports the endpoints that routing tests depend on, without launching
|
||||||
|
// any subprocess or binding any port.
|
||||||
|
func newTestHandler(respond string) http.Handler {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
|
mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
bodyBytes, _ := io.ReadAll(r.Body)
|
||||||
|
isStreaming := r.URL.Query().Get("stream") == "true"
|
||||||
|
|
||||||
|
if wait := r.URL.Query().Get("wait"); wait != "" {
|
||||||
|
if d, err := time.ParseDuration(wait); err == nil {
|
||||||
|
time.Sleep(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if isStreaming {
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
flusher := w.(http.Flusher)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
data, _ := json.Marshal(map[string]any{
|
||||||
|
"created": time.Now().Unix(),
|
||||||
|
"choices": []map[string]any{
|
||||||
|
{"index": 0, "delta": map[string]any{"content": "asdf"}, "finish_reason": nil},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
fmt.Fprintf(w, "event: message\ndata: %s\n\n", data)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
finalData, _ := json.Marshal(map[string]any{
|
||||||
|
"usage": map[string]any{
|
||||||
|
"completion_tokens": 10, "prompt_tokens": 25, "total_tokens": 35,
|
||||||
|
},
|
||||||
|
"timings": map[string]any{
|
||||||
|
"prompt_n": 25, "prompt_ms": 13, "predicted_n": 10,
|
||||||
|
"predicted_ms": 17, "predicted_per_second": 10,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
fmt.Fprintf(w, "event: message\ndata: %s\n\n", finalData)
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "event: message\ndata: [DONE]\n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
} else {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"responseMessage": respond,
|
||||||
|
"h_content_length": r.Header.Get("Content-Length"),
|
||||||
|
"request_body": string(bodyBytes),
|
||||||
|
"usage": map[string]any{
|
||||||
|
"completion_tokens": 10, "prompt_tokens": 25, "total_tokens": 35,
|
||||||
|
},
|
||||||
|
"timings": map[string]any{
|
||||||
|
"prompt_n": 25, "prompt_ms": 13, "predicted_n": 10,
|
||||||
|
"predicted_ms": 17, "predicted_per_second": 10,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/v1/audio/speech", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
modelName := gjson.GetBytes(body, "model").String()
|
||||||
|
if modelName != respond {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("Invalid model: %s, expected: %s", modelName, respond)})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"message": "ok"})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/v1/completions", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"responseMessage": respond,
|
||||||
|
"usage": map[string]any{
|
||||||
|
"completion_tokens": 10, "prompt_tokens": 25, "total_tokens": 35,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/completion", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"responseMessage": respond,
|
||||||
|
"usage": map[string]any{
|
||||||
|
"completion_tokens": 10, "prompt_tokens": 25, "total_tokens": 35,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/v1/audio/transcriptions", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if err := r.ParseMultipartForm(10 << 20); err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("Error parsing multipart form: %s", err)})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
model := r.FormValue("model")
|
||||||
|
if model == "" {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"error": "Missing model parameter"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
file, _, err := r.FormFile("file")
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("Error getting file: %s", err)})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fileBytes, _ := io.ReadAll(file)
|
||||||
|
file.Close()
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"text": fmt.Sprintf("The length of the file is %d bytes", len(fileBytes)),
|
||||||
|
"model": model,
|
||||||
|
"h_content_type": r.Header.Get("Content-Type"),
|
||||||
|
"h_content_length": r.Header.Get("Content-Length"),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/v1/audio/voices", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
model := r.URL.Query().Get("model")
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"voices": []string{"voice1"}, "model": model,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
fmt.Fprint(w, respond)
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/" {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
fmt.Fprintf(w, "%s %s", r.Method, r.URL.Path)
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/sdapi/v1/txt2img", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
modelName := gjson.GetBytes(body, "model").String()
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"model": modelName, "images": []string{},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/sdapi/v1/img2img", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
modelName := gjson.GetBytes(body, "model").String()
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"model": modelName, "images": []string{},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
mux.HandleFunc("/sdapi/v1/loras", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"loras": []string{},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
return mux
|
||||||
|
}
|
||||||
|
|||||||
@@ -77,6 +77,9 @@ type Process struct {
|
|||||||
// used for testing to override the default value
|
// used for testing to override the default value
|
||||||
gracefulStopTimeout time.Duration
|
gracefulStopTimeout time.Duration
|
||||||
|
|
||||||
|
// used for testing to bypass subprocess and reverse proxy
|
||||||
|
testHandler http.Handler
|
||||||
|
|
||||||
// track the number of failed starts
|
// track the number of failed starts
|
||||||
failedStartCount int
|
failedStartCount int
|
||||||
}
|
}
|
||||||
@@ -236,6 +239,49 @@ func (p *Process) forceState(newState ProcessState) {
|
|||||||
// at any time.
|
// at any time.
|
||||||
func (p *Process) start() error {
|
func (p *Process) start() error {
|
||||||
|
|
||||||
|
// test-only fast path: skip subprocess, health check, and TTL goroutine
|
||||||
|
if p.testHandler != nil {
|
||||||
|
if curState, err := p.swapState(StateStopped, StateStarting); err != nil {
|
||||||
|
if err == ErrExpectedStateMismatch {
|
||||||
|
if curState == StateStarting {
|
||||||
|
p.waitStarting.Wait()
|
||||||
|
curState = p.CurrentState()
|
||||||
|
if curState == StateReady {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("process was already starting but wound up in state %v", curState)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("process was in state %v when start() was called", curState)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to set Process state to starting: current state: %v, error: %v", curState, err)
|
||||||
|
}
|
||||||
|
defer p.waitStarting.Done()
|
||||||
|
|
||||||
|
// Mimic the real stop path: cancelUpstream transitions
|
||||||
|
// StateStopping -> StateStopped and closes cmdWaitChan,
|
||||||
|
// matching what waitForCmd does for real subprocesses.
|
||||||
|
ch := make(chan struct{})
|
||||||
|
p.cmdMutex.Lock()
|
||||||
|
p.cancelUpstream = func() {
|
||||||
|
if curState := p.CurrentState(); curState == StateStopping {
|
||||||
|
if _, err := p.swapState(StateStopping, StateStopped); err != nil {
|
||||||
|
p.forceState(StateStopped)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p.forceState(StateStopped)
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}
|
||||||
|
p.cmdWaitChan = ch
|
||||||
|
p.cmdMutex.Unlock()
|
||||||
|
|
||||||
|
if curState, err := p.swapState(StateStarting, StateReady); err != nil {
|
||||||
|
return fmt.Errorf("failed to set Process state to ready: current state: %v, error: %v", curState, err)
|
||||||
|
}
|
||||||
|
p.failedStartCount = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if p.config.Proxy == "" {
|
if p.config.Proxy == "" {
|
||||||
return fmt.Errorf("can not start(), upstream proxy missing")
|
return fmt.Errorf("can not start(), upstream proxy missing")
|
||||||
}
|
}
|
||||||
@@ -577,6 +623,11 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
|||||||
if !srw.waitForCompletion(completionTimeout) {
|
if !srw.waitForCompletion(completionTimeout) {
|
||||||
p.proxyLogger.Warnf("<%s> status updates goroutine did not complete within %v, proceeding with proxy request", p.ID, completionTimeout)
|
p.proxyLogger.Warnf("<%s> status updates goroutine did not complete within %v, proceeding with proxy request", p.ID, completionTimeout)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.testHandler != nil {
|
||||||
|
p.testHandler.ServeHTTP(w, r)
|
||||||
|
} else if srw != nil {
|
||||||
p.reverseProxy.ServeHTTP(srw, r)
|
p.reverseProxy.ServeHTTP(srw, r)
|
||||||
} else {
|
} else {
|
||||||
p.reverseProxy.ServeHTTP(w, r)
|
p.reverseProxy.ServeHTTP(w, r)
|
||||||
|
|||||||
+322
-339
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user