feat: foundations — canonical types, Parse grammar, env DSNs, health, chains
Phase 1 of the majordomo build: - llm/ canonical contract (messages, parts, tools, capabilities, streaming, Model/Provider, error classification) - health/ clock-injected tracker (threshold bench, exponential capped cooldown, reset-on-success) - root Registry + Parse (verbatim model ids, inline recursive alias expansion with cycle detection, chain dedup), LLM_* env-DSN providers (go-llm parity: lazy fallback + eager LoadEnv), health-aware chain executor behind the Model interface - provider/fake scriptable test provider; hermetic test suite incl. the trailing-thinking chain and foreman:// env loading - ADRs 0001-0008, CLAUDE.md, README (honest matrix), CI workflow, docs/phase-1-design.md Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,134 @@
|
||||
package majordomo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// ErrChainExhausted reports that every element of a failover chain failed
|
||||
// (or was skipped while backed off). It is always joined with the
|
||||
// per-target errors.
|
||||
var ErrChainExhausted = errors.New("all chain targets failed")
|
||||
|
||||
// chainTarget is one resolved element of a failover chain.
|
||||
type chainTarget struct {
|
||||
// key identifies the target for health tracking: "provider/model-id".
|
||||
key string
|
||||
model llm.Model
|
||||
}
|
||||
|
||||
// chain implements llm.Model over an ordered list of targets with
|
||||
// health-tracked failover. A single-element spec is a chain of one — the
|
||||
// behavior (retry-on-transient, backoff bookkeeping) is identical, so
|
||||
// callers never branch on what Parse returned.
|
||||
//
|
||||
// Semantics (ADR-0006, ADR-0008):
|
||||
// - Targets are tried head-to-tail; targets currently backed off are
|
||||
// skipped.
|
||||
// - A transient error is retried on the same target (ChainConfig
|
||||
// TransientRetries, default 1). Every failed attempt counts toward the
|
||||
// target's consecutive-failure threshold; when the tracker benches the
|
||||
// target (default: 2 consecutive transient failures → exponential
|
||||
// capped cooldown) the chain stops retrying it and advances.
|
||||
// - Model-not-found advances without penalizing health. Other permanent
|
||||
// errors fail fast by default (AdvanceOnPermanent flips this).
|
||||
// - Any success resets the target's health.
|
||||
// - When every target fails or is skipped, the returned error joins
|
||||
// ErrChainExhausted with each target's reason.
|
||||
type chain struct {
|
||||
targets []chainTarget
|
||||
tracker *health.Tracker
|
||||
cfg ChainConfig
|
||||
}
|
||||
|
||||
// Targets returns the resolved "provider/model" keys in chain order
|
||||
// (diagnostics and tests).
|
||||
func (c *chain) Targets() []string {
|
||||
keys := make([]string, len(c.targets))
|
||||
for i, t := range c.targets {
|
||||
keys[i] = t.key
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// Capabilities reports the head element's capabilities — the chain's
|
||||
// preferred target (ADR-0008). Per-attempt media normalization uses the
|
||||
// actual target's capabilities, not this value.
|
||||
func (c *chain) Capabilities() llm.Capabilities {
|
||||
return c.targets[0].model.Capabilities()
|
||||
}
|
||||
|
||||
// Generate tries each target per the chain semantics above.
|
||||
func (c *chain) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
|
||||
req = req.Apply(opts...)
|
||||
return chainDo(ctx, c, func(ctx context.Context, t chainTarget) (*llm.Response, error) {
|
||||
return t.model.Generate(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
// Stream tries each target per the chain semantics. Failover applies to
|
||||
// establishing the stream; once a stream is open, mid-stream errors
|
||||
// propagate to the consumer rather than restarting on another target
|
||||
// (replaying half-delivered output would duplicate content).
|
||||
func (c *chain) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
||||
req = req.Apply(opts...)
|
||||
return chainDo(ctx, c, func(ctx context.Context, t chainTarget) (llm.Stream, error) {
|
||||
return t.model.Stream(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
// chainDo runs the head-to-tail failover algorithm around an attempt
|
||||
// function, generic over the result type (response vs stream).
|
||||
func chainDo[T any](ctx context.Context, c *chain, attempt func(context.Context, chainTarget) (T, error)) (T, error) {
|
||||
var zero T
|
||||
var failures []error
|
||||
|
||||
for _, t := range c.targets {
|
||||
if !c.tracker.Available(t.key) {
|
||||
until := c.tracker.BackedOffUntil(t.key)
|
||||
failures = append(failures, fmt.Errorf("%s: skipped (backed off until %s)", t.key, until.Format("15:04:05.000")))
|
||||
continue
|
||||
}
|
||||
|
||||
retries := c.cfg.retries()
|
||||
for attemptN := 0; ; attemptN++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return zero, err
|
||||
}
|
||||
result, err := attempt(ctx, t)
|
||||
if err == nil {
|
||||
c.tracker.ReportSuccess(t.key)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
class := c.cfg.classify(err)
|
||||
if class == llm.ClassPermanent {
|
||||
if errors.Is(err, llm.ErrModelNotFound) || c.cfg.AdvanceOnPermanent {
|
||||
// Not a health problem (or policy says keep going):
|
||||
// advance without penalizing the target.
|
||||
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
|
||||
break
|
||||
}
|
||||
// Failing over cannot fix a bad request or bad credentials.
|
||||
return zero, fmt.Errorf("%s: %w", t.key, err)
|
||||
}
|
||||
|
||||
// Transient: every failed attempt counts toward the target's
|
||||
// consecutive-failure threshold. Retry the same target while
|
||||
// attempts remain — but advance as soon as the tracker benches
|
||||
// it (a freshly backed-off target is not worth more retries).
|
||||
benched := c.tracker.ReportFailure(t.key)
|
||||
if !benched && attemptN < retries {
|
||||
continue
|
||||
}
|
||||
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return zero, errors.Join(append([]error{ErrChainExhausted}, failures...)...)
|
||||
}
|
||||
Reference in New Issue
Block a user