Files
majordomo/chain.go
T
steve 043249e0e1 feat: OpenAI, Anthropic, and native-Ollama providers + media pipeline
Phase 3:
- provider/openai: Chat Completions for OpenAI + compat endpoints (SSE
  streaming with by-index tool-call assembly, response_format json_schema,
  legacy max_tokens option, reasoning_effort)
- provider/anthropic: Messages API (tool_use/tool_result, GA structured
  output via output_config.format, full SSE event parser, 529 transient)
- provider/ollama: one native /api/chat client behind the ollama,
  ollama-cloud, and foreman built-ins (presets; NDJSON streaming tolerant
  of foreman's buffered single-object responses; object tool arguments;
  format-schema structured output; think mapping)
- media/: capability normalization (sniff, downscale, transcode, byte
  ladder, ErrUnsupported), wired into the chain executor per target with
  penalty-free advance past incapable elements
- registry: real provider + scheme wiring, WithHTTPClient option, required
  env-foreman TLS chat round-trip test
- ADR-0009 multimodal strategy, ADR-0010 tools/structured mapping; README
  matrix + CLAUDE.md synced

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 12:58:08 +02:00

147 lines
5.4 KiB
Go

package majordomo
import (
"context"
"errors"
"fmt"
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/media"
)
// ErrChainExhausted reports that every element of a failover chain failed
// (or was skipped while backed off). It is always joined with the
// per-target errors.
var ErrChainExhausted = errors.New("all chain targets failed")
// chainTarget is one resolved element of a failover chain.
type chainTarget struct {
// key identifies the target for health tracking: "provider/model-id".
key string
model llm.Model
}
// chain implements llm.Model over an ordered list of targets with
// health-tracked failover. A single-element spec is a chain of one — the
// behavior (retry-on-transient, backoff bookkeeping) is identical, so
// callers never branch on what Parse returned.
//
// Semantics (ADR-0006, ADR-0008):
// - Targets are tried head-to-tail; targets currently backed off are
// skipped.
// - A transient error is retried on the same target (ChainConfig
// TransientRetries, default 1). Every failed attempt counts toward the
// target's consecutive-failure threshold; when the tracker benches the
// target (default: 2 consecutive transient failures → exponential
// capped cooldown) the chain stops retrying it and advances.
// - Model-not-found advances without penalizing health. Other permanent
// errors fail fast by default (AdvanceOnPermanent flips this).
// - Any success resets the target's health.
// - When every target fails or is skipped, the returned error joins
// ErrChainExhausted with each target's reason.
type chain struct {
targets []chainTarget
tracker *health.Tracker
cfg ChainConfig
}
// Targets returns the resolved "provider/model" keys in chain order
// (diagnostics and tests).
func (c *chain) Targets() []string {
keys := make([]string, len(c.targets))
for i, t := range c.targets {
keys[i] = t.key
}
return keys
}
// Capabilities reports the head element's capabilities — the chain's
// preferred target (ADR-0008). Per-attempt media normalization uses the
// actual target's capabilities, not this value.
func (c *chain) Capabilities() llm.Capabilities {
return c.targets[0].model.Capabilities()
}
// Generate tries each target per the chain semantics above.
func (c *chain) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
req = req.Apply(opts...)
return chainDo(ctx, c, req, func(ctx context.Context, t chainTarget, nreq llm.Request) (*llm.Response, error) {
return t.model.Generate(ctx, nreq)
})
}
// Stream tries each target per the chain semantics. Failover applies to
// establishing the stream; once a stream is open, mid-stream errors
// propagate to the consumer rather than restarting on another target
// (replaying half-delivered output would duplicate content).
func (c *chain) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
return chainDo(ctx, c, req, func(ctx context.Context, t chainTarget, nreq llm.Request) (llm.Stream, error) {
return t.model.Stream(ctx, nreq)
})
}
// chainDo runs the head-to-tail failover algorithm around an attempt
// function, generic over the result type (response vs stream). Before each
// target is tried, the request's media is normalized against THAT target's
// capabilities (ADR-0008/0009) — a request that cannot be made to fit one
// target advances to the next without a health penalty.
func chainDo[T any](ctx context.Context, c *chain, req llm.Request, attempt func(context.Context, chainTarget, llm.Request) (T, error)) (T, error) {
var zero T
var failures []error
for _, t := range c.targets {
if !c.tracker.Available(t.key) {
until := c.tracker.BackedOffUntil(t.key)
failures = append(failures, fmt.Errorf("%s: skipped (backed off until %s)", t.key, until.Format("15:04:05.000")))
continue
}
nreq, err := media.Normalize(req, t.model.Capabilities())
if err != nil {
// Always ErrUnsupported-wrapped: this target cannot take the
// request by declaration. Advance, no health penalty.
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
continue
}
retries := c.cfg.retries()
for attemptN := 0; ; attemptN++ {
if err := ctx.Err(); err != nil {
return zero, err
}
result, err := attempt(ctx, t, nreq)
if err == nil {
c.tracker.ReportSuccess(t.key)
return result, nil
}
class := c.cfg.classify(err)
if class == llm.ClassPermanent {
if errors.Is(err, llm.ErrModelNotFound) || errors.Is(err, llm.ErrUnsupported) || c.cfg.AdvanceOnPermanent {
// Not a health problem (or policy says keep going):
// advance without penalizing the target.
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
break
}
// Failing over cannot fix a bad request or bad credentials.
return zero, fmt.Errorf("%s: %w", t.key, err)
}
// Transient: every failed attempt counts toward the target's
// consecutive-failure threshold. Retry the same target while
// attempts remain — but advance as soon as the tracker benches
// it (a freshly backed-off target is not worth more retries).
benched := c.tracker.ReportFailure(t.key)
if !benched && attemptN < retries {
continue
}
failures = append(failures, fmt.Errorf("%s: %w", t.key, err))
break
}
}
return zero, errors.Join(append([]error{ErrChainExhausted}, failures...)...)
}