package fanout import ( "context" "errors" "sync/atomic" "testing" "time" ) func TestRunPreservesOrderAndCapturesErrors(t *testing.T) { items := []int{0, 1, 2, 3, 4} got := Run(context.Background(), items, Options[int]{MaxConcurrent: 2}, func(_ context.Context, n int) (int, error) { if n == 2 { return 0, errors.New("boom") } return n * 10, nil }) if len(got) != len(items) { t.Fatalf("len = %d", len(got)) } for i, r := range got { if r.Index != i { t.Errorf("result[%d].Index = %d", i, r.Index) } if i == 2 { if r.Err == nil { t.Errorf("expected error at index 2") } } else if r.Value != i*10 { t.Errorf("result[%d].Value = %d, want %d", i, r.Value, i*10) } } if vals := Values(got); len(vals) != 4 { t.Errorf("Values len = %d, want 4", len(vals)) } if errs := Errors(got); len(errs) != 1 { t.Errorf("Errors len = %d, want 1", len(errs)) } } func TestMaxConcurrentBound(t *testing.T) { const max = 3 var inflight, peak int32 items := make([]int, 30) Run(context.Background(), items, Options[int]{MaxConcurrent: max}, func(_ context.Context, _ int) (int, error) { n := atomic.AddInt32(&inflight, 1) for { p := atomic.LoadInt32(&peak) if n <= p || atomic.CompareAndSwapInt32(&peak, p, n) { break } } time.Sleep(2 * time.Millisecond) atomic.AddInt32(&inflight, -1) return 0, nil }) if peak > max { t.Errorf("peak concurrency %d exceeded MaxConcurrent %d", peak, max) } } func TestPerKeyCap(t *testing.T) { // Two providers; provider "slow" capped at 1, so its peak must be 1 even // though MaxConcurrent allows more. var slowInflight, slowPeak int32 type job struct{ provider string } items := make([]job, 12) for i := range items { items[i] = job{provider: "slow"} } Run(context.Background(), items, Options[job]{ MaxConcurrent: 8, PerKey: map[string]int{"slow": 1}, Key: func(j job) string { return j.provider }, }, func(_ context.Context, _ job) (int, error) { n := atomic.AddInt32(&slowInflight, 1) for { p := atomic.LoadInt32(&slowPeak) if n <= p || atomic.CompareAndSwapInt32(&slowPeak, p, n) { break } } time.Sleep(time.Millisecond) atomic.AddInt32(&slowInflight, -1) return 0, nil }) if slowPeak != 1 { t.Errorf("per-key cap not honored: slow peak = %d, want 1", slowPeak) } } func TestContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() got := Run(ctx, make([]int, 5), Options[int]{MaxConcurrent: 2}, func(ctx context.Context, _ int) (int, error) { return 1, nil }) for i, r := range got { if r.Err == nil { t.Errorf("result[%d] expected ctx error after cancel", i) } } }