Files
gadfly/cmd/gadfly/lens_concurrency_test.go
T
steve b860119332
Build & push image / build-and-push (pull_request) Successful in 11s
Adversarial Review (Gadfly) / review (pull_request) Successful in 25m56s
feat: re-platform agentic review onto executus + large-PR cost controls
Fixes the large-PR token burn: a ~250K-token diff was re-sent every agent
step across models × lenses × passes, draining a metered usage block in
minutes. Small PRs are untouched (every mitigation is size-gated / no-op
under threshold).

- Re-platform the in-process review path onto executus run.Executor: context
  compaction (executus/compact, threshold from the model's real context window
  via executus/model), run-bounding, a per-PR budget gate (Ports.Budget), and
  the wrap-up nudge re-expressed as a run.Critic. Lens fan-out now uses
  executus/fanout. gadfly keeps its own model.go, so GADFLY_ENDPOINT_<NAME>
  aliases and the claude-code engine are unaffected. No majordomo bump; the
  binary stays static (executus core is majordomo+stdlib only).
- Paginate get_diff (per-file `path` + start_line/limit) instead of dumping the
  whole diff; trim the recheck diff embed (60k -> 20k chars).
- entrypoint.sh: downshift the fleet above GADFLY_HUGE_DIFF_BYTES (one cheap
  model, fewer lenses/steps, no recheck) + a swarm-wide GADFLY_PR_BUDGET_SECS
  wall-clock backstop (adds procps for pkill). All advisory; CI never fails.
- README + CLAUDE.md + tests updated.

Note: run.Result exposes no transcript, so the old transcript-based forced-
finalization fallback is dropped; the wrap-up critic nudge is the remaining
"always emit something" mechanism.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:43:34 -04:00

190 lines
6.3 KiB
Go

package main
import (
"context"
"sync"
"testing"
"time"
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
)
// trackingModel wraps a real llm.Model and records the maximum number of
// concurrent Generate calls, holding each call open briefly so overlap is
// observable. The concurrency tracking + sleep happen OUTSIDE the wrapped
// model's call, so (unlike the fake provider, which holds its mutex while
// invoking its default func) this measures genuine caller-side concurrency.
type trackingModel struct {
inner llm.Model
onEnter func()
onExit func()
}
func (m *trackingModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
m.onEnter()
defer m.onExit()
time.Sleep(40 * time.Millisecond) // overlap window: concurrent lenses pile up here
return m.inner.Generate(ctx, req, opts...)
}
func (m *trackingModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
func (m *trackingModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// peakTrackingModel returns a model that replies with a clean verdict (so the
// recheck pass is skipped — one Generate per lens) and a getter for the peak
// number of concurrent Generate calls observed.
func peakTrackingModel(t *testing.T) (llm.Model, func() int) {
t.Helper()
p := fake.New("fake", fake.WithDefault(func(_ string, _ llm.Request) fake.Step {
return fake.Reply("No material issues found.")
}))
inner, err := p.Model("mock")
if err != nil {
t.Fatal(err)
}
var mu sync.Mutex
var inFlight, peak int
enter := func() {
mu.Lock()
defer mu.Unlock()
inFlight++
if inFlight > peak {
peak = inFlight
}
}
exit := func() {
mu.Lock()
defer mu.Unlock()
inFlight--
}
return &trackingModel{inner: inner, onEnter: enter, onExit: exit},
func() int { mu.Lock(); defer mu.Unlock(); return peak }
}
func threeLenses() []Specialist {
return []Specialist{
{Name: "security", Title: "🔒 Security", Focus: "x"},
{Name: "correctness", Title: "🎯 Correctness", Focus: "y"},
{Name: "error-handling", Title: "🧯 Errors", Focus: "z"},
}
}
func assertOrderClean(t *testing.T, specs []Specialist, results []specialistResult) {
t.Helper()
if len(results) != len(specs) {
t.Fatalf("got %d results, want %d", len(results), len(specs))
}
for i, r := range results {
if r.spec.Name != specs[i].Name {
t.Errorf("result %d is %q, want %q (order must follow the specialist list, not finish order)", i, r.spec.Name, specs[i].Name)
}
if r.errored {
t.Errorf("lens %q unexpectedly errored", r.spec.Name)
}
if r.verdict != verdictClean {
t.Errorf("lens %q verdict = %v, want clean", r.spec.Name, r.verdict)
}
}
}
// TestRunSpecialists_FansOut: with GADFLY_LENS_CONCURRENCY=3 all three lenses run
// at once (peak == 3), and results still come back in specialist order.
func TestRunSpecialists_FansOut(t *testing.T) {
t.Setenv("GADFLY_LENS_CONCURRENCY", "3")
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "")
mdl, peak := peakTrackingModel(t)
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
if err != nil {
t.Fatal(err)
}
specs := threeLenses()
results := runSpecialists(testEngine(t, mdl, fs), "sys", specs, "task", "diff")
if got := peak(); got != 3 {
t.Errorf("peak concurrent lenses = %d, want 3", got)
}
assertOrderClean(t, specs, results)
}
// TestRunSpecialists_SequentialByDefault: an unset GADFLY_LENS_CONCURRENCY keeps
// the suite strictly serial (peak == 1) — the historical behavior.
func TestRunSpecialists_SequentialByDefault(t *testing.T) {
t.Setenv("GADFLY_LENS_CONCURRENCY", "")
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "")
mdl, peak := peakTrackingModel(t)
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
if err != nil {
t.Fatal(err)
}
specs := threeLenses()
results := runSpecialists(testEngine(t, mdl, fs), "sys", specs, "task", "diff")
if got := peak(); got != 1 {
t.Errorf("peak concurrent lenses = %d, want 1 (sequential by default)", got)
}
assertOrderClean(t, specs, results)
}
// TestRunSpecialists_PerProviderFanOut: a per-provider override fans this model's
// lenses out even when the scalar default is serial — the model's lane (from
// GADFLY_MODEL's prefix) is matched against GADFLY_PROVIDER_LENS_CONCURRENCY.
func TestRunSpecialists_PerProviderFanOut(t *testing.T) {
t.Setenv("GADFLY_MODEL", "m1/qwen3:14b")
t.Setenv("GADFLY_LENS_CONCURRENCY", "1") // scalar default: serial
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "m1=3") // but the m1 lane fans out
mdl, peak := peakTrackingModel(t)
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
if err != nil {
t.Fatal(err)
}
specs := threeLenses()
results := runSpecialists(testEngine(t, mdl, fs), "sys", specs, "task", "diff")
if got := peak(); got != 3 {
t.Errorf("peak concurrent lenses = %d, want 3 (m1 per-provider override)", got)
}
assertOrderClean(t, specs, results)
}
// TestLensConcurrency covers the resolution matrix: scalar default, scalar
// override, and per-provider override keyed by the model's resolved lane (same
// lane rule entrypoint.sh uses for GADFLY_PROVIDER_CONCURRENCY).
func TestLensConcurrency(t *testing.T) {
tests := []struct {
name string
model string
provider string
scalar string
provMap string
want int
}{
{"default", "", "", "", "", 1},
{"scalar", "", "", "4", "", 4},
{"scalar invalid falls back to default", "", "", "nope", "", 1},
{"bare id uses GADFLY_PROVIDER lane", "qwen3-coder:480b-cloud", "ollama-cloud", "1", "ollama-cloud=3", 3},
{"bare id with no provider defaults to ollama-cloud lane", "qwen3-coder:480b-cloud", "", "1", "ollama-cloud=7", 7},
{"prefixed spec uses its prefix lane", "m1/qwen3:14b", "", "1", "m1=2,ollama-cloud=3", 2},
{"no lane match falls back to scalar", "m5/qwen3.6:35b-mlx", "", "5", "m1=2", 5},
{"override wins over scalar", "ollama-cloud/x", "", "9", "ollama-cloud=2", 2},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv("GADFLY_MODEL", tt.model)
t.Setenv("GADFLY_PROVIDER", tt.provider)
t.Setenv("GADFLY_LENS_CONCURRENCY", tt.scalar)
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", tt.provMap)
if got := lensConcurrency(); got != tt.want {
t.Errorf("lensConcurrency() = %d, want %d", got, tt.want)
}
})
}
}