0147a79d18
Phase 9a (ADR-0014): Registry.RegisterResolver for dynamic tiers; DefineTool[Args] typed tools; Usage cache/reasoning detail fields wired through anthropic/openai/google; WithPromptCaching (Anthropic cache_control); agent supervision hooks (WithMaxStepsFunc, WithSteer, WithCompactor, WithToolErrorLimits + ErrToolLoop); health Bench/Unbench/Snapshot; ChainConfig.Observer failover events. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
156 lines
5.8 KiB
Go
156 lines
5.8 KiB
Go
package majordomo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/media"
|
|
)
|
|
|
|
// ErrChainExhausted reports that every element of a failover chain failed
|
|
// (or was skipped while backed off). It is always joined with the
|
|
// per-target errors.
|
|
var ErrChainExhausted = errors.New("all chain targets failed")
|
|
|
|
// chainTarget is one resolved element of a failover chain.
|
|
type chainTarget struct {
|
|
// key identifies the target for health tracking: "provider/model-id".
|
|
key string
|
|
model llm.Model
|
|
}
|
|
|
|
// chain implements llm.Model over an ordered list of targets with
|
|
// health-tracked failover. A single-element spec is a chain of one — the
|
|
// behavior (retry-on-transient, backoff bookkeeping) is identical, so
|
|
// callers never branch on what Parse returned.
|
|
//
|
|
// Semantics (ADR-0006, ADR-0008):
|
|
// - Targets are tried head-to-tail; targets currently backed off are
|
|
// skipped.
|
|
// - A transient error is retried on the same target (ChainConfig
|
|
// TransientRetries, default 1). Every failed attempt counts toward the
|
|
// target's consecutive-failure threshold; when the tracker benches the
|
|
// target (default: 2 consecutive transient failures → exponential
|
|
// capped cooldown) the chain stops retrying it and advances.
|
|
// - Model-not-found advances without penalizing health. Other permanent
|
|
// errors fail fast by default (AdvanceOnPermanent flips this).
|
|
// - Any success resets the target's health.
|
|
// - When every target fails or is skipped, the returned error joins
|
|
// ErrChainExhausted with each target's reason.
|
|
type chain struct {
|
|
targets []chainTarget
|
|
tracker *health.Tracker
|
|
cfg ChainConfig
|
|
}
|
|
|
|
// Targets returns the resolved "provider/model" keys in chain order
|
|
// (diagnostics and tests).
|
|
func (c *chain) Targets() []string {
|
|
keys := make([]string, len(c.targets))
|
|
for i, t := range c.targets {
|
|
keys[i] = t.key
|
|
}
|
|
return keys
|
|
}
|
|
|
|
// Capabilities reports the head element's capabilities — the chain's
|
|
// preferred target (ADR-0008). Per-attempt media normalization uses the
|
|
// actual target's capabilities, not this value.
|
|
func (c *chain) Capabilities() llm.Capabilities {
|
|
return c.targets[0].model.Capabilities()
|
|
}
|
|
|
|
// Generate tries each target per the chain semantics above.
|
|
func (c *chain) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
|
|
req = req.Apply(opts...)
|
|
return chainDo(ctx, c, req, func(ctx context.Context, t chainTarget, nreq llm.Request) (*llm.Response, error) {
|
|
return t.model.Generate(ctx, nreq)
|
|
})
|
|
}
|
|
|
|
// Stream tries each target per the chain semantics. Failover applies to
|
|
// establishing the stream; once a stream is open, mid-stream errors
|
|
// propagate to the consumer rather than restarting on another target
|
|
// (replaying half-delivered output would duplicate content).
|
|
func (c *chain) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
|
req = req.Apply(opts...)
|
|
return chainDo(ctx, c, req, func(ctx context.Context, t chainTarget, nreq llm.Request) (llm.Stream, error) {
|
|
return t.model.Stream(ctx, nreq)
|
|
})
|
|
}
|
|
|
|
// chainDo runs the head-to-tail failover algorithm around an attempt
|
|
// function, generic over the result type (response vs stream). Before each
|
|
// target is tried, the request's media is normalized against THAT target's
|
|
// capabilities (ADR-0008/0009) — a request that cannot be made to fit one
|
|
// target advances to the next without a health penalty.
|
|
func chainDo[T any](ctx context.Context, c *chain, req llm.Request, attempt func(context.Context, chainTarget, llm.Request) (T, error)) (T, error) {
|
|
var zero T
|
|
var failures []error
|
|
|
|
observe := func(ev FailoverEvent) {
|
|
if c.cfg.Observer != nil {
|
|
c.cfg.Observer(ev)
|
|
}
|
|
}
|
|
|
|
for _, t := range c.targets {
|
|
if !c.tracker.Available(t.key) {
|
|
until := c.tracker.BackedOffUntil(t.key)
|
|
failures = append(failures, fmt.Errorf("%s: skipped (backed off until %s)", t.key, until.Format("15:04:05.000")))
|
|
observe(FailoverEvent{Target: t.key, Skipped: true})
|
|
continue
|
|
}
|
|
|
|
nreq, err := media.Normalize(req, t.model.Capabilities())
|
|
if err != nil {
|
|
// Always ErrUnsupported-wrapped: this target cannot take the
|
|
// request by declaration. Advance, no health penalty.
|
|
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
|
|
continue
|
|
}
|
|
|
|
retries := c.cfg.retries()
|
|
for attemptN := 0; ; attemptN++ {
|
|
if err := ctx.Err(); err != nil {
|
|
return zero, err
|
|
}
|
|
result, err := attempt(ctx, t, nreq)
|
|
if err == nil {
|
|
c.tracker.ReportSuccess(t.key)
|
|
return result, nil
|
|
}
|
|
|
|
class := c.cfg.classify(err)
|
|
if class == llm.ClassPermanent {
|
|
observe(FailoverEvent{Target: t.key, Err: err, Class: class, Attempt: attemptN})
|
|
if errors.Is(err, llm.ErrModelNotFound) || errors.Is(err, llm.ErrUnsupported) || c.cfg.AdvanceOnPermanent {
|
|
// Not a health problem (or policy says keep going):
|
|
// advance without penalizing the target.
|
|
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
|
|
break
|
|
}
|
|
// Failing over cannot fix a bad request or bad credentials.
|
|
return zero, fmt.Errorf("%s: %w", t.key, err)
|
|
}
|
|
|
|
// Transient: every failed attempt counts toward the target's
|
|
// consecutive-failure threshold. Retry the same target while
|
|
// attempts remain — but advance as soon as the tracker benches
|
|
// it (a freshly backed-off target is not worth more retries).
|
|
benched := c.tracker.ReportFailure(t.key)
|
|
observe(FailoverEvent{Target: t.key, Err: err, Class: class, Attempt: attemptN, Benched: benched})
|
|
if !benched && attemptN < retries {
|
|
continue
|
|
}
|
|
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
|
|
break
|
|
}
|
|
}
|
|
|
|
return zero, errors.Join(append([]error{ErrChainExhausted}, failures...)...)
|
|
}
|