Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6ea551362e | |||
| 03d58e53fa | |||
| c790d0ee03 | |||
| 4ca9c478a2 | |||
| 146a9eab24 |
@@ -32,11 +32,9 @@ jobs:
|
||||
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # 6.4.0
|
||||
with:
|
||||
node-version: "24"
|
||||
- name: Install dependencies and build UI
|
||||
- name: Build UI
|
||||
run: |
|
||||
cd ui-svelte
|
||||
npm ci
|
||||
npm run build
|
||||
make ui
|
||||
|
||||
- name: Run GoReleaser
|
||||
uses: goreleaser/goreleaser-action@1a80836c5c9d9e5755a25cb59ec6f45a3b5f41a8 #7.2.1
|
||||
|
||||
@@ -8,4 +8,3 @@ dist/
|
||||
|
||||
# UI build output; placeholder.txt is kept so the go:embed succeeds.
|
||||
internal/server/ui_dist/*
|
||||
!internal/server/ui_dist/placeholder.txt
|
||||
|
||||
@@ -41,8 +41,7 @@ ui/node_modules:
|
||||
# build react UI
|
||||
ui: ui/node_modules
|
||||
cd ui-svelte && npm run build
|
||||
mkdir -p internal/server/ui_dist
|
||||
cp -R proxy/ui_dist/. internal/server/ui_dist/
|
||||
touch internal/server/ui_dist/placeholder.txt
|
||||
|
||||
# Build OSX binary
|
||||
mac: ui
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/mostlygeek/llama-swap/internal/config"
|
||||
@@ -22,6 +21,15 @@ import (
|
||||
|
||||
var ErrStartAborted = fmt.Errorf("aborted")
|
||||
|
||||
// cmdWaitDelay is the upper bound the runtime will wait for child I/O to
|
||||
// drain after the process exits before force-closing the stdout/stderr
|
||||
// pipes. Required so that cmd.Wait() returns even when a forked grandchild
|
||||
// inherits and holds the pipes open (e.g. a shell wrapper that backgrounds
|
||||
// the real binary). killProcess sends the stop signal directly (not via the
|
||||
// cmd context), so this delay is measured from process exit rather than from
|
||||
// the stop request, and stays independent of the caller's graceful timeout.
|
||||
const cmdWaitDelay = 10 * time.Second
|
||||
|
||||
type runReq struct {
|
||||
timeout time.Duration
|
||||
respond chan error
|
||||
@@ -39,6 +47,7 @@ type waitReadyReq struct {
|
||||
type startResult struct {
|
||||
cmd *exec.Cmd
|
||||
cmdDone chan struct{}
|
||||
cancel context.CancelFunc
|
||||
handlerFn http.HandlerFunc
|
||||
err error
|
||||
}
|
||||
@@ -51,6 +60,11 @@ type ProcessCommand struct {
|
||||
processLogger *logmon.Monitor
|
||||
proxyLogger *logmon.Monitor
|
||||
|
||||
// waitDelay is assigned to cmd.WaitDelay when starting the upstream
|
||||
// process. Defaults to cmdWaitDelay; tests override it to keep the
|
||||
// pipe-close backstop from dominating their runtime.
|
||||
waitDelay time.Duration
|
||||
|
||||
runCh chan runReq
|
||||
stopCh chan stopReq
|
||||
waitReadyCh chan waitReadyReq
|
||||
@@ -85,6 +99,7 @@ func New(
|
||||
runCh: make(chan runReq),
|
||||
stopCh: make(chan stopReq),
|
||||
waitReadyCh: make(chan waitReadyReq),
|
||||
waitDelay: cmdWaitDelay,
|
||||
}
|
||||
p.state.Store(StateStopped)
|
||||
|
||||
@@ -122,6 +137,7 @@ func (p *ProcessCommand) run() {
|
||||
var (
|
||||
cmd *exec.Cmd
|
||||
cmdDone <-chan struct{}
|
||||
cmdCancel context.CancelFunc
|
||||
readyWaiters []waitReadyReq
|
||||
// runResp parks the in-flight Run caller's response channel. The
|
||||
// interface contract is that Run blocks until the process is
|
||||
@@ -164,9 +180,10 @@ func (p *ProcessCommand) run() {
|
||||
setState(StateShutdown)
|
||||
if cmd != nil {
|
||||
p.handler.Store(nil)
|
||||
p.killProcess(cmd, cmdDone, 100*time.Millisecond)
|
||||
p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond)
|
||||
cmd = nil
|
||||
cmdDone = nil
|
||||
cmdCancel = nil
|
||||
}
|
||||
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
||||
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
||||
@@ -177,8 +194,12 @@ func (p *ProcessCommand) run() {
|
||||
// cmdDone is nil while no process is running, so this case is
|
||||
// dormant outside of StateReady.
|
||||
case <-cmdDone:
|
||||
if cmdCancel != nil {
|
||||
cmdCancel()
|
||||
}
|
||||
cmd = nil
|
||||
cmdDone = nil
|
||||
cmdCancel = nil
|
||||
p.handler.Store(nil)
|
||||
setState(StateStopped)
|
||||
respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id))
|
||||
@@ -226,6 +247,7 @@ func (p *ProcessCommand) run() {
|
||||
if res.err == nil {
|
||||
cmd = res.cmd
|
||||
cmdDone = res.cmdDone
|
||||
cmdCancel = res.cancel
|
||||
fn := res.handlerFn
|
||||
p.handler.Store(&fn)
|
||||
setState(StateReady)
|
||||
@@ -273,7 +295,7 @@ func (p *ProcessCommand) run() {
|
||||
cancelStart()
|
||||
res := <-resultCh
|
||||
if res.cmd != nil {
|
||||
p.killProcess(res.cmd, res.cmdDone, stop.timeout)
|
||||
p.killProcess(res.cmd, res.cancel, res.cmdDone, stop.timeout)
|
||||
}
|
||||
setState(StateStopped)
|
||||
notifyWaiters(ErrStartAborted)
|
||||
@@ -293,7 +315,7 @@ func (p *ProcessCommand) run() {
|
||||
setState(StateShutdown)
|
||||
res := <-resultCh
|
||||
if res.cmd != nil {
|
||||
p.killProcess(res.cmd, res.cmdDone, 100*time.Millisecond)
|
||||
p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond)
|
||||
}
|
||||
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
|
||||
respondRun(fmt.Errorf("[%s] shutdown", p.id))
|
||||
@@ -310,9 +332,10 @@ func (p *ProcessCommand) run() {
|
||||
case stop := <-p.stopCh:
|
||||
if cmd != nil {
|
||||
setState(StateStopping)
|
||||
p.killProcess(cmd, cmdDone, stop.timeout)
|
||||
p.killProcess(cmd, cmdCancel, cmdDone, stop.timeout)
|
||||
cmd = nil
|
||||
cmdDone = nil
|
||||
cmdCancel = nil
|
||||
p.handler.Store(nil)
|
||||
}
|
||||
// Stop is a no-op (and not an error) when already Stopped — this
|
||||
@@ -377,16 +400,26 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
reverseProxy.ServeHTTP(w, r)
|
||||
})
|
||||
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
// cmdCtx + cmd.Cancel are wired as a safety net: if the context is ever
|
||||
// cancelled while the process is alive, cmd.Cancel sends SIGTERM / CmdStop
|
||||
// and the runtime escalates to SIGKILL after cmd.WaitDelay. In the normal
|
||||
// teardown path killProcess sends the stop signal directly instead, so
|
||||
// cmd.WaitDelay only acts as the inherited-pipe backstop measured from
|
||||
// process exit (see killProcess).
|
||||
cmdCtx, cmdCancel := context.WithCancel(context.Background())
|
||||
cmd := exec.CommandContext(cmdCtx, args[0], args[1:]...)
|
||||
cmd.Stderr = p.processLogger
|
||||
cmd.Stdout = p.processLogger
|
||||
cmd.Env = append(cmd.Environ(), p.config.Env...)
|
||||
cmd.Cancel = func() error { return p.sendStopSignal(cmd) }
|
||||
cmd.WaitDelay = p.waitDelay
|
||||
setProcAttributes(cmd)
|
||||
|
||||
p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", "))
|
||||
|
||||
cmdDone := make(chan struct{})
|
||||
if err := cmd.Start(); err != nil {
|
||||
cmdCancel()
|
||||
return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)}
|
||||
}
|
||||
|
||||
@@ -402,21 +435,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
close(cmdDone)
|
||||
}()
|
||||
|
||||
abort := func(err error) startResult {
|
||||
p.killProcess(cmd, cmdCancel, cmdDone, 5*time.Second)
|
||||
return startResult{err: err}
|
||||
}
|
||||
prematureExit := func() startResult {
|
||||
cmdCancel()
|
||||
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
||||
}
|
||||
|
||||
if startCtx.Err() != nil {
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: ErrStartAborted}
|
||||
return abort(ErrStartAborted)
|
||||
}
|
||||
|
||||
checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint)
|
||||
if checkEndpoint == "none" {
|
||||
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn}
|
||||
return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
|
||||
}
|
||||
|
||||
// Wait 250ms for the command to start up before health checking
|
||||
select {
|
||||
case <-startCtx.Done():
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: ErrStartAborted}
|
||||
return abort(ErrStartAborted)
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
}
|
||||
|
||||
@@ -424,16 +464,14 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
for {
|
||||
select {
|
||||
case <-startCtx.Done():
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: ErrStartAborted}
|
||||
return abort(ErrStartAborted)
|
||||
case <-cmdDone:
|
||||
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
||||
return prematureExit()
|
||||
default:
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: fmt.Errorf("health check timed out after %v", healthCheckTimeout)}
|
||||
return abort(fmt.Errorf("health check timed out after %v", healthCheckTimeout))
|
||||
}
|
||||
|
||||
req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil)
|
||||
@@ -445,28 +483,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
|
||||
p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint)
|
||||
break
|
||||
} else if startCtx.Err() != nil {
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: ErrStartAborted}
|
||||
return abort(ErrStartAborted)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-startCtx.Done():
|
||||
p.killProcess(cmd, cmdDone, 5*time.Second)
|
||||
return startResult{err: ErrStartAborted}
|
||||
return abort(ErrStartAborted)
|
||||
case <-cmdDone:
|
||||
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
|
||||
return prematureExit()
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn}
|
||||
return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
|
||||
}
|
||||
|
||||
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gracefulTimeout time.Duration) {
|
||||
// sendStopSignal runs the configured CmdStop (if any) or sends SIGTERM to
|
||||
// the upstream process. Wired up as cmd.Cancel so it fires whenever the
|
||||
// cmd's context is cancelled.
|
||||
func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if p.config.CmdStop != "" {
|
||||
stopArgs, err := config.SanitizeCommand(
|
||||
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)),
|
||||
@@ -475,12 +513,48 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
|
||||
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
|
||||
stopCmd.Env = cmd.Env
|
||||
setProcAttributes(stopCmd)
|
||||
stopCmd.Run()
|
||||
} else {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
return stopCmd.Run()
|
||||
}
|
||||
} else {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
// fall through to SIGTERM if sanitize failed
|
||||
}
|
||||
// On Unix this SIGTERMs the whole process group so a forked grandchild
|
||||
// (e.g. a shell wrapper that backgrounds the real binary) is taken down
|
||||
// with the parent rather than orphaned.
|
||||
return terminateProcessTree(cmd)
|
||||
}
|
||||
|
||||
// killProcess terminates the upstream process. The flow:
|
||||
//
|
||||
// 1. Send the graceful stop signal (CmdStop / SIGTERM) directly — NOT by
|
||||
// cancelling cmdCtx. Cancelling the context would start cmd.WaitDelay
|
||||
// immediately, which force-kills the process WaitDelay after the signal
|
||||
// and would silently cap gracefulTimeout at WaitDelay whenever
|
||||
// gracefulTimeout is the longer of the two.
|
||||
// 2. We wait up to gracefulTimeout for the process to exit on its own.
|
||||
// 3. If still alive, we SIGKILL the process group directly (Unix) so any
|
||||
// forked descendant is force-terminated alongside the parent.
|
||||
// 4. We wait on cmdDone. cmd.WaitDelay (set when the cmd was built) is the
|
||||
// critical backstop here: once the process exits, if a forked grandchild
|
||||
// inherited the stdout/stderr pipes and is still holding them, the runtime
|
||||
// force-closes the pipes WaitDelay after the exit and cmd.Wait() unblocks.
|
||||
// Because we never cancelled the context, that WaitDelay timer measures
|
||||
// from process exit (see os/exec awaitGoroutines), not from this call.
|
||||
// Without WaitDelay this select would hang forever (the v219 bug).
|
||||
//
|
||||
// cancel() is still invoked (deferred) to release the context, but only after
|
||||
// the process has exited and os/exec's ctx watcher has already torn down, so it
|
||||
// never re-fires cmd.Cancel.
|
||||
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cancel context.CancelFunc, cmdDone <-chan struct{}, gracefulTimeout time.Duration) {
|
||||
if cancel == nil {
|
||||
return
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
// Deliver CmdStop / SIGTERM in a goroutine so a slow or hanging CmdStop
|
||||
// cannot block the run() goroutine; the gracefulTimeout + Process.Kill
|
||||
// path below still guarantees teardown.
|
||||
if cmd != nil {
|
||||
go func() { _ = p.sendStopSignal(cmd) }()
|
||||
}
|
||||
|
||||
timer := time.NewTimer(gracefulTimeout)
|
||||
@@ -488,10 +562,16 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
|
||||
|
||||
select {
|
||||
case <-cmdDone:
|
||||
return
|
||||
case <-timer.C:
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
}
|
||||
|
||||
if cmd != nil {
|
||||
// SIGKILL the whole process group on Unix so any descendant that
|
||||
// ignored or outlived the graceful signal is force-terminated too.
|
||||
_ = killProcessTree(cmd)
|
||||
}
|
||||
<-cmdDone
|
||||
}
|
||||
|
||||
func (p *ProcessCommand) ID() string {
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
//go:build !windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mostlygeek/llama-swap/internal/config"
|
||||
)
|
||||
|
||||
// TestProcessCommand_StopForkingWrapper is a regression for the bug reported
|
||||
// against v219 where Stop would hang indefinitely when the upstream command
|
||||
// is a shell wrapper that forks the real binary (e.g. `#!/bin/bash` then
|
||||
// `"$@"`). After SIGTERM the wrapper dies but the grandchild inherits the
|
||||
// stdout/stderr pipes; cmd.Wait() blocks waiting for the pipe-copy goroutine
|
||||
// to drain EOF, which never happens while the grandchild holds the fds.
|
||||
//
|
||||
// The fix is cmd.WaitDelay (combined with exec.CommandContext + cmd.Cancel),
|
||||
// which causes the runtime to force-close the pipes after the delay so
|
||||
// cmd.Wait() — and therefore Stop — returns.
|
||||
func TestProcessCommand_StopForkingWrapper(t *testing.T) {
|
||||
skipIfNoSimpleResponder(t)
|
||||
|
||||
port := getFreePort(t)
|
||||
dir := t.TempDir()
|
||||
pidFile := filepath.Join(dir, "child.pid")
|
||||
|
||||
// Wrapper script: backgrounds the child (which inherits stdout/stderr),
|
||||
// records its PID for cleanup, then waits. When SIGTERM hits bash it
|
||||
// dies without forwarding the signal; the grandchild keeps running and
|
||||
// keeps the inherited pipe fds open. This is the scenario reported in
|
||||
// the v219 regression.
|
||||
wrapper := filepath.Join(dir, "wrapper.sh")
|
||||
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
|
||||
simpleResponderPath, port, pidFile)
|
||||
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
|
||||
t.Fatalf("WriteFile: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { killChildFromPidFile(pidFile) })
|
||||
|
||||
p := newProcessCommand(t, config.ModelConfig{
|
||||
Cmd: wrapper,
|
||||
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
|
||||
CheckEndpoint: "/health",
|
||||
HealthCheckTimeout: 10,
|
||||
})
|
||||
// Shrink the pipe-close backstop so the test doesn't sit at the
|
||||
// production default (10s). Must be set before Run() so doStart picks
|
||||
// it up when building the cmd.
|
||||
const testWaitDelay = 250 * time.Millisecond
|
||||
p.waitDelay = testWaitDelay
|
||||
|
||||
runErr := runAsync(t, p)
|
||||
|
||||
// Stop must return within a bounded time even though the grandchild
|
||||
// is still holding the pipe open. Budget is generous on top of
|
||||
// testWaitDelay to absorb scheduling jitter on slow CI runners; the
|
||||
// pre-fix behaviour was an unbounded hang, so any reasonable cap
|
||||
// distinguishes pass from fail.
|
||||
stopReturned := make(chan error, 1)
|
||||
stopStart := time.Now()
|
||||
go func() { stopReturned <- p.Stop(testStopTimeout) }()
|
||||
|
||||
const stopBudget = testWaitDelay + 2*time.Second
|
||||
select {
|
||||
case err := <-stopReturned:
|
||||
if err != nil {
|
||||
t.Fatalf("Stop: %v", err)
|
||||
}
|
||||
t.Logf("Stop returned in %v", time.Since(stopStart))
|
||||
case <-time.After(stopBudget):
|
||||
t.Fatalf("Stop did not return within %v — cmd.Wait() likely hung on inherited pipe", stopBudget)
|
||||
}
|
||||
|
||||
if got := p.State(); got != StateStopped {
|
||||
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-runErr:
|
||||
case <-time.After(testReturnTimeout):
|
||||
t.Errorf("Run did not return after Stop")
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessCommand_StopHonorsGracefulTimeout is a regression for the bug
|
||||
// where cmd.WaitDelay capped the graceful shutdown window. killProcess used to
|
||||
// cancel the cmd context to deliver SIGTERM, which starts cmd.WaitDelay
|
||||
// immediately; a process whose SIGTERM handler needs longer than WaitDelay to
|
||||
// finish was force-killed early even though Stop was given a much longer
|
||||
// timeout. The fix sends the signal directly so WaitDelay measures from process
|
||||
// exit (its inherited-pipe backstop role), leaving the graceful window to the
|
||||
// caller's Stop timeout.
|
||||
func TestProcessCommand_StopHonorsGracefulTimeout(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
marker := filepath.Join(dir, "graceful.done")
|
||||
ready := filepath.Join(dir, "trap.ready")
|
||||
|
||||
// On SIGTERM, sleep past the (short) WaitDelay, then write the marker and
|
||||
// exit cleanly. If WaitDelay still drove the kill, bash would be SIGKILLed
|
||||
// mid-handler and the marker would never be written. The ready file is
|
||||
// written only after the trap is installed so the test does not race
|
||||
// SIGTERM ahead of it (CheckEndpoint:none marks ready before bash runs).
|
||||
script := filepath.Join(dir, "graceful.sh")
|
||||
body := fmt.Sprintf(
|
||||
"#!/bin/bash\ncleanup() { sleep 0.6; echo done > %q; exit 0; }\ntrap cleanup SIGTERM\necho ready > %q\nwhile true; do sleep 0.1; done\n",
|
||||
marker, ready,
|
||||
)
|
||||
if err := os.WriteFile(script, []byte(body), 0o755); err != nil {
|
||||
t.Fatalf("WriteFile: %v", err)
|
||||
}
|
||||
|
||||
p := newProcessCommand(t, config.ModelConfig{
|
||||
Cmd: script,
|
||||
Proxy: "http://127.0.0.1:1", // unused: health check disabled
|
||||
CheckEndpoint: "none",
|
||||
})
|
||||
// WaitDelay shorter than the handler's 0.6s sleep, and far shorter than the
|
||||
// Stop timeout below — this is the window the old code mis-killed in.
|
||||
p.waitDelay = 200 * time.Millisecond
|
||||
|
||||
runErr := runAsync(t, p)
|
||||
|
||||
// Wait until the trap is installed before stopping.
|
||||
trapDeadline := time.Now().Add(2 * time.Second)
|
||||
for {
|
||||
if _, err := os.Stat(ready); err == nil {
|
||||
break
|
||||
}
|
||||
if time.Now().After(trapDeadline) {
|
||||
t.Fatalf("script did not install SIGTERM trap in time")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
stopStart := time.Now()
|
||||
if err := p.Stop(5 * time.Second); err != nil {
|
||||
t.Fatalf("Stop: %v", err)
|
||||
}
|
||||
elapsed := time.Since(stopStart)
|
||||
|
||||
// The handler must have run to completion (marker written) rather than
|
||||
// being force-killed at waitDelay.
|
||||
if _, err := os.Stat(marker); err != nil {
|
||||
t.Fatalf("graceful handler did not complete (marker missing): %v", err)
|
||||
}
|
||||
// And Stop must have waited for the handler (>~0.6s), not returned at the
|
||||
// 200ms waitDelay.
|
||||
if elapsed < 500*time.Millisecond {
|
||||
t.Fatalf("Stop returned in %v — process was killed before its graceful handler finished", elapsed)
|
||||
}
|
||||
|
||||
if got := p.State(); got != StateStopped {
|
||||
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
|
||||
}
|
||||
select {
|
||||
case <-runErr:
|
||||
case <-time.After(testReturnTimeout):
|
||||
t.Errorf("Run did not return after Stop")
|
||||
}
|
||||
}
|
||||
|
||||
// TestProcessCommand_StopReapsForkedGrandchild verifies that stopping a forking
|
||||
// wrapper takes down the backgrounded grandchild too, rather than leaving it as
|
||||
// an orphan. The fix is Setpgid (runtime_unix.go): the wrapper leads its own
|
||||
// process group, so the stop signal is delivered to the whole group via the
|
||||
// negative PID and reaches the grandchild the wrapper never reaped.
|
||||
func TestProcessCommand_StopReapsForkedGrandchild(t *testing.T) {
|
||||
skipIfNoSimpleResponder(t)
|
||||
|
||||
port := getFreePort(t)
|
||||
dir := t.TempDir()
|
||||
pidFile := filepath.Join(dir, "child.pid")
|
||||
|
||||
wrapper := filepath.Join(dir, "wrapper.sh")
|
||||
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
|
||||
simpleResponderPath, port, pidFile)
|
||||
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
|
||||
t.Fatalf("WriteFile: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { killChildFromPidFile(pidFile) })
|
||||
|
||||
p := newProcessCommand(t, config.ModelConfig{
|
||||
Cmd: wrapper,
|
||||
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
|
||||
CheckEndpoint: "/health",
|
||||
HealthCheckTimeout: 10,
|
||||
})
|
||||
|
||||
runErr := runAsync(t, p)
|
||||
|
||||
// Read the grandchild PID the wrapper recorded.
|
||||
var childPID int
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for {
|
||||
data, err := os.ReadFile(pidFile)
|
||||
if err == nil {
|
||||
if pid, perr := strconv.Atoi(strings.TrimSpace(string(data))); perr == nil && pid > 0 {
|
||||
childPID = pid
|
||||
break
|
||||
}
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("wrapper did not record grandchild PID")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if err := p.Stop(testStopTimeout); err != nil {
|
||||
t.Fatalf("Stop: %v", err)
|
||||
}
|
||||
|
||||
// After Stop the grandchild must be gone. Signal 0 probes liveness without
|
||||
// actually sending a signal; give it a brief window to exit after the
|
||||
// group SIGTERM.
|
||||
proc, err := os.FindProcess(childPID)
|
||||
if err != nil {
|
||||
t.Fatalf("FindProcess: %v", err)
|
||||
}
|
||||
gone := false
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := proc.Signal(syscall.Signal(0)); err != nil {
|
||||
gone = true
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
if !gone {
|
||||
t.Errorf("grandchild PID %d still alive after Stop — process group was not reaped", childPID)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-runErr:
|
||||
case <-time.After(testReturnTimeout):
|
||||
t.Errorf("Run did not return after Stop")
|
||||
}
|
||||
}
|
||||
|
||||
// killChildFromPidFile reads a PID written by the wrapper script and SIGKILLs
|
||||
// it so leaked orphans don't accumulate between test runs. Best-effort.
|
||||
func killChildFromPidFile(pidFile string) {
|
||||
data, err := os.ReadFile(pidFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
|
||||
if err != nil || pid <= 0 {
|
||||
return
|
||||
}
|
||||
proc, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_ = proc.Kill()
|
||||
}
|
||||
@@ -4,9 +4,41 @@ package process
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// setProcAttributes sets platform-specific process attributes
|
||||
// setProcAttributes starts the upstream in its own process group (Setpgid) so
|
||||
// the entire process tree can be signalled at once via its negative PID. This
|
||||
// is what lets us reap a forked grandchild — e.g. a shell wrapper that
|
||||
// backgrounds the real binary and exits — instead of leaking it as an orphan
|
||||
// that holds the inherited stdout/stderr pipes open.
|
||||
func setProcAttributes(cmd *exec.Cmd) {
|
||||
// No-op on Unix systems
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
}
|
||||
|
||||
// terminateProcessTree sends SIGTERM to the whole process group led by the
|
||||
// command, giving every process in the tree a chance to shut down gracefully.
|
||||
func terminateProcessTree(cmd *exec.Cmd) error {
|
||||
return signalProcessTree(cmd, syscall.SIGTERM)
|
||||
}
|
||||
|
||||
// killProcessTree sends SIGKILL to the whole process group, force-terminating
|
||||
// every process in the tree.
|
||||
func killProcessTree(cmd *exec.Cmd) error {
|
||||
return signalProcessTree(cmd, syscall.SIGKILL)
|
||||
}
|
||||
|
||||
// signalProcessTree signals the process group led by cmd.Process. Because the
|
||||
// child was started with Setpgid it is its own group leader (pgid == pid), so
|
||||
// targeting -pid reaches the child and every descendant still in the group.
|
||||
// Falls back to signalling just the child if the group send fails (e.g. the
|
||||
// group has already drained), so we never silently skip the signal.
|
||||
func signalProcessTree(cmd *exec.Cmd, sig syscall.Signal) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
if err := syscall.Kill(-cmd.Process.Pid, sig); err != nil {
|
||||
return cmd.Process.Signal(sig)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -14,3 +14,23 @@ func setProcAttributes(cmd *exec.Cmd) {
|
||||
CreationFlags: 0x08000000, // CREATE_NO_WINDOW
|
||||
}
|
||||
}
|
||||
|
||||
// terminateProcessTree asks the upstream process to stop. Windows has no
|
||||
// process-group signalling here — process-tree teardown is handled by the
|
||||
// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves
|
||||
// the previous single-process SIGTERM behaviour.
|
||||
func terminateProcessTree(cmd *exec.Cmd) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return cmd.Process.Signal(syscall.SIGTERM)
|
||||
}
|
||||
|
||||
// killProcessTree force-terminates the upstream process. Tree teardown on
|
||||
// Windows relies on CmdStop (taskkill /t); this kills the launched process.
|
||||
func killProcessTree(cmd *exec.Cmd) error {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
return cmd.Process.Kill()
|
||||
}
|
||||
|
||||
+16
-12
@@ -745,24 +745,28 @@ func (b *baseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}()
|
||||
}
|
||||
|
||||
// finishLoading stops the loading stream and fences its goroutine off from
|
||||
// the ResponseWriter before the real handler (or ServeHTTP's return)
|
||||
// reclaims it. release() must run even when waitForCompletion times out:
|
||||
// otherwise a still-streaming goroutine flushes a finalized response and
|
||||
// panics on the recycled *bufio.Writer.
|
||||
finishLoading := func() {
|
||||
cancelLoad()
|
||||
if lw != nil {
|
||||
lw.waitForCompletion(1 * time.Second)
|
||||
lw.release()
|
||||
}
|
||||
}
|
||||
|
||||
var resp handlerResp
|
||||
select {
|
||||
case resp = <-hr.respond:
|
||||
cancelLoad()
|
||||
if lw != nil {
|
||||
lw.waitForCompletion(1 * time.Second)
|
||||
}
|
||||
finishLoading()
|
||||
case <-req.Context().Done():
|
||||
cancelLoad()
|
||||
if lw != nil {
|
||||
lw.waitForCompletion(1 * time.Second)
|
||||
}
|
||||
finishLoading()
|
||||
return
|
||||
case <-b.shutdownCtx.Done():
|
||||
cancelLoad()
|
||||
if lw != nil {
|
||||
lw.waitForCompletion(1 * time.Second)
|
||||
}
|
||||
finishLoading()
|
||||
SendError(w, req, fmt.Errorf("%s is shutting down", b.name))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -38,6 +38,13 @@ type loadingWriter struct {
|
||||
pendingMu sync.Mutex
|
||||
pendingUpdate string
|
||||
|
||||
// writeMu serializes writes to the underlying writer and guards released.
|
||||
// Once released is set, the streaming goroutine must not touch the writer
|
||||
// again — ServeHTTP has reclaimed it (to run the real handler or to return)
|
||||
// and writing/flushing a finalized response panics.
|
||||
writeMu sync.Mutex
|
||||
released bool
|
||||
|
||||
// closed by start when the goroutine finishes (after cleanup messages)
|
||||
done chan struct{}
|
||||
|
||||
@@ -217,12 +224,33 @@ func (s *loadingWriter) sendData(data string) {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData)
|
||||
if err != nil {
|
||||
s.writeMu.Lock()
|
||||
defer s.writeMu.Unlock()
|
||||
// Once ServeHTTP has reclaimed the writer (release), writing/flushing it
|
||||
// races the real handler or panics on a finalized response. Stop here.
|
||||
if s.released {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData); err != nil {
|
||||
s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err)
|
||||
return
|
||||
}
|
||||
s.Flush()
|
||||
if flusher, ok := s.writer.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// release fences the loadingWriter off from the underlying ResponseWriter.
|
||||
// After it returns, the streaming goroutine will not write to or flush the
|
||||
// writer again: any in-flight write completes under writeMu first, and later
|
||||
// writes short-circuit on released. The caller can then safely hand the writer
|
||||
// to the real handler or let ServeHTTP return without racing a finalized
|
||||
// response (a use-after-return Flush panics on the recycled *bufio.Writer).
|
||||
func (s *loadingWriter) release() {
|
||||
s.writeMu.Lock()
|
||||
s.released = true
|
||||
s.writeMu.Unlock()
|
||||
}
|
||||
|
||||
func (s *loadingWriter) Header() http.Header {
|
||||
|
||||
@@ -45,7 +45,9 @@ func CreateConcurrencyMiddleware(cfg config.Config) chain.Middleware {
|
||||
return
|
||||
}
|
||||
if !sem.TryAcquire(1) {
|
||||
http.Error(w, "Too many requests", http.StatusTooManyRequests)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
w.Write([]byte(`{"error":"Too many requests"}`))
|
||||
return
|
||||
}
|
||||
defer sem.Release(1)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
placeholder so //go:embed ui_dist succeeds before the UI is built
|
||||
|
||||
@@ -0,0 +1,632 @@
|
||||
<script lang="ts">
|
||||
import { models } from "../../stores/api";
|
||||
import { persistentStore } from "../../stores/persistent";
|
||||
import { streamChatCompletion } from "../../lib/chatApi";
|
||||
|
||||
type Status = "waiting" | "streaming" | "done" | "error";
|
||||
type Phase = "waiting" | "loading" | "reasoning" | "content";
|
||||
type RunState = {
|
||||
status: Status;
|
||||
loadingText: string;
|
||||
reasoningContent: string;
|
||||
content: string;
|
||||
loadingDone: boolean;
|
||||
waitingMs: number;
|
||||
loadingMs: number;
|
||||
reasoningMs: number;
|
||||
contentMs: number;
|
||||
phase: Phase;
|
||||
elapsedMs: number;
|
||||
error?: string;
|
||||
};
|
||||
type TestEntry = { id: string; model: string };
|
||||
|
||||
const LOAD_MARKER = "━━━━━";
|
||||
|
||||
const DEFAULT_PROMPT = "Write a few sentences about the history of computing.";
|
||||
const DEFAULT_MAX_TOKENS = 256;
|
||||
|
||||
const promptStore = persistentStore<string>("concurrency-prompt", DEFAULT_PROMPT);
|
||||
const maxTokensStore = persistentStore<number>("concurrency-max-tokens", DEFAULT_MAX_TOKENS);
|
||||
const testListStore = persistentStore<TestEntry[]>("concurrency-test-list", []);
|
||||
|
||||
let runs = $state<Record<string, RunState>>({});
|
||||
let isRunning = $state(false);
|
||||
let abortController: AbortController | null = null;
|
||||
let dragIndex = $state<number | null>(null);
|
||||
let dragOverIndex = $state<number | null>(null);
|
||||
|
||||
const timelineCollapsedStore = persistentStore<boolean>("concurrency-timeline-collapsed", false);
|
||||
|
||||
let timelineMaxMs = $derived(Math.max(100, ...Object.values(runs).map((r) => r.elapsedMs)));
|
||||
|
||||
let availableModels = $derived($models.filter((m) => !m.unlisted));
|
||||
let hasModels = $derived(availableModels.length > 0);
|
||||
let canRun = $derived(!isRunning && $testListStore.length > 0 && $promptStore.trim() !== "");
|
||||
|
||||
function newId(): string {
|
||||
if (typeof crypto !== "undefined" && "randomUUID" in crypto) {
|
||||
return crypto.randomUUID();
|
||||
}
|
||||
return `${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
}
|
||||
|
||||
function addModel(modelId: string) {
|
||||
if (isRunning) return;
|
||||
testListStore.update((list) => [...list, { id: newId(), model: modelId }]);
|
||||
}
|
||||
|
||||
function removeEntry(id: string) {
|
||||
if (isRunning) return;
|
||||
testListStore.update((list) => list.filter((e) => e.id !== id));
|
||||
const next = { ...runs };
|
||||
delete next[id];
|
||||
runs = next;
|
||||
}
|
||||
|
||||
function clearAll() {
|
||||
if (isRunning) return;
|
||||
testListStore.set([]);
|
||||
runs = {};
|
||||
}
|
||||
|
||||
function onDragStart(i: number, e: DragEvent) {
|
||||
if (isRunning) return;
|
||||
dragIndex = i;
|
||||
if (e.dataTransfer) {
|
||||
e.dataTransfer.effectAllowed = "move";
|
||||
e.dataTransfer.setData("text/plain", String(i));
|
||||
}
|
||||
}
|
||||
|
||||
function onDragOver(i: number, e: DragEvent) {
|
||||
if (isRunning || dragIndex === null) return;
|
||||
e.preventDefault();
|
||||
if (e.dataTransfer) e.dataTransfer.dropEffect = "move";
|
||||
dragOverIndex = i;
|
||||
}
|
||||
|
||||
function onDrop(i: number, e: DragEvent) {
|
||||
if (isRunning || dragIndex === null) return;
|
||||
e.preventDefault();
|
||||
const from = dragIndex;
|
||||
const to = i;
|
||||
dragIndex = null;
|
||||
dragOverIndex = null;
|
||||
if (from === to) return;
|
||||
testListStore.update((list) => {
|
||||
const next = [...list];
|
||||
const [moved] = next.splice(from, 1);
|
||||
next.splice(to, 0, moved);
|
||||
return next;
|
||||
});
|
||||
}
|
||||
|
||||
function onDragEnd() {
|
||||
dragIndex = null;
|
||||
dragOverIndex = null;
|
||||
}
|
||||
|
||||
function emptyRun(): RunState {
|
||||
return {
|
||||
status: "waiting",
|
||||
loadingText: "",
|
||||
reasoningContent: "",
|
||||
content: "",
|
||||
loadingDone: false,
|
||||
waitingMs: 0,
|
||||
loadingMs: 0,
|
||||
reasoningMs: 0,
|
||||
contentMs: 0,
|
||||
phase: "waiting",
|
||||
elapsedMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Detect and split the llama-swap loading block (wrapped in ━━━━━ markers,
|
||||
// delivered as reasoning_content) from the model's own reasoning tokens.
|
||||
function ingestReasoning(
|
||||
prev: RunState,
|
||||
chunk: string
|
||||
): { loadingText: string; reasoningContent: string; loadingDone: boolean; nowPhase: Phase } {
|
||||
if (prev.loadingDone) {
|
||||
return {
|
||||
loadingText: prev.loadingText,
|
||||
reasoningContent: prev.reasoningContent + chunk,
|
||||
loadingDone: true,
|
||||
nowPhase: "reasoning",
|
||||
};
|
||||
}
|
||||
|
||||
const combined = prev.loadingText + chunk;
|
||||
// Not enough to decide whether this is a loading marker
|
||||
if (combined.length < LOAD_MARKER.length) {
|
||||
if (LOAD_MARKER.startsWith(combined)) {
|
||||
return { loadingText: combined, reasoningContent: prev.reasoningContent, loadingDone: false, nowPhase: "loading" };
|
||||
}
|
||||
return {
|
||||
loadingText: "",
|
||||
reasoningContent: prev.reasoningContent + combined,
|
||||
loadingDone: true,
|
||||
nowPhase: "reasoning",
|
||||
};
|
||||
}
|
||||
|
||||
if (!combined.startsWith(LOAD_MARKER)) {
|
||||
return {
|
||||
loadingText: "",
|
||||
reasoningContent: prev.reasoningContent + combined,
|
||||
loadingDone: true,
|
||||
nowPhase: "reasoning",
|
||||
};
|
||||
}
|
||||
|
||||
// We're inside a loading block — look for the closing marker
|
||||
const closingIdx = combined.indexOf(LOAD_MARKER, LOAD_MARKER.length);
|
||||
if (closingIdx < 0) {
|
||||
return { loadingText: combined, reasoningContent: prev.reasoningContent, loadingDone: false, nowPhase: "loading" };
|
||||
}
|
||||
const newlineIdx = combined.indexOf("\n", closingIdx);
|
||||
const sliceEnd = newlineIdx >= 0 ? newlineIdx + 1 : combined.length;
|
||||
const loadingPart = combined.substring(0, sliceEnd);
|
||||
// Strip the trailing " \n" the loader sends after the closing marker
|
||||
const remainder = combined.substring(sliceEnd).replace(/^[ \t]*\n?/, "");
|
||||
return {
|
||||
loadingText: loadingPart,
|
||||
reasoningContent: prev.reasoningContent + remainder,
|
||||
loadingDone: true,
|
||||
nowPhase: remainder ? "reasoning" : "waiting",
|
||||
};
|
||||
}
|
||||
|
||||
async function runOne(entry: TestEntry, signal: AbortSignal) {
|
||||
const start = performance.now();
|
||||
let phaseStart = start;
|
||||
runs[entry.id] = { ...emptyRun(), status: "streaming" };
|
||||
|
||||
const accrue = (
|
||||
prev: RunState,
|
||||
now: number
|
||||
): { waitingMs: number; loadingMs: number; reasoningMs: number; contentMs: number } => {
|
||||
const delta = now - phaseStart;
|
||||
const base = {
|
||||
waitingMs: prev.waitingMs,
|
||||
loadingMs: prev.loadingMs,
|
||||
reasoningMs: prev.reasoningMs,
|
||||
contentMs: prev.contentMs,
|
||||
};
|
||||
if (prev.phase === "waiting") return { ...base, waitingMs: base.waitingMs + delta };
|
||||
if (prev.phase === "loading") return { ...base, loadingMs: base.loadingMs + delta };
|
||||
if (prev.phase === "reasoning") return { ...base, reasoningMs: base.reasoningMs + delta };
|
||||
if (prev.phase === "content") return { ...base, contentMs: base.contentMs + delta };
|
||||
return base;
|
||||
};
|
||||
|
||||
const ticker = window.setInterval(() => {
|
||||
const prev = runs[entry.id];
|
||||
if (!prev || prev.status !== "streaming") return;
|
||||
const now = performance.now();
|
||||
const accrued = accrue(prev, now);
|
||||
phaseStart = now;
|
||||
runs[entry.id] = { ...prev, ...accrued, elapsedMs: now - start };
|
||||
}, 50);
|
||||
|
||||
try {
|
||||
const stream = streamChatCompletion(entry.model, [{ role: "user", content: $promptStore }], signal, {
|
||||
endpoint: "v1/chat/completions",
|
||||
max_tokens: $maxTokensStore,
|
||||
});
|
||||
for await (const chunk of stream) {
|
||||
if (chunk.done) break;
|
||||
const prev = runs[entry.id];
|
||||
if (!prev) break;
|
||||
const now = performance.now();
|
||||
const accrued = accrue(prev, now);
|
||||
phaseStart = now;
|
||||
|
||||
let nextPhase: Phase = prev.phase;
|
||||
let loadingText = prev.loadingText;
|
||||
let reasoningContent = prev.reasoningContent;
|
||||
let loadingDone = prev.loadingDone;
|
||||
|
||||
if (chunk.reasoning_content) {
|
||||
const parsed = ingestReasoning(prev, chunk.reasoning_content);
|
||||
loadingText = parsed.loadingText;
|
||||
reasoningContent = parsed.reasoningContent;
|
||||
loadingDone = parsed.loadingDone;
|
||||
nextPhase = parsed.nowPhase;
|
||||
}
|
||||
if (chunk.content) nextPhase = "content";
|
||||
|
||||
runs[entry.id] = {
|
||||
...prev,
|
||||
...accrued,
|
||||
loadingText,
|
||||
reasoningContent,
|
||||
content: prev.content + (chunk.content ?? ""),
|
||||
loadingDone,
|
||||
phase: nextPhase,
|
||||
elapsedMs: now - start,
|
||||
};
|
||||
}
|
||||
const prev = runs[entry.id];
|
||||
if (prev) {
|
||||
const now = performance.now();
|
||||
const accrued = accrue(prev, now);
|
||||
runs[entry.id] = { ...prev, ...accrued, status: "done", elapsedMs: now - start };
|
||||
}
|
||||
} catch (err) {
|
||||
const prev = runs[entry.id] ?? emptyRun();
|
||||
const now = performance.now();
|
||||
const accrued = accrue(prev, now);
|
||||
const aborted = err instanceof Error && err.name === "AbortError";
|
||||
runs[entry.id] = {
|
||||
...prev,
|
||||
...accrued,
|
||||
status: "error",
|
||||
elapsedMs: now - start,
|
||||
error: aborted ? "aborted" : err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
} finally {
|
||||
window.clearInterval(ticker);
|
||||
}
|
||||
}
|
||||
|
||||
async function run() {
|
||||
if (!canRun) return;
|
||||
const entries = $testListStore;
|
||||
const initial: Record<string, RunState> = {};
|
||||
for (const e of entries) {
|
||||
initial[e.id] = emptyRun();
|
||||
}
|
||||
runs = initial;
|
||||
isRunning = true;
|
||||
abortController = new AbortController();
|
||||
try {
|
||||
await Promise.allSettled(entries.map((e) => runOne(e, abortController!.signal)));
|
||||
} finally {
|
||||
isRunning = false;
|
||||
abortController = null;
|
||||
}
|
||||
}
|
||||
|
||||
function stop() {
|
||||
abortController?.abort();
|
||||
}
|
||||
|
||||
function waitingBarClass(run: RunState): string {
|
||||
if (run.status === "error" && run.phase === "waiting") return "bg-red-500";
|
||||
return "bg-slate-200 dark:bg-white/10";
|
||||
}
|
||||
|
||||
function loadingBarClass(run: RunState): string {
|
||||
if (run.status === "error" && run.phase === "loading") return "bg-red-500";
|
||||
return "bg-slate-400 dark:bg-slate-500";
|
||||
}
|
||||
|
||||
function reasoningBarClass(run: RunState): string {
|
||||
if (run.status === "error" && run.phase === "reasoning") return "bg-red-500";
|
||||
return "bg-purple-500";
|
||||
}
|
||||
|
||||
function contentBarClass(run: RunState): string {
|
||||
if (run.status === "error" && run.phase === "content") return "bg-red-500";
|
||||
if (run.status === "done") return "bg-green-500";
|
||||
return "bg-amber-400 dark:bg-amber-500";
|
||||
}
|
||||
|
||||
function niceStepMs(maxMs: number): number {
|
||||
if (maxMs <= 500) return 100;
|
||||
if (maxMs <= 2000) return 500;
|
||||
if (maxMs <= 5000) return 1000;
|
||||
if (maxMs <= 20000) return 5000;
|
||||
if (maxMs <= 60000) return 10000;
|
||||
return 30000;
|
||||
}
|
||||
|
||||
function formatTickMs(ms: number): string {
|
||||
if (ms < 1000) return `${ms}`;
|
||||
return `${(ms / 1000).toFixed(ms % 1000 === 0 ? 0 : 1)}s`;
|
||||
}
|
||||
|
||||
let timelineTicks = $derived.by(() => {
|
||||
const step = niceStepMs(timelineMaxMs);
|
||||
const ticks: number[] = [];
|
||||
for (let t = 0; t <= timelineMaxMs; t += step) ticks.push(t);
|
||||
return ticks;
|
||||
});
|
||||
|
||||
function statusBadgeClass(status: Status): string {
|
||||
switch (status) {
|
||||
case "waiting":
|
||||
return "bg-gray-200 text-gray-700 dark:bg-gray-700 dark:text-gray-200";
|
||||
case "streaming":
|
||||
return "bg-amber-200 text-amber-900 dark:bg-amber-500/30 dark:text-amber-200";
|
||||
case "done":
|
||||
return "bg-green-200 text-green-900 dark:bg-green-500/30 dark:text-green-200";
|
||||
case "error":
|
||||
return "bg-red-200 text-red-900 dark:bg-red-500/30 dark:text-red-200";
|
||||
}
|
||||
}
|
||||
|
||||
function formatElapsed(ms: number): string {
|
||||
if (ms < 1000) return `${Math.round(ms)}ms`;
|
||||
return `${(ms / 1000).toFixed(2)}s`;
|
||||
}
|
||||
|
||||
function resetDefaults() {
|
||||
promptStore.set(DEFAULT_PROMPT);
|
||||
maxTokensStore.set(DEFAULT_MAX_TOKENS);
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="flex flex-col md:flex-row gap-4 h-full min-h-0">
|
||||
<!-- Left column: run controls, model picker, settings -->
|
||||
<div class="md:w-72 shrink-0 flex flex-col gap-3 min-h-0">
|
||||
<!-- Run controls -->
|
||||
<div class="flex items-center gap-2">
|
||||
{#if isRunning}
|
||||
<button class="btn bg-red-500 hover:bg-red-600 text-white border-red-500" onclick={stop}>
|
||||
<span class="inline-block w-3 h-3 bg-white align-middle mr-2"></span>Stop
|
||||
</button>
|
||||
{:else}
|
||||
<button
|
||||
class="btn bg-primary text-btn-primary-text hover:opacity-90"
|
||||
onclick={run}
|
||||
disabled={!canRun}
|
||||
title={$testListStore.length === 0 ? "Add models from the list below" : "Run concurrent requests"}
|
||||
>
|
||||
<span class="inline-block align-middle mr-2" aria-hidden="true">▶</span>Go
|
||||
</button>
|
||||
{/if}
|
||||
<button class="btn btn--sm" onclick={clearAll} disabled={isRunning || $testListStore.length === 0}>
|
||||
Clear ({$testListStore.length})
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<!-- Available models -->
|
||||
<div class="flex flex-col min-h-0 flex-1">
|
||||
<div class="text-xs font-medium text-txtsecondary mb-1">
|
||||
Models <span class="text-[10px] font-normal">— click to queue (add the same model more than once to test parallel requests)</span>
|
||||
</div>
|
||||
<div class="flex-1 border border-gray-200 dark:border-white/10 rounded overflow-y-auto min-h-0">
|
||||
{#if !hasModels}
|
||||
<div class="p-3 text-sm text-txtsecondary text-center">No models configured.</div>
|
||||
{:else}
|
||||
<ul class="divide-y divide-gray-100 dark:divide-white/5">
|
||||
{#each availableModels as m (m.id)}
|
||||
<li>
|
||||
<button
|
||||
class="w-full text-left px-2 py-1.5 text-sm hover:bg-secondary-hover transition-colors disabled:opacity-50 disabled:cursor-not-allowed flex items-center gap-2"
|
||||
onclick={() => addModel(m.id)}
|
||||
disabled={isRunning}
|
||||
title="Add {m.id}"
|
||||
>
|
||||
<span class="text-primary" aria-hidden="true">+</span>
|
||||
<span class="truncate flex-1">{m.id}</span>
|
||||
</button>
|
||||
</li>
|
||||
{/each}
|
||||
</ul>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Settings -->
|
||||
<div class="flex flex-col gap-2 border-t border-gray-200 dark:border-white/10 pt-3">
|
||||
<div class="flex items-center justify-between">
|
||||
<label for="concurrency-prompt" class="text-xs font-medium text-txtsecondary">Prompt</label>
|
||||
<button
|
||||
class="text-[10px] text-txtsecondary hover:text-txtmain underline"
|
||||
onclick={resetDefaults}
|
||||
disabled={isRunning}
|
||||
>
|
||||
reset defaults
|
||||
</button>
|
||||
</div>
|
||||
<textarea
|
||||
id="concurrency-prompt"
|
||||
class="w-full px-2 py-1.5 text-sm rounded border border-gray-200 dark:border-white/10 bg-surface focus:outline-none focus:ring-2 focus:ring-primary resize-none"
|
||||
rows="3"
|
||||
bind:value={$promptStore}
|
||||
disabled={isRunning}
|
||||
></textarea>
|
||||
<label for="concurrency-max-tokens" class="text-xs font-medium text-txtsecondary">max_tokens</label>
|
||||
<input
|
||||
id="concurrency-max-tokens"
|
||||
type="number"
|
||||
min="1"
|
||||
class="w-full px-2 py-1.5 text-sm rounded border border-gray-200 dark:border-white/10 bg-surface focus:outline-none focus:ring-2 focus:ring-primary"
|
||||
bind:value={$maxTokensStore}
|
||||
disabled={isRunning}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Right column: result panels (draggable to reorder) -->
|
||||
<div class="flex-1 min-w-0 min-h-0 overflow-y-auto">
|
||||
{#if $testListStore.length === 0}
|
||||
<div class="h-full flex items-center justify-center px-6">
|
||||
<div class="max-w-md text-sm text-txtsecondary space-y-4">
|
||||
<h4 class="text-base font-semibold text-txtmain pb-0">Load Test</h4>
|
||||
<p>
|
||||
Fire several streaming chat completions at llama-swap at the same time to see how it handles parallel
|
||||
loading and concurrent inference. Each request streams into its own panel with a live timer and status.
|
||||
</p>
|
||||
<ol class="list-decimal list-inside space-y-1">
|
||||
<li>Click models on the left to queue them — repeat a model to hit it with parallel requests.</li>
|
||||
<li>Tweak the prompt and <code>max_tokens</code> if you want.</li>
|
||||
<li>Press <span class="font-semibold text-txtmain">Go</span> to launch them concurrently.</li>
|
||||
</ol>
|
||||
<p class="text-xs">Tip: drag a result card's header to reorder, or hit × to drop it.</p>
|
||||
</div>
|
||||
</div>
|
||||
{:else}
|
||||
<!-- Gantt-style timeline -->
|
||||
<div class="mb-3 border border-gray-200 dark:border-white/10 rounded">
|
||||
<button
|
||||
class="w-full flex items-center gap-2 px-2 py-1.5 text-xs font-medium text-txtsecondary hover:bg-secondary-hover transition-colors {$timelineCollapsedStore ? 'rounded' : 'rounded-t border-b border-gray-200 dark:border-white/10'}"
|
||||
onclick={() => timelineCollapsedStore.update((v) => !v)}
|
||||
aria-expanded={!$timelineCollapsedStore}
|
||||
>
|
||||
<svg
|
||||
class="w-4 h-4 transition-transform {$timelineCollapsedStore ? '-rotate-90' : ''}"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
viewBox="0 0 24 24"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M19 9l-7 7-7-7"></path>
|
||||
</svg>
|
||||
<span>Timeline</span>
|
||||
{#if !$timelineCollapsedStore}
|
||||
<span class="flex items-center gap-3 text-[10px] text-txtsecondary font-normal ml-3" aria-hidden="true">
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-slate-200 dark:bg-white/10 border border-gray-300 dark:border-white/10"></span>waiting</span>
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-slate-400 dark:bg-slate-500"></span>loading</span>
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-purple-500"></span>reasoning</span>
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-amber-400 dark:bg-amber-500"></span>streaming</span>
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-green-500"></span>done</span>
|
||||
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-red-500"></span>error</span>
|
||||
</span>
|
||||
{/if}
|
||||
<span class="ml-auto tabular-nums text-txtsecondary">
|
||||
max {formatElapsed(timelineMaxMs)} · {$testListStore.length} request{$testListStore.length === 1 ? "" : "s"}
|
||||
</span>
|
||||
</button>
|
||||
{#if !$timelineCollapsedStore}
|
||||
<div class="px-2 py-2">
|
||||
<!-- X axis ticks -->
|
||||
<div class="flex" aria-hidden="true">
|
||||
<div class="w-40 shrink-0"></div>
|
||||
<div class="relative flex-1 h-4 border-b border-gray-200 dark:border-white/10">
|
||||
{#each timelineTicks as t (t)}
|
||||
<div
|
||||
class="absolute top-0 bottom-0 border-l border-gray-200 dark:border-white/10"
|
||||
style="left: {(t / timelineMaxMs) * 100}%;"
|
||||
>
|
||||
<span class="absolute -top-0.5 left-1 text-[10px] text-txtsecondary tabular-nums">{formatTickMs(t)}</span>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
<div class="w-16 shrink-0"></div>
|
||||
</div>
|
||||
<!-- Bars -->
|
||||
<div class="flex flex-col gap-1 mt-1">
|
||||
{#each $testListStore as entry, i (entry.id)}
|
||||
{@const run = runs[entry.id]}
|
||||
{@const waitingPct = run ? (run.waitingMs / timelineMaxMs) * 100 : 0}
|
||||
{@const loadingPct = run ? (run.loadingMs / timelineMaxMs) * 100 : 0}
|
||||
{@const reasoningPct = run ? (run.reasoningMs / timelineMaxMs) * 100 : 0}
|
||||
{@const contentPct = run ? (run.contentMs / timelineMaxMs) * 100 : 0}
|
||||
<div class="flex items-center text-xs">
|
||||
<div class="w-40 shrink-0 flex items-center gap-1 pr-2 text-txtsecondary">
|
||||
<span class="tabular-nums w-5 text-right">{i + 1}.</span>
|
||||
<span class="truncate" title={entry.model}>{entry.model}</span>
|
||||
</div>
|
||||
<div class="relative flex-1 h-4">
|
||||
{#each timelineTicks as t (t)}
|
||||
<div
|
||||
class="absolute top-0 bottom-0 border-l border-gray-100 dark:border-white/5"
|
||||
style="left: {(t / timelineMaxMs) * 100}%;"
|
||||
aria-hidden="true"
|
||||
></div>
|
||||
{/each}
|
||||
{#if run && run.waitingMs > 0}
|
||||
<div
|
||||
class="absolute top-0.5 bottom-0.5 rounded-l-sm transition-all {waitingBarClass(run)}"
|
||||
style="left: 0; width: {waitingPct}%;"
|
||||
title="waiting {formatElapsed(run.waitingMs)}"
|
||||
></div>
|
||||
{/if}
|
||||
{#if run && run.loadingMs > 0}
|
||||
<div
|
||||
class="absolute top-0.5 bottom-0.5 transition-all {loadingBarClass(run)} {run.waitingMs === 0 ? 'rounded-l-sm' : ''}"
|
||||
style="left: {waitingPct}%; width: {loadingPct}%;"
|
||||
title="loading {formatElapsed(run.loadingMs)}"
|
||||
></div>
|
||||
{/if}
|
||||
{#if run && run.reasoningMs > 0}
|
||||
<div
|
||||
class="absolute top-0.5 bottom-0.5 transition-all {reasoningBarClass(run)} {run.waitingMs === 0 && run.loadingMs === 0 ? 'rounded-l-sm' : ''}"
|
||||
style="left: {waitingPct + loadingPct}%; width: {reasoningPct}%;"
|
||||
title="reasoning {formatElapsed(run.reasoningMs)}"
|
||||
></div>
|
||||
{/if}
|
||||
{#if run && run.contentMs > 0}
|
||||
<div
|
||||
class="absolute top-0.5 bottom-0.5 transition-all {contentBarClass(run)} {run.waitingMs === 0 && run.loadingMs === 0 && run.reasoningMs === 0 ? 'rounded-l-sm' : ''} {run.status === 'done' || run.status === 'error' ? 'rounded-r-sm' : ''}"
|
||||
style="left: {waitingPct + loadingPct + reasoningPct}%; width: {contentPct}%;"
|
||||
title="content {formatElapsed(run.contentMs)}"
|
||||
></div>
|
||||
{/if}
|
||||
</div>
|
||||
<div class="w-16 shrink-0 pl-2 tabular-nums text-txtsecondary text-right">
|
||||
{run ? formatElapsed(run.elapsedMs) : "—"}
|
||||
</div>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
<div class="grid grid-cols-1 lg:grid-cols-2 xl:grid-cols-3 gap-3" role="list">
|
||||
{#each $testListStore as entry, i (entry.id)}
|
||||
{@const run = runs[entry.id]}
|
||||
{@const status = run?.status ?? "waiting"}
|
||||
<div
|
||||
class="border rounded flex flex-col min-h-0 transition-colors {dragOverIndex === i && dragIndex !== i
|
||||
? 'border-primary ring-2 ring-primary/40'
|
||||
: 'border-gray-200 dark:border-white/10'} {dragIndex === i ? 'opacity-40' : ''}"
|
||||
style="height: 280px;"
|
||||
role="listitem"
|
||||
ondragover={(e) => onDragOver(i, e)}
|
||||
ondrop={(e) => onDrop(i, e)}
|
||||
>
|
||||
<div
|
||||
class="shrink-0 flex items-center gap-2 px-2 py-1.5 border-b border-gray-200 dark:border-white/10 bg-secondary/40 rounded-t"
|
||||
draggable={!isRunning}
|
||||
role="button"
|
||||
tabindex="-1"
|
||||
aria-label="Drag to reorder {entry.model}"
|
||||
ondragstart={(e) => onDragStart(i, e)}
|
||||
ondragend={onDragEnd}
|
||||
class:cursor-grab={!isRunning}
|
||||
title={isRunning ? "" : "Drag to reorder"}
|
||||
>
|
||||
<span class="text-txtsecondary select-none" aria-hidden="true">⋮⋮</span>
|
||||
<span class="text-txtsecondary tabular-nums text-xs w-5 text-right">{i + 1}.</span>
|
||||
<span class="flex-1 truncate text-sm font-medium" title={entry.model}>{entry.model}</span>
|
||||
<span class="text-xs tabular-nums text-txtsecondary">
|
||||
{run ? formatElapsed(run.elapsedMs) : "—"}
|
||||
</span>
|
||||
<span class="status text-[10px] {statusBadgeClass(status)}">{status}</span>
|
||||
<button
|
||||
class="w-5 h-5 flex items-center justify-center text-txtsecondary hover:text-red-500 transition-colors rounded disabled:opacity-30 disabled:cursor-not-allowed"
|
||||
onclick={() => removeEntry(entry.id)}
|
||||
disabled={isRunning}
|
||||
aria-label="Remove"
|
||||
tabindex="-1"
|
||||
>
|
||||
×
|
||||
</button>
|
||||
</div>
|
||||
<div class="flex-1 min-h-0 overflow-y-auto font-mono text-xs px-2 py-1.5">
|
||||
{#if run?.loadingText}
|
||||
<div class="bg-secondary/40 dark:bg-white/5 text-txtsecondary rounded px-2 py-1 mb-2 whitespace-pre-wrap">{run.loadingText.trim()}</div>
|
||||
{/if}
|
||||
{#if run?.reasoningContent}
|
||||
<div class="text-purple-700 dark:text-purple-300 whitespace-pre-wrap">{run.reasoningContent}</div>
|
||||
{/if}
|
||||
{#if run?.content}
|
||||
<div class="whitespace-pre-wrap {run.reasoningContent ? 'mt-2' : ''}">{run.content}</div>
|
||||
{/if}
|
||||
{#if run?.status === "error" && run?.error}
|
||||
<div class="text-red-500 mt-2">[error] {run.error}</div>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
@@ -5,8 +5,9 @@
|
||||
import AudioInterface from "../components/playground/AudioInterface.svelte";
|
||||
import SpeechInterface from "../components/playground/SpeechInterface.svelte";
|
||||
import RerankInterface from "../components/playground/RerankInterface.svelte";
|
||||
import ConcurrencyInterface from "../components/playground/ConcurrencyInterface.svelte";
|
||||
|
||||
type Tab = "chat" | "images" | "speech" | "audio" | "rerank";
|
||||
type Tab = "chat" | "images" | "speech" | "audio" | "rerank" | "concurrency";
|
||||
|
||||
const selectedTabStore = persistentStore<Tab>("playground-selected-tab", "chat");
|
||||
let mobileMenuOpen = $state(false);
|
||||
@@ -17,6 +18,7 @@
|
||||
{ id: "speech", label: "Speech" },
|
||||
{ id: "audio", label: "Transcription" },
|
||||
{ id: "rerank", label: "Rerank" },
|
||||
{ id: "concurrency", label: "Load Test" },
|
||||
];
|
||||
|
||||
function selectTab(tab: Tab) {
|
||||
@@ -25,7 +27,7 @@
|
||||
}
|
||||
|
||||
function getTabLabel(tabId: Tab): string {
|
||||
return tabs.find(t => t.id === tabId)?.label || "";
|
||||
return tabs.find((t) => t.id === tabId)?.label || "";
|
||||
}
|
||||
</script>
|
||||
|
||||
@@ -49,10 +51,15 @@
|
||||
</svg>
|
||||
</button>
|
||||
{#if mobileMenuOpen}
|
||||
<div class="absolute top-full left-0 right-0 mt-1 bg-surface border border-gray-200 dark:border-white/10 rounded shadow-lg z-10">
|
||||
<div
|
||||
class="absolute top-full left-0 right-0 mt-1 bg-surface border border-gray-200 dark:border-white/10 rounded shadow-lg z-10"
|
||||
>
|
||||
{#each tabs as tab (tab.id)}
|
||||
<button
|
||||
class="w-full px-4 py-2 text-left hover:bg-secondary-hover transition-colors first:rounded-t last:rounded-b {$selectedTabStore === tab.id ? 'bg-primary/10 font-medium' : ''}"
|
||||
class="w-full px-4 py-2 text-left hover:bg-secondary-hover transition-colors first:rounded-t last:rounded-b {$selectedTabStore ===
|
||||
tab.id
|
||||
? 'bg-primary/10 font-medium'
|
||||
: ''}"
|
||||
onclick={() => selectTab(tab.id)}
|
||||
>
|
||||
{tab.label}
|
||||
@@ -94,6 +101,9 @@
|
||||
<div class="h-full" class:tab-hidden={$selectedTabStore !== "rerank"}>
|
||||
<RerankInterface />
|
||||
</div>
|
||||
<div class="h-full" class:tab-hidden={$selectedTabStore !== "concurrency"}>
|
||||
<ConcurrencyInterface />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ export default defineConfig({
|
||||
],
|
||||
base: "/ui/",
|
||||
build: {
|
||||
outDir: "../proxy/ui_dist",
|
||||
outDir: "../internal/server/ui_dist",
|
||||
assetsDir: "assets",
|
||||
},
|
||||
server: {
|
||||
|
||||
Reference in New Issue
Block a user