package lane import ( "context" "sync" ) // ConvarReader is the narrow surface the registry uses to read // per-lane concurrency caps from convars at startup and on Reload. // // Why an interface (not pkg/convar directly): registry is a generic // primitive and shouldn't import the application convar package. // Production wires a thin adapter; tests pass a fake. type ConvarReader interface { Int(ctx context.Context, name string, def int) int } // ConvarReaderFunc adapts a closure into a ConvarReader. type ConvarReaderFunc func(ctx context.Context, name string, def int) int // Int satisfies ConvarReader. func (f ConvarReaderFunc) Int(ctx context.Context, name string, def int) int { if f == nil { return def } return f(ctx, name, def) } // Registry is a manager of named lanes. The default policy is // fair-share; lanes are created lazily on first GetOrCreate, with // concurrency read from convar `lanes..max_concurrent` (default // 1). Reload re-reads convars and updates each lane's MaxConcurrent // in place — useful for runtime tuning without losing in-flight work. // // Why a singleton-ish manager vs constructing lanes ad-hoc: the // registry is the integration point where mort.go wires lanes once // and every subsystem (LLM transport, skill runner) looks them up by // name. Lazy creation lets the registry stay schema-free — adding a // new lane is just "ask for it by name". // // Test: registry_test.go covers GetOrCreate identity, convar read, // and Reload. type Registry struct { mu sync.RWMutex lanes map[string]Lane convars ConvarReader // policyFactory is the queue policy constructor used for new // lanes. Defaults to NewFairSharePolicy. Tests substitute FIFO // when they want deterministic ordering. policyFactory func() queuePolicy } // NewRegistry constructs a registry. convars may be nil — lanes // fall back to the registry's default concurrency (1). func NewRegistry(convars ConvarReader) *Registry { return &Registry{ lanes: make(map[string]Lane), convars: convars, policyFactory: NewFairSharePolicy, } } // SetPolicyFactory overrides the default policy used for new lanes. // Existing lanes are unchanged. Used by tests; production keeps the // fair-share default. func (r *Registry) SetPolicyFactory(f func() queuePolicy) { if f == nil { f = NewFairSharePolicy } r.mu.Lock() r.policyFactory = f r.mu.Unlock() } // Get returns the named lane or nil if it has not been created. // Useful in admin/UI code that wants to show only existing lanes // without creating new ones as a side effect. func (r *Registry) Get(name string) Lane { r.mu.RLock() defer r.mu.RUnlock() return r.lanes[name] } // StatsReader is the read-only stats surface exposed to admin / user // dashboards (Discord queue commands, /skills/admin/queues web view). // *Registry satisfies it; tests substitute a fake. // // Why a narrow interface (vs passing *Registry around): the consumers // only need stats and lane lookup — no creation or mutation surface. // Keeping the dep narrow makes mocks trivial in webui + skills tests. type StatsReader interface { // List returns a snapshot of every registered lane. List() []Lane // Lookup returns the lane by name, or nil. Mirrors Registry.Get // (named differently to avoid the "Get" verb confusion in // dashboards that primarily call Stats). Lookup(name string) Lane } // Lookup satisfies the StatsReader surface alongside Registry.Get. We // expose both verbs so the dashboard code reads naturally without // forcing existing call sites that use Get() to migrate. func (r *Registry) Lookup(name string) Lane { return r.Get(name) } // GetOrCreate returns the named lane, creating it lazily on first // call. Concurrency is read from convar `lanes..max_concurrent` // (default 1). The policy is the registry's policy factory (default // fair-share). // // Why convar name `lanes..max_concurrent` (not // `skills.lane..max_concurrent`): pkg/lane is generic — the // skills system happens to be the first caller, but the LLM transport // wrapper (Phase 3) and other future runners will use the same // registry. The convar namespace `lanes.*` keeps lane configuration // in one place. The skills system can adopt different convar names // if it prefers; in that case, mort.go reads them and calls // SetMaxConcurrent on the resulting lanes after creation. func (r *Registry) GetOrCreate(ctx context.Context, name string) Lane { r.mu.RLock() if l, ok := r.lanes[name]; ok { r.mu.RUnlock() return l } r.mu.RUnlock() r.mu.Lock() defer r.mu.Unlock() // Double-check after upgrading the lock. if l, ok := r.lanes[name]; ok { return l } maxConcurrent := r.readConcurrency(ctx, name) policy := r.policyFactory() if policy == nil { policy = NewFairSharePolicy() } l := NewWithPolicy(name, maxConcurrent, policy) r.lanes[name] = l return l } // List returns a snapshot of all registered lanes. Iteration order is // not guaranteed (Go map randomization). func (r *Registry) List() []Lane { r.mu.RLock() defer r.mu.RUnlock() out := make([]Lane, 0, len(r.lanes)) for _, l := range r.lanes { out = append(out, l) } return out } // Names returns the registered lane names. Used for the admin // "list all lanes" surface. func (r *Registry) Names() []string { r.mu.RLock() defer r.mu.RUnlock() out := make([]string, 0, len(r.lanes)) for n := range r.lanes { out = append(out, n) } return out } // Reload re-reads convars for every registered lane and calls // SetMaxConcurrent on each. Existing running jobs continue to run; // new dispatches respect the updated cap. // // Why a manual Reload instead of reading convars at every dispatch: // dispatch is on the hot path; reading a convar there for every // queued job is wasteful. A periodic Reload (every minute, say) is // cheap and good enough for human-driven config changes. func (r *Registry) Reload(ctx context.Context) { r.mu.RLock() defer r.mu.RUnlock() for name, l := range r.lanes { n := r.readConcurrency(ctx, name) l.SetMaxConcurrent(n) } } // readConcurrency reads `lanes..max_concurrent` with a default // of 1. Defensive against a nil ConvarReader and against negative // values (clamped to 1). func (r *Registry) readConcurrency(ctx context.Context, name string) int { if r.convars == nil { return 1 } n := r.convars.Int(ctx, "lanes."+name+".max_concurrent", 1) if n <= 0 { return 1 } return n }