Files
majordomo/chain.go
T
steve 323558ed72 feat(llm): ReasoningEffort request option and ErrUnsupported sentinel
Groundwork for the provider phase: reasoning levels map to native knobs
(OpenAI reasoning_effort, Ollama think); ErrUnsupported marks declared
capability mismatches that chains advance past without health penalty.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 12:40:47 +02:00

135 lines
4.8 KiB
Go

package majordomo
import (
"context"
"errors"
"fmt"
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// ErrChainExhausted reports that every element of a failover chain failed
// (or was skipped while backed off). It is always joined with the
// per-target errors.
var ErrChainExhausted = errors.New("all chain targets failed")
// chainTarget is one resolved element of a failover chain.
type chainTarget struct {
// key identifies the target for health tracking: "provider/model-id".
key string
model llm.Model
}
// chain implements llm.Model over an ordered list of targets with
// health-tracked failover. A single-element spec is a chain of one — the
// behavior (retry-on-transient, backoff bookkeeping) is identical, so
// callers never branch on what Parse returned.
//
// Semantics (ADR-0006, ADR-0008):
// - Targets are tried head-to-tail; targets currently backed off are
// skipped.
// - A transient error is retried on the same target (ChainConfig
// TransientRetries, default 1). Every failed attempt counts toward the
// target's consecutive-failure threshold; when the tracker benches the
// target (default: 2 consecutive transient failures → exponential
// capped cooldown) the chain stops retrying it and advances.
// - Model-not-found advances without penalizing health. Other permanent
// errors fail fast by default (AdvanceOnPermanent flips this).
// - Any success resets the target's health.
// - When every target fails or is skipped, the returned error joins
// ErrChainExhausted with each target's reason.
type chain struct {
targets []chainTarget
tracker *health.Tracker
cfg ChainConfig
}
// Targets returns the resolved "provider/model" keys in chain order
// (diagnostics and tests).
func (c *chain) Targets() []string {
keys := make([]string, len(c.targets))
for i, t := range c.targets {
keys[i] = t.key
}
return keys
}
// Capabilities reports the head element's capabilities — the chain's
// preferred target (ADR-0008). Per-attempt media normalization uses the
// actual target's capabilities, not this value.
func (c *chain) Capabilities() llm.Capabilities {
return c.targets[0].model.Capabilities()
}
// Generate tries each target per the chain semantics above.
func (c *chain) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
req = req.Apply(opts...)
return chainDo(ctx, c, func(ctx context.Context, t chainTarget) (*llm.Response, error) {
return t.model.Generate(ctx, req)
})
}
// Stream tries each target per the chain semantics. Failover applies to
// establishing the stream; once a stream is open, mid-stream errors
// propagate to the consumer rather than restarting on another target
// (replaying half-delivered output would duplicate content).
func (c *chain) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
return chainDo(ctx, c, func(ctx context.Context, t chainTarget) (llm.Stream, error) {
return t.model.Stream(ctx, req)
})
}
// chainDo runs the head-to-tail failover algorithm around an attempt
// function, generic over the result type (response vs stream).
func chainDo[T any](ctx context.Context, c *chain, attempt func(context.Context, chainTarget) (T, error)) (T, error) {
var zero T
var failures []error
for _, t := range c.targets {
if !c.tracker.Available(t.key) {
until := c.tracker.BackedOffUntil(t.key)
failures = append(failures, fmt.Errorf("%s: skipped (backed off until %s)", t.key, until.Format("15:04:05.000")))
continue
}
retries := c.cfg.retries()
for attemptN := 0; ; attemptN++ {
if err := ctx.Err(); err != nil {
return zero, err
}
result, err := attempt(ctx, t)
if err == nil {
c.tracker.ReportSuccess(t.key)
return result, nil
}
class := c.cfg.classify(err)
if class == llm.ClassPermanent {
if errors.Is(err, llm.ErrModelNotFound) || errors.Is(err, llm.ErrUnsupported) || c.cfg.AdvanceOnPermanent {
// Not a health problem (or policy says keep going):
// advance without penalizing the target.
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
break
}
// Failing over cannot fix a bad request or bad credentials.
return zero, fmt.Errorf("%s: %w", t.key, err)
}
// Transient: every failed attempt counts toward the target's
// consecutive-failure threshold. Retry the same target while
// attempts remain — but advance as soon as the tracker benches
// it (a freshly backed-off target is not worth more retries).
benched := c.tracker.ReportFailure(t.key)
if !benched && attemptN < retries {
continue
}
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
break
}
}
return zero, errors.Join(append([]error{ErrChainExhausted}, failures...)...)
}