// Package fanout is executus's programmatic swarm primitive: run a function over // many items concurrently with bounded global and per-key concurrency, returning // one result per item in input order. // // This is distinct from the LLM-callable agent_spawn_parallel tool. fanout is a // plain Go API a host drives directly — it is what Gadfly uses to run an // N-models × M-lenses review fleet (flatten the matrix into items, key each by // its provider, cap per-provider concurrency) and what any host uses to scatter // bounded agent runs and gather structured results for consolidation. // // fanout has no dependency beyond the stdlib; a caller wires per-provider caps // from config (Mort: convar; Gadfly: GADFLY_PROVIDER_CONCURRENCY). package fanout import ( "context" "sync" ) // Result pairs a task's output with its error and original index. fn errors are // captured here, not propagated — one failing task never aborts the batch. type Result[T any] struct { Index int Value T Err error } // Options bound a fan-out. // // MaxConcurrent — cap on total in-flight tasks (0 = unbounded). // PerKey — cap on in-flight tasks sharing a key bucket; a key absent // from the map (or mapped to <=0) is uncapped beyond // MaxConcurrent. Used for per-provider concurrency. // Key — maps an item to its bucket; nil means all items are unkeyed. type Options[A any] struct { MaxConcurrent int PerKey map[string]int Key func(A) string } // Run executes fn over items concurrently under opts and returns one Result per // item, in input order. Context cancellation stops un-started tasks (their // Result carries ctx.Err()); already-running tasks observe ctx through fn. func Run[A any, T any](ctx context.Context, items []A, opts Options[A], fn func(ctx context.Context, item A) (T, error)) []Result[T] { results := make([]Result[T], len(items)) var global chan struct{} if opts.MaxConcurrent > 0 { global = make(chan struct{}, opts.MaxConcurrent) } // Build per-key semaphores up front; the map is read-only during the run so // concurrent reads are safe. keySems := make(map[string]chan struct{}, len(opts.PerKey)) for k, n := range opts.PerKey { if n > 0 { keySems[k] = make(chan struct{}, n) } } var wg sync.WaitGroup for i, it := range items { wg.Add(1) go func(i int, it A) { defer wg.Done() results[i].Index = i if err := ctx.Err(); err != nil { results[i].Err = err return } // Acquire global then key (consistent order avoids deadlock). if global != nil { select { case global <- struct{}{}: defer func() { <-global }() case <-ctx.Done(): results[i].Err = ctx.Err() return } } if opts.Key != nil { if ks := keySems[opts.Key(it)]; ks != nil { select { case ks <- struct{}{}: defer func() { <-ks }() case <-ctx.Done(): results[i].Err = ctx.Err() return } } } v, err := fn(ctx, it) results[i].Value = v results[i].Err = err }(i, it) } wg.Wait() return results } // Values returns the successful values (Err == nil) from a result slice, in // order. Convenience for consolidation steps that ignore failures. func Values[T any](rs []Result[T]) []T { out := make([]T, 0, len(rs)) for _, r := range rs { if r.Err == nil { out = append(out, r.Value) } } return out } // Errors returns the non-nil errors from a result slice, in order. func Errors[T any](rs []Result[T]) []error { var out []error for _, r := range rs { if r.Err != nil { out = append(out, r.Err) } } return out }