Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6ea551362e |
@@ -11,7 +11,6 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mostlygeek/llama-swap/internal/config"
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
@@ -22,6 +21,15 @@ import (
|
|||||||
|
|
||||||
var ErrStartAborted = fmt.Errorf("aborted")
|
var ErrStartAborted = fmt.Errorf("aborted")
|
||||||
|
|
||||||
|
// cmdWaitDelay is the upper bound the runtime will wait for child I/O to
|
||||||
|
// drain after the process exits before force-closing the stdout/stderr
|
||||||
|
// pipes. Required so that cmd.Wait() returns even when a forked grandchild
|
||||||
|
// inherits and holds the pipes open (e.g. a shell wrapper that backgrounds
|
||||||
|
// the real binary). killProcess sends the stop signal directly (not via the
|
||||||
|
// cmd context), so this delay is measured from process exit rather than from
|
||||||
|
// the stop request, and stays independent of the caller's graceful timeout.
|
||||||
|
const cmdWaitDelay = 10 * time.Second
|
||||||
|
|
||||||
type runReq struct {
|
type runReq struct {
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
respond chan error
|
respond chan error
|
||||||
@@ -39,6 +47,7 @@ type waitReadyReq struct {
|
|||||||
type startResult struct {
|
type startResult struct {
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
cmdDone chan struct{}
|
cmdDone chan struct{}
|
||||||
|
cancel context.CancelFunc
|
||||||
handlerFn http.HandlerFunc
|
handlerFn http.HandlerFunc
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
@@ -51,6 +60,11 @@ type ProcessCommand struct {
|
|||||||
processLogger *logmon.Monitor
|
processLogger *logmon.Monitor
|
||||||
proxyLogger *logmon.Monitor
|
proxyLogger *logmon.Monitor
|
||||||
|
|
||||||
|
// waitDelay is assigned to cmd.WaitDelay when starting the upstream
|
||||||
|
// process. Defaults to cmdWaitDelay; tests override it to keep the
|
||||||
|
// pipe-close backstop from dominating their runtime.
|
||||||
|
waitDelay time.Duration
|
||||||
|
|
||||||
runCh chan runReq
|
runCh chan runReq
|
||||||
stopCh chan stopReq
|
stopCh chan stopReq
|
||||||
waitReadyCh chan waitReadyReq
|
waitReadyCh chan waitReadyReq
|
||||||
@@ -85,6 +99,7 @@ func New(
|
|||||||
runCh: make(chan runReq),
|
runCh: make(chan runReq),
|
||||||
stopCh: make(chan stopReq),
|
stopCh: make(chan stopReq),
|
||||||
waitReadyCh: make(chan waitReadyReq),
|
waitReadyCh: make(chan waitReadyReq),
|
||||||
|
waitDelay: cmdWaitDelay,
|
||||||
}
|
}
|
||||||
p.state.Store(StateStopped)
|
p.state.Store(StateStopped)
|
||||||
|
|
||||||
@@ -122,6 +137,7 @@ func (p *ProcessCommand) run() {
|
|||||||
var (
|
var (
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
cmdDone <-chan struct{}
|
cmdDone <-chan struct{}
|
||||||
|
cmdCancel context.CancelFunc
|
||||||
readyWaiters []waitReadyReq
|
readyWaiters []waitReadyReq
|
||||||
// runResp parks the in-flight Run caller's response channel. The
|
// runResp parks the in-flight Run caller's response channel. The
|
||||||
// interface contract is that Run blocks until the process is
|
// interface contract is that Run blocks until the process is
|
||||||
@@ -164,9 +180,10 @@ func (p *ProcessCommand) run() {
|
|||||||
setState(StateShutdown)
|
setState(StateShutdown)
|
||||||
if cmd != nil {
|
if cmd != nil {
|
||||||
p.handler.Store(nil)
|
p.handler.Store(nil)
|
||||||
p.killProcess(cmd, cmdDone, 100*time.Millisecond)
|
p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond)
|
||||||
cmd = nil
|
cmd = nil
|
||||||
cmdDone = nil
|
cmdDone = nil
|
||||||
|
cmdCancel = nil
|
||||||
}
|
}
|
||||||
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
||||||
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
||||||
@@ -177,8 +194,12 @@ func (p *ProcessCommand) run() {
|
|||||||
// cmdDone is nil while no process is running, so this case is
|
// cmdDone is nil while no process is running, so this case is
|
||||||
// dormant outside of StateReady.
|
// dormant outside of StateReady.
|
||||||
case <-cmdDone:
|
case <-cmdDone:
|
||||||
|
if cmdCancel != nil {
|
||||||
|
cmdCancel()
|
||||||
|
}
|
||||||
cmd = nil
|
cmd = nil
|
||||||
cmdDone = nil
|
cmdDone = nil
|
||||||
|
cmdCancel = nil
|
||||||
p.handler.Store(nil)
|
p.handler.Store(nil)
|
||||||
setState(StateStopped)
|
setState(StateStopped)
|
||||||
respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id))
|
respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id))
|
||||||
@@ -226,6 +247,7 @@ func (p *ProcessCommand) run() {
|
|||||||
if res.err == nil {
|
if res.err == nil {
|
||||||
cmd = res.cmd
|
cmd = res.cmd
|
||||||
cmdDone = res.cmdDone
|
cmdDone = res.cmdDone
|
||||||
|
cmdCancel = res.cancel
|
||||||
fn := res.handlerFn
|
fn := res.handlerFn
|
||||||
p.handler.Store(&fn)
|
p.handler.Store(&fn)
|
||||||
setState(StateReady)
|
setState(StateReady)
|
||||||
@@ -273,7 +295,7 @@ func (p *ProcessCommand) run() {
|
|||||||
cancelStart()
|
cancelStart()
|
||||||
res := <-resultCh
|
res := <-resultCh
|
||||||
if res.cmd != nil {
|
if res.cmd != nil {
|
||||||
p.killProcess(res.cmd, res.cmdDone, stop.timeout)
|
p.killProcess(res.cmd, res.cancel, res.cmdDone, stop.timeout)
|
||||||
}
|
}
|
||||||
setState(StateStopped)
|
setState(StateStopped)
|
||||||
notifyWaiters(ErrStartAborted)
|
notifyWaiters(ErrStartAborted)
|
||||||
@@ -293,7 +315,7 @@ func (p *ProcessCommand) run() {
|
|||||||
setState(StateShutdown)
|
setState(StateShutdown)
|
||||||
res := <-resultCh
|
res := <-resultCh
|
||||||
if res.cmd != nil {
|
if res.cmd != nil {
|
||||||
p.killProcess(res.cmd, res.cmdDone, 100*time.Millisecond)
|
p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond)
|
||||||
}
|
}
|
||||||
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
||||||
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
||||||
@@ -310,9 +332,10 @@ func (p *ProcessCommand) run() {
|
|||||||
case stop := <-p.stopCh:
|
case stop := <-p.stopCh:
|
||||||
if cmd != nil {
|
if cmd != nil {
|
||||||
setState(StateStopping)
|
setState(StateStopping)
|
||||||
p.killProcess(cmd, cmdDone, stop.timeout)
|
p.killProcess(cmd, cmdCancel, cmdDone, stop.timeout)
|
||||||
cmd = nil
|
cmd = nil
|
||||||
cmdDone = nil
|
cmdDone = nil
|
||||||
|
cmdCancel = nil
|
||||||
p.handler.Store(nil)
|
p.handler.Store(nil)
|
||||||
}
|
}
|
||||||
// Stop is a no-op (and not an error) when already Stopped — this
|
// Stop is a no-op (and not an error) when already Stopped — this
|
||||||
@@ -377,16 +400,26 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
|||||||
reverseProxy.ServeHTTP(w, r)
|
reverseProxy.ServeHTTP(w, r)
|
||||||
})
|
})
|
||||||
|
|
||||||
cmd := exec.Command(args[0], args[1:]...)
|
// cmdCtx + cmd.Cancel are wired as a safety net: if the context is ever
|
||||||
|
// cancelled while the process is alive, cmd.Cancel sends SIGTERM / CmdStop
|
||||||
|
// and the runtime escalates to SIGKILL after cmd.WaitDelay. In the normal
|
||||||
|
// teardown path killProcess sends the stop signal directly instead, so
|
||||||
|
// cmd.WaitDelay only acts as the inherited-pipe backstop measured from
|
||||||
|
// process exit (see killProcess).
|
||||||
|
cmdCtx, cmdCancel := context.WithCancel(context.Background())
|
||||||
|
cmd := exec.CommandContext(cmdCtx, args[0], args[1:]...)
|
||||||
cmd.Stderr = p.processLogger
|
cmd.Stderr = p.processLogger
|
||||||
cmd.Stdout = p.processLogger
|
cmd.Stdout = p.processLogger
|
||||||
cmd.Env = append(cmd.Environ(), p.config.Env...)
|
cmd.Env = append(cmd.Environ(), p.config.Env...)
|
||||||
|
cmd.Cancel = func() error { return p.sendStopSignal(cmd) }
|
||||||
|
cmd.WaitDelay = p.waitDelay
|
||||||
setProcAttributes(cmd)
|
setProcAttributes(cmd)
|
||||||
|
|
||||||
p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", "))
|
p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", "))
|
||||||
|
|
||||||
cmdDone := make(chan struct{})
|
cmdDone := make(chan struct{})
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
|
cmdCancel()
|
||||||
return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)}
|
return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,21 +435,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
|||||||
close(cmdDone)
|
close(cmdDone)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
abort := func(err error) startResult {
|
||||||
|
p.killProcess(cmd, cmdCancel, cmdDone, 5*time.Second)
|
||||||
|
return startResult{err: err}
|
||||||
|
}
|
||||||
|
prematureExit := func() startResult {
|
||||||
|
cmdCancel()
|
||||||
|
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
||||||
|
}
|
||||||
|
|
||||||
if startCtx.Err() != nil {
|
if startCtx.Err() != nil {
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(ErrStartAborted)
|
||||||
return startResult{err: ErrStartAborted}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint)
|
checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint)
|
||||||
if checkEndpoint == "none" {
|
if checkEndpoint == "none" {
|
||||||
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn}
|
return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait 250ms for the command to start up before health checking
|
// Wait 250ms for the command to start up before health checking
|
||||||
select {
|
select {
|
||||||
case <-startCtx.Done():
|
case <-startCtx.Done():
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(ErrStartAborted)
|
||||||
return startResult{err: ErrStartAborted}
|
|
||||||
case <-time.After(250 * time.Millisecond):
|
case <-time.After(250 * time.Millisecond):
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -424,16 +464,14 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-startCtx.Done():
|
case <-startCtx.Done():
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(ErrStartAborted)
|
||||||
return startResult{err: ErrStartAborted}
|
|
||||||
case <-cmdDone:
|
case <-cmdDone:
|
||||||
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
return prematureExit()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().After(deadline) {
|
if time.Now().After(deadline) {
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(fmt.Errorf("health check timed out after %v", healthCheckTimeout))
|
||||||
return startResult{err: fmt.Errorf("health check timed out after %v", healthCheckTimeout)}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil)
|
req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil)
|
||||||
@@ -445,28 +483,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
|||||||
p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint)
|
p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint)
|
||||||
break
|
break
|
||||||
} else if startCtx.Err() != nil {
|
} else if startCtx.Err() != nil {
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(ErrStartAborted)
|
||||||
return startResult{err: ErrStartAborted}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-startCtx.Done():
|
case <-startCtx.Done():
|
||||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
return abort(ErrStartAborted)
|
||||||
return startResult{err: ErrStartAborted}
|
|
||||||
case <-cmdDone:
|
case <-cmdDone:
|
||||||
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
return prematureExit()
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn}
|
return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gracefulTimeout time.Duration) {
|
// sendStopSignal runs the configured CmdStop (if any) or sends SIGTERM to
|
||||||
|
// the upstream process. Wired up as cmd.Cancel so it fires whenever the
|
||||||
|
// cmd's context is cancelled.
|
||||||
|
func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error {
|
||||||
if cmd == nil || cmd.Process == nil {
|
if cmd == nil || cmd.Process == nil {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.config.CmdStop != "" {
|
if p.config.CmdStop != "" {
|
||||||
stopArgs, err := config.SanitizeCommand(
|
stopArgs, err := config.SanitizeCommand(
|
||||||
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)),
|
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)),
|
||||||
@@ -475,12 +513,48 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
|
|||||||
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
|
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
|
||||||
stopCmd.Env = cmd.Env
|
stopCmd.Env = cmd.Env
|
||||||
setProcAttributes(stopCmd)
|
setProcAttributes(stopCmd)
|
||||||
stopCmd.Run()
|
return stopCmd.Run()
|
||||||
} else {
|
|
||||||
cmd.Process.Signal(syscall.SIGTERM)
|
|
||||||
}
|
}
|
||||||
} else {
|
// fall through to SIGTERM if sanitize failed
|
||||||
cmd.Process.Signal(syscall.SIGTERM)
|
}
|
||||||
|
// On Unix this SIGTERMs the whole process group so a forked grandchild
|
||||||
|
// (e.g. a shell wrapper that backgrounds the real binary) is taken down
|
||||||
|
// with the parent rather than orphaned.
|
||||||
|
return terminateProcessTree(cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// killProcess terminates the upstream process. The flow:
|
||||||
|
//
|
||||||
|
// 1. Send the graceful stop signal (CmdStop / SIGTERM) directly — NOT by
|
||||||
|
// cancelling cmdCtx. Cancelling the context would start cmd.WaitDelay
|
||||||
|
// immediately, which force-kills the process WaitDelay after the signal
|
||||||
|
// and would silently cap gracefulTimeout at WaitDelay whenever
|
||||||
|
// gracefulTimeout is the longer of the two.
|
||||||
|
// 2. We wait up to gracefulTimeout for the process to exit on its own.
|
||||||
|
// 3. If still alive, we SIGKILL the process group directly (Unix) so any
|
||||||
|
// forked descendant is force-terminated alongside the parent.
|
||||||
|
// 4. We wait on cmdDone. cmd.WaitDelay (set when the cmd was built) is the
|
||||||
|
// critical backstop here: once the process exits, if a forked grandchild
|
||||||
|
// inherited the stdout/stderr pipes and is still holding them, the runtime
|
||||||
|
// force-closes the pipes WaitDelay after the exit and cmd.Wait() unblocks.
|
||||||
|
// Because we never cancelled the context, that WaitDelay timer measures
|
||||||
|
// from process exit (see os/exec awaitGoroutines), not from this call.
|
||||||
|
// Without WaitDelay this select would hang forever (the v219 bug).
|
||||||
|
//
|
||||||
|
// cancel() is still invoked (deferred) to release the context, but only after
|
||||||
|
// the process has exited and os/exec's ctx watcher has already torn down, so it
|
||||||
|
// never re-fires cmd.Cancel.
|
||||||
|
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cancel context.CancelFunc, cmdDone <-chan struct{}, gracefulTimeout time.Duration) {
|
||||||
|
if cancel == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Deliver CmdStop / SIGTERM in a goroutine so a slow or hanging CmdStop
|
||||||
|
// cannot block the run() goroutine; the gracefulTimeout + Process.Kill
|
||||||
|
// path below still guarantees teardown.
|
||||||
|
if cmd != nil {
|
||||||
|
go func() { _ = p.sendStopSignal(cmd) }()
|
||||||
}
|
}
|
||||||
|
|
||||||
timer := time.NewTimer(gracefulTimeout)
|
timer := time.NewTimer(gracefulTimeout)
|
||||||
@@ -488,10 +562,16 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-cmdDone:
|
case <-cmdDone:
|
||||||
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
cmd.Process.Kill()
|
|
||||||
<-cmdDone
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cmd != nil {
|
||||||
|
// SIGKILL the whole process group on Unix so any descendant that
|
||||||
|
// ignored or outlived the graceful signal is force-terminated too.
|
||||||
|
_ = killProcessTree(cmd)
|
||||||
|
}
|
||||||
|
<-cmdDone
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProcessCommand) ID() string {
|
func (p *ProcessCommand) ID() string {
|
||||||
|
|||||||
@@ -0,0 +1,262 @@
|
|||||||
|
//go:build !windows
|
||||||
|
|
||||||
|
package process
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mostlygeek/llama-swap/internal/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestProcessCommand_StopForkingWrapper is a regression for the bug reported
|
||||||
|
// against v219 where Stop would hang indefinitely when the upstream command
|
||||||
|
// is a shell wrapper that forks the real binary (e.g. `#!/bin/bash` then
|
||||||
|
// `"$@"`). After SIGTERM the wrapper dies but the grandchild inherits the
|
||||||
|
// stdout/stderr pipes; cmd.Wait() blocks waiting for the pipe-copy goroutine
|
||||||
|
// to drain EOF, which never happens while the grandchild holds the fds.
|
||||||
|
//
|
||||||
|
// The fix is cmd.WaitDelay (combined with exec.CommandContext + cmd.Cancel),
|
||||||
|
// which causes the runtime to force-close the pipes after the delay so
|
||||||
|
// cmd.Wait() — and therefore Stop — returns.
|
||||||
|
func TestProcessCommand_StopForkingWrapper(t *testing.T) {
|
||||||
|
skipIfNoSimpleResponder(t)
|
||||||
|
|
||||||
|
port := getFreePort(t)
|
||||||
|
dir := t.TempDir()
|
||||||
|
pidFile := filepath.Join(dir, "child.pid")
|
||||||
|
|
||||||
|
// Wrapper script: backgrounds the child (which inherits stdout/stderr),
|
||||||
|
// records its PID for cleanup, then waits. When SIGTERM hits bash it
|
||||||
|
// dies without forwarding the signal; the grandchild keeps running and
|
||||||
|
// keeps the inherited pipe fds open. This is the scenario reported in
|
||||||
|
// the v219 regression.
|
||||||
|
wrapper := filepath.Join(dir, "wrapper.sh")
|
||||||
|
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
|
||||||
|
simpleResponderPath, port, pidFile)
|
||||||
|
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
|
||||||
|
t.Fatalf("WriteFile: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { killChildFromPidFile(pidFile) })
|
||||||
|
|
||||||
|
p := newProcessCommand(t, config.ModelConfig{
|
||||||
|
Cmd: wrapper,
|
||||||
|
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
|
||||||
|
CheckEndpoint: "/health",
|
||||||
|
HealthCheckTimeout: 10,
|
||||||
|
})
|
||||||
|
// Shrink the pipe-close backstop so the test doesn't sit at the
|
||||||
|
// production default (10s). Must be set before Run() so doStart picks
|
||||||
|
// it up when building the cmd.
|
||||||
|
const testWaitDelay = 250 * time.Millisecond
|
||||||
|
p.waitDelay = testWaitDelay
|
||||||
|
|
||||||
|
runErr := runAsync(t, p)
|
||||||
|
|
||||||
|
// Stop must return within a bounded time even though the grandchild
|
||||||
|
// is still holding the pipe open. Budget is generous on top of
|
||||||
|
// testWaitDelay to absorb scheduling jitter on slow CI runners; the
|
||||||
|
// pre-fix behaviour was an unbounded hang, so any reasonable cap
|
||||||
|
// distinguishes pass from fail.
|
||||||
|
stopReturned := make(chan error, 1)
|
||||||
|
stopStart := time.Now()
|
||||||
|
go func() { stopReturned <- p.Stop(testStopTimeout) }()
|
||||||
|
|
||||||
|
const stopBudget = testWaitDelay + 2*time.Second
|
||||||
|
select {
|
||||||
|
case err := <-stopReturned:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Stop: %v", err)
|
||||||
|
}
|
||||||
|
t.Logf("Stop returned in %v", time.Since(stopStart))
|
||||||
|
case <-time.After(stopBudget):
|
||||||
|
t.Fatalf("Stop did not return within %v — cmd.Wait() likely hung on inherited pipe", stopBudget)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := p.State(); got != StateStopped {
|
||||||
|
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runErr:
|
||||||
|
case <-time.After(testReturnTimeout):
|
||||||
|
t.Errorf("Run did not return after Stop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessCommand_StopHonorsGracefulTimeout is a regression for the bug
|
||||||
|
// where cmd.WaitDelay capped the graceful shutdown window. killProcess used to
|
||||||
|
// cancel the cmd context to deliver SIGTERM, which starts cmd.WaitDelay
|
||||||
|
// immediately; a process whose SIGTERM handler needs longer than WaitDelay to
|
||||||
|
// finish was force-killed early even though Stop was given a much longer
|
||||||
|
// timeout. The fix sends the signal directly so WaitDelay measures from process
|
||||||
|
// exit (its inherited-pipe backstop role), leaving the graceful window to the
|
||||||
|
// caller's Stop timeout.
|
||||||
|
func TestProcessCommand_StopHonorsGracefulTimeout(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
marker := filepath.Join(dir, "graceful.done")
|
||||||
|
ready := filepath.Join(dir, "trap.ready")
|
||||||
|
|
||||||
|
// On SIGTERM, sleep past the (short) WaitDelay, then write the marker and
|
||||||
|
// exit cleanly. If WaitDelay still drove the kill, bash would be SIGKILLed
|
||||||
|
// mid-handler and the marker would never be written. The ready file is
|
||||||
|
// written only after the trap is installed so the test does not race
|
||||||
|
// SIGTERM ahead of it (CheckEndpoint:none marks ready before bash runs).
|
||||||
|
script := filepath.Join(dir, "graceful.sh")
|
||||||
|
body := fmt.Sprintf(
|
||||||
|
"#!/bin/bash\ncleanup() { sleep 0.6; echo done > %q; exit 0; }\ntrap cleanup SIGTERM\necho ready > %q\nwhile true; do sleep 0.1; done\n",
|
||||||
|
marker, ready,
|
||||||
|
)
|
||||||
|
if err := os.WriteFile(script, []byte(body), 0o755); err != nil {
|
||||||
|
t.Fatalf("WriteFile: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := newProcessCommand(t, config.ModelConfig{
|
||||||
|
Cmd: script,
|
||||||
|
Proxy: "http://127.0.0.1:1", // unused: health check disabled
|
||||||
|
CheckEndpoint: "none",
|
||||||
|
})
|
||||||
|
// WaitDelay shorter than the handler's 0.6s sleep, and far shorter than the
|
||||||
|
// Stop timeout below — this is the window the old code mis-killed in.
|
||||||
|
p.waitDelay = 200 * time.Millisecond
|
||||||
|
|
||||||
|
runErr := runAsync(t, p)
|
||||||
|
|
||||||
|
// Wait until the trap is installed before stopping.
|
||||||
|
trapDeadline := time.Now().Add(2 * time.Second)
|
||||||
|
for {
|
||||||
|
if _, err := os.Stat(ready); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if time.Now().After(trapDeadline) {
|
||||||
|
t.Fatalf("script did not install SIGTERM trap in time")
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
stopStart := time.Now()
|
||||||
|
if err := p.Stop(5 * time.Second); err != nil {
|
||||||
|
t.Fatalf("Stop: %v", err)
|
||||||
|
}
|
||||||
|
elapsed := time.Since(stopStart)
|
||||||
|
|
||||||
|
// The handler must have run to completion (marker written) rather than
|
||||||
|
// being force-killed at waitDelay.
|
||||||
|
if _, err := os.Stat(marker); err != nil {
|
||||||
|
t.Fatalf("graceful handler did not complete (marker missing): %v", err)
|
||||||
|
}
|
||||||
|
// And Stop must have waited for the handler (>~0.6s), not returned at the
|
||||||
|
// 200ms waitDelay.
|
||||||
|
if elapsed < 500*time.Millisecond {
|
||||||
|
t.Fatalf("Stop returned in %v — process was killed before its graceful handler finished", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := p.State(); got != StateStopped {
|
||||||
|
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-runErr:
|
||||||
|
case <-time.After(testReturnTimeout):
|
||||||
|
t.Errorf("Run did not return after Stop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessCommand_StopReapsForkedGrandchild verifies that stopping a forking
|
||||||
|
// wrapper takes down the backgrounded grandchild too, rather than leaving it as
|
||||||
|
// an orphan. The fix is Setpgid (runtime_unix.go): the wrapper leads its own
|
||||||
|
// process group, so the stop signal is delivered to the whole group via the
|
||||||
|
// negative PID and reaches the grandchild the wrapper never reaped.
|
||||||
|
func TestProcessCommand_StopReapsForkedGrandchild(t *testing.T) {
|
||||||
|
skipIfNoSimpleResponder(t)
|
||||||
|
|
||||||
|
port := getFreePort(t)
|
||||||
|
dir := t.TempDir()
|
||||||
|
pidFile := filepath.Join(dir, "child.pid")
|
||||||
|
|
||||||
|
wrapper := filepath.Join(dir, "wrapper.sh")
|
||||||
|
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
|
||||||
|
simpleResponderPath, port, pidFile)
|
||||||
|
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
|
||||||
|
t.Fatalf("WriteFile: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { killChildFromPidFile(pidFile) })
|
||||||
|
|
||||||
|
p := newProcessCommand(t, config.ModelConfig{
|
||||||
|
Cmd: wrapper,
|
||||||
|
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
|
||||||
|
CheckEndpoint: "/health",
|
||||||
|
HealthCheckTimeout: 10,
|
||||||
|
})
|
||||||
|
|
||||||
|
runErr := runAsync(t, p)
|
||||||
|
|
||||||
|
// Read the grandchild PID the wrapper recorded.
|
||||||
|
var childPID int
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for {
|
||||||
|
data, err := os.ReadFile(pidFile)
|
||||||
|
if err == nil {
|
||||||
|
if pid, perr := strconv.Atoi(strings.TrimSpace(string(data))); perr == nil && pid > 0 {
|
||||||
|
childPID = pid
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
t.Fatalf("wrapper did not record grandchild PID")
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.Stop(testStopTimeout); err != nil {
|
||||||
|
t.Fatalf("Stop: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After Stop the grandchild must be gone. Signal 0 probes liveness without
|
||||||
|
// actually sending a signal; give it a brief window to exit after the
|
||||||
|
// group SIGTERM.
|
||||||
|
proc, err := os.FindProcess(childPID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("FindProcess: %v", err)
|
||||||
|
}
|
||||||
|
gone := false
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if err := proc.Signal(syscall.Signal(0)); err != nil {
|
||||||
|
gone = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if !gone {
|
||||||
|
t.Errorf("grandchild PID %d still alive after Stop — process group was not reaped", childPID)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runErr:
|
||||||
|
case <-time.After(testReturnTimeout):
|
||||||
|
t.Errorf("Run did not return after Stop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// killChildFromPidFile reads a PID written by the wrapper script and SIGKILLs
|
||||||
|
// it so leaked orphans don't accumulate between test runs. Best-effort.
|
||||||
|
func killChildFromPidFile(pidFile string) {
|
||||||
|
data, err := os.ReadFile(pidFile)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
|
||||||
|
if err != nil || pid <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
proc, err := os.FindProcess(pid)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = proc.Kill()
|
||||||
|
}
|
||||||
@@ -4,9 +4,41 @@ package process
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
// setProcAttributes sets platform-specific process attributes
|
// setProcAttributes starts the upstream in its own process group (Setpgid) so
|
||||||
|
// the entire process tree can be signalled at once via its negative PID. This
|
||||||
|
// is what lets us reap a forked grandchild — e.g. a shell wrapper that
|
||||||
|
// backgrounds the real binary and exits — instead of leaking it as an orphan
|
||||||
|
// that holds the inherited stdout/stderr pipes open.
|
||||||
func setProcAttributes(cmd *exec.Cmd) {
|
func setProcAttributes(cmd *exec.Cmd) {
|
||||||
// No-op on Unix systems
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// terminateProcessTree sends SIGTERM to the whole process group led by the
|
||||||
|
// command, giving every process in the tree a chance to shut down gracefully.
|
||||||
|
func terminateProcessTree(cmd *exec.Cmd) error {
|
||||||
|
return signalProcessTree(cmd, syscall.SIGTERM)
|
||||||
|
}
|
||||||
|
|
||||||
|
// killProcessTree sends SIGKILL to the whole process group, force-terminating
|
||||||
|
// every process in the tree.
|
||||||
|
func killProcessTree(cmd *exec.Cmd) error {
|
||||||
|
return signalProcessTree(cmd, syscall.SIGKILL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// signalProcessTree signals the process group led by cmd.Process. Because the
|
||||||
|
// child was started with Setpgid it is its own group leader (pgid == pid), so
|
||||||
|
// targeting -pid reaches the child and every descendant still in the group.
|
||||||
|
// Falls back to signalling just the child if the group send fails (e.g. the
|
||||||
|
// group has already drained), so we never silently skip the signal.
|
||||||
|
func signalProcessTree(cmd *exec.Cmd, sig syscall.Signal) error {
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := syscall.Kill(-cmd.Process.Pid, sig); err != nil {
|
||||||
|
return cmd.Process.Signal(sig)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,3 +14,23 @@ func setProcAttributes(cmd *exec.Cmd) {
|
|||||||
CreationFlags: 0x08000000, // CREATE_NO_WINDOW
|
CreationFlags: 0x08000000, // CREATE_NO_WINDOW
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// terminateProcessTree asks the upstream process to stop. Windows has no
|
||||||
|
// process-group signalling here — process-tree teardown is handled by the
|
||||||
|
// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves
|
||||||
|
// the previous single-process SIGTERM behaviour.
|
||||||
|
func terminateProcessTree(cmd *exec.Cmd) error {
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
}
|
||||||
|
|
||||||
|
// killProcessTree force-terminates the upstream process. Tree teardown on
|
||||||
|
// Windows relies on CmdStop (taskkill /t); this kills the launched process.
|
||||||
|
func killProcessTree(cmd *exec.Cmd) error {
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cmd.Process.Kill()
|
||||||
|
}
|
||||||
|
|||||||
+16
-12
@@ -745,24 +745,28 @@ func (b *baseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// finishLoading stops the loading stream and fences its goroutine off from
|
||||||
|
// the ResponseWriter before the real handler (or ServeHTTP's return)
|
||||||
|
// reclaims it. release() must run even when waitForCompletion times out:
|
||||||
|
// otherwise a still-streaming goroutine flushes a finalized response and
|
||||||
|
// panics on the recycled *bufio.Writer.
|
||||||
|
finishLoading := func() {
|
||||||
|
cancelLoad()
|
||||||
|
if lw != nil {
|
||||||
|
lw.waitForCompletion(1 * time.Second)
|
||||||
|
lw.release()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var resp handlerResp
|
var resp handlerResp
|
||||||
select {
|
select {
|
||||||
case resp = <-hr.respond:
|
case resp = <-hr.respond:
|
||||||
cancelLoad()
|
finishLoading()
|
||||||
if lw != nil {
|
|
||||||
lw.waitForCompletion(1 * time.Second)
|
|
||||||
}
|
|
||||||
case <-req.Context().Done():
|
case <-req.Context().Done():
|
||||||
cancelLoad()
|
finishLoading()
|
||||||
if lw != nil {
|
|
||||||
lw.waitForCompletion(1 * time.Second)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
case <-b.shutdownCtx.Done():
|
case <-b.shutdownCtx.Done():
|
||||||
cancelLoad()
|
finishLoading()
|
||||||
if lw != nil {
|
|
||||||
lw.waitForCompletion(1 * time.Second)
|
|
||||||
}
|
|
||||||
SendError(w, req, fmt.Errorf("%s is shutting down", b.name))
|
SendError(w, req, fmt.Errorf("%s is shutting down", b.name))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,13 @@ type loadingWriter struct {
|
|||||||
pendingMu sync.Mutex
|
pendingMu sync.Mutex
|
||||||
pendingUpdate string
|
pendingUpdate string
|
||||||
|
|
||||||
|
// writeMu serializes writes to the underlying writer and guards released.
|
||||||
|
// Once released is set, the streaming goroutine must not touch the writer
|
||||||
|
// again — ServeHTTP has reclaimed it (to run the real handler or to return)
|
||||||
|
// and writing/flushing a finalized response panics.
|
||||||
|
writeMu sync.Mutex
|
||||||
|
released bool
|
||||||
|
|
||||||
// closed by start when the goroutine finishes (after cleanup messages)
|
// closed by start when the goroutine finishes (after cleanup messages)
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
||||||
@@ -217,12 +224,33 @@ func (s *loadingWriter) sendData(data string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData)
|
s.writeMu.Lock()
|
||||||
if err != nil {
|
defer s.writeMu.Unlock()
|
||||||
|
// Once ServeHTTP has reclaimed the writer (release), writing/flushing it
|
||||||
|
// races the real handler or panics on a finalized response. Stop here.
|
||||||
|
if s.released {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData); err != nil {
|
||||||
s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err)
|
s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.Flush()
|
if flusher, ok := s.writer.(http.Flusher); ok {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// release fences the loadingWriter off from the underlying ResponseWriter.
|
||||||
|
// After it returns, the streaming goroutine will not write to or flush the
|
||||||
|
// writer again: any in-flight write completes under writeMu first, and later
|
||||||
|
// writes short-circuit on released. The caller can then safely hand the writer
|
||||||
|
// to the real handler or let ServeHTTP return without racing a finalized
|
||||||
|
// response (a use-after-return Flush panics on the recycled *bufio.Writer).
|
||||||
|
func (s *loadingWriter) release() {
|
||||||
|
s.writeMu.Lock()
|
||||||
|
s.released = true
|
||||||
|
s.writeMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *loadingWriter) Header() http.Header {
|
func (s *loadingWriter) Header() http.Header {
|
||||||
|
|||||||
Reference in New Issue
Block a user