Files
executus/fanout/fanout.go
T
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

125 lines
3.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package fanout is executus's programmatic swarm primitive: run a function over
// many items concurrently with bounded global and per-key concurrency, returning
// one result per item in input order.
//
// This is distinct from the LLM-callable agent_spawn_parallel tool. fanout is a
// plain Go API a host drives directly — it is what Gadfly uses to run an
// N-models × M-lenses review fleet (flatten the matrix into items, key each by
// its provider, cap per-provider concurrency) and what any host uses to scatter
// bounded agent runs and gather structured results for consolidation.
//
// fanout has no dependency beyond the stdlib; a caller wires per-provider caps
// from config (Mort: convar; Gadfly: GADFLY_PROVIDER_CONCURRENCY).
package fanout
import (
"context"
"sync"
)
// Result pairs a task's output with its error and original index. fn errors are
// captured here, not propagated — one failing task never aborts the batch.
type Result[T any] struct {
Index int
Value T
Err error
}
// Options bound a fan-out.
//
// MaxConcurrent — cap on total in-flight tasks (0 = unbounded).
// PerKey — cap on in-flight tasks sharing a key bucket; a key absent
// from the map (or mapped to <=0) is uncapped beyond
// MaxConcurrent. Used for per-provider concurrency.
// Key — maps an item to its bucket; nil means all items are unkeyed.
type Options[A any] struct {
MaxConcurrent int
PerKey map[string]int
Key func(A) string
}
// Run executes fn over items concurrently under opts and returns one Result per
// item, in input order. Context cancellation stops un-started tasks (their
// Result carries ctx.Err()); already-running tasks observe ctx through fn.
func Run[A any, T any](ctx context.Context, items []A, opts Options[A], fn func(ctx context.Context, item A) (T, error)) []Result[T] {
results := make([]Result[T], len(items))
var global chan struct{}
if opts.MaxConcurrent > 0 {
global = make(chan struct{}, opts.MaxConcurrent)
}
// Build per-key semaphores up front; the map is read-only during the run so
// concurrent reads are safe.
keySems := make(map[string]chan struct{}, len(opts.PerKey))
for k, n := range opts.PerKey {
if n > 0 {
keySems[k] = make(chan struct{}, n)
}
}
var wg sync.WaitGroup
for i, it := range items {
wg.Add(1)
go func(i int, it A) {
defer wg.Done()
results[i].Index = i
if err := ctx.Err(); err != nil {
results[i].Err = err
return
}
// Acquire global then key (consistent order avoids deadlock).
if global != nil {
select {
case global <- struct{}{}:
defer func() { <-global }()
case <-ctx.Done():
results[i].Err = ctx.Err()
return
}
}
if opts.Key != nil {
if ks := keySems[opts.Key(it)]; ks != nil {
select {
case ks <- struct{}{}:
defer func() { <-ks }()
case <-ctx.Done():
results[i].Err = ctx.Err()
return
}
}
}
v, err := fn(ctx, it)
results[i].Value = v
results[i].Err = err
}(i, it)
}
wg.Wait()
return results
}
// Values returns the successful values (Err == nil) from a result slice, in
// order. Convenience for consolidation steps that ignore failures.
func Values[T any](rs []Result[T]) []T {
out := make([]T, 0, len(rs))
for _, r := range rs {
if r.Err == nil {
out = append(out, r.Value)
}
}
return out
}
// Errors returns the non-nil errors from a result slice, in order.
func Errors[T any](rs []Result[T]) []error {
var out []error
for _, r := range rs {
if r.Err != nil {
out = append(out, r.Err)
}
}
return out
}