From d0de03472642540cb586579a35b0b69155932050 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 22:53:27 -0400 Subject: [PATCH] feat: configurable lens fan-out, per-provider like model concurrency Specialist lenses ran strictly sequentially within a model. Add a GADFLY_LENS_CONCURRENCY knob (default 1 = unchanged) that overlaps the independent per-lens review+recheck passes, so a model posts its consolidated comment as soon as its lenses finish. Per-provider configurable, mirroring GADFLY_PROVIDER_CONCURRENCY: GADFLY_PROVIDER_LENS_CONCURRENCY takes a "provider=N,..." map keyed by the same provider lanes (modelProvider() mirrors entrypoint's provider_of; providerOverride() mirrors provider_cap). The override wins for the model's lane, else the scalar default. runSpecialists fans out via a bounded worker pool, order-preserving (results written by index) and keeping each lens's own timeout/recheck. repoFS is immutable + fresh-toolbox-per-pass, so lenses share no mutable state (verified under -race). Docs/examples updated; dropped a duplicate GADFLY_TIMEOUT_SECS README row. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 21 +++- cmd/gadfly/lens_concurrency_test.go | 189 ++++++++++++++++++++++++++++ cmd/gadfly/main.go | 86 ++++++++++++- cmd/gadfly/model.go | 17 +++ examples/adversarial-review.yml | 10 ++ 5 files changed, 317 insertions(+), 6 deletions(-) create mode 100644 cmd/gadfly/lens_concurrency_test.go diff --git a/README.md b/README.md index 240efd2..e425b1c 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,24 @@ GADFLY_MODELS: "m1pro/qwen3:14b,qwen3-coder:480b-cloud,gpt-oss:120b-cloud" A model's provider is the spec's first segment (`m1pro/…` → `m1pro`), or `GADFLY_PROVIDER`/ `ollama-cloud` for a bare id. Default (`cap 1`) keeps a single-provider pool fully sequential. +**Lens fan-out (within a model).** By default the specialist lenses run **sequentially** inside +each model (`GADFLY_LENS_CONCURRENCY=1`). Raise it to overlap the independent per-lens +review+recheck passes — the model then posts its consolidated comment as soon as its lenses +finish (so with sequential models, results stream in per model and per-model timings stay +clean). Like the model cap, it's **per-provider configurable**: `GADFLY_PROVIDER_LENS_CONCURRENCY` +takes a `provider=N` map keyed by the **same provider lanes** as `GADFLY_PROVIDER_CONCURRENCY`, +falling back to the `GADFLY_LENS_CONCURRENCY` scalar (default `1`). **It multiplies with the +model cap:** total in-flight requests ≈ *models-at-once × lenses-at-once*, so to fan lenses out +without oversubscribing a backend, keep its model cap low and raise its lens cap: + +```yaml +# Per provider: cloud runs one model at a time but fans its 3 lenses out (3 concurrent requests); +# the slow local box stays fully serial. Both provider lanes still run in parallel. +GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=1,m1=1" +GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3,m1=1" +GADFLY_SPECIALISTS: "security,correctness,error-handling" +``` + ### Triggers 1. A **new/reopened/ready** non-draft PR — automatic. @@ -227,11 +245,12 @@ The reviewer binary reads these (the stub/entrypoint set sane defaults): | `GADFLY_WORKER_MAX_STEPS` | 8 | tool-step cap for a delegated worker run | | `GADFLY_CONCURRENCY` | 1 | default max models run at once **per provider** | | `GADFLY_PROVIDER_CONCURRENCY` | — | per-provider overrides, e.g. `ollama-cloud=3,m1pro=1` | +| `GADFLY_LENS_CONCURRENCY` | 1 | specialist lenses run at once **within a model** (× model cap = total in-flight) | +| `GADFLY_PROVIDER_LENS_CONCURRENCY` | — | per-provider lens overrides, same lanes as `GADFLY_PROVIDER_CONCURRENCY`, e.g. `ollama-cloud=3,m1=1` | | `GADFLY_MAX_STEPS` | 24 | review-pass tool-step cap | | `GADFLY_TIMEOUT_SECS` | 300 | deadline **per specialist lens** (review+recheck) | | `GADFLY_RECHECK` | on | set `0`/`false` to skip the recheck pass | | `GADFLY_RECHECK_MAX_STEPS` | 16 | recheck-pass step cap | -| `GADFLY_TIMEOUT_SECS` | 300 | overall deadline (both passes) | | `GADFLY_MAX_DIFF_CHARS` | 60000 | diff chars embedded in the prompt (full diff via `get_diff`) | | `GADFLY_TRIGGER_PHRASE` | `@gadfly review` | comment phrase that re-triggers | | `GADFLY_ALLOWED_USERS` | *(collaborators)* | comma-separated allow-list for comment triggers | diff --git a/cmd/gadfly/lens_concurrency_test.go b/cmd/gadfly/lens_concurrency_test.go new file mode 100644 index 0000000..2cfc577 --- /dev/null +++ b/cmd/gadfly/lens_concurrency_test.go @@ -0,0 +1,189 @@ +package main + +import ( + "context" + "sync" + "testing" + "time" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" +) + +// trackingModel wraps a real llm.Model and records the maximum number of +// concurrent Generate calls, holding each call open briefly so overlap is +// observable. The concurrency tracking + sleep happen OUTSIDE the wrapped +// model's call, so (unlike the fake provider, which holds its mutex while +// invoking its default func) this measures genuine caller-side concurrency. +type trackingModel struct { + inner llm.Model + onEnter func() + onExit func() +} + +func (m *trackingModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) { + m.onEnter() + defer m.onExit() + time.Sleep(40 * time.Millisecond) // overlap window: concurrent lenses pile up here + return m.inner.Generate(ctx, req, opts...) +} + +func (m *trackingModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) { + return m.inner.Stream(ctx, req, opts...) +} + +func (m *trackingModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() } + +// peakTrackingModel returns a model that replies with a clean verdict (so the +// recheck pass is skipped — one Generate per lens) and a getter for the peak +// number of concurrent Generate calls observed. +func peakTrackingModel(t *testing.T) (llm.Model, func() int) { + t.Helper() + p := fake.New("fake", fake.WithDefault(func(_ string, _ llm.Request) fake.Step { + return fake.Reply("No material issues found.") + })) + inner, err := p.Model("mock") + if err != nil { + t.Fatal(err) + } + + var mu sync.Mutex + var inFlight, peak int + enter := func() { + mu.Lock() + defer mu.Unlock() + inFlight++ + if inFlight > peak { + peak = inFlight + } + } + exit := func() { + mu.Lock() + defer mu.Unlock() + inFlight-- + } + return &trackingModel{inner: inner, onEnter: enter, onExit: exit}, + func() int { mu.Lock(); defer mu.Unlock(); return peak } +} + +func threeLenses() []Specialist { + return []Specialist{ + {Name: "security", Title: "🔒 Security", Focus: "x"}, + {Name: "correctness", Title: "🎯 Correctness", Focus: "y"}, + {Name: "error-handling", Title: "🧯 Errors", Focus: "z"}, + } +} + +func assertOrderClean(t *testing.T, specs []Specialist, results []specialistResult) { + t.Helper() + if len(results) != len(specs) { + t.Fatalf("got %d results, want %d", len(results), len(specs)) + } + for i, r := range results { + if r.spec.Name != specs[i].Name { + t.Errorf("result %d is %q, want %q (order must follow the specialist list, not finish order)", i, r.spec.Name, specs[i].Name) + } + if r.errored { + t.Errorf("lens %q unexpectedly errored", r.spec.Name) + } + if r.verdict != verdictClean { + t.Errorf("lens %q verdict = %v, want clean", r.spec.Name, r.verdict) + } + } +} + +// TestRunSpecialists_FansOut: with GADFLY_LENS_CONCURRENCY=3 all three lenses run +// at once (peak == 3), and results still come back in specialist order. +func TestRunSpecialists_FansOut(t *testing.T) { + t.Setenv("GADFLY_LENS_CONCURRENCY", "3") + t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "") + mdl, peak := peakTrackingModel(t) + fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n") + if err != nil { + t.Fatal(err) + } + specs := threeLenses() + + results := runSpecialists(mdl, fs, "sys", specs, "task", "diff") + + if got := peak(); got != 3 { + t.Errorf("peak concurrent lenses = %d, want 3", got) + } + assertOrderClean(t, specs, results) +} + +// TestRunSpecialists_SequentialByDefault: an unset GADFLY_LENS_CONCURRENCY keeps +// the suite strictly serial (peak == 1) — the historical behavior. +func TestRunSpecialists_SequentialByDefault(t *testing.T) { + t.Setenv("GADFLY_LENS_CONCURRENCY", "") + t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "") + mdl, peak := peakTrackingModel(t) + fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n") + if err != nil { + t.Fatal(err) + } + specs := threeLenses() + + results := runSpecialists(mdl, fs, "sys", specs, "task", "diff") + + if got := peak(); got != 1 { + t.Errorf("peak concurrent lenses = %d, want 1 (sequential by default)", got) + } + assertOrderClean(t, specs, results) +} + +// TestRunSpecialists_PerProviderFanOut: a per-provider override fans this model's +// lenses out even when the scalar default is serial — the model's lane (from +// GADFLY_MODEL's prefix) is matched against GADFLY_PROVIDER_LENS_CONCURRENCY. +func TestRunSpecialists_PerProviderFanOut(t *testing.T) { + t.Setenv("GADFLY_MODEL", "m1/qwen3:14b") + t.Setenv("GADFLY_LENS_CONCURRENCY", "1") // scalar default: serial + t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", "m1=3") // but the m1 lane fans out + mdl, peak := peakTrackingModel(t) + fs, err := newRepoFS(t.TempDir(), "diff --git a/x b/x\n+y\n") + if err != nil { + t.Fatal(err) + } + specs := threeLenses() + + results := runSpecialists(mdl, fs, "sys", specs, "task", "diff") + + if got := peak(); got != 3 { + t.Errorf("peak concurrent lenses = %d, want 3 (m1 per-provider override)", got) + } + assertOrderClean(t, specs, results) +} + +// TestLensConcurrency covers the resolution matrix: scalar default, scalar +// override, and per-provider override keyed by the model's resolved lane (same +// lane rule entrypoint.sh uses for GADFLY_PROVIDER_CONCURRENCY). +func TestLensConcurrency(t *testing.T) { + tests := []struct { + name string + model string + provider string + scalar string + provMap string + want int + }{ + {"default", "", "", "", "", 1}, + {"scalar", "", "", "4", "", 4}, + {"scalar invalid falls back to default", "", "", "nope", "", 1}, + {"bare id uses GADFLY_PROVIDER lane", "qwen3-coder:480b-cloud", "ollama-cloud", "1", "ollama-cloud=3", 3}, + {"bare id with no provider defaults to ollama-cloud lane", "qwen3-coder:480b-cloud", "", "1", "ollama-cloud=7", 7}, + {"prefixed spec uses its prefix lane", "m1/qwen3:14b", "", "1", "m1=2,ollama-cloud=3", 2}, + {"no lane match falls back to scalar", "m5/qwen3.6:35b-mlx", "", "5", "m1=2", 5}, + {"override wins over scalar", "ollama-cloud/x", "", "9", "ollama-cloud=2", 2}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("GADFLY_MODEL", tt.model) + t.Setenv("GADFLY_PROVIDER", tt.provider) + t.Setenv("GADFLY_LENS_CONCURRENCY", tt.scalar) + t.Setenv("GADFLY_PROVIDER_LENS_CONCURRENCY", tt.provMap) + if got := lensConcurrency(); got != tt.want { + t.Errorf("lensConcurrency() = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/cmd/gadfly/main.go b/cmd/gadfly/main.go index 5aae508..1189864 100644 --- a/cmd/gadfly/main.go +++ b/cmd/gadfly/main.go @@ -46,6 +46,15 @@ // GADFLY_RECHECK set to 0/false to skip the recheck pass (optional, default on). // GADFLY_RECHECK_MAX_STEPS recheck-pass step cap (optional, default 16). // GADFLY_TIMEOUT_SECS overall deadline in seconds, shared by both passes (optional, default 300). +// GADFLY_LENS_CONCURRENCY how many specialist lenses run concurrently within this +// model (optional, default 1 = sequential). Total in-flight +// model requests ≈ this × entrypoint.sh's per-provider model +// concurrency, so keep the product within the backend's budget. +// GADFLY_PROVIDER_LENS_CONCURRENCY per-provider override for the above, as a +// "provider=N,provider=N" map keyed by the SAME provider +// lanes as GADFLY_PROVIDER_CONCURRENCY (e.g. +// "ollama-cloud=3,m1=1"). Wins over GADFLY_LENS_CONCURRENCY +// for the model's provider; falls back to it otherwise. // GADFLY_MAX_DIFF_CHARS diff chars embedded in the prompt (optional, default 60000; // the full diff is always available via the get_diff tool). // @@ -61,6 +70,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/agent" @@ -83,6 +93,10 @@ const ( // calls and then hard-failing with "max steps reached without a final // answer" — it always has a few steps left to wrap up. defaultWrapUpReserve = 4 + // defaultLensConcurrency is how many specialist lenses run at once within a + // single model. 1 keeps the suite sequential (the historical behavior); + // higher values overlap the independent per-lens passes. See runSpecialists. + defaultLensConcurrency = 1 ) // wrapUpInstruction is steered into a running agent once it comes within the @@ -176,16 +190,78 @@ func run() error { base := string(systemBytes) task := buildTask(diff) - results := make([]specialistResult, 0, len(specialists)) - for _, sp := range specialists { - out, errored := reviewWithSpecialist(mdl, fsTools, base, sp, task, diff) - results = append(results, specialistResult{spec: sp, out: out, verdict: parseVerdict(out), errored: errored}) - } + results := runSpecialists(mdl, fsTools, base, specialists, task, diff) fmt.Println(renderConsolidated(results)) return nil } +// runSpecialists reviews the diff through each lens and returns the results in +// the SAME order as specialists, regardless of finish order. Up to +// GADFLY_LENS_CONCURRENCY lenses run concurrently; the default of 1 keeps the +// suite sequential, exactly as before. Each lens already runs under its own +// per-lens timeout (reviewWithSpecialist), so concurrency simply overlaps those +// independent passes — and because reviewWithSpecialist builds a fresh toolbox +// per pass and the lenses only read the immutable repoFS, they share no mutable +// state. Results are stored by index so the consolidated comment keeps the +// configured lens order. +// +// Caution: this fans out WITHIN one model. It multiplies with entrypoint.sh's +// per-provider model concurrency, so total concurrent backend requests ≈ +// (models at once) × (lenses at once). To fan lenses out without oversubscribing +// the backend, run models one at a time (provider lane cap 1) and raise this. +func runSpecialists(mdl llm.Model, fsTools *repoFS, base string, specialists []Specialist, task, diff string) []specialistResult { + results := make([]specialistResult, len(specialists)) + + conc := min(lensConcurrency(), len(specialists)) + + sem := make(chan struct{}, conc) + var wg sync.WaitGroup + for i, sp := range specialists { + wg.Add(1) + sem <- struct{}{} // blocks once `conc` lenses are already in flight + go func(i int, sp Specialist) { + defer wg.Done() + defer func() { <-sem }() + out, errored := reviewWithSpecialist(mdl, fsTools, base, sp, task, diff) + results[i] = specialistResult{spec: sp, out: out, verdict: parseVerdict(out), errored: errored} + }(i, sp) + } + wg.Wait() + return results +} + +// lensConcurrency resolves how many specialist lenses run at once for THIS run's +// model. It mirrors entrypoint.sh's per-provider MODEL concurrency: a +// per-provider override in GADFLY_PROVIDER_LENS_CONCURRENCY ("provider=N,...") +// wins for the model's provider, otherwise the GADFLY_LENS_CONCURRENCY scalar +// (default 1). The provider is resolved by modelProvider() — the SAME lane rule +// entrypoint uses for GADFLY_PROVIDER_CONCURRENCY — so e.g. +// "ollama-cloud=3,m1=1" fans cloud lenses out while keeping a slow local box +// serial, exactly the way the model map does for whole models. +func lensConcurrency() int { + if n, ok := providerOverride("GADFLY_PROVIDER_LENS_CONCURRENCY", modelProvider()); ok { + return n + } + return envInt("GADFLY_LENS_CONCURRENCY", defaultLensConcurrency) +} + +// providerOverride parses a "provider=N,provider=N" env map and returns the +// value for provider when present and valid (>0). Mirrors entrypoint.sh's +// provider_cap lookup so the two concurrency maps share one syntax. +func providerOverride(envName, provider string) (int, bool) { + for _, item := range strings.Split(os.Getenv(envName), ",") { + k, v, ok := strings.Cut(item, "=") + if !ok || strings.TrimSpace(k) != provider { + continue + } + if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil && n > 0 { + return n, true + } + } + return 0, false +} + // reviewWithSpecialist runs one lens end-to-end under its OWN timeout, so a slow // model on one lens can't starve the others: a review pass under the // specialist's composed prompt, then the shared adversarial recheck pass. The diff --git a/cmd/gadfly/model.go b/cmd/gadfly/model.go index 03d0a4e..cca8b23 100644 --- a/cmd/gadfly/model.go +++ b/cmd/gadfly/model.go @@ -126,6 +126,23 @@ func buildSpec(provider, model string) string { return provider + "/" + model } +// modelProvider returns the provider "lane" for THIS run's model, mirroring +// entrypoint.sh's provider_of: the segment before the first "/" in GADFLY_MODEL, +// else GADFLY_PROVIDER, else the default (ollama-cloud). The binary reviews one +// model per invocation, so this is that model's provider — used to resolve +// per-provider policy (e.g. lens concurrency) against the SAME provider keys +// entrypoint uses for GADFLY_PROVIDER_CONCURRENCY. +func modelProvider() string { + model := strings.TrimSpace(os.Getenv("GADFLY_MODEL")) + if pfx, _, ok := strings.Cut(model, "/"); ok { + return strings.TrimSpace(pfx) + } + if p := strings.TrimSpace(os.Getenv("GADFLY_PROVIDER")); p != "" { + return p + } + return defaultProvider +} + // registerEnvProviders reads named endpoints and aliases from the environment // and registers them with majordomo's default registry, so they can be used as // "/" specs (or bare aliases) in GADFLY_MODEL. diff --git a/examples/adversarial-review.yml b/examples/adversarial-review.yml index ed488bd..4e39d9d 100644 --- a/examples/adversarial-review.yml +++ b/examples/adversarial-review.yml @@ -55,6 +55,16 @@ jobs: # csv to choose; "all" for everything; or define custom ones via a repo # .gadfly.yml / GADFLY_SPECIALIST_. See README "Specialists". GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS }} + # Lens fan-out (optional; default 1 = lenses run sequentially within a + # model). Raise it to run a model's lenses concurrently so each model + # posts its comment sooner. Total in-flight requests = (models at once) + # × (lenses at once), so to fan out without oversubscribing a backend, + # keep its model cap low and raise its lens cap. Per-provider configurable + # via GADFLY_PROVIDER_LENS_CONCURRENCY (same lanes as the model map): + # GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=1,m1=1" + # GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3,m1=1" + # GADFLY_LENS_CONCURRENCY: ${{ vars.GADFLY_LENS_CONCURRENCY }} + # GADFLY_PROVIDER_LENS_CONCURRENCY: ${{ vars.GADFLY_PROVIDER_LENS_CONCURRENCY }} # --- Models & providers (optional; default = Ollama Cloud) ---------- # Gadfly is majordomo-powered, so it can target other backends. Set a # provider for bare model ids; point at a different endpoint with a