feat: configurable lens fan-out, per-provider like model concurrency
Build & push image / build-and-push (push) Successful in 9s
Build & push image / build-and-push (push) Successful in 9s
Specialist lenses ran strictly sequentially within a model. Add a GADFLY_LENS_CONCURRENCY knob (default 1 = unchanged) that overlaps the independent per-lens review+recheck passes, so a model posts its consolidated comment as soon as its lenses finish. Per-provider configurable, mirroring GADFLY_PROVIDER_CONCURRENCY: GADFLY_PROVIDER_LENS_CONCURRENCY takes a "provider=N,..." map keyed by the same provider lanes (modelProvider() mirrors entrypoint's provider_of; providerOverride() mirrors provider_cap). The override wins for the model's lane, else the scalar default. runSpecialists fans out via a bounded worker pool, order-preserving (results written by index) and keeping each lens's own timeout/recheck. repoFS is immutable + fresh-toolbox-per-pass, so lenses share no mutable state (verified under -race). Docs/examples updated; dropped a duplicate GADFLY_TIMEOUT_SECS README row. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,189 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||
)
|
||||
|
||||
// trackingModel wraps a real llm.Model and records the maximum number of
|
||||
// concurrent Generate calls, holding each call open briefly so overlap is
|
||||
// observable. The concurrency tracking + sleep happen OUTSIDE the wrapped
|
||||
// model's call, so (unlike the fake provider, which holds its mutex while
|
||||
// invoking its default func) this measures genuine caller-side concurrency.
|
||||
type trackingModel struct {
|
||||
inner llm.Model
|
||||
onEnter func()
|
||||
onExit func()
|
||||
}
|
||||
|
||||
func (m *trackingModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
|
||||
m.onEnter()
|
||||
defer m.onExit()
|
||||
time.Sleep(40 * time.Millisecond) // overlap window: concurrent lenses pile up here
|
||||
return m.inner.Generate(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func (m *trackingModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
||||
return m.inner.Stream(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func (m *trackingModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
|
||||
|
||||
// peakTrackingModel returns a model that replies with a clean verdict (so the
|
||||
// recheck pass is skipped — one Generate per lens) and a getter for the peak
|
||||
// number of concurrent Generate calls observed.
|
||||
func peakTrackingModel(t *testing.T) (llm.Model, func() int) {
|
||||
t.Helper()
|
||||
p := fake.New("fake", fake.WithDefault(func(_ string, _ llm.Request) fake.Step {
|
||||
return fake.Reply("No material issues found.")
|
||||
}))
|
||||
inner, err := p.Model("mock")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var inFlight, peak int
|
||||
enter := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
inFlight++
|
||||
if inFlight > peak {
|
||||
peak = inFlight
|
||||
}
|
||||
}
|
||||
exit := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
inFlight--
|
||||
}
|
||||
return &trackingModel{inner: inner, onEnter: enter, onExit: exit},
|
||||
func() int { mu.Lock(); defer mu.Unlock(); return peak }
|
||||
}
|
||||
|
||||
func threeLenses() []Specialist {
|
||||
return []Specialist{
|
||||
{Name: "security", Title: "🔒 Security", Focus: "x"},
|
||||
{Name: "correctness", Title: "🎯 Correctness", Focus: "y"},
|
||||
{Name: "error-handling", Title: "🧯 Errors", Focus: "z"},
|
||||
}
|
||||
}
|
||||
|
||||
func assertOrderClean(t *testing.T, specs []Specialist, results []specialistResult) {
|
||||
t.Helper()
|
||||
if len(results) != len(specs) {
|
||||
t.Fatalf("got %d results, want %d", len(results), len(specs))
|
||||
}
|
||||
for i, r := range results {
|
||||
if r.spec.Name != specs[i].Name {
|
||||
t.Errorf("result %d is %q, want %q (order must follow the specialist list, not finish order)", i, r.spec.Name, specs[i].Name)
|
||||
}
|
||||
if r.errored {
|
||||
t.Errorf("lens %q unexpectedly errored", r.spec.Name)
|
||||
}
|
||||
if r.verdict != verdictClean {
|
||||
t.Errorf("lens %q verdict = %v, want clean", r.spec.Name, r.verdict)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunSpecialists_FansOut: with GADFLY_LENS_CONCURRENCY=3 all three lenses run
|
||||
// at once (peak == 3), and results still come back in specialist order.
|
||||
func TestRunSpecialists_FansOut(t *testing.T) {
|
||||
t.Setenv("GADFLY_LENS_CONCURRENCY", "3")
|
||||
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "")
|
||||
mdl, peak := peakTrackingModel(t)
|
||||
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
specs := threeLenses()
|
||||
|
||||
results := runSpecialists(mdl, fs, "sys", specs, "task", "diff")
|
||||
|
||||
if got := peak(); got != 3 {
|
||||
t.Errorf("peak concurrent lenses = %d, want 3", got)
|
||||
}
|
||||
assertOrderClean(t, specs, results)
|
||||
}
|
||||
|
||||
// TestRunSpecialists_SequentialByDefault: an unset GADFLY_LENS_CONCURRENCY keeps
|
||||
// the suite strictly serial (peak == 1) — the historical behavior.
|
||||
func TestRunSpecialists_SequentialByDefault(t *testing.T) {
|
||||
t.Setenv("GADFLY_LENS_CONCURRENCY", "")
|
||||
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "")
|
||||
mdl, peak := peakTrackingModel(t)
|
||||
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
specs := threeLenses()
|
||||
|
||||
results := runSpecialists(mdl, fs, "sys", specs, "task", "diff")
|
||||
|
||||
if got := peak(); got != 1 {
|
||||
t.Errorf("peak concurrent lenses = %d, want 1 (sequential by default)", got)
|
||||
}
|
||||
assertOrderClean(t, specs, results)
|
||||
}
|
||||
|
||||
// TestRunSpecialists_PerProviderFanOut: a per-provider override fans this model's
|
||||
// lenses out even when the scalar default is serial — the model's lane (from
|
||||
// GADFLY_MODEL's prefix) is matched against GADFLY_PROVIDER_LENS_CONCURRENCY.
|
||||
func TestRunSpecialists_PerProviderFanOut(t *testing.T) {
|
||||
t.Setenv("GADFLY_MODEL", "m1/qwen3:14b")
|
||||
t.Setenv("GADFLY_LENS_CONCURRENCY", "1") // scalar default: serial
|
||||
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "m1=3") // but the m1 lane fans out
|
||||
mdl, peak := peakTrackingModel(t)
|
||||
fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
specs := threeLenses()
|
||||
|
||||
results := runSpecialists(mdl, fs, "sys", specs, "task", "diff")
|
||||
|
||||
if got := peak(); got != 3 {
|
||||
t.Errorf("peak concurrent lenses = %d, want 3 (m1 per-provider override)", got)
|
||||
}
|
||||
assertOrderClean(t, specs, results)
|
||||
}
|
||||
|
||||
// TestLensConcurrency covers the resolution matrix: scalar default, scalar
|
||||
// override, and per-provider override keyed by the model's resolved lane (same
|
||||
// lane rule entrypoint.sh uses for GADFLY_PROVIDER_CONCURRENCY).
|
||||
func TestLensConcurrency(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
model string
|
||||
provider string
|
||||
scalar string
|
||||
provMap string
|
||||
want int
|
||||
}{
|
||||
{"default", "", "", "", "", 1},
|
||||
{"scalar", "", "", "4", "", 4},
|
||||
{"scalar invalid falls back to default", "", "", "nope", "", 1},
|
||||
{"bare id uses GADFLY_PROVIDER lane", "qwen3-coder:480b-cloud", "ollama-cloud", "1", "ollama-cloud=3", 3},
|
||||
{"bare id with no provider defaults to ollama-cloud lane", "qwen3-coder:480b-cloud", "", "1", "ollama-cloud=7", 7},
|
||||
{"prefixed spec uses its prefix lane", "m1/qwen3:14b", "", "1", "m1=2,ollama-cloud=3", 2},
|
||||
{"no lane match falls back to scalar", "m5/qwen3.6:35b-mlx", "", "5", "m1=2", 5},
|
||||
{"override wins over scalar", "ollama-cloud/x", "", "9", "ollama-cloud=2", 2},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Setenv("GADFLY_MODEL", tt.model)
|
||||
t.Setenv("GADFLY_PROVIDER", tt.provider)
|
||||
t.Setenv("GADFLY_LENS_CONCURRENCY", tt.scalar)
|
||||
t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", tt.provMap)
|
||||
if got := lensConcurrency(); got != tt.want {
|
||||
t.Errorf("lensConcurrency() = %d, want %d", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
+81
-5
@@ -46,6 +46,15 @@
|
||||
// GADFLY_RECHECK set to 0/false to skip the recheck pass (optional, default on).
|
||||
// GADFLY_RECHECK_MAX_STEPS recheck-pass step cap (optional, default 16).
|
||||
// GADFLY_TIMEOUT_SECS overall deadline in seconds, shared by both passes (optional, default 300).
|
||||
// GADFLY_LENS_CONCURRENCY how many specialist lenses run concurrently within this
|
||||
// model (optional, default 1 = sequential). Total in-flight
|
||||
// model requests ≈ this × entrypoint.sh's per-provider model
|
||||
// concurrency, so keep the product within the backend's budget.
|
||||
// GADFLY_PROVIDER_LENS_CONCURRENCY per-provider override for the above, as a
|
||||
// "provider=N,provider=N" map keyed by the SAME provider
|
||||
// lanes as GADFLY_PROVIDER_CONCURRENCY (e.g.
|
||||
// "ollama-cloud=3,m1=1"). Wins over GADFLY_LENS_CONCURRENCY
|
||||
// for the model's provider; falls back to it otherwise.
|
||||
// GADFLY_MAX_DIFF_CHARS diff chars embedded in the prompt (optional, default 60000;
|
||||
// the full diff is always available via the get_diff tool).
|
||||
//
|
||||
@@ -61,6 +70,7 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||
@@ -83,6 +93,10 @@ const (
|
||||
// calls and then hard-failing with "max steps reached without a final
|
||||
// answer" — it always has a few steps left to wrap up.
|
||||
defaultWrapUpReserve = 4
|
||||
// defaultLensConcurrency is how many specialist lenses run at once within a
|
||||
// single model. 1 keeps the suite sequential (the historical behavior);
|
||||
// higher values overlap the independent per-lens passes. See runSpecialists.
|
||||
defaultLensConcurrency = 1
|
||||
)
|
||||
|
||||
// wrapUpInstruction is steered into a running agent once it comes within the
|
||||
@@ -176,16 +190,78 @@ func run() error {
|
||||
|
||||
base := string(systemBytes)
|
||||
task := buildTask(diff)
|
||||
results := make([]specialistResult, 0, len(specialists))
|
||||
for _, sp := range specialists {
|
||||
out, errored := reviewWithSpecialist(mdl, fsTools, base, sp, task, diff)
|
||||
results = append(results, specialistResult{spec: sp, out: out, verdict: parseVerdict(out), errored: errored})
|
||||
}
|
||||
results := runSpecialists(mdl, fsTools, base, specialists, task, diff)
|
||||
|
||||
fmt.Println(renderConsolidated(results))
|
||||
return nil
|
||||
}
|
||||
|
||||
// runSpecialists reviews the diff through each lens and returns the results in
|
||||
// the SAME order as specialists, regardless of finish order. Up to
|
||||
// GADFLY_LENS_CONCURRENCY lenses run concurrently; the default of 1 keeps the
|
||||
// suite sequential, exactly as before. Each lens already runs under its own
|
||||
// per-lens timeout (reviewWithSpecialist), so concurrency simply overlaps those
|
||||
// independent passes — and because reviewWithSpecialist builds a fresh toolbox
|
||||
// per pass and the lenses only read the immutable repoFS, they share no mutable
|
||||
// state. Results are stored by index so the consolidated comment keeps the
|
||||
// configured lens order.
|
||||
//
|
||||
// Caution: this fans out WITHIN one model. It multiplies with entrypoint.sh's
|
||||
// per-provider model concurrency, so total concurrent backend requests ≈
|
||||
// (models at once) × (lenses at once). To fan lenses out without oversubscribing
|
||||
// the backend, run models one at a time (provider lane cap 1) and raise this.
|
||||
func runSpecialists(mdl llm.Model, fsTools *repoFS, base string, specialists []Specialist, task, diff string) []specialistResult {
|
||||
results := make([]specialistResult, len(specialists))
|
||||
|
||||
conc := min(lensConcurrency(), len(specialists))
|
||||
|
||||
sem := make(chan struct{}, conc)
|
||||
var wg sync.WaitGroup
|
||||
for i, sp := range specialists {
|
||||
wg.Add(1)
|
||||
sem <- struct{}{} // blocks once `conc` lenses are already in flight
|
||||
go func(i int, sp Specialist) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
out, errored := reviewWithSpecialist(mdl, fsTools, base, sp, task, diff)
|
||||
results[i] = specialistResult{spec: sp, out: out, verdict: parseVerdict(out), errored: errored}
|
||||
}(i, sp)
|
||||
}
|
||||
wg.Wait()
|
||||
return results
|
||||
}
|
||||
|
||||
// lensConcurrency resolves how many specialist lenses run at once for THIS run's
|
||||
// model. It mirrors entrypoint.sh's per-provider MODEL concurrency: a
|
||||
// per-provider override in GADFLY_PROVIDER_LENS_CONCURRENCY ("provider=N,...")
|
||||
// wins for the model's provider, otherwise the GADFLY_LENS_CONCURRENCY scalar
|
||||
// (default 1). The provider is resolved by modelProvider() — the SAME lane rule
|
||||
// entrypoint uses for GADFLY_PROVIDER_CONCURRENCY — so e.g.
|
||||
// "ollama-cloud=3,m1=1" fans cloud lenses out while keeping a slow local box
|
||||
// serial, exactly the way the model map does for whole models.
|
||||
func lensConcurrency() int {
|
||||
if n, ok := providerOverride("GADFLY_PROVIDER_LENS_CONCURRENCY", modelProvider()); ok {
|
||||
return n
|
||||
}
|
||||
return envInt("GADFLY_LENS_CONCURRENCY", defaultLensConcurrency)
|
||||
}
|
||||
|
||||
// providerOverride parses a "provider=N,provider=N" env map and returns the
|
||||
// value for provider when present and valid (>0). Mirrors entrypoint.sh's
|
||||
// provider_cap lookup so the two concurrency maps share one syntax.
|
||||
func providerOverride(envName, provider string) (int, bool) {
|
||||
for _, item := range strings.Split(os.Getenv(envName), ",") {
|
||||
k, v, ok := strings.Cut(item, "=")
|
||||
if !ok || strings.TrimSpace(k) != provider {
|
||||
continue
|
||||
}
|
||||
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil && n > 0 {
|
||||
return n, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// reviewWithSpecialist runs one lens end-to-end under its OWN timeout, so a slow
|
||||
// model on one lens can't starve the others: a review pass under the
|
||||
// specialist's composed prompt, then the shared adversarial recheck pass. The
|
||||
|
||||
@@ -126,6 +126,23 @@ func buildSpec(provider, model string) string {
|
||||
return provider + "/" + model
|
||||
}
|
||||
|
||||
// modelProvider returns the provider "lane" for THIS run's model, mirroring
|
||||
// entrypoint.sh's provider_of: the segment before the first "/" in GADFLY_MODEL,
|
||||
// else GADFLY_PROVIDER, else the default (ollama-cloud). The binary reviews one
|
||||
// model per invocation, so this is that model's provider — used to resolve
|
||||
// per-provider policy (e.g. lens concurrency) against the SAME provider keys
|
||||
// entrypoint uses for GADFLY_PROVIDER_CONCURRENCY.
|
||||
func modelProvider() string {
|
||||
model := strings.TrimSpace(os.Getenv("GADFLY_MODEL"))
|
||||
if pfx, _, ok := strings.Cut(model, "/"); ok {
|
||||
return strings.TrimSpace(pfx)
|
||||
}
|
||||
if p := strings.TrimSpace(os.Getenv("GADFLY_PROVIDER")); p != "" {
|
||||
return p
|
||||
}
|
||||
return defaultProvider
|
||||
}
|
||||
|
||||
// registerEnvProviders reads named endpoints and aliases from the environment
|
||||
// and registers them with majordomo's default registry, so they can be used as
|
||||
// "<name>/<model>" specs (or bare aliases) in GADFLY_MODEL.
|
||||
|
||||
Reference in New Issue
Block a user