Compare commits

...

5 Commits

Author SHA1 Message Date
Benson Wong 6ea551362e process,router: make model shutdown and load-streaming robust
Note: The original proxy/process_unix.go had a noop for setProcAttributes
so it also did not stop grandchildren processes. This patch adds that capability 
and improves reliability.

--

Stop() no longer hangs on a shell wrapper that forks the real binary.
The upstream is built with exec.CommandContext + cmd.Cancel +
cmd.WaitDelay, so cmd.Wait() returns even when a forked grandchild
inherits the stdout/stderr pipes. killProcess sends the stop signal
directly (not by cancelling the context) so cmd.WaitDelay measures from
process exit and never silently caps the caller's graceful timeout.

The upstream is also started in its own process group (Setpgid) on Unix,
so the graceful SIGTERM — and the SIGKILL escalation after the timeout —
are delivered to the whole group via the negative PID. A forked
grandchild is reaped with its parent instead of leaking as an orphan.

The loading-spinner SSE goroutine can no longer panic when it outlives
the request. net/http recycles the response writer via Reset(nil) once
ServeHTTP returns; the orphaned goroutine then flushed against a
nil-backed writer and crashed with a SIGSEGV. A release() fence on
loadingWriter lets any in-flight write finish then short-circuits later
writes/flushes, and all three ServeHTTP select branches run a
finishLoading helper (cancelLoad, waitForCompletion, release) before the
writer is reclaimed.

- internal/process: exec.CommandContext + WaitDelay, Setpgid process
groups, group-wide SIGTERM/SIGKILL teardown
- internal/router: release() fence + finishLoading on loadingWriter

fixes #804
2026-05-31 10:11:12 -07:00
Benson Wong 03d58e53fa Add load testing tool to the UI (#805)
Wouldn't it be nice to test the performance, swapping and concurrency
from the UI? Now we can! This is a port of `cmd/test-concurrency` into the UI

Here's a demo of it working with a swap matrix: 

https://github.com/user-attachments/assets/b6bb12ec-0381-46f1-a6b8-27d1c3c0ddb3
2026-05-30 17:04:30 -07:00
Luiszzzor c790d0ee03 fix: update the concurrency middleware to respond with a JSON payload (#798)
update the concurrency middleware to respond with a JSON payload instead
of plain text when the request limit is reached to be compatible with
openai api standard

---------

Co-authored-by: Ludwik <l.czarnota@samsung.com>
2026-05-29 23:59:32 -07:00
Benson Wong 4ca9c478a2 Makefile,internal/server: various release tweaks 2026-05-29 15:27:08 -07:00
Benson Wong 146a9eab24 ui-svelte: update build directory (#801)
Fixes #799
2026-05-29 14:45:05 -07:00
14 changed files with 1128 additions and 63 deletions
+2 -4
View File
@@ -32,11 +32,9 @@ jobs:
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # 6.4.0 uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # 6.4.0
with: with:
node-version: "24" node-version: "24"
- name: Install dependencies and build UI - name: Build UI
run: | run: |
cd ui-svelte make ui
npm ci
npm run build
- name: Run GoReleaser - name: Run GoReleaser
uses: goreleaser/goreleaser-action@1a80836c5c9d9e5755a25cb59ec6f45a3b5f41a8 #7.2.1 uses: goreleaser/goreleaser-action@1a80836c5c9d9e5755a25cb59ec6f45a3b5f41a8 #7.2.1
-1
View File
@@ -8,4 +8,3 @@ dist/
# UI build output; placeholder.txt is kept so the go:embed succeeds. # UI build output; placeholder.txt is kept so the go:embed succeeds.
internal/server/ui_dist/* internal/server/ui_dist/*
!internal/server/ui_dist/placeholder.txt
+1 -2
View File
@@ -41,8 +41,7 @@ ui/node_modules:
# build react UI # build react UI
ui: ui/node_modules ui: ui/node_modules
cd ui-svelte && npm run build cd ui-svelte && npm run build
mkdir -p internal/server/ui_dist touch internal/server/ui_dist/placeholder.txt
cp -R proxy/ui_dist/. internal/server/ui_dist/
# Build OSX binary # Build OSX binary
mac: ui mac: ui
+112 -32
View File
@@ -11,7 +11,6 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/mostlygeek/llama-swap/internal/config" "github.com/mostlygeek/llama-swap/internal/config"
@@ -22,6 +21,15 @@ import (
var ErrStartAborted = fmt.Errorf("aborted") var ErrStartAborted = fmt.Errorf("aborted")
// cmdWaitDelay is the upper bound the runtime will wait for child I/O to
// drain after the process exits before force-closing the stdout/stderr
// pipes. Required so that cmd.Wait() returns even when a forked grandchild
// inherits and holds the pipes open (e.g. a shell wrapper that backgrounds
// the real binary). killProcess sends the stop signal directly (not via the
// cmd context), so this delay is measured from process exit rather than from
// the stop request, and stays independent of the caller's graceful timeout.
const cmdWaitDelay = 10 * time.Second
type runReq struct { type runReq struct {
timeout time.Duration timeout time.Duration
respond chan error respond chan error
@@ -39,6 +47,7 @@ type waitReadyReq struct {
type startResult struct { type startResult struct {
cmd *exec.Cmd cmd *exec.Cmd
cmdDone chan struct{} cmdDone chan struct{}
cancel context.CancelFunc
handlerFn http.HandlerFunc handlerFn http.HandlerFunc
err error err error
} }
@@ -51,6 +60,11 @@ type ProcessCommand struct {
processLogger *logmon.Monitor processLogger *logmon.Monitor
proxyLogger *logmon.Monitor proxyLogger *logmon.Monitor
// waitDelay is assigned to cmd.WaitDelay when starting the upstream
// process. Defaults to cmdWaitDelay; tests override it to keep the
// pipe-close backstop from dominating their runtime.
waitDelay time.Duration
runCh chan runReq runCh chan runReq
stopCh chan stopReq stopCh chan stopReq
waitReadyCh chan waitReadyReq waitReadyCh chan waitReadyReq
@@ -85,6 +99,7 @@ func New(
runCh: make(chan runReq), runCh: make(chan runReq),
stopCh: make(chan stopReq), stopCh: make(chan stopReq),
waitReadyCh: make(chan waitReadyReq), waitReadyCh: make(chan waitReadyReq),
waitDelay: cmdWaitDelay,
} }
p.state.Store(StateStopped) p.state.Store(StateStopped)
@@ -122,6 +137,7 @@ func (p *ProcessCommand) run() {
var ( var (
cmd *exec.Cmd cmd *exec.Cmd
cmdDone <-chan struct{} cmdDone <-chan struct{}
cmdCancel context.CancelFunc
readyWaiters []waitReadyReq readyWaiters []waitReadyReq
// runResp parks the in-flight Run caller's response channel. The // runResp parks the in-flight Run caller's response channel. The
// interface contract is that Run blocks until the process is // interface contract is that Run blocks until the process is
@@ -164,9 +180,10 @@ func (p *ProcessCommand) run() {
setState(StateShutdown) setState(StateShutdown)
if cmd != nil { if cmd != nil {
p.handler.Store(nil) p.handler.Store(nil)
p.killProcess(cmd, cmdDone, 100*time.Millisecond) p.killProcess(cmd, cmdCancel, cmdDone, 100*time.Millisecond)
cmd = nil cmd = nil
cmdDone = nil cmdDone = nil
cmdCancel = nil
} }
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id)) notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
respondRun(fmt.Errorf("[%s] shutdown", p.id)) respondRun(fmt.Errorf("[%s] shutdown", p.id))
@@ -177,8 +194,12 @@ func (p *ProcessCommand) run() {
// cmdDone is nil while no process is running, so this case is // cmdDone is nil while no process is running, so this case is
// dormant outside of StateReady. // dormant outside of StateReady.
case <-cmdDone: case <-cmdDone:
if cmdCancel != nil {
cmdCancel()
}
cmd = nil cmd = nil
cmdDone = nil cmdDone = nil
cmdCancel = nil
p.handler.Store(nil) p.handler.Store(nil)
setState(StateStopped) setState(StateStopped)
respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id)) respondRun(fmt.Errorf("[%s] upstream exited unexpectedly", p.id))
@@ -226,6 +247,7 @@ func (p *ProcessCommand) run() {
if res.err == nil { if res.err == nil {
cmd = res.cmd cmd = res.cmd
cmdDone = res.cmdDone cmdDone = res.cmdDone
cmdCancel = res.cancel
fn := res.handlerFn fn := res.handlerFn
p.handler.Store(&fn) p.handler.Store(&fn)
setState(StateReady) setState(StateReady)
@@ -273,7 +295,7 @@ func (p *ProcessCommand) run() {
cancelStart() cancelStart()
res := <-resultCh res := <-resultCh
if res.cmd != nil { if res.cmd != nil {
p.killProcess(res.cmd, res.cmdDone, stop.timeout) p.killProcess(res.cmd, res.cancel, res.cmdDone, stop.timeout)
} }
setState(StateStopped) setState(StateStopped)
notifyWaiters(ErrStartAborted) notifyWaiters(ErrStartAborted)
@@ -293,7 +315,7 @@ func (p *ProcessCommand) run() {
setState(StateShutdown) setState(StateShutdown)
res := <-resultCh res := <-resultCh
if res.cmd != nil { if res.cmd != nil {
p.killProcess(res.cmd, res.cmdDone, 100*time.Millisecond) p.killProcess(res.cmd, res.cancel, res.cmdDone, 100*time.Millisecond)
} }
notifyWaiters(fmt.Errorf("[%s] shutdown", p.id)) notifyWaiters(fmt.Errorf("[%s] shutdown", p.id))
respondRun(fmt.Errorf("[%s] shutdown", p.id)) respondRun(fmt.Errorf("[%s] shutdown", p.id))
@@ -310,9 +332,10 @@ func (p *ProcessCommand) run() {
case stop := <-p.stopCh: case stop := <-p.stopCh:
if cmd != nil { if cmd != nil {
setState(StateStopping) setState(StateStopping)
p.killProcess(cmd, cmdDone, stop.timeout) p.killProcess(cmd, cmdCancel, cmdDone, stop.timeout)
cmd = nil cmd = nil
cmdDone = nil cmdDone = nil
cmdCancel = nil
p.handler.Store(nil) p.handler.Store(nil)
} }
// Stop is a no-op (and not an error) when already Stopped — this // Stop is a no-op (and not an error) when already Stopped — this
@@ -377,16 +400,26 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
reverseProxy.ServeHTTP(w, r) reverseProxy.ServeHTTP(w, r)
}) })
cmd := exec.Command(args[0], args[1:]...) // cmdCtx + cmd.Cancel are wired as a safety net: if the context is ever
// cancelled while the process is alive, cmd.Cancel sends SIGTERM / CmdStop
// and the runtime escalates to SIGKILL after cmd.WaitDelay. In the normal
// teardown path killProcess sends the stop signal directly instead, so
// cmd.WaitDelay only acts as the inherited-pipe backstop measured from
// process exit (see killProcess).
cmdCtx, cmdCancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(cmdCtx, args[0], args[1:]...)
cmd.Stderr = p.processLogger cmd.Stderr = p.processLogger
cmd.Stdout = p.processLogger cmd.Stdout = p.processLogger
cmd.Env = append(cmd.Environ(), p.config.Env...) cmd.Env = append(cmd.Environ(), p.config.Env...)
cmd.Cancel = func() error { return p.sendStopSignal(cmd) }
cmd.WaitDelay = p.waitDelay
setProcAttributes(cmd) setProcAttributes(cmd)
p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", ")) p.proxyLogger.Debugf("<%s> Executing start command: %s, env: %s", p.id, strings.Join(args, " "), strings.Join(p.config.Env, ", "))
cmdDone := make(chan struct{}) cmdDone := make(chan struct{})
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
cmdCancel()
return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)} return startResult{err: fmt.Errorf("failed to start command '%s': %w", strings.Join(args, " "), err)}
} }
@@ -402,21 +435,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
close(cmdDone) close(cmdDone)
}() }()
abort := func(err error) startResult {
p.killProcess(cmd, cmdCancel, cmdDone, 5*time.Second)
return startResult{err: err}
}
prematureExit := func() startResult {
cmdCancel()
return startResult{err: fmt.Errorf("upstream command exited prematurely")}
}
if startCtx.Err() != nil { if startCtx.Err() != nil {
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(ErrStartAborted)
return startResult{err: ErrStartAborted}
} }
checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint) checkEndpoint := strings.TrimSpace(p.config.CheckEndpoint)
if checkEndpoint == "none" { if checkEndpoint == "none" {
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn} return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
} }
// Wait 250ms for the command to start up before health checking // Wait 250ms for the command to start up before health checking
select { select {
case <-startCtx.Done(): case <-startCtx.Done():
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(ErrStartAborted)
return startResult{err: ErrStartAborted}
case <-time.After(250 * time.Millisecond): case <-time.After(250 * time.Millisecond):
} }
@@ -424,16 +464,14 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
for { for {
select { select {
case <-startCtx.Done(): case <-startCtx.Done():
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(ErrStartAborted)
return startResult{err: ErrStartAborted}
case <-cmdDone: case <-cmdDone:
return startResult{err: fmt.Errorf("upstream command exited prematurely")} return prematureExit()
default: default:
} }
if time.Now().After(deadline) { if time.Now().After(deadline) {
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(fmt.Errorf("health check timed out after %v", healthCheckTimeout))
return startResult{err: fmt.Errorf("health check timed out after %v", healthCheckTimeout)}
} }
req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil) req, _ := http.NewRequestWithContext(startCtx, "GET", p.config.CheckEndpoint, nil)
@@ -445,28 +483,28 @@ func (p *ProcessCommand) doStart(startCtx context.Context, healthCheckTimeout ti
p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint) p.proxyLogger.Infof("<%s> Health check passed on %s%s", p.id, p.config.Proxy, p.config.CheckEndpoint)
break break
} else if startCtx.Err() != nil { } else if startCtx.Err() != nil {
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(ErrStartAborted)
return startResult{err: ErrStartAborted}
} }
select { select {
case <-startCtx.Done(): case <-startCtx.Done():
p.killProcess(cmd, cmdDone, 5*time.Second) return abort(ErrStartAborted)
return startResult{err: ErrStartAborted}
case <-cmdDone: case <-cmdDone:
return startResult{err: fmt.Errorf("upstream command exited prematurely")} return prematureExit()
case <-time.After(time.Second): case <-time.After(time.Second):
} }
} }
return startResult{cmd: cmd, cmdDone: cmdDone, handlerFn: handlerFn} return startResult{cmd: cmd, cmdDone: cmdDone, cancel: cmdCancel, handlerFn: handlerFn}
} }
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gracefulTimeout time.Duration) { // sendStopSignal runs the configured CmdStop (if any) or sends SIGTERM to
// the upstream process. Wired up as cmd.Cancel so it fires whenever the
// cmd's context is cancelled.
func (p *ProcessCommand) sendStopSignal(cmd *exec.Cmd) error {
if cmd == nil || cmd.Process == nil { if cmd == nil || cmd.Process == nil {
return return nil
} }
if p.config.CmdStop != "" { if p.config.CmdStop != "" {
stopArgs, err := config.SanitizeCommand( stopArgs, err := config.SanitizeCommand(
strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)), strings.ReplaceAll(p.config.CmdStop, "${PID}", fmt.Sprintf("%d", cmd.Process.Pid)),
@@ -475,12 +513,48 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...) stopCmd := exec.Command(stopArgs[0], stopArgs[1:]...)
stopCmd.Env = cmd.Env stopCmd.Env = cmd.Env
setProcAttributes(stopCmd) setProcAttributes(stopCmd)
stopCmd.Run() return stopCmd.Run()
} else {
cmd.Process.Signal(syscall.SIGTERM)
} }
} else { // fall through to SIGTERM if sanitize failed
cmd.Process.Signal(syscall.SIGTERM) }
// On Unix this SIGTERMs the whole process group so a forked grandchild
// (e.g. a shell wrapper that backgrounds the real binary) is taken down
// with the parent rather than orphaned.
return terminateProcessTree(cmd)
}
// killProcess terminates the upstream process. The flow:
//
// 1. Send the graceful stop signal (CmdStop / SIGTERM) directly — NOT by
// cancelling cmdCtx. Cancelling the context would start cmd.WaitDelay
// immediately, which force-kills the process WaitDelay after the signal
// and would silently cap gracefulTimeout at WaitDelay whenever
// gracefulTimeout is the longer of the two.
// 2. We wait up to gracefulTimeout for the process to exit on its own.
// 3. If still alive, we SIGKILL the process group directly (Unix) so any
// forked descendant is force-terminated alongside the parent.
// 4. We wait on cmdDone. cmd.WaitDelay (set when the cmd was built) is the
// critical backstop here: once the process exits, if a forked grandchild
// inherited the stdout/stderr pipes and is still holding them, the runtime
// force-closes the pipes WaitDelay after the exit and cmd.Wait() unblocks.
// Because we never cancelled the context, that WaitDelay timer measures
// from process exit (see os/exec awaitGoroutines), not from this call.
// Without WaitDelay this select would hang forever (the v219 bug).
//
// cancel() is still invoked (deferred) to release the context, but only after
// the process has exited and os/exec's ctx watcher has already torn down, so it
// never re-fires cmd.Cancel.
func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cancel context.CancelFunc, cmdDone <-chan struct{}, gracefulTimeout time.Duration) {
if cancel == nil {
return
}
defer cancel()
// Deliver CmdStop / SIGTERM in a goroutine so a slow or hanging CmdStop
// cannot block the run() goroutine; the gracefulTimeout + Process.Kill
// path below still guarantees teardown.
if cmd != nil {
go func() { _ = p.sendStopSignal(cmd) }()
} }
timer := time.NewTimer(gracefulTimeout) timer := time.NewTimer(gracefulTimeout)
@@ -488,10 +562,16 @@ func (p *ProcessCommand) killProcess(cmd *exec.Cmd, cmdDone <-chan struct{}, gra
select { select {
case <-cmdDone: case <-cmdDone:
return
case <-timer.C: case <-timer.C:
cmd.Process.Kill()
<-cmdDone
} }
if cmd != nil {
// SIGKILL the whole process group on Unix so any descendant that
// ignored or outlived the graceful signal is force-terminated too.
_ = killProcessTree(cmd)
}
<-cmdDone
} }
func (p *ProcessCommand) ID() string { func (p *ProcessCommand) ID() string {
@@ -0,0 +1,262 @@
//go:build !windows
package process
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"
"github.com/mostlygeek/llama-swap/internal/config"
)
// TestProcessCommand_StopForkingWrapper is a regression for the bug reported
// against v219 where Stop would hang indefinitely when the upstream command
// is a shell wrapper that forks the real binary (e.g. `#!/bin/bash` then
// `"$@"`). After SIGTERM the wrapper dies but the grandchild inherits the
// stdout/stderr pipes; cmd.Wait() blocks waiting for the pipe-copy goroutine
// to drain EOF, which never happens while the grandchild holds the fds.
//
// The fix is cmd.WaitDelay (combined with exec.CommandContext + cmd.Cancel),
// which causes the runtime to force-close the pipes after the delay so
// cmd.Wait() — and therefore Stop — returns.
func TestProcessCommand_StopForkingWrapper(t *testing.T) {
skipIfNoSimpleResponder(t)
port := getFreePort(t)
dir := t.TempDir()
pidFile := filepath.Join(dir, "child.pid")
// Wrapper script: backgrounds the child (which inherits stdout/stderr),
// records its PID for cleanup, then waits. When SIGTERM hits bash it
// dies without forwarding the signal; the grandchild keeps running and
// keeps the inherited pipe fds open. This is the scenario reported in
// the v219 regression.
wrapper := filepath.Join(dir, "wrapper.sh")
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
simpleResponderPath, port, pidFile)
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
t.Fatalf("WriteFile: %v", err)
}
t.Cleanup(func() { killChildFromPidFile(pidFile) })
p := newProcessCommand(t, config.ModelConfig{
Cmd: wrapper,
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
CheckEndpoint: "/health",
HealthCheckTimeout: 10,
})
// Shrink the pipe-close backstop so the test doesn't sit at the
// production default (10s). Must be set before Run() so doStart picks
// it up when building the cmd.
const testWaitDelay = 250 * time.Millisecond
p.waitDelay = testWaitDelay
runErr := runAsync(t, p)
// Stop must return within a bounded time even though the grandchild
// is still holding the pipe open. Budget is generous on top of
// testWaitDelay to absorb scheduling jitter on slow CI runners; the
// pre-fix behaviour was an unbounded hang, so any reasonable cap
// distinguishes pass from fail.
stopReturned := make(chan error, 1)
stopStart := time.Now()
go func() { stopReturned <- p.Stop(testStopTimeout) }()
const stopBudget = testWaitDelay + 2*time.Second
select {
case err := <-stopReturned:
if err != nil {
t.Fatalf("Stop: %v", err)
}
t.Logf("Stop returned in %v", time.Since(stopStart))
case <-time.After(stopBudget):
t.Fatalf("Stop did not return within %v — cmd.Wait() likely hung on inherited pipe", stopBudget)
}
if got := p.State(); got != StateStopped {
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
}
select {
case <-runErr:
case <-time.After(testReturnTimeout):
t.Errorf("Run did not return after Stop")
}
}
// TestProcessCommand_StopHonorsGracefulTimeout is a regression for the bug
// where cmd.WaitDelay capped the graceful shutdown window. killProcess used to
// cancel the cmd context to deliver SIGTERM, which starts cmd.WaitDelay
// immediately; a process whose SIGTERM handler needs longer than WaitDelay to
// finish was force-killed early even though Stop was given a much longer
// timeout. The fix sends the signal directly so WaitDelay measures from process
// exit (its inherited-pipe backstop role), leaving the graceful window to the
// caller's Stop timeout.
func TestProcessCommand_StopHonorsGracefulTimeout(t *testing.T) {
dir := t.TempDir()
marker := filepath.Join(dir, "graceful.done")
ready := filepath.Join(dir, "trap.ready")
// On SIGTERM, sleep past the (short) WaitDelay, then write the marker and
// exit cleanly. If WaitDelay still drove the kill, bash would be SIGKILLed
// mid-handler and the marker would never be written. The ready file is
// written only after the trap is installed so the test does not race
// SIGTERM ahead of it (CheckEndpoint:none marks ready before bash runs).
script := filepath.Join(dir, "graceful.sh")
body := fmt.Sprintf(
"#!/bin/bash\ncleanup() { sleep 0.6; echo done > %q; exit 0; }\ntrap cleanup SIGTERM\necho ready > %q\nwhile true; do sleep 0.1; done\n",
marker, ready,
)
if err := os.WriteFile(script, []byte(body), 0o755); err != nil {
t.Fatalf("WriteFile: %v", err)
}
p := newProcessCommand(t, config.ModelConfig{
Cmd: script,
Proxy: "http://127.0.0.1:1", // unused: health check disabled
CheckEndpoint: "none",
})
// WaitDelay shorter than the handler's 0.6s sleep, and far shorter than the
// Stop timeout below — this is the window the old code mis-killed in.
p.waitDelay = 200 * time.Millisecond
runErr := runAsync(t, p)
// Wait until the trap is installed before stopping.
trapDeadline := time.Now().Add(2 * time.Second)
for {
if _, err := os.Stat(ready); err == nil {
break
}
if time.Now().After(trapDeadline) {
t.Fatalf("script did not install SIGTERM trap in time")
}
time.Sleep(10 * time.Millisecond)
}
stopStart := time.Now()
if err := p.Stop(5 * time.Second); err != nil {
t.Fatalf("Stop: %v", err)
}
elapsed := time.Since(stopStart)
// The handler must have run to completion (marker written) rather than
// being force-killed at waitDelay.
if _, err := os.Stat(marker); err != nil {
t.Fatalf("graceful handler did not complete (marker missing): %v", err)
}
// And Stop must have waited for the handler (>~0.6s), not returned at the
// 200ms waitDelay.
if elapsed < 500*time.Millisecond {
t.Fatalf("Stop returned in %v — process was killed before its graceful handler finished", elapsed)
}
if got := p.State(); got != StateStopped {
t.Errorf("after Stop: expected state %s, got %s", StateStopped, got)
}
select {
case <-runErr:
case <-time.After(testReturnTimeout):
t.Errorf("Run did not return after Stop")
}
}
// TestProcessCommand_StopReapsForkedGrandchild verifies that stopping a forking
// wrapper takes down the backgrounded grandchild too, rather than leaving it as
// an orphan. The fix is Setpgid (runtime_unix.go): the wrapper leads its own
// process group, so the stop signal is delivered to the whole group via the
// negative PID and reaches the grandchild the wrapper never reaped.
func TestProcessCommand_StopReapsForkedGrandchild(t *testing.T) {
skipIfNoSimpleResponder(t)
port := getFreePort(t)
dir := t.TempDir()
pidFile := filepath.Join(dir, "child.pid")
wrapper := filepath.Join(dir, "wrapper.sh")
script := fmt.Sprintf("#!/bin/bash\n%q -port %d -silent &\necho $! > %q\nwait\n",
simpleResponderPath, port, pidFile)
if err := os.WriteFile(wrapper, []byte(script), 0o755); err != nil {
t.Fatalf("WriteFile: %v", err)
}
t.Cleanup(func() { killChildFromPidFile(pidFile) })
p := newProcessCommand(t, config.ModelConfig{
Cmd: wrapper,
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
CheckEndpoint: "/health",
HealthCheckTimeout: 10,
})
runErr := runAsync(t, p)
// Read the grandchild PID the wrapper recorded.
var childPID int
deadline := time.Now().Add(2 * time.Second)
for {
data, err := os.ReadFile(pidFile)
if err == nil {
if pid, perr := strconv.Atoi(strings.TrimSpace(string(data))); perr == nil && pid > 0 {
childPID = pid
break
}
}
if time.Now().After(deadline) {
t.Fatalf("wrapper did not record grandchild PID")
}
time.Sleep(10 * time.Millisecond)
}
if err := p.Stop(testStopTimeout); err != nil {
t.Fatalf("Stop: %v", err)
}
// After Stop the grandchild must be gone. Signal 0 probes liveness without
// actually sending a signal; give it a brief window to exit after the
// group SIGTERM.
proc, err := os.FindProcess(childPID)
if err != nil {
t.Fatalf("FindProcess: %v", err)
}
gone := false
for i := 0; i < 100; i++ {
if err := proc.Signal(syscall.Signal(0)); err != nil {
gone = true
break
}
time.Sleep(10 * time.Millisecond)
}
if !gone {
t.Errorf("grandchild PID %d still alive after Stop — process group was not reaped", childPID)
}
select {
case <-runErr:
case <-time.After(testReturnTimeout):
t.Errorf("Run did not return after Stop")
}
}
// killChildFromPidFile reads a PID written by the wrapper script and SIGKILLs
// it so leaked orphans don't accumulate between test runs. Best-effort.
func killChildFromPidFile(pidFile string) {
data, err := os.ReadFile(pidFile)
if err != nil {
return
}
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil || pid <= 0 {
return
}
proc, err := os.FindProcess(pid)
if err != nil {
return
}
_ = proc.Kill()
}
+34 -2
View File
@@ -4,9 +4,41 @@ package process
import ( import (
"os/exec" "os/exec"
"syscall"
) )
// setProcAttributes sets platform-specific process attributes // setProcAttributes starts the upstream in its own process group (Setpgid) so
// the entire process tree can be signalled at once via its negative PID. This
// is what lets us reap a forked grandchild — e.g. a shell wrapper that
// backgrounds the real binary and exits — instead of leaking it as an orphan
// that holds the inherited stdout/stderr pipes open.
func setProcAttributes(cmd *exec.Cmd) { func setProcAttributes(cmd *exec.Cmd) {
// No-op on Unix systems cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}
// terminateProcessTree sends SIGTERM to the whole process group led by the
// command, giving every process in the tree a chance to shut down gracefully.
func terminateProcessTree(cmd *exec.Cmd) error {
return signalProcessTree(cmd, syscall.SIGTERM)
}
// killProcessTree sends SIGKILL to the whole process group, force-terminating
// every process in the tree.
func killProcessTree(cmd *exec.Cmd) error {
return signalProcessTree(cmd, syscall.SIGKILL)
}
// signalProcessTree signals the process group led by cmd.Process. Because the
// child was started with Setpgid it is its own group leader (pgid == pid), so
// targeting -pid reaches the child and every descendant still in the group.
// Falls back to signalling just the child if the group send fails (e.g. the
// group has already drained), so we never silently skip the signal.
func signalProcessTree(cmd *exec.Cmd, sig syscall.Signal) error {
if cmd == nil || cmd.Process == nil {
return nil
}
if err := syscall.Kill(-cmd.Process.Pid, sig); err != nil {
return cmd.Process.Signal(sig)
}
return nil
} }
+20
View File
@@ -14,3 +14,23 @@ func setProcAttributes(cmd *exec.Cmd) {
CreationFlags: 0x08000000, // CREATE_NO_WINDOW CreationFlags: 0x08000000, // CREATE_NO_WINDOW
} }
} }
// terminateProcessTree asks the upstream process to stop. Windows has no
// process-group signalling here — process-tree teardown is handled by the
// configured CmdStop, which defaults to `taskkill /f /t` — so this preserves
// the previous single-process SIGTERM behaviour.
func terminateProcessTree(cmd *exec.Cmd) error {
if cmd == nil || cmd.Process == nil {
return nil
}
return cmd.Process.Signal(syscall.SIGTERM)
}
// killProcessTree force-terminates the upstream process. Tree teardown on
// Windows relies on CmdStop (taskkill /t); this kills the launched process.
func killProcessTree(cmd *exec.Cmd) error {
if cmd == nil || cmd.Process == nil {
return nil
}
return cmd.Process.Kill()
}
+16 -12
View File
@@ -745,24 +745,28 @@ func (b *baseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}() }()
} }
// finishLoading stops the loading stream and fences its goroutine off from
// the ResponseWriter before the real handler (or ServeHTTP's return)
// reclaims it. release() must run even when waitForCompletion times out:
// otherwise a still-streaming goroutine flushes a finalized response and
// panics on the recycled *bufio.Writer.
finishLoading := func() {
cancelLoad()
if lw != nil {
lw.waitForCompletion(1 * time.Second)
lw.release()
}
}
var resp handlerResp var resp handlerResp
select { select {
case resp = <-hr.respond: case resp = <-hr.respond:
cancelLoad() finishLoading()
if lw != nil {
lw.waitForCompletion(1 * time.Second)
}
case <-req.Context().Done(): case <-req.Context().Done():
cancelLoad() finishLoading()
if lw != nil {
lw.waitForCompletion(1 * time.Second)
}
return return
case <-b.shutdownCtx.Done(): case <-b.shutdownCtx.Done():
cancelLoad() finishLoading()
if lw != nil {
lw.waitForCompletion(1 * time.Second)
}
SendError(w, req, fmt.Errorf("%s is shutting down", b.name)) SendError(w, req, fmt.Errorf("%s is shutting down", b.name))
return return
} }
+31 -3
View File
@@ -38,6 +38,13 @@ type loadingWriter struct {
pendingMu sync.Mutex pendingMu sync.Mutex
pendingUpdate string pendingUpdate string
// writeMu serializes writes to the underlying writer and guards released.
// Once released is set, the streaming goroutine must not touch the writer
// again — ServeHTTP has reclaimed it (to run the real handler or to return)
// and writing/flushing a finalized response panics.
writeMu sync.Mutex
released bool
// closed by start when the goroutine finishes (after cleanup messages) // closed by start when the goroutine finishes (after cleanup messages)
done chan struct{} done chan struct{}
@@ -217,12 +224,33 @@ func (s *loadingWriter) sendData(data string) {
return return
} }
_, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData) s.writeMu.Lock()
if err != nil { defer s.writeMu.Unlock()
// Once ServeHTTP has reclaimed the writer (release), writing/flushing it
// races the real handler or panics on a finalized response. Stop here.
if s.released {
return
}
if _, err = fmt.Fprintf(s.writer, "data: %s\n\n", jsonData); err != nil {
s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err) s.logger.Debugf("<%s> Failed to write SSE data (client likely disconnected): %v", s.modelName, err)
return return
} }
s.Flush() if flusher, ok := s.writer.(http.Flusher); ok {
flusher.Flush()
}
}
// release fences the loadingWriter off from the underlying ResponseWriter.
// After it returns, the streaming goroutine will not write to or flush the
// writer again: any in-flight write completes under writeMu first, and later
// writes short-circuit on released. The caller can then safely hand the writer
// to the real handler or let ServeHTTP return without racing a finalized
// response (a use-after-return Flush panics on the recycled *bufio.Writer).
func (s *loadingWriter) release() {
s.writeMu.Lock()
s.released = true
s.writeMu.Unlock()
} }
func (s *loadingWriter) Header() http.Header { func (s *loadingWriter) Header() http.Header {
+3 -1
View File
@@ -45,7 +45,9 @@ func CreateConcurrencyMiddleware(cfg config.Config) chain.Middleware {
return return
} }
if !sem.TryAcquire(1) { if !sem.TryAcquire(1) {
http.Error(w, "Too many requests", http.StatusTooManyRequests) w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte(`{"error":"Too many requests"}`))
return return
} }
defer sem.Release(1) defer sem.Release(1)
-1
View File
@@ -1 +0,0 @@
placeholder so //go:embed ui_dist succeeds before the UI is built
@@ -0,0 +1,632 @@
<script lang="ts">
import { models } from "../../stores/api";
import { persistentStore } from "../../stores/persistent";
import { streamChatCompletion } from "../../lib/chatApi";
type Status = "waiting" | "streaming" | "done" | "error";
type Phase = "waiting" | "loading" | "reasoning" | "content";
type RunState = {
status: Status;
loadingText: string;
reasoningContent: string;
content: string;
loadingDone: boolean;
waitingMs: number;
loadingMs: number;
reasoningMs: number;
contentMs: number;
phase: Phase;
elapsedMs: number;
error?: string;
};
type TestEntry = { id: string; model: string };
const LOAD_MARKER = "━━━━━";
const DEFAULT_PROMPT = "Write a few sentences about the history of computing.";
const DEFAULT_MAX_TOKENS = 256;
const promptStore = persistentStore<string>("concurrency-prompt", DEFAULT_PROMPT);
const maxTokensStore = persistentStore<number>("concurrency-max-tokens", DEFAULT_MAX_TOKENS);
const testListStore = persistentStore<TestEntry[]>("concurrency-test-list", []);
let runs = $state<Record<string, RunState>>({});
let isRunning = $state(false);
let abortController: AbortController | null = null;
let dragIndex = $state<number | null>(null);
let dragOverIndex = $state<number | null>(null);
const timelineCollapsedStore = persistentStore<boolean>("concurrency-timeline-collapsed", false);
let timelineMaxMs = $derived(Math.max(100, ...Object.values(runs).map((r) => r.elapsedMs)));
let availableModels = $derived($models.filter((m) => !m.unlisted));
let hasModels = $derived(availableModels.length > 0);
let canRun = $derived(!isRunning && $testListStore.length > 0 && $promptStore.trim() !== "");
function newId(): string {
if (typeof crypto !== "undefined" && "randomUUID" in crypto) {
return crypto.randomUUID();
}
return `${Date.now()}-${Math.random().toString(36).slice(2)}`;
}
function addModel(modelId: string) {
if (isRunning) return;
testListStore.update((list) => [...list, { id: newId(), model: modelId }]);
}
function removeEntry(id: string) {
if (isRunning) return;
testListStore.update((list) => list.filter((e) => e.id !== id));
const next = { ...runs };
delete next[id];
runs = next;
}
function clearAll() {
if (isRunning) return;
testListStore.set([]);
runs = {};
}
function onDragStart(i: number, e: DragEvent) {
if (isRunning) return;
dragIndex = i;
if (e.dataTransfer) {
e.dataTransfer.effectAllowed = "move";
e.dataTransfer.setData("text/plain", String(i));
}
}
function onDragOver(i: number, e: DragEvent) {
if (isRunning || dragIndex === null) return;
e.preventDefault();
if (e.dataTransfer) e.dataTransfer.dropEffect = "move";
dragOverIndex = i;
}
function onDrop(i: number, e: DragEvent) {
if (isRunning || dragIndex === null) return;
e.preventDefault();
const from = dragIndex;
const to = i;
dragIndex = null;
dragOverIndex = null;
if (from === to) return;
testListStore.update((list) => {
const next = [...list];
const [moved] = next.splice(from, 1);
next.splice(to, 0, moved);
return next;
});
}
function onDragEnd() {
dragIndex = null;
dragOverIndex = null;
}
function emptyRun(): RunState {
return {
status: "waiting",
loadingText: "",
reasoningContent: "",
content: "",
loadingDone: false,
waitingMs: 0,
loadingMs: 0,
reasoningMs: 0,
contentMs: 0,
phase: "waiting",
elapsedMs: 0,
};
}
// Detect and split the llama-swap loading block (wrapped in ━━━━━ markers,
// delivered as reasoning_content) from the model's own reasoning tokens.
function ingestReasoning(
prev: RunState,
chunk: string
): { loadingText: string; reasoningContent: string; loadingDone: boolean; nowPhase: Phase } {
if (prev.loadingDone) {
return {
loadingText: prev.loadingText,
reasoningContent: prev.reasoningContent + chunk,
loadingDone: true,
nowPhase: "reasoning",
};
}
const combined = prev.loadingText + chunk;
// Not enough to decide whether this is a loading marker
if (combined.length < LOAD_MARKER.length) {
if (LOAD_MARKER.startsWith(combined)) {
return { loadingText: combined, reasoningContent: prev.reasoningContent, loadingDone: false, nowPhase: "loading" };
}
return {
loadingText: "",
reasoningContent: prev.reasoningContent + combined,
loadingDone: true,
nowPhase: "reasoning",
};
}
if (!combined.startsWith(LOAD_MARKER)) {
return {
loadingText: "",
reasoningContent: prev.reasoningContent + combined,
loadingDone: true,
nowPhase: "reasoning",
};
}
// We're inside a loading block — look for the closing marker
const closingIdx = combined.indexOf(LOAD_MARKER, LOAD_MARKER.length);
if (closingIdx < 0) {
return { loadingText: combined, reasoningContent: prev.reasoningContent, loadingDone: false, nowPhase: "loading" };
}
const newlineIdx = combined.indexOf("\n", closingIdx);
const sliceEnd = newlineIdx >= 0 ? newlineIdx + 1 : combined.length;
const loadingPart = combined.substring(0, sliceEnd);
// Strip the trailing " \n" the loader sends after the closing marker
const remainder = combined.substring(sliceEnd).replace(/^[ \t]*\n?/, "");
return {
loadingText: loadingPart,
reasoningContent: prev.reasoningContent + remainder,
loadingDone: true,
nowPhase: remainder ? "reasoning" : "waiting",
};
}
async function runOne(entry: TestEntry, signal: AbortSignal) {
const start = performance.now();
let phaseStart = start;
runs[entry.id] = { ...emptyRun(), status: "streaming" };
const accrue = (
prev: RunState,
now: number
): { waitingMs: number; loadingMs: number; reasoningMs: number; contentMs: number } => {
const delta = now - phaseStart;
const base = {
waitingMs: prev.waitingMs,
loadingMs: prev.loadingMs,
reasoningMs: prev.reasoningMs,
contentMs: prev.contentMs,
};
if (prev.phase === "waiting") return { ...base, waitingMs: base.waitingMs + delta };
if (prev.phase === "loading") return { ...base, loadingMs: base.loadingMs + delta };
if (prev.phase === "reasoning") return { ...base, reasoningMs: base.reasoningMs + delta };
if (prev.phase === "content") return { ...base, contentMs: base.contentMs + delta };
return base;
};
const ticker = window.setInterval(() => {
const prev = runs[entry.id];
if (!prev || prev.status !== "streaming") return;
const now = performance.now();
const accrued = accrue(prev, now);
phaseStart = now;
runs[entry.id] = { ...prev, ...accrued, elapsedMs: now - start };
}, 50);
try {
const stream = streamChatCompletion(entry.model, [{ role: "user", content: $promptStore }], signal, {
endpoint: "v1/chat/completions",
max_tokens: $maxTokensStore,
});
for await (const chunk of stream) {
if (chunk.done) break;
const prev = runs[entry.id];
if (!prev) break;
const now = performance.now();
const accrued = accrue(prev, now);
phaseStart = now;
let nextPhase: Phase = prev.phase;
let loadingText = prev.loadingText;
let reasoningContent = prev.reasoningContent;
let loadingDone = prev.loadingDone;
if (chunk.reasoning_content) {
const parsed = ingestReasoning(prev, chunk.reasoning_content);
loadingText = parsed.loadingText;
reasoningContent = parsed.reasoningContent;
loadingDone = parsed.loadingDone;
nextPhase = parsed.nowPhase;
}
if (chunk.content) nextPhase = "content";
runs[entry.id] = {
...prev,
...accrued,
loadingText,
reasoningContent,
content: prev.content + (chunk.content ?? ""),
loadingDone,
phase: nextPhase,
elapsedMs: now - start,
};
}
const prev = runs[entry.id];
if (prev) {
const now = performance.now();
const accrued = accrue(prev, now);
runs[entry.id] = { ...prev, ...accrued, status: "done", elapsedMs: now - start };
}
} catch (err) {
const prev = runs[entry.id] ?? emptyRun();
const now = performance.now();
const accrued = accrue(prev, now);
const aborted = err instanceof Error && err.name === "AbortError";
runs[entry.id] = {
...prev,
...accrued,
status: "error",
elapsedMs: now - start,
error: aborted ? "aborted" : err instanceof Error ? err.message : String(err),
};
} finally {
window.clearInterval(ticker);
}
}
async function run() {
if (!canRun) return;
const entries = $testListStore;
const initial: Record<string, RunState> = {};
for (const e of entries) {
initial[e.id] = emptyRun();
}
runs = initial;
isRunning = true;
abortController = new AbortController();
try {
await Promise.allSettled(entries.map((e) => runOne(e, abortController!.signal)));
} finally {
isRunning = false;
abortController = null;
}
}
function stop() {
abortController?.abort();
}
function waitingBarClass(run: RunState): string {
if (run.status === "error" && run.phase === "waiting") return "bg-red-500";
return "bg-slate-200 dark:bg-white/10";
}
function loadingBarClass(run: RunState): string {
if (run.status === "error" && run.phase === "loading") return "bg-red-500";
return "bg-slate-400 dark:bg-slate-500";
}
function reasoningBarClass(run: RunState): string {
if (run.status === "error" && run.phase === "reasoning") return "bg-red-500";
return "bg-purple-500";
}
function contentBarClass(run: RunState): string {
if (run.status === "error" && run.phase === "content") return "bg-red-500";
if (run.status === "done") return "bg-green-500";
return "bg-amber-400 dark:bg-amber-500";
}
function niceStepMs(maxMs: number): number {
if (maxMs <= 500) return 100;
if (maxMs <= 2000) return 500;
if (maxMs <= 5000) return 1000;
if (maxMs <= 20000) return 5000;
if (maxMs <= 60000) return 10000;
return 30000;
}
function formatTickMs(ms: number): string {
if (ms < 1000) return `${ms}`;
return `${(ms / 1000).toFixed(ms % 1000 === 0 ? 0 : 1)}s`;
}
let timelineTicks = $derived.by(() => {
const step = niceStepMs(timelineMaxMs);
const ticks: number[] = [];
for (let t = 0; t <= timelineMaxMs; t += step) ticks.push(t);
return ticks;
});
function statusBadgeClass(status: Status): string {
switch (status) {
case "waiting":
return "bg-gray-200 text-gray-700 dark:bg-gray-700 dark:text-gray-200";
case "streaming":
return "bg-amber-200 text-amber-900 dark:bg-amber-500/30 dark:text-amber-200";
case "done":
return "bg-green-200 text-green-900 dark:bg-green-500/30 dark:text-green-200";
case "error":
return "bg-red-200 text-red-900 dark:bg-red-500/30 dark:text-red-200";
}
}
function formatElapsed(ms: number): string {
if (ms < 1000) return `${Math.round(ms)}ms`;
return `${(ms / 1000).toFixed(2)}s`;
}
function resetDefaults() {
promptStore.set(DEFAULT_PROMPT);
maxTokensStore.set(DEFAULT_MAX_TOKENS);
}
</script>
<div class="flex flex-col md:flex-row gap-4 h-full min-h-0">
<!-- Left column: run controls, model picker, settings -->
<div class="md:w-72 shrink-0 flex flex-col gap-3 min-h-0">
<!-- Run controls -->
<div class="flex items-center gap-2">
{#if isRunning}
<button class="btn bg-red-500 hover:bg-red-600 text-white border-red-500" onclick={stop}>
<span class="inline-block w-3 h-3 bg-white align-middle mr-2"></span>Stop
</button>
{:else}
<button
class="btn bg-primary text-btn-primary-text hover:opacity-90"
onclick={run}
disabled={!canRun}
title={$testListStore.length === 0 ? "Add models from the list below" : "Run concurrent requests"}
>
<span class="inline-block align-middle mr-2" aria-hidden="true"></span>Go
</button>
{/if}
<button class="btn btn--sm" onclick={clearAll} disabled={isRunning || $testListStore.length === 0}>
Clear ({$testListStore.length})
</button>
</div>
<!-- Available models -->
<div class="flex flex-col min-h-0 flex-1">
<div class="text-xs font-medium text-txtsecondary mb-1">
Models <span class="text-[10px] font-normal">— click to queue (add the same model more than once to test parallel requests)</span>
</div>
<div class="flex-1 border border-gray-200 dark:border-white/10 rounded overflow-y-auto min-h-0">
{#if !hasModels}
<div class="p-3 text-sm text-txtsecondary text-center">No models configured.</div>
{:else}
<ul class="divide-y divide-gray-100 dark:divide-white/5">
{#each availableModels as m (m.id)}
<li>
<button
class="w-full text-left px-2 py-1.5 text-sm hover:bg-secondary-hover transition-colors disabled:opacity-50 disabled:cursor-not-allowed flex items-center gap-2"
onclick={() => addModel(m.id)}
disabled={isRunning}
title="Add {m.id}"
>
<span class="text-primary" aria-hidden="true">+</span>
<span class="truncate flex-1">{m.id}</span>
</button>
</li>
{/each}
</ul>
{/if}
</div>
</div>
<!-- Settings -->
<div class="flex flex-col gap-2 border-t border-gray-200 dark:border-white/10 pt-3">
<div class="flex items-center justify-between">
<label for="concurrency-prompt" class="text-xs font-medium text-txtsecondary">Prompt</label>
<button
class="text-[10px] text-txtsecondary hover:text-txtmain underline"
onclick={resetDefaults}
disabled={isRunning}
>
reset defaults
</button>
</div>
<textarea
id="concurrency-prompt"
class="w-full px-2 py-1.5 text-sm rounded border border-gray-200 dark:border-white/10 bg-surface focus:outline-none focus:ring-2 focus:ring-primary resize-none"
rows="3"
bind:value={$promptStore}
disabled={isRunning}
></textarea>
<label for="concurrency-max-tokens" class="text-xs font-medium text-txtsecondary">max_tokens</label>
<input
id="concurrency-max-tokens"
type="number"
min="1"
class="w-full px-2 py-1.5 text-sm rounded border border-gray-200 dark:border-white/10 bg-surface focus:outline-none focus:ring-2 focus:ring-primary"
bind:value={$maxTokensStore}
disabled={isRunning}
/>
</div>
</div>
<!-- Right column: result panels (draggable to reorder) -->
<div class="flex-1 min-w-0 min-h-0 overflow-y-auto">
{#if $testListStore.length === 0}
<div class="h-full flex items-center justify-center px-6">
<div class="max-w-md text-sm text-txtsecondary space-y-4">
<h4 class="text-base font-semibold text-txtmain pb-0">Load Test</h4>
<p>
Fire several streaming chat completions at llama-swap at the same time to see how it handles parallel
loading and concurrent inference. Each request streams into its own panel with a live timer and status.
</p>
<ol class="list-decimal list-inside space-y-1">
<li>Click models on the left to queue them — repeat a model to hit it with parallel requests.</li>
<li>Tweak the prompt and <code>max_tokens</code> if you want.</li>
<li>Press <span class="font-semibold text-txtmain">Go</span> to launch them concurrently.</li>
</ol>
<p class="text-xs">Tip: drag a result card's header to reorder, or hit × to drop it.</p>
</div>
</div>
{:else}
<!-- Gantt-style timeline -->
<div class="mb-3 border border-gray-200 dark:border-white/10 rounded">
<button
class="w-full flex items-center gap-2 px-2 py-1.5 text-xs font-medium text-txtsecondary hover:bg-secondary-hover transition-colors {$timelineCollapsedStore ? 'rounded' : 'rounded-t border-b border-gray-200 dark:border-white/10'}"
onclick={() => timelineCollapsedStore.update((v) => !v)}
aria-expanded={!$timelineCollapsedStore}
>
<svg
class="w-4 h-4 transition-transform {$timelineCollapsedStore ? '-rotate-90' : ''}"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
aria-hidden="true"
>
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M19 9l-7 7-7-7"></path>
</svg>
<span>Timeline</span>
{#if !$timelineCollapsedStore}
<span class="flex items-center gap-3 text-[10px] text-txtsecondary font-normal ml-3" aria-hidden="true">
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-slate-200 dark:bg-white/10 border border-gray-300 dark:border-white/10"></span>waiting</span>
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-slate-400 dark:bg-slate-500"></span>loading</span>
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-purple-500"></span>reasoning</span>
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-amber-400 dark:bg-amber-500"></span>streaming</span>
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-green-500"></span>done</span>
<span class="flex items-center gap-1"><span class="inline-block w-2.5 h-2.5 rounded-sm bg-red-500"></span>error</span>
</span>
{/if}
<span class="ml-auto tabular-nums text-txtsecondary">
max {formatElapsed(timelineMaxMs)} · {$testListStore.length} request{$testListStore.length === 1 ? "" : "s"}
</span>
</button>
{#if !$timelineCollapsedStore}
<div class="px-2 py-2">
<!-- X axis ticks -->
<div class="flex" aria-hidden="true">
<div class="w-40 shrink-0"></div>
<div class="relative flex-1 h-4 border-b border-gray-200 dark:border-white/10">
{#each timelineTicks as t (t)}
<div
class="absolute top-0 bottom-0 border-l border-gray-200 dark:border-white/10"
style="left: {(t / timelineMaxMs) * 100}%;"
>
<span class="absolute -top-0.5 left-1 text-[10px] text-txtsecondary tabular-nums">{formatTickMs(t)}</span>
</div>
{/each}
</div>
<div class="w-16 shrink-0"></div>
</div>
<!-- Bars -->
<div class="flex flex-col gap-1 mt-1">
{#each $testListStore as entry, i (entry.id)}
{@const run = runs[entry.id]}
{@const waitingPct = run ? (run.waitingMs / timelineMaxMs) * 100 : 0}
{@const loadingPct = run ? (run.loadingMs / timelineMaxMs) * 100 : 0}
{@const reasoningPct = run ? (run.reasoningMs / timelineMaxMs) * 100 : 0}
{@const contentPct = run ? (run.contentMs / timelineMaxMs) * 100 : 0}
<div class="flex items-center text-xs">
<div class="w-40 shrink-0 flex items-center gap-1 pr-2 text-txtsecondary">
<span class="tabular-nums w-5 text-right">{i + 1}.</span>
<span class="truncate" title={entry.model}>{entry.model}</span>
</div>
<div class="relative flex-1 h-4">
{#each timelineTicks as t (t)}
<div
class="absolute top-0 bottom-0 border-l border-gray-100 dark:border-white/5"
style="left: {(t / timelineMaxMs) * 100}%;"
aria-hidden="true"
></div>
{/each}
{#if run && run.waitingMs > 0}
<div
class="absolute top-0.5 bottom-0.5 rounded-l-sm transition-all {waitingBarClass(run)}"
style="left: 0; width: {waitingPct}%;"
title="waiting {formatElapsed(run.waitingMs)}"
></div>
{/if}
{#if run && run.loadingMs > 0}
<div
class="absolute top-0.5 bottom-0.5 transition-all {loadingBarClass(run)} {run.waitingMs === 0 ? 'rounded-l-sm' : ''}"
style="left: {waitingPct}%; width: {loadingPct}%;"
title="loading {formatElapsed(run.loadingMs)}"
></div>
{/if}
{#if run && run.reasoningMs > 0}
<div
class="absolute top-0.5 bottom-0.5 transition-all {reasoningBarClass(run)} {run.waitingMs === 0 && run.loadingMs === 0 ? 'rounded-l-sm' : ''}"
style="left: {waitingPct + loadingPct}%; width: {reasoningPct}%;"
title="reasoning {formatElapsed(run.reasoningMs)}"
></div>
{/if}
{#if run && run.contentMs > 0}
<div
class="absolute top-0.5 bottom-0.5 transition-all {contentBarClass(run)} {run.waitingMs === 0 && run.loadingMs === 0 && run.reasoningMs === 0 ? 'rounded-l-sm' : ''} {run.status === 'done' || run.status === 'error' ? 'rounded-r-sm' : ''}"
style="left: {waitingPct + loadingPct + reasoningPct}%; width: {contentPct}%;"
title="content {formatElapsed(run.contentMs)}"
></div>
{/if}
</div>
<div class="w-16 shrink-0 pl-2 tabular-nums text-txtsecondary text-right">
{run ? formatElapsed(run.elapsedMs) : "—"}
</div>
</div>
{/each}
</div>
</div>
{/if}
</div>
<div class="grid grid-cols-1 lg:grid-cols-2 xl:grid-cols-3 gap-3" role="list">
{#each $testListStore as entry, i (entry.id)}
{@const run = runs[entry.id]}
{@const status = run?.status ?? "waiting"}
<div
class="border rounded flex flex-col min-h-0 transition-colors {dragOverIndex === i && dragIndex !== i
? 'border-primary ring-2 ring-primary/40'
: 'border-gray-200 dark:border-white/10'} {dragIndex === i ? 'opacity-40' : ''}"
style="height: 280px;"
role="listitem"
ondragover={(e) => onDragOver(i, e)}
ondrop={(e) => onDrop(i, e)}
>
<div
class="shrink-0 flex items-center gap-2 px-2 py-1.5 border-b border-gray-200 dark:border-white/10 bg-secondary/40 rounded-t"
draggable={!isRunning}
role="button"
tabindex="-1"
aria-label="Drag to reorder {entry.model}"
ondragstart={(e) => onDragStart(i, e)}
ondragend={onDragEnd}
class:cursor-grab={!isRunning}
title={isRunning ? "" : "Drag to reorder"}
>
<span class="text-txtsecondary select-none" aria-hidden="true">⋮⋮</span>
<span class="text-txtsecondary tabular-nums text-xs w-5 text-right">{i + 1}.</span>
<span class="flex-1 truncate text-sm font-medium" title={entry.model}>{entry.model}</span>
<span class="text-xs tabular-nums text-txtsecondary">
{run ? formatElapsed(run.elapsedMs) : "—"}
</span>
<span class="status text-[10px] {statusBadgeClass(status)}">{status}</span>
<button
class="w-5 h-5 flex items-center justify-center text-txtsecondary hover:text-red-500 transition-colors rounded disabled:opacity-30 disabled:cursor-not-allowed"
onclick={() => removeEntry(entry.id)}
disabled={isRunning}
aria-label="Remove"
tabindex="-1"
>
×
</button>
</div>
<div class="flex-1 min-h-0 overflow-y-auto font-mono text-xs px-2 py-1.5">
{#if run?.loadingText}
<div class="bg-secondary/40 dark:bg-white/5 text-txtsecondary rounded px-2 py-1 mb-2 whitespace-pre-wrap">{run.loadingText.trim()}</div>
{/if}
{#if run?.reasoningContent}
<div class="text-purple-700 dark:text-purple-300 whitespace-pre-wrap">{run.reasoningContent}</div>
{/if}
{#if run?.content}
<div class="whitespace-pre-wrap {run.reasoningContent ? 'mt-2' : ''}">{run.content}</div>
{/if}
{#if run?.status === "error" && run?.error}
<div class="text-red-500 mt-2">[error] {run.error}</div>
{/if}
</div>
</div>
{/each}
</div>
{/if}
</div>
</div>
+14 -4
View File
@@ -5,8 +5,9 @@
import AudioInterface from "../components/playground/AudioInterface.svelte"; import AudioInterface from "../components/playground/AudioInterface.svelte";
import SpeechInterface from "../components/playground/SpeechInterface.svelte"; import SpeechInterface from "../components/playground/SpeechInterface.svelte";
import RerankInterface from "../components/playground/RerankInterface.svelte"; import RerankInterface from "../components/playground/RerankInterface.svelte";
import ConcurrencyInterface from "../components/playground/ConcurrencyInterface.svelte";
type Tab = "chat" | "images" | "speech" | "audio" | "rerank"; type Tab = "chat" | "images" | "speech" | "audio" | "rerank" | "concurrency";
const selectedTabStore = persistentStore<Tab>("playground-selected-tab", "chat"); const selectedTabStore = persistentStore<Tab>("playground-selected-tab", "chat");
let mobileMenuOpen = $state(false); let mobileMenuOpen = $state(false);
@@ -17,6 +18,7 @@
{ id: "speech", label: "Speech" }, { id: "speech", label: "Speech" },
{ id: "audio", label: "Transcription" }, { id: "audio", label: "Transcription" },
{ id: "rerank", label: "Rerank" }, { id: "rerank", label: "Rerank" },
{ id: "concurrency", label: "Load Test" },
]; ];
function selectTab(tab: Tab) { function selectTab(tab: Tab) {
@@ -25,7 +27,7 @@
} }
function getTabLabel(tabId: Tab): string { function getTabLabel(tabId: Tab): string {
return tabs.find(t => t.id === tabId)?.label || ""; return tabs.find((t) => t.id === tabId)?.label || "";
} }
</script> </script>
@@ -49,10 +51,15 @@
</svg> </svg>
</button> </button>
{#if mobileMenuOpen} {#if mobileMenuOpen}
<div class="absolute top-full left-0 right-0 mt-1 bg-surface border border-gray-200 dark:border-white/10 rounded shadow-lg z-10"> <div
class="absolute top-full left-0 right-0 mt-1 bg-surface border border-gray-200 dark:border-white/10 rounded shadow-lg z-10"
>
{#each tabs as tab (tab.id)} {#each tabs as tab (tab.id)}
<button <button
class="w-full px-4 py-2 text-left hover:bg-secondary-hover transition-colors first:rounded-t last:rounded-b {$selectedTabStore === tab.id ? 'bg-primary/10 font-medium' : ''}" class="w-full px-4 py-2 text-left hover:bg-secondary-hover transition-colors first:rounded-t last:rounded-b {$selectedTabStore ===
tab.id
? 'bg-primary/10 font-medium'
: ''}"
onclick={() => selectTab(tab.id)} onclick={() => selectTab(tab.id)}
> >
{tab.label} {tab.label}
@@ -94,6 +101,9 @@
<div class="h-full" class:tab-hidden={$selectedTabStore !== "rerank"}> <div class="h-full" class:tab-hidden={$selectedTabStore !== "rerank"}>
<RerankInterface /> <RerankInterface />
</div> </div>
<div class="h-full" class:tab-hidden={$selectedTabStore !== "concurrency"}>
<ConcurrencyInterface />
</div>
</div> </div>
</div> </div>
+1 -1
View File
@@ -22,7 +22,7 @@ export default defineConfig({
], ],
base: "/ui/", base: "/ui/",
build: { build: {
outDir: "../proxy/ui_dist", outDir: "../internal/server/ui_dist",
assetsDir: "assets", assetsDir: "assets",
}, },
server: { server: {