Files
llama-swap/proxy/processgroup_test.go
T
Benson Wong c3f0d43e6e proxy: fix race conditions during swap (#667)
I pointed Opus 4.7 (high effort) at proxy.ProcessGroup to identify any
race conditions in the swapping code. It found a race condition where
there is a small window in the fast path for routing a request to a
loaded model. There is a very small window where:

- model M1 is loaded and ready for requests
- a request, R1, for M1 comes in 
- a request, R2, for M2 comes in almost immediately after
- R1 acquires the lock, sees M1 is loaded (fast path), releases the lock
`[race window]` and the request is ready to be forwarded
- the race window occurs between the release of the lock and the request
being forwarded
  - the lock is released so requests can be handled concurrently 
- R2 comes in within the `[race window]`, acquires the lock, triggers a
model swap to M2. stopping M1
- R1 is forwarded to a model that is unloaded or in the process of
shutting down creating an error response

In deployed systems the race window is very small and doesn't happen
often. However with #635 and PR #656 I though this deserved a bit more
attention. It is not concluded that this race is the cause of #635 but
the race is likely to happen more often under sustained or high load.

AI Note: Opus 4.7 x-high effort took about an hour to write the original
patch. With the pattern discovered the fix to matrix.go was very quick.
GLM 5.1 using the previous established patterns was able to easily write
the fix for ProcessGroup.StopProcesses().

Supersedes: #656
Updates: #277, #635
2026-04-17 21:23:17 -07:00

346 lines
11 KiB
Go

package proxy
import (
"bytes"
"net/http"
"net/http/httptest"
"runtime"
"sync"
"testing"
"time"
"github.com/mostlygeek/llama-swap/proxy/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var processGroupTestConfig = config.AddDefaultGroupToConfig(config.Config{
HealthCheckTimeout: 15,
Models: map[string]config.ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
"model3": getTestSimpleResponderConfig("model3"),
"model4": getTestSimpleResponderConfig("model4"),
"model5": getTestSimpleResponderConfig("model5"),
},
Groups: map[string]config.GroupConfig{
"G1": {
Swap: true,
Exclusive: true,
Members: []string{"model1", "model2"},
},
"G2": {
Swap: false,
Exclusive: true,
Members: []string{"model3", "model4"},
},
},
})
func TestProcessGroup_DefaultHasCorrectModel(t *testing.T) {
pg := NewProcessGroup(config.DEFAULT_GROUP_ID, processGroupTestConfig, testLogger, testLogger)
assert.True(t, pg.HasMember("model5"))
}
func TestProcessGroup_HasMember(t *testing.T) {
pg := NewProcessGroup("G1", processGroupTestConfig, testLogger, testLogger)
assert.True(t, pg.HasMember("model1"))
assert.True(t, pg.HasMember("model2"))
assert.False(t, pg.HasMember("model3"))
}
// TestProcessGroup_ProxyRequestSwapIsTrueParallel tests that when swap is true
// and multiple requests are made in parallel, only one process is running at a time.
func TestProcessGroup_ProxyRequestSwapIsTrueParallel(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test")
}
var processGroupTestConfig = config.AddDefaultGroupToConfig(config.Config{
HealthCheckTimeout: 15,
Models: map[string]config.ModelConfig{
// use the same listening so if a model is already running, it will fail
// this is a way to test that swap isolation is working
// properly when there are parallel requests made at the
// same time.
"model1": getTestSimpleResponderConfigPort("model1", 9832),
"model2": getTestSimpleResponderConfigPort("model2", 9832),
"model3": getTestSimpleResponderConfigPort("model3", 9832),
"model4": getTestSimpleResponderConfigPort("model4", 9832),
"model5": getTestSimpleResponderConfigPort("model5", 9832),
},
Groups: map[string]config.GroupConfig{
"G1": {
Swap: true,
Members: []string{"model1", "model2", "model3", "model4", "model5"},
},
},
})
pg := NewProcessGroup("G1", processGroupTestConfig, testLogger, testLogger)
defer pg.StopProcesses(StopWaitForInflightRequest)
tests := []string{"model1", "model2", "model3", "model4", "model5"}
var wg sync.WaitGroup
wg.Add(len(tests))
for _, modelName := range tests {
go func(modelName string) {
defer wg.Done()
req := httptest.NewRequest("POST", "/v1/chat/completions", nil)
w := httptest.NewRecorder()
assert.NoError(t, pg.ProxyRequest(modelName, w, req))
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), modelName)
}(modelName)
}
wg.Wait()
}
// TestProcessGroup_ProxyRequestSwapRaceAgainstFastPath verifies that a swap
// request cannot stop the current process while a fast-path request (for the
// already-selected model) is in flight. Without ProcessGroup-level inflight
// tracking, a fast-path request that has released pg.Lock but has not yet
// incremented Process.inFlightRequests races with Stop()'s Wait() and the
// process is killed mid-request.
func TestProcessGroup_ProxyRequestSwapRaceAgainstFastPath(t *testing.T) {
cfg := config.AddDefaultGroupToConfig(config.Config{
HealthCheckTimeout: 15,
Models: map[string]config.ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
},
Groups: map[string]config.GroupConfig{
"G1": {
Swap: true,
Members: []string{"model1", "model2"},
},
},
})
pg := NewProcessGroup("G1", cfg, testLogger, testLogger)
defer pg.StopProcesses(StopImmediately)
// Bypass real subprocesses so the test is fast and deterministic.
pg.processes["model1"].testHandler = newTestHandler("model1")
pg.processes["model2"].testHandler = newTestHandler("model2")
// Prime: run a request through model1 via the swap path so that
// lastUsedProcess == "model1" and subsequent model1 requests take the
// fast path.
primeReq := httptest.NewRequest("POST", "/v1/chat/completions", nil)
primeW := httptest.NewRecorder()
require.NoError(t, pg.ProxyRequest("model1", primeW, primeReq))
require.Equal(t, http.StatusOK, primeW.Code)
require.Equal(t, StateReady, pg.processes["model1"].CurrentState())
require.Equal(t, StateStopped, pg.processes["model2"].CurrentState())
// Fast-path hook: signal arrival at the race window, then wait for
// release. This parks R2 deterministically at the point where pg.Lock
// has been released but Process.inFlightRequests has not yet been
// incremented — the exact window the race exploits.
r2Reached := make(chan struct{})
r2Release := make(chan struct{})
pg.testDelayFastPath = func() {
close(r2Reached)
<-r2Release
}
// R2: fast-path request for model1. Will pause at the test hook.
r2Done := make(chan struct{})
w2 := httptest.NewRecorder()
go func() {
defer close(r2Done)
req := httptest.NewRequest("POST", "/v1/chat/completions", nil)
assert.NoError(t, pg.ProxyRequest("model1", w2, req))
}()
// Deterministically wait for R2 to reach the race window.
<-r2Reached
// R3: swap request for model2. Must wait for R2 to finish before touching
// model1, otherwise model1 gets killed mid-request.
r3Done := make(chan struct{})
w3 := httptest.NewRecorder()
go func() {
defer close(r3Done)
req := httptest.NewRequest("POST", "/v1/chat/completions", nil)
assert.NoError(t, pg.ProxyRequest("model2", w3, req))
}()
// Spin until R3 has acquired pg.Lock and entered the swap critical
// section. In the fixed code, R3 then blocks on pg.inflight.Wait() while
// still holding the lock, so TryLock keeps failing.
for pg.TryLock() {
pg.Unlock()
runtime.Gosched()
}
// Bounded poll: give R3 a chance to demonstrate the bug by mutating
// state. In the fixed code, R3 is blocked on pg.inflight.Wait() and
// nothing changes, so we wait the full window. In the buggy code, R3
// will Stop() model1 and start serving via model2 within microseconds —
// we exit early once the mutation is observable.
deadline := time.Now().Add(100 * time.Millisecond)
for time.Now().Before(deadline) {
if pg.processes["model1"].CurrentState() != StateReady ||
pg.processes["model2"].CurrentState() != StateStopped {
break
}
done := false
select {
case <-r3Done:
done = true
default:
}
if done {
break
}
runtime.Gosched()
}
// Invariant: R3 must be blocked while R2 is still in flight.
select {
case <-r3Done:
t.Fatal("swap completed while fast-path request was still in flight — race not prevented")
default:
}
assert.Equal(t, StateReady, pg.processes["model1"].CurrentState(),
"model1 must stay Ready while a fast-path request is in flight")
assert.Equal(t, StateStopped, pg.processes["model2"].CurrentState(),
"model2 must not be started until R2 finishes and model1 is swapped out")
// Release R2 and let both requests finish.
close(r2Release)
<-r2Done
<-r3Done
assert.Equal(t, http.StatusOK, w2.Code)
assert.Contains(t, w2.Body.String(), "model1")
assert.Equal(t, http.StatusOK, w3.Code)
assert.Contains(t, w3.Body.String(), "model2")
}
// TestProcessGroup_StopProcessesWaitsForInflight verifies that StopProcesses
// (called externally, e.g. from ProxyManager.swapProcessGroup) cannot stop a
// process while a fast-path ProxyRequest is in the [pg.Unlock,
// Process.inFlightRequests.Add(1)] window. Without pg.inflight.Wait() in
// StopProcesses, the external caller bypasses the inflight guard and kills the
// process mid-request.
func TestProcessGroup_StopProcessesWaitsForInflight(t *testing.T) {
cfg := config.AddDefaultGroupToConfig(config.Config{
HealthCheckTimeout: 15,
Models: map[string]config.ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
},
Groups: map[string]config.GroupConfig{
"G1": {
Swap: true,
Members: []string{"model1", "model2"},
},
},
})
pg := NewProcessGroup("G1", cfg, testLogger, testLogger)
defer pg.StopProcesses(StopImmediately)
pg.processes["model1"].testHandler = newTestHandler("model1")
pg.processes["model2"].testHandler = newTestHandler("model2")
// Prime: model1 is active so subsequent model1 requests take the fast path.
primeReq := httptest.NewRequest("POST", "/v1/chat/completions", nil)
primeW := httptest.NewRecorder()
require.NoError(t, pg.ProxyRequest("model1", primeW, primeReq))
require.Equal(t, http.StatusOK, primeW.Code)
require.Equal(t, StateReady, pg.processes["model1"].CurrentState())
// Park a fast-path request at the race window.
r2Reached := make(chan struct{})
r2Release := make(chan struct{})
pg.testDelayFastPath = func() {
close(r2Reached)
<-r2Release
}
r2Done := make(chan struct{})
w2 := httptest.NewRecorder()
go func() {
defer close(r2Done)
req := httptest.NewRequest("POST", "/v1/chat/completions", nil)
assert.NoError(t, pg.ProxyRequest("model1", w2, req))
}()
<-r2Reached
// Simulate an external caller (e.g. ProxyManager.swapProcessGroup) stopping
// the group while a fast-path request is in flight.
r3Done := make(chan struct{})
go func() {
defer close(r3Done)
pg.StopProcesses(StopWaitForInflightRequest)
}()
// Spin until StopProcesses has acquired pg.Lock.
for pg.TryLock() {
pg.Unlock()
runtime.Gosched()
}
// Bounded poll: in the fixed code StopProcesses blocks on pg.inflight.Wait()
// and model1 stays Ready. In the buggy code it proceeds immediately and
// kills model1.
deadline := time.Now().Add(100 * time.Millisecond)
for time.Now().Before(deadline) {
if pg.processes["model1"].CurrentState() != StateReady {
break
}
select {
case <-r3Done:
goto done
default:
}
runtime.Gosched()
}
done:
select {
case <-r3Done:
t.Fatal("StopProcesses completed while a fast-path request was still in flight — race not prevented")
default:
}
assert.Equal(t, StateReady, pg.processes["model1"].CurrentState(),
"model1 must stay Ready while a fast-path request is in flight")
close(r2Release)
<-r2Done
<-r3Done
assert.Equal(t, http.StatusOK, w2.Code)
assert.Contains(t, w2.Body.String(), "model1")
}
func TestProcessGroup_ProxyRequestSwapIsFalse(t *testing.T) {
pg := NewProcessGroup("G2", processGroupTestConfig, testLogger, testLogger)
defer pg.StopProcesses(StopWaitForInflightRequest)
tests := []string{"model3", "model4"}
for _, modelName := range tests {
t.Run(modelName, func(t *testing.T) {
reqBody := `{"x", "y"}`
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
assert.NoError(t, pg.ProxyRequest(modelName, w, req))
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), modelName)
})
}
// make sure all the processes are running
for _, process := range pg.processes {
assert.Equal(t, StateReady, process.CurrentState())
}
}