Files
majordomo/registry.go
steve 0147a79d18
CI / Tidy (push) Successful in 9m31s
CI / Build & Test (push) Successful in 10m13s
feat: conversion-driven extensions — resolvers, DefineTool, hooks, ops controls
Phase 9a (ADR-0014): Registry.RegisterResolver for dynamic tiers;
DefineTool[Args] typed tools; Usage cache/reasoning detail fields wired
through anthropic/openai/google; WithPromptCaching (Anthropic
cache_control); agent supervision hooks (WithMaxStepsFunc, WithSteer,
WithCompactor, WithToolErrorLimits + ErrToolLoop); health
Bench/Unbench/Snapshot; ChainConfig.Observer failover events.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:30:06 +02:00

285 lines
8.9 KiB
Go

package majordomo
import (
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// Registry owns providers, aliases/tiers, env-DSN scheme factories, the
// model health tracker, and Parse. It is safe for concurrent use.
type Registry struct {
mu sync.RWMutex
providers map[string]llm.Provider
aliases map[string]string
resolvers []Resolver
schemes map[string]SchemeFactory
// envErrs records LLM_* entries that failed to load so the failure
// surfaces when (and only when) that provider name is actually used.
envErrs map[string]error
tracker *health.Tracker
chainCfg ChainConfig
envLookup func(string) string
}
// Resolver dynamically resolves a bare alias token to a spec string —
// the hook for DB- or config-backed tiers that change at runtime. Checked
// after static aliases, in registration order; the returned spec is
// expanded recursively (chains, nested aliases) with cycle detection.
type Resolver interface {
// Resolve returns the spec for name, or ok=false when this resolver
// does not handle it.
Resolve(name string) (spec string, ok bool)
}
// ResolverFunc adapts a function to the Resolver interface.
type ResolverFunc func(name string) (string, bool)
// Resolve implements Resolver.
func (f ResolverFunc) Resolve(name string) (string, bool) { return f(name) }
// SchemeFactory builds a provider instance from an env DSN. name is the
// registry name the provider will be registered under (e.g. "m1" for
// LLM_M1); dsn carries the scheme, credential, and host.
type SchemeFactory func(name string, dsn DSN) (llm.Provider, error)
// ChainConfig tunes failover-chain execution.
type ChainConfig struct {
// TransientRetries is the number of immediate same-target retries after
// a transient error. 0 selects the default (1); negative disables
// retries.
TransientRetries int
// AdvanceOnPermanent, when true, makes the chain advance to the next
// element on permanent errors other than model-not-found instead of
// returning immediately. Model-not-found always advances (without
// penalizing health); auth/malformed errors default to fail-fast
// because failing over cannot help a bad request.
AdvanceOnPermanent bool
// Classify overrides the default error classifier (llm.Classify).
Classify func(error) llm.ErrorClass
// Observer, when non-nil, receives one event per failover decision
// (failed attempt, bench, benched-skip) — the hook for persisting
// failover logs. Called synchronously; keep it fast or hand off.
Observer func(FailoverEvent)
}
// FailoverEvent describes one failover decision in a chain.
type FailoverEvent struct {
// Target is the "provider/model" key the event concerns.
Target string
// Err is the failure (nil for benched-skip events).
Err error
// Class is the error classification (meaningful when Err != nil).
Class llm.ErrorClass
// Attempt is the 0-based attempt number on this target in this request.
Attempt int
// Benched reports that this failure benched the target.
Benched bool
// Skipped reports the target was skipped because it is benched.
Skipped bool
}
// DefaultTransientRetries is the default number of same-target retries
// after a single transient error.
const DefaultTransientRetries = 1
func (c ChainConfig) retries() int {
switch {
case c.TransientRetries < 0:
return 0
case c.TransientRetries == 0:
return DefaultTransientRetries
default:
return c.TransientRetries
}
}
func (c ChainConfig) classify(err error) llm.ErrorClass {
if c.Classify != nil {
return c.Classify(err)
}
return llm.Classify(err)
}
type registryConfig struct {
health health.Config
chain ChainConfig
envLookup func(string) string
environ func() []string
skipEnv bool
httpClient *http.Client
}
// RegistryOption configures New.
type RegistryOption func(*registryConfig)
// WithHealthConfig overrides the health tracker configuration
// (thresholds, cooldowns, clock).
func WithHealthConfig(cfg health.Config) RegistryOption {
return func(rc *registryConfig) { rc.health = cfg }
}
// WithChainConfig overrides failover-chain behavior (retry count,
// permanent-error policy, classifier).
func WithChainConfig(cfg ChainConfig) RegistryOption {
return func(rc *registryConfig) { rc.chain = cfg }
}
// WithClock injects a clock for the health tracker; tests use a fake clock
// to step through backoff windows deterministically.
func WithClock(clock func() time.Time) RegistryOption {
return func(rc *registryConfig) { rc.health.Clock = clock }
}
// WithEnvLookup injects the env-var lookup used for lazy LLM_* resolution
// during Parse (defaults to os.Getenv). Tests use this to avoid touching
// the process environment.
func WithEnvLookup(lookup func(string) string) RegistryOption {
return func(rc *registryConfig) { rc.envLookup = lookup }
}
// WithoutEnvProviders disables the eager LLM_* scan at construction time.
// Lazy per-name resolution during Parse still works (use WithEnvLookup to
// control it in tests).
func WithoutEnvProviders() RegistryOption {
return func(rc *registryConfig) { rc.skipEnv = true }
}
// WithHTTPClient sets the HTTP client used by built-in providers and
// env-DSN scheme factories created by this registry (proxies, custom TLS,
// test servers). Providers registered explicitly via RegisterProvider keep
// whatever client they were built with.
func WithHTTPClient(c *http.Client) RegistryOption {
return func(rc *registryConfig) { rc.httpClient = c }
}
// New creates a Registry with all built-in providers and scheme factories
// registered, then loads LLM_* env-DSN providers from the process
// environment (unless WithoutEnvProviders is given). Malformed LLM_* entries
// do not fail construction; the error surfaces if that provider name is
// referenced in Parse.
func New(opts ...RegistryOption) *Registry {
cfg := registryConfig{
envLookup: os.Getenv,
environ: os.Environ,
}
for _, opt := range opts {
opt(&cfg)
}
r := &Registry{
providers: make(map[string]llm.Provider),
aliases: make(map[string]string),
schemes: make(map[string]SchemeFactory),
envErrs: make(map[string]error),
tracker: health.NewTracker(cfg.health),
chainCfg: cfg.chain,
envLookup: cfg.envLookup,
}
registerBuiltins(r, cfg.httpClient)
if !cfg.skipEnv {
env := make(map[string]string)
for _, kv := range cfg.environ() {
if k, v, ok := strings.Cut(kv, "="); ok {
env[k] = v
}
}
// Errors are recorded per-name and surfaced on use; see envErrs.
_ = r.LoadEnv(env)
}
return r
}
// RegisterProvider adds or replaces a provider under its Name().
func (r *Registry) RegisterProvider(p llm.Provider) {
r.mu.Lock()
defer r.mu.Unlock()
r.providers[p.Name()] = p
}
// RegisterAlias maps a bare name (no slash) to a spec. The spec may be a
// single target, another alias, or a comma-separated chain; Parse expands
// aliases inline and recursively, with cycle detection.
func (r *Registry) RegisterAlias(name, spec string) {
r.mu.Lock()
defer r.mu.Unlock()
r.aliases[name] = spec
}
// RegisterResolver appends a dynamic alias resolver (e.g. database-backed
// tiers). Resolvers are consulted in registration order, after static
// aliases.
func (r *Registry) RegisterResolver(res Resolver) {
r.mu.Lock()
defer r.mu.Unlock()
r.resolvers = append(r.resolvers, res)
}
// RegisterScheme adds or replaces an env-DSN scheme factory, letting
// consumers wire custom provider kinds into LLM_* definitions.
func (r *Registry) RegisterScheme(scheme string, factory SchemeFactory) {
r.mu.Lock()
defer r.mu.Unlock()
r.schemes[scheme] = factory
}
// Provider returns the registered provider with the given name, if any.
func (r *Registry) Provider(name string) (llm.Provider, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
p, ok := r.providers[name]
return p, ok
}
// Health exposes the registry's health tracker (read-mostly; useful for
// diagnostics and tests).
func (r *Registry) Health() *health.Tracker { return r.tracker }
// providerFor resolves a provider name: registered providers first, then a
// recorded env-load error for that name, then lazy LLM_* env resolution
// (go-llm parity: "m5" → env LLM_M5, "my-prov" → LLM_MY_PROV). Providers
// resolved lazily are cached in the registry.
func (r *Registry) providerFor(name string) (llm.Provider, error) {
r.mu.RLock()
p, ok := r.providers[name]
envErr := r.envErrs[name]
r.mu.RUnlock()
if ok {
return p, nil
}
if envErr != nil {
return nil, envErr
}
envKey := "LLM_" + strings.ToUpper(strings.ReplaceAll(name, "-", "_"))
envVal := r.envLookup(envKey)
if envVal == "" {
return nil, fmt.Errorf("%w: %q (checked registry and %s env var)", ErrUnknownProvider, name, envKey)
}
p, err := r.providerFromDSN(name, envVal)
if err != nil {
return nil, fmt.Errorf("parse %s: %w", envKey, err)
}
r.mu.Lock()
defer r.mu.Unlock()
// Another goroutine may have raced us here; keep the first registration.
if existing, ok := r.providers[name]; ok {
return existing, nil
}
r.providers[name] = p
return p, nil
}