P1: model layer (convar->config inversion) + llmmeta
executus CI / test (pull_request) Successful in 58s
Adversarial Review (Gadfly) / review (pull_request) Successful in 26m27s
executus CI / test (push) Successful in 1m2s

Lifts mort's pkg/logic/llms into executus/model, decoupled from mort:

- tiers.go: the tier resolver now reads a host-supplied config.Source under
  "model.tier.<name>" with host-supplied fallbacks (Configure(cfg, defaults,
  ttl)), instead of convar.Manager. Tier NAMES + specs are host config; the
  resolution mechanism (cache, reasoning-suffix dialect, chain validation) is
  generic. No tier names hard-coded in the harness.
- sink.go: usage/trace recording inverted off mort's llmusage/llmtrace into
  UsageSink / TraceSink seams + a model-owned Span, with nil-safe context
  attribution helpers (WithModel/WithTraceID/WithUsageTool/WithUsageUser).
  Both sinks optional (nil = off) so a light host records nothing.
- lane decoration repointed to executus/lane; utils.Errorf -> fmt.Errorf.
- call.go keeps GenerateWith[T] (instrumented structured output) — this is the
  structured-output primitive; no separate structured/ package.
- llmmeta moved over model/ (the meta-LLM helper: tier allowlist + JSON retry
  + ledger). Its tests configure a minimal tier table via TestMain.

New tests cover the inversion: config overrides fallback, tier registration,
reasoning-suffix survival, nested-tier rejection, nil-sink no-ops.

Full module: go build/vet/test -race green; core go.sum still free of
gorm/redis/discordgo/sqlite.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit was merged in pull request #1.
This commit is contained in:
2026-06-26 19:47:13 -04:00
parent 741d7816ed
commit b424261aca
17 changed files with 3698 additions and 3 deletions
+97
View File
@@ -0,0 +1,97 @@
// Package llms — bench.go: the mort-flavored facade over majordomo's
// health tracker for the `.failover` Discord commands and the failover
// web UI.
//
// Why a facade (vs exposing health.Tracker directly): the admin surfaces
// want the historical shape — a benched-only list with a manual/auto
// flag. majordomo's tracker treats manual benches (Bench) and automatic
// backoffs identically, so the manual marker is kept mort-side.
package model
import (
"sync"
"time"
)
// BenchedModel is one currently-benched model for admin display.
type BenchedModel struct {
// Model is the "provider/model" target key.
Model string
// Until is the end of the bench window.
Until time.Time
// ConsecutiveFails is the failure count since the last success.
ConsecutiveFails int
// Manual reports the bench was placed by an operator (BenchModel)
// rather than the automatic failure threshold.
Manual bool
}
var (
manualMu sync.Mutex
manualBenches = map[string]time.Time{}
)
// ListBenched returns the currently-benched models, manual and automatic,
// from the live health tracker.
func ListBenched() []BenchedModel {
now := time.Now()
pruneManual(now)
var out []BenchedModel
for _, st := range Health().Snapshot() {
if !st.Until.After(now) {
continue
}
out = append(out, BenchedModel{
Model: st.Key,
Until: st.Until,
ConsecutiveFails: st.ConsecutiveFailures,
Manual: isManual(st.Key, st.Until),
})
}
return out
}
// BenchModel manually benches a model spec until the given time. The
// chain executor skips benched targets until the window expires (or
// UnbenchModel clears it).
func BenchModel(model string, until time.Time) {
Health().Bench(model, until)
manualMu.Lock()
manualBenches[model] = until
manualMu.Unlock()
}
// UnbenchModel clears the bench on a model. Returns true when the model
// was actually benched.
func UnbenchModel(model string) bool {
now := time.Now()
wasBenched := Health().BackedOffUntil(model).After(now)
Health().Unbench(model)
manualMu.Lock()
delete(manualBenches, model)
manualMu.Unlock()
return wasBenched
}
// isManual reports whether the bench window for key matches a manual
// bench placed via BenchModel. An automatic backoff that outlives the
// manual window supersedes the marker.
func isManual(key string, until time.Time) bool {
manualMu.Lock()
defer manualMu.Unlock()
manualUntil, ok := manualBenches[key]
return ok && !until.After(manualUntil)
}
// pruneManual drops expired manual markers so the map can't grow
// unbounded across a long uptime.
func pruneManual(now time.Time) {
manualMu.Lock()
defer manualMu.Unlock()
for k, until := range manualBenches {
if !until.After(now) {
delete(manualBenches, k)
}
}
}
+415
View File
@@ -0,0 +1,415 @@
package model
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"time"
majordomo "gitea.stevedudenhoeffer.com/steve/majordomo"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"github.com/google/uuid"
)
// CallResult captures the result of a single tool call execution.
type CallResult struct {
Name string
Arguments string
Result string
Error error
}
// instrumentedModel decorates a parsed model so every successful Generate
// records token usage to the usage sink automatically. This is the
// single usage chokepoint: ANY call through a model from
// ParseModelRequest / ParseModelForContext is accounted, whether it goes
// through the helpers in this file, the agent loop, or a direct
// model.Generate at a call site.
//
// IMPORTANT: do not call RecordUsage on responses from a parsed model —
// that would double-count. RecordUsage exists for models obtained outside
// this package.
type instrumentedModel struct {
inner llm.Model
}
func (m *instrumentedModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
resp, err := m.inner.Generate(ctx, req, opts...)
if err == nil && resp != nil {
recordUsage(ctx, resp)
}
return resp, err
}
func (m *instrumentedModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
func (m *instrumentedModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// CallAndExecute sends messages to the model with a toolbox, executes any
// tool calls, and returns the results. It performs a single round of
// generation + tool execution (no looping) — multi-step loops belong to
// the agent package.
func CallAndExecute(ctx context.Context, model llm.Model, systemPrompt string, toolbox *llm.Toolbox, messages []llm.Message, opts ...llm.Option) ([]CallResult, string, error) {
req := llm.Request{System: systemPrompt, Messages: messages}
allOpts := make([]llm.Option, 0, len(opts)+1)
if toolbox != nil {
allOpts = append(allOpts, llm.WithToolbox(toolbox))
}
allOpts = append(allOpts, opts...)
startTime := time.Now()
resp, err := model.Generate(ctx, req, allOpts...)
if err != nil {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, nil, nil, startTime, err)
return nil, "", fmt.Errorf("completion failed: %w", err)
}
if len(resp.ToolCalls) == 0 || toolbox == nil {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, nil, startTime, nil)
return nil, resp.Text(), nil
}
var results []CallResult
for _, call := range resp.ToolCalls {
tr := toolbox.Execute(ctx, call)
cr := CallResult{
Name: call.Name,
Arguments: string(call.Arguments),
Result: tr.Content,
}
if tr.IsError {
cr.Error = errors.New(tr.Content)
}
results = append(results, cr)
}
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, results, startTime, nil)
return results, resp.Text(), nil
}
// GenerateWith sends messages to the model with an optional system prompt and
// returns structured output parsed into T. T must be a struct. Uses
// majordomo's native structured output (response schema derived from T).
func GenerateWith[T any](ctx context.Context, model llm.Model, systemPrompt string, messages []llm.Message, opts ...llm.Option) (T, error) {
req := llm.Request{System: systemPrompt, Messages: messages}
startTime := time.Now()
// Capture the raw response so the trace span carries usage and the
// concrete serving model even though majordomo.Generate only returns T.
capture := &captureModel{inner: model}
result, err := majordomo.Generate[T](ctx, capture, req, opts...)
resolvedModel := resolvedModelName(ctx, capture.resp)
if tracingEnabled(ctx) {
span := Span{
SpanID: uuid.New().String(),
TraceID: traceIDFromContext(ctx),
Model: resolvedModel,
SystemPrompt: systemPrompt,
Messages: marshalMessages(messages),
DurationMs: time.Since(startTime).Milliseconds(),
StartedAt: startTime,
CompletedAt: time.Now(),
CreatedAt: time.Now(),
}
if capture.resp != nil {
span.InputTokens = capture.resp.Usage.InputTokens
span.OutputTokens = capture.resp.Usage.OutputTokens
}
if err != nil {
span.Error = err.Error()
// Structured-output failure: log loudly so operators can chase
// down a regression (e.g. a model returning prose or fenced
// JSON the decoder rejects) from the trace span alone. The
// error string includes the failing field path on decode
// errors.
if isStructuredOutputParseError(err) {
slog.Warn("llms.GenerateWith: structured-output parse failure",
"model", resolvedModel,
"span_id", span.SpanID,
"trace_id", span.TraceID,
"err", err.Error(),
)
}
} else {
b, _ := json.Marshal(result)
span.ResponseText = string(b)
}
traceSink.WriteSpan(span)
} else if err != nil && isStructuredOutputParseError(err) {
// Tracing disabled: slog.Warn is the only breadcrumb operators get.
slog.Warn("llms.GenerateWith: structured-output parse failure (no trace span)",
"model", resolvedModel,
"err", err.Error(),
)
}
return result, err
}
// captureModel records the last successful response so wrappers that
// only see the decoded result (majordomo.Generate) can still attribute
// usage and tracing.
type captureModel struct {
inner llm.Model
resp *llm.Response
}
func (m *captureModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
resp, err := m.inner.Generate(ctx, req, opts...)
if err == nil {
m.resp = resp
}
return resp, err
}
func (m *captureModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
func (m *captureModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// isStructuredOutputParseError reports whether err looks like a
// structured-output failure from majordomo.Generate — either the decode
// path ("decode structured response") or the empty-response path
// ("structured response from ... is empty"). Used to gate the loud
// slog.Warn so transport errors don't get tagged as parse failures.
func isStructuredOutputParseError(err error) bool {
if err == nil {
return false
}
s := err.Error()
return strings.Contains(s, "decode structured response") ||
strings.Contains(s, "structured response from")
}
// SimpleCall sends a single user message to the model with an optional system
// prompt and returns the text response. No tools involved.
func SimpleCall(ctx context.Context, model llm.Model, systemPrompt string, userMessage string, opts ...llm.Option) (string, error) {
msgs := []llm.Message{llm.UserText(userMessage)}
startTime := time.Now()
resp, err := model.Generate(ctx, llm.Request{System: systemPrompt, Messages: msgs}, opts...)
if err != nil {
recordSpanFromWrapper(ctx, systemPrompt, msgs, nil, nil, nil, startTime, err)
return "", fmt.Errorf("completion failed: %w", err)
}
recordSpanFromWrapper(ctx, systemPrompt, msgs, nil, resp, nil, startTime, nil)
return resp.Text(), nil
}
// RecordUsage records LLM token usage from a successful Generate response.
//
// ONLY call this for models obtained outside this package: models returned
// by ParseModelRequest / ParseModelForContext record usage automatically on
// every Generate, and calling RecordUsage on their responses double-counts.
func RecordUsage(ctx context.Context, resp llm.Response) {
recordUsage(ctx, &resp)
}
// RecordSpan records a trace span for a direct model.Generate() call.
// Call this from modules that invoke model.Generate() directly when they
// want the call traced (usage is already recorded automatically for
// parsed models).
func RecordSpan(ctx context.Context, systemPrompt string, messages []llm.Message, toolbox *llm.Toolbox, resp *llm.Response, callResults []CallResult, startTime time.Time, callErr error) {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, callResults, startTime, callErr)
}
// recordUsage records token usage for one response. The model is
// attributed from the response itself when possible (resp.Model names
// the chain element that actually served the request — more precise than
// the requested spec), falling back to the context attribution set by
// ParseModelForContext.
func recordUsage(ctx context.Context, resp *llm.Response) {
if usageSink == nil || resp == nil {
return
}
u := resp.Usage
if u.InputTokens == 0 && u.OutputTokens == 0 {
return
}
model := resolvedModelName(ctx, resp)
if model == "unknown" || model == "" {
tool := toolFromContext(ctx)
if tool == "unknown" {
slog.Warn("model usage: recording with both unknown model and tool",
"user", userFromContext(ctx), "stack", string(debug.Stack()))
} else {
slog.Warn("model usage: recording with unknown model — caller should set model.WithModel or use model.ParseModelForContext",
"tool", tool, "user", userFromContext(ctx))
}
}
usageSink.Record(ctx, model, u.InputTokens, u.OutputTokens, u.CacheReadTokens, u.CacheWriteTokens)
}
// resolvedModelName picks the usage/trace attribution name: the serving
// model from the response when present ("provider/model" → "model"),
// else the context's requested model resolved through the tier table.
func resolvedModelName(ctx context.Context, resp *llm.Response) string {
if resp != nil && resp.Model != "" {
name := resp.Model
if idx := strings.Index(name, "/"); idx >= 0 {
name = name[idx+1:]
}
return name
}
return ResolveModelName(modelFromContext(ctx))
}
// tracingEnabled returns true if there's an active trace and tracing is enabled.
func tracingEnabled(ctx context.Context) bool {
if traceSink == nil {
return false
}
return traceIDFromContext(ctx) != ""
}
// recordSpanFromWrapper records a trace span if tracing is active.
func recordSpanFromWrapper(ctx context.Context, systemPrompt string, messages []llm.Message, toolbox *llm.Toolbox, resp *llm.Response, callResults []CallResult, startTime time.Time, callErr error) {
if !tracingEnabled(ctx) {
return
}
now := time.Now()
span := Span{
SpanID: uuid.New().String(),
TraceID: traceIDFromContext(ctx),
Model: resolvedModelName(ctx, resp),
SystemPrompt: systemPrompt,
Messages: marshalMessages(messages),
ToolDefinitions: marshalToolDefs(toolbox),
DurationMs: now.Sub(startTime).Milliseconds(),
StartedAt: startTime,
CompletedAt: now,
CreatedAt: now,
}
if callErr != nil {
span.Error = callErr.Error()
}
if resp != nil {
span.ResponseText = resp.Text()
span.InputTokens = resp.Usage.InputTokens
span.OutputTokens = resp.Usage.OutputTokens
if len(resp.ToolCalls) > 0 {
span.ResponseToolCalls = marshalToolCalls(resp.ToolCalls)
}
}
if len(callResults) > 0 {
span.ToolResults = marshalCallResults(callResults)
}
traceSink.WriteSpan(span)
}
// --- Serialization helpers ---
type jsonMessage struct {
Role string `json:"role"`
Text string `json:"text,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
ImageCount int `json:"image_count,omitempty"`
}
func marshalMessages(msgs []llm.Message) string {
out := make([]jsonMessage, 0, len(msgs))
for _, m := range msgs {
jm := jsonMessage{
Role: string(m.Role),
Text: m.Text(),
}
for _, p := range m.Parts {
if _, ok := p.(llm.ImagePart); ok {
jm.ImageCount++
}
}
if len(m.ToolResults) > 0 {
jm.ToolCallID = m.ToolResults[0].ID
}
out = append(out, jm)
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonToolCall struct {
ID string `json:"id"`
Name string `json:"name"`
Arguments string `json:"arguments"`
}
func marshalToolCalls(calls []llm.ToolCall) string {
out := make([]jsonToolCall, 0, len(calls))
for _, c := range calls {
out = append(out, jsonToolCall{
ID: c.ID,
Name: c.Name,
Arguments: string(c.Arguments),
})
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonCallResult struct {
Name string `json:"name"`
Arguments string `json:"arguments"`
Result string `json:"result"`
Error string `json:"error,omitempty"`
}
func marshalCallResults(results []CallResult) string {
out := make([]jsonCallResult, 0, len(results))
for _, r := range results {
jr := jsonCallResult{
Name: r.Name,
Arguments: r.Arguments,
Result: r.Result,
}
if r.Error != nil {
jr.Error = r.Error.Error()
}
out = append(out, jr)
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonToolDef struct {
Name string `json:"name"`
Description string `json:"description"`
}
func marshalToolDefs(tb *llm.Toolbox) string {
if tb == nil {
return ""
}
tools := tb.Tools()
if len(tools) == 0 {
return ""
}
out := make([]jsonToolDef, 0, len(tools))
for _, t := range tools {
out = append(out, jsonToolDef{
Name: t.Name,
Description: t.Description,
})
}
b, _ := json.Marshal(out)
return string(b)
}
+453
View File
@@ -0,0 +1,453 @@
// V15.4 — Ollama Cloud dynamic context-length sync.
//
// Why: the static map in context_limits.go has to be hand-maintained
// for every new Ollama Cloud model. Cloud ships new models monthly,
// and a missing entry silently disables compaction for runs on that
// model (compactionThresholdForModel returns 0 on MaxContextTokens
// miss). Dynamic sync removes the maintenance burden and means new
// cloud models work out-of-the-box.
//
// How: at boot, mort kicks off a CloudOllamaLimitCache.RefreshAll in a
// background goroutine. RefreshAll calls /api/tags to list every
// available cloud model, then concurrently calls /api/show for each
// to extract `<family>.context_length` from the response's model_info
// map. The cache is consulted by the executor's
// compactionThresholdForModel via the cache-aware
// MaxContextTokensWithCache helper.
//
// Periodic refresh: a daily ticker re-runs RefreshAll so newly
// released models surface without a mort restart. The interval is
// intentionally not configurable — cloud model context lengths don't
// change for a given tag (only the tag pointer can move, e.g. :cloud
// → larger model), so daily is conservative.
package model
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"
)
// defaultCloudEndpoint is the public Ollama Cloud base URL. Override
// in tests via NewCloudOllamaLimitCache's endpoint arg.
const defaultCloudEndpoint = "https://ollama.com"
// CloudOllamaLimitCache holds context-length values for Ollama Cloud
// models, populated dynamically via /api/tags + /api/show. Construct
// with NewCloudOllamaLimitCache. Safe for concurrent use.
//
// Empty when OLLAMA_API_KEY is unset — Refresh returns a clear error
// and the cache stays empty. Lookups return (0, false) and callers
// fall back to the static map / disabled compaction.
type CloudOllamaLimitCache struct {
mu sync.RWMutex
limit map[string]int
negative map[string]time.Time // model → fetch-failure time (for TTL)
endpoint string
apiKey string
httpClient *http.Client
// refreshConcurrency caps the number of concurrent /api/show calls
// during RefreshAll. Default 8 — enough to finish a ~50-model
// catalog in well under a minute without hammering Cloud.
refreshConcurrency int
// negativeTTL is how long a /api/show miss is cached before we
// retry. Prevents hammering Cloud on a typo or recently-removed
// model. Default 10 minutes.
negativeTTL time.Duration
}
// NewCloudOllamaLimitCache constructs a fresh cache. apiKey can be
// empty — RefreshAll then returns an error and the cache stays empty.
// endpoint defaults to https://ollama.com when empty. httpClient
// defaults to a 15s-timeout client.
func NewCloudOllamaLimitCache(endpoint, apiKey string, httpClient *http.Client) *CloudOllamaLimitCache {
if strings.TrimSpace(endpoint) == "" {
endpoint = defaultCloudEndpoint
}
endpoint = strings.TrimRight(endpoint, "/")
if httpClient == nil {
httpClient = &http.Client{Timeout: 15 * time.Second}
}
return &CloudOllamaLimitCache{
limit: make(map[string]int),
negative: make(map[string]time.Time),
endpoint: endpoint,
apiKey: apiKey,
httpClient: httpClient,
refreshConcurrency: 8,
negativeTTL: 10 * time.Minute,
}
}
// SetNegativeTTL overrides the negative-cache lifetime. Tests use this
// to control retry behaviour without sleeping.
func (c *CloudOllamaLimitCache) SetNegativeTTL(d time.Duration) {
if c == nil || d < 0 {
return
}
c.mu.Lock()
c.negativeTTL = d
c.mu.Unlock()
}
// Lookup returns the cached context length for an Ollama Cloud model
// name (e.g. "qwen3.5:cloud", "qwen3-coder:480b"). Returns (0, false)
// on miss. Lookup never makes HTTP calls — it's the hot path consulted
// by the executor before every run.
//
// modelName accepts either the bare model:tag form or the prefixed
// "ollama-cloud/model:tag" form; the prefix is stripped.
func (c *CloudOllamaLimitCache) Lookup(modelName string) (int, bool) {
if c == nil {
return 0, false
}
key := stripCloudPrefix(modelName)
if key == "" {
return 0, false
}
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.limit[key]
return v, ok
}
// Size returns the number of cached entries. Useful for logging /
// health checks.
func (c *CloudOllamaLimitCache) Size() int {
if c == nil {
return 0
}
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.limit)
}
// LookupOrFetch returns the cached context length OR, on miss, makes a
// single /api/show call to populate the cache. Negative results
// (model not found, /api/show returns no context_length) are cached
// for negativeTTL to prevent hammering Cloud on a typo. Returns
// (0, false) when the model is genuinely unknown and (size, true) on
// any successful resolve.
//
// Why this exists: Ollama Cloud's /api/tags lists canonical model
// names only (e.g. "qwen3.5:397b") but accepts aliases on /api/show
// (e.g. "qwen3.5:cloud" → same 397b model). The boot-time RefreshAll
// only sees the canonical names, so common aliases miss the cache.
// LookupOrFetch fills the gap.
//
// The cache is therefore self-healing: any unknown model gets one
// live /api/show call, the result lands in the cache, and subsequent
// runs hit immediately. Periodic RefreshAll overwrites everything
// with the canonical-name results but additionally-fetched aliases
// linger as positive entries.
func (c *CloudOllamaLimitCache) LookupOrFetch(ctx context.Context, modelName string) (int, bool) {
if c == nil {
return 0, false
}
key := stripCloudPrefix(modelName)
if key == "" {
return 0, false
}
// Fast path: positive hit.
c.mu.RLock()
if v, ok := c.limit[key]; ok {
c.mu.RUnlock()
return v, true
}
// Negative cache check.
if t, ok := c.negative[key]; ok && time.Since(t) < c.negativeTTL {
c.mu.RUnlock()
return 0, false
}
c.mu.RUnlock()
// No API key configured → can't fetch. Don't write a negative
// entry (when the key gets configured later we want the next call
// to re-try immediately).
if strings.TrimSpace(c.apiKey) == "" {
return 0, false
}
// Slow path: live /api/show.
n, err := c.fetchContextLength(ctx, key)
if err != nil || n <= 0 {
slog.Debug("cloud limit cache: lazy fetch miss",
"model", key, "err", err)
c.mu.Lock()
c.negative[key] = time.Now()
c.mu.Unlock()
return 0, false
}
c.set(key, n)
slog.Info("cloud limit cache: lazy fetch hit", "model", key, "context_length", n)
return n, true
}
// set stores a context length. n <= 0 is a no-op.
func (c *CloudOllamaLimitCache) set(modelName string, n int) {
if c == nil || n <= 0 {
return
}
key := stripCloudPrefix(modelName)
if key == "" {
return
}
c.mu.Lock()
c.limit[key] = n
c.mu.Unlock()
}
// RefreshAll queries /api/tags then concurrently calls /api/show for
// every listed model, populating the cache. Returns the number of
// models successfully cached and the first error encountered (a
// /api/tags failure aborts; individual /api/show failures are logged
// but don't abort the whole refresh).
//
// Safe to call repeatedly. Cache entries are overwritten with the
// fresh values; entries for models that have been removed from Cloud
// are NOT pruned (cheap to keep; pruning risks dropping an entry just
// before a run that needs it).
func (c *CloudOllamaLimitCache) RefreshAll(ctx context.Context) (int, error) {
if c == nil {
return 0, fmt.Errorf("cloud limit cache: nil receiver")
}
if strings.TrimSpace(c.apiKey) == "" {
return 0, fmt.Errorf("cloud limit cache: OLLAMA_API_KEY unset")
}
tags, err := c.fetchTags(ctx)
if err != nil {
return 0, fmt.Errorf("cloud limit cache: /api/tags: %w", err)
}
concurrency := c.refreshConcurrency
if concurrency <= 0 {
concurrency = 8
}
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
var (
mu sync.Mutex
success int
)
for _, name := range tags {
name := name
wg.Add(1)
sem <- struct{}{}
go func() {
defer wg.Done()
defer func() { <-sem }()
ctxLen, ferr := c.fetchContextLength(ctx, name)
if ferr != nil {
slog.Debug("cloud limit cache: /api/show miss",
"model", name, "err", ferr)
return
}
c.set(name, ctxLen)
mu.Lock()
success++
mu.Unlock()
}()
}
wg.Wait()
slog.Info("cloud limit cache: refresh complete",
"models_total", len(tags), "cached", success)
return success, nil
}
// StartPeriodicRefresh runs RefreshAll once immediately, then on every
// interval tick. Cancellation via ctx stops the loop. Logs each
// outcome; never returns an error to the caller (this is a background
// task — failures are warnings, not show-stoppers).
//
// Typical usage: a goroutine spawned at mort boot.
//
// go cache.StartPeriodicRefresh(ctx, 24*time.Hour)
func (c *CloudOllamaLimitCache) StartPeriodicRefresh(ctx context.Context, interval time.Duration) {
if c == nil {
return
}
if interval <= 0 {
interval = 24 * time.Hour
}
doOne := func() {
n, err := c.RefreshAll(ctx)
if err != nil {
slog.Warn("cloud limit cache: refresh failed",
"err", err, "cached_size", c.Size())
return
}
slog.Info("cloud limit cache: refreshed",
"newly_cached_or_updated", n, "cached_size", c.Size())
}
doOne()
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
doOne()
}
}
}
// fetchTags calls GET /api/tags and returns the model names.
func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) {
url := c.endpoint + "/api/tags"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
c.applyAuth(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(body, 400))
}
var parsed struct {
Models []struct {
Name string `json:"name"`
Model string `json:"model"`
} `json:"models"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, fmt.Errorf("parse /api/tags: %w", err)
}
out := make([]string, 0, len(parsed.Models))
for _, m := range parsed.Models {
name := m.Name
if name == "" {
name = m.Model
}
if name == "" {
continue
}
out = append(out, name)
}
return out, nil
}
// fetchContextLength calls POST /api/show for a model and extracts
// the largest *.context_length value from model_info. Returns the
// length and nil on success; (0, err) on any failure.
//
// Why "largest" rather than family-keyed: the family field in the
// /api/show response is sometimes empty or doesn't match the
// model_info key prefix exactly (Ollama Cloud returns the
// architecture as the prefix, which usually but not always matches
// `family`). Scanning for any `*.context_length` is robust.
func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelName string) (int, error) {
url := c.endpoint + "/api/show"
body, _ := json.Marshal(map[string]string{"name": modelName})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return 0, err
}
req.Header.Set("Content-Type", "application/json")
c.applyAuth(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}
if resp.StatusCode/100 != 2 {
return 0, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(respBody, 400))
}
n, err := parseContextLengthJSON(respBody)
if err != nil {
return 0, err
}
return n, nil
}
// parseContextLengthJSON extracts the largest `*.context_length` int
// from an /api/show response body. Exported-ish (lowercase but tested
// in the same package) so the unit test can exercise it without
// spinning up an httptest server.
func parseContextLengthJSON(body []byte) (int, error) {
var parsed struct {
ModelInfo map[string]any `json:"model_info"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return 0, fmt.Errorf("parse: %w", err)
}
best := 0
for k, v := range parsed.ModelInfo {
if !strings.HasSuffix(k, ".context_length") {
continue
}
n := toInt(v)
if n > best {
best = n
}
}
if best <= 0 {
return 0, fmt.Errorf("no context_length in model_info")
}
return best, nil
}
// toInt coerces a JSON-decoded value to int. Handles float64 (the
// json default) and json.Number; returns 0 for anything else.
func toInt(v any) int {
switch x := v.(type) {
case float64:
return int(x)
case int:
return x
case int64:
return int(x)
case json.Number:
if n, err := x.Int64(); err == nil {
return int(n)
}
}
return 0
}
// applyAuth sets the Bearer token when an API key is configured.
func (c *CloudOllamaLimitCache) applyAuth(req *http.Request) {
if strings.TrimSpace(c.apiKey) == "" {
return
}
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(c.apiKey))
}
// stripCloudPrefix strips an "ollama-cloud/" prefix (and surrounding
// whitespace). Returns the bare model:tag form.
func stripCloudPrefix(s string) string {
s = strings.TrimSpace(s)
if strings.HasPrefix(s, "ollama-cloud/") {
s = s[len("ollama-cloud/"):]
}
return s
}
// truncate caps a byte slice for error messages.
func truncate(b []byte, n int) string {
if len(b) <= n {
return string(b)
}
return string(b[:n]) + "...(truncated)"
}
+224
View File
@@ -0,0 +1,224 @@
// V15.2 — per-model context-window limits.
//
// Why: agents need to know when they're about to blow the model's
// max-input cap so they can compact stale tool results out of the
// message history. Pre-15.2 the agent loop had no awareness; a long
// research run that accumulated dozens of HTTP tool results would
// hit Ollama's HTTP 400 "prompt is too long" or Anthropic's similar
// error mid-run with no graceful degradation.
//
// Coverage:
// - Anthropic Claude 4.x (200K default; 1M when the model ID
// includes the "[1m]" suffix per llms.tier reload conventions)
// - OpenAI GPT-4.x / o-series (128K)
// - Gemini 2.x (1M-2M, model-specific)
// - Ollama Cloud (model-specific; hardcoded per known model)
// - Local Ollama: queries `/api/show` once at first use, caches
//
// Returns (0, false) for unknown models — callers should treat
// "unknown" as "don't budget" (the agent's existing iteration cap +
// timeout are the fallback safety nets).
package model
import (
"context"
"strings"
"sync"
)
// MaxContextTokens returns the model's max INPUT context-window size
// in tokens. Output / response tokens are NOT included — most models
// share input + output budget but cap them separately, and the practical
// concern is "how big can my prompt get before the model rejects".
//
// modelID accepts both the bare model name (`claude-sonnet-4-6`) and
// the prefixed form (`anthropic/claude-sonnet-4-6` or
// `ollama-cloud/qwen3-coder:480b`). The prefix is stripped before lookup.
//
// Returns (limit, true) on a known model; (0, false) otherwise.
//
// This function is pure (no I/O). For Ollama Cloud models that aren't
// in the static map, use MaxContextTokensWithCache which consults a
// CloudOllamaLimitCache populated at boot from /api/tags + /api/show.
func MaxContextTokens(modelID string) (int, bool) {
id := normalizeModelID(modelID)
if v, ok := staticContextLimits[id]; ok {
return v, true
}
// Anthropic 1M-context variant marker. Mort's llms tier system
// uses a `[1m]` suffix on the model ID (e.g.
// `claude-opus-4-7[1m]`) to opt into Anthropic's 1M beta context.
if strings.HasSuffix(id, "[1m]") {
return 1_000_000, true
}
// Local-ollama dynamic lookup is wired separately so it can
// query the daemon's /api/show endpoint. The static map covers
// known cloud models.
return 0, false
}
// MaxContextTokensWithCache is the cache-aware variant of
// MaxContextTokens. It tries the static map first; on miss, if the
// model is an Ollama Cloud spec (the `ollama-cloud/` prefix), it
// consults the supplied CloudOllamaLimitCache. Pass nil cache for
// static-only behaviour (equivalent to MaxContextTokens).
//
// This function never makes HTTP calls — the cache must be
// pre-populated (typically via cache.RefreshAll at boot). Callers in
// the hot path can rely on a single map lookup per call. Prefer
// MaxContextTokensResolving when a context is available — it makes a
// single /api/show call to fill the cache on miss, which is essential
// for Cloud aliases that /api/tags doesn't enumerate (e.g. :cloud).
func MaxContextTokensWithCache(modelID string, cloud *CloudOllamaLimitCache) (int, bool) {
if v, ok := MaxContextTokens(modelID); ok {
return v, true
}
if cloud == nil {
return 0, false
}
// Only ollama-cloud/* models are eligible for the cache.
id := strings.TrimSpace(modelID)
if !strings.HasPrefix(id, "ollama-cloud/") {
// Also allow bare model:tag form when the caller has already
// stripped the prefix (some test paths).
if strings.Contains(id, "/") {
return 0, false
}
}
return cloud.Lookup(id)
}
// MaxContextTokensResolving is the cache-aware variant that ALSO
// performs a live /api/show fetch on cache miss (with negative caching
// to prevent thrash). Use this in run-setup paths where one HTTP call
// per unseen model is acceptable — typically the skill executor's
// compaction threshold computation. The fetched result is cached for
// future calls, so subsequent runs hit the in-memory map.
//
// Falls back to the static-only path when the model isn't an
// ollama-cloud/* spec or cache is nil. ctx cancellation aborts the
// fetch and returns (0, false) without writing a negative entry.
func MaxContextTokensResolving(ctx context.Context, modelID string, cloud *CloudOllamaLimitCache) (int, bool) {
if v, ok := MaxContextTokens(modelID); ok {
return v, true
}
if cloud == nil {
return 0, false
}
id := strings.TrimSpace(modelID)
if !strings.HasPrefix(id, "ollama-cloud/") {
if strings.Contains(id, "/") {
return 0, false
}
}
return cloud.LookupOrFetch(ctx, id)
}
// normalizeModelID strips provider prefix and reasoning suffix so a
// lookup keyed on the base name works regardless of caller form.
//
// Examples:
// - "anthropic/claude-sonnet-4-6" → "claude-sonnet-4-6"
// - "ollama-cloud/qwen3-coder:480b" → "qwen3-coder:480b"
// - "claude-opus-4-7:high" → "claude-opus-4-7"
func normalizeModelID(id string) string {
id = strings.TrimSpace(id)
if idx := strings.Index(id, "/"); idx >= 0 {
id = id[idx+1:]
}
// Strip :low/:medium/:high reasoning effort suffix used by some
// OpenAI / Anthropic clients.
for _, suffix := range []string{":low", ":medium", ":high"} {
if strings.HasSuffix(id, suffix) {
id = id[:len(id)-len(suffix)]
break
}
}
return id
}
// staticContextLimits is the source of truth for known cloud models.
// Add new entries when adding a model to the llms tier system.
//
// CRITICAL: keep these in sync with the actual provider docs. A wrong
// number here causes EITHER premature compaction (too low, degrades
// agent quality unnecessarily) OR HTTP 400 mid-run (too high). The
// 410K-token failure on `qwen3-coder:480b` is the kind of bug a
// mistyped value would reintroduce.
var staticContextLimits = map[string]int{
// Anthropic Claude 4.x — default 200K input. 1M variant via
// `[1m]` suffix handled in MaxContextTokens above.
"claude-opus-4-7": 200_000,
"claude-opus-4-6": 200_000,
"claude-opus-4-5": 200_000,
"claude-sonnet-4-6": 200_000,
"claude-sonnet-4-5": 200_000,
"claude-haiku-4-5": 200_000,
"claude-haiku-4-5-20251001": 200_000,
// OpenAI GPT-4.x / o-series — 128K input.
"gpt-4o": 128_000,
"gpt-4o-mini": 128_000,
"gpt-4-turbo": 128_000,
"o1": 200_000,
"o1-mini": 128_000,
"o3-mini": 200_000,
"gpt-5": 400_000,
"gpt-5-mini": 400_000,
// Gemini — varies dramatically by model.
"gemini-2.5-pro": 2_000_000,
"gemini-2.5-flash": 1_000_000,
"gemini-2.5-flash-lite": 1_000_000,
"gemini-1.5-pro": 2_000_000,
"gemini-1.5-flash": 1_000_000,
// Ollama Cloud (turbo). Limits per https://ollama.com/cloud/models
// — verified against the Ollama API show output for each model.
// Update when Ollama publishes new models or extends contexts.
"qwen3-coder:480b": 262_144, // 262K — matches the v15.2 trace
"qwen3:235b": 262_144,
"qwen3:32b": 131_072,
"qwen2.5:72b": 131_072,
"gpt-oss:120b": 131_072,
"gpt-oss:20b": 131_072,
"deepseek-v3.1:671b": 131_072,
"glm-4.6:355b": 131_072,
"kimi-k2:1t": 262_144,
"llama4:scout": 10_000_000, // Llama 4 Scout claims 10M
"llama4:maverick": 1_000_000,
}
// LocalOllamaLimitCache holds the resolved /api/show context_length per
// local-ollama model. Populated on first lookup; never invalidated
// (changing num_ctx requires an ollama restart anyway). Process-wide,
// no per-tenant scoping needed.
type LocalOllamaLimitCache struct {
mu sync.RWMutex
limit map[string]int
}
// NewLocalOllamaLimitCache constructs a fresh cache.
func NewLocalOllamaLimitCache() *LocalOllamaLimitCache {
return &LocalOllamaLimitCache{limit: make(map[string]int)}
}
// Get returns the cached limit or (0, false) when unseen. The caller
// is expected to follow up with a lookup against the live daemon.
func (c *LocalOllamaLimitCache) Get(model string) (int, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.limit[model]
return v, ok
}
// Set records a resolved limit. Idempotent; no-op when value is <= 0.
func (c *LocalOllamaLimitCache) Set(model string, n int) {
if n <= 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.limit[model] = n
}
+97
View File
@@ -0,0 +1,97 @@
package model
import (
"testing"
"time"
)
// mapSource is a tiny config.Source for tests: a key->value map, defaults
// returned for misses.
type mapSource map[string]string
func (m mapSource) String(k, d string) string {
if v, ok := m[k]; ok {
return v
}
return d
}
func (m mapSource) Int(string, int) int { panic("unused") }
func (m mapSource) Float(string, float64) float64 { panic("unused") }
func (m mapSource) Bool(string, bool) bool { panic("unused") }
// TestConfigureTierResolution covers the convar->config.Source inversion: the
// host supplies a tier table (names + fallbacks) and a live config source; the
// config value overrides the fallback, and an absent key falls back.
func TestConfigureTierResolution(t *testing.T) {
Configure(
mapSource{"model.tier.fast": "anthropic/claude-haiku-4-5"},
map[string]string{"fast": "openai/gpt-4o-mini", "thinking": "anthropic/claude-opus-4-8"},
time.Minute,
)
defer Configure(nil, nil, 0) // reset package global
if !IsTierName("fast") || !IsTierName("thinking") {
t.Fatal("configured tiers should be registered")
}
if IsTierName("nope") {
t.Fatal("unknown tier must not report as a tier")
}
if names := TierNames(); len(names) != 2 || names[0] != "fast" || names[1] != "thinking" {
t.Fatalf("TierNames = %v, want sorted [fast thinking]", names)
}
// config value overrides the host fallback
if spec, _, ok := defaultResolver.Resolve("fast"); !ok || spec != "anthropic/claude-haiku-4-5" {
t.Fatalf("fast resolve = %q ok=%v; config should override fallback", spec, ok)
}
// fallback used when config has no override for the key
if spec, _, ok := defaultResolver.Resolve("thinking"); !ok || spec != "anthropic/claude-opus-4-8" {
t.Fatalf("thinking resolve = %q ok=%v; should use fallback", spec, ok)
}
// unknown tier
if _, _, ok := defaultResolver.Resolve("nope"); ok {
t.Fatal("Resolve of unknown tier should be ok=false")
}
}
// TestReasoningSuffixOnTier verifies the reasoning-suffix dialect survives the
// move: a tier whose spec carries ":high" yields the bare spec + level "high".
func TestReasoningSuffixOnTier(t *testing.T) {
Configure(nil, map[string]string{"thinking": "anthropic/claude-opus-4-8:high"}, time.Minute)
defer Configure(nil, nil, 0)
spec, level, ok := defaultResolver.Resolve("thinking")
if !ok {
t.Fatal("thinking should resolve")
}
if spec != "anthropic/claude-opus-4-8" {
t.Errorf("spec = %q, want suffix stripped", spec)
}
if level != "high" {
t.Errorf("reasoning level = %q, want high", level)
}
}
func TestValidateTierValueRejectsNestedTier(t *testing.T) {
Configure(nil, map[string]string{"fast": "x/y"}, time.Minute)
defer Configure(nil, nil, 0)
if err := ValidateTierValue("fast,a/b"); err == nil {
t.Error("a chain containing a tier alias must be rejected")
}
if err := ValidateTierValue("a/b,c/d"); err != nil {
t.Errorf("a chain of concrete specs must validate, got %v", err)
}
}
// TestSinksDefaultNil verifies usage/trace recording is inert with no sinks
// installed (the light-host default).
func TestSinksDefaultNil(t *testing.T) {
SetUsageSink(nil)
SetTraceSink(nil)
if TraceSinkActive() {
t.Error("no trace sink should mean inactive")
}
// recordUsage must be a no-op (no panic) with a nil sink.
recordUsage(WithModel(t.Context(), "x"), nil)
}
+91
View File
@@ -0,0 +1,91 @@
// Package llms — lane_mapping.go: maps a model spec to a stable lane
// name. Pure data + a single function; no dependency on the registry,
// no provider wrapping. Kept separate from lane_transport.go so the
// mapping table can be committed and reviewed in isolation, and so
// admin / webui code that just wants to *display* lane assignments
// doesn't drag in the transport machinery.
//
// Why a fixed table: provider concurrency caps differ — Ollama Pro is
// 3 connections, Anthropic Claude has higher per-tier limits, etc.
// Each provider gets its own lane name so they can be configured
// independently via convars (lanes.<name>.max_concurrent). Lane names
// are user-facing (admin dashboard + convar key suffixes) and need to
// stay stable across deploys; an env-overridable map adds complexity
// for no current benefit.
//
// Test: lane_transport_test.go covers TestLaneFor_Mapping.
package model
import "strings"
// Lane name constants. Defined as exported strings so admin code (.skill
// admin set-lane <skill> <lane>), webui dropdowns, and convar consumers
// share a single canonical spelling.
const (
// LaneOllama covers ollama-cloud/* (and any future ollama/* local).
// The local ollama instance is on the same physical resource as
// the cloud account from mort's perspective — the connection cap
// should apply jointly.
LaneOllama = "ollama"
// LaneAnthropicThinking is the lane for Anthropic models in
// extended-thinking mode. Separated from default because thinking
// requests hold connections longer and can starve faster lanes
// when multiplexed.
LaneAnthropicThinking = "anthropic-thinking"
// LaneAnthropicDefault is the lane for non-thinking Anthropic
// requests (haiku, sonnet, opus without -thinking-).
LaneAnthropicDefault = "anthropic-default"
// LaneM1 is the lane for m1/* models (foreman-style router
// pointing at a dedicated local instance). Separated from the
// ollama lane because m1 targets a distinct host with its own
// connection budget.
LaneM1 = "m1"
// LaneLLMDefault is the catch-all lane for any provider/model
// combination not explicitly mapped above.
LaneLLMDefault = "llm-default"
)
// LaneFor returns the lane name for the given model spec. Mapping:
//
// ollama-cloud/* → "ollama" (Pro account: 3 connections)
// anthropic/*-thinking-* → "anthropic-thinking"
// anthropic/* → "anthropic-default"
// (anything else) → "llm-default"
//
// Tier aliases (fast/standard/thinking) flow through this function as
// the resolver's expanded provider/model spec, so callers don't need
// to think about tier indirection. Empty input falls through to
// LaneLLMDefault rather than panicking — defensive against unset
// model specs in edge-case test wiring.
//
// Substring match for "-thinking-" keeps future Anthropic naming
// variations classified correctly without churning this table on
// every model release.
func LaneFor(modelSpec string) string {
s := strings.TrimSpace(modelSpec)
if strings.HasPrefix(s, "ollama-cloud/") {
return LaneOllama
}
if strings.HasPrefix(s, "anthropic/") {
if strings.Contains(s, "-thinking-") {
return LaneAnthropicThinking
}
return LaneAnthropicDefault
}
// Foreman instances are backed by Ollama and share its connection
// cap, so they route to the same lane.
if strings.HasPrefix(s, "foreman/") {
return LaneOllama
}
// m1/ is a foreman-style router pointing at a dedicated local
// instance with its own connection budget. Separate lane so its
// concurrency cap is independent of the shared ollama lane.
if strings.HasPrefix(s, "m1/") {
return LaneM1
}
return LaneLLMDefault
}
+373
View File
@@ -0,0 +1,373 @@
// Package llms — lane_transport.go: the lane-aware decorator. Wraps an
// llm.Provider so every model it mints submits its Generate/Stream calls
// through the matching named lane's bounded worker pool (lane selection
// per lane_mapping.go), and stamps every returned error with per-call
// attribution (caller id, run id, prompt snapshot) for the failover log.
//
// Why intercept at the llm.Provider layer: majordomo's Provider and Model
// are small public interfaces, so the decorator slots between the chain
// executor and the real provider with no fork. Every chain attempt calls
// laneModel.Generate, which queues on the lane, runs the real call, and
// wraps failures with CallInfo — the ChainConfig.Observer (which receives
// no context) recovers the attribution from the error itself.
//
// Test: lane_transport_test.go covers mapping correctness, the
// concurrency-limiting behavior, and error attribution.
// lane_chatbot_test.go is the regression guard proving chatbot-path LLM
// calls actually go through the lane.
package model
import (
"context"
"errors"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"github.com/google/uuid"
"gitea.stevedudenhoeffer.com/steve/executus/lane"
)
// defaultLaneExecTimeout is the execution backstop applied inside a lane
// job once it leaves the queue: the caller's deadline is detached (queue
// wait must not consume the LLM execution budget) and replaced with this
// hard cap so a hung provider can't leak workers.
const defaultLaneExecTimeout = 5 * time.Minute
// foremanModelTimeout is the hard per-call timeout for foreman targets —
// slow local LLMs that may block on model loads and upstream queues.
const foremanModelTimeout = 30 * time.Minute
// foremanLaneExecTimeout is the lane execution backstop for foreman
// targets. Slightly above foremanModelTimeout so the model-level timeout
// (the documented contract) is the one that fires.
const foremanLaneExecTimeout = foremanModelTimeout + time.Minute
// laneCallerKey is the context key for the per-call caller identity used
// for fair-share queueing.
type laneCallerKey struct{}
// runIDKey is the context key for the per-call run id used for failover
// event attribution.
type runIDKey struct{}
// ContextWithLaneCaller attaches a caller identity to ctx. The lane
// decorator reads this when constructing a Job so fair-share queueing
// can isolate heavy users, and snapshots it into error attribution for
// the failover log.
//
// Empty string is a no-op and lumps every empty-caller invocation into a
// single fair-share bucket; production callers should always populate it.
func ContextWithLaneCaller(ctx context.Context, callerID string) context.Context {
if callerID == "" {
return ctx
}
return context.WithValue(ctx, laneCallerKey{}, callerID)
}
// LaneCallerFromContext returns the caller identity attached via
// ContextWithLaneCaller, or "" if none is set.
func LaneCallerFromContext(ctx context.Context) string {
s, _ := ctx.Value(laneCallerKey{}).(string)
return s
}
// ContextWithRunID attaches a skill/agent run id to ctx. Snapshotted into
// error attribution so failover events can be correlated to runs.
func ContextWithRunID(ctx context.Context, runID string) context.Context {
if runID == "" {
return ctx
}
return context.WithValue(ctx, runIDKey{}, runID)
}
// RunIDFromContext returns the run id attached via ContextWithRunID, or
// "" if none is set.
func RunIDFromContext(ctx context.Context) string {
s, _ := ctx.Value(runIDKey{}).(string)
return s
}
// ---------------------------------------------------------------------------
// Error attribution
// ---------------------------------------------------------------------------
// CallInfo is the per-call attribution snapshot the lane decorator stamps
// onto every error it returns. majordomo's ChainConfig.Observer receives
// a bare FailoverEvent (no context); the failover log recovers caller,
// run id, and the prompt chain from the event's error via
// CallInfoFromError.
type CallInfo struct {
// CallerID is the fair-share caller identity (ContextWithLaneCaller).
CallerID string
// RunID is the skill/agent run id (ContextWithRunID); "" if not threaded.
RunID string
// Messages is the request's message chain at call time, for the
// failover log's persist_prompts feature.
Messages []llm.Message
}
// callInfoError carries CallInfo along an error chain without changing
// the error's message or classification (Unwrap preserves errors.Is/As).
type callInfoError struct {
inner error
info CallInfo
}
func (e *callInfoError) Error() string { return e.inner.Error() }
func (e *callInfoError) Unwrap() error { return e.inner }
// WithCallInfo stamps attribution onto err. nil err returns nil.
func WithCallInfo(err error, info CallInfo) error {
if err == nil {
return nil
}
return &callInfoError{inner: err, info: info}
}
// CallInfoFromError extracts the attribution stamped by the lane
// decorator (or WithCallInfo), if any.
func CallInfoFromError(err error) (CallInfo, bool) {
var cie *callInfoError
if errors.As(err, &cie) {
return cie.info, true
}
return CallInfo{}, false
}
// ---------------------------------------------------------------------------
// Lane decoration
// ---------------------------------------------------------------------------
// LaneRegistry is the narrow surface the lane decorator needs from
// pkg/lane.Registry. Defined as an interface so tests can substitute a
// fake registry without spinning up a real one.
type LaneRegistry interface {
GetOrCreate(ctx context.Context, name string) lane.Lane
}
// laneProvider decorates an llm.Provider so every model it mints routes
// calls through the lane named by LaneFor(provider/model). With a nil
// registry the queueing is skipped but error attribution still applies.
type laneProvider struct {
inner llm.Provider
registry LaneRegistry
execTimeout time.Duration
}
// WrapProviderForLane returns a provider whose models submit each
// Generate/Stream call through the lane named by LaneFor(name/model) in
// the registry, and stamp CallInfo attribution onto every error.
//
// A nil registry disables queueing (calls pass straight through) but the
// decoration — and with it error attribution — remains, so failover
// logging works in lane-less deployments and tests.
func WrapProviderForLane(inner llm.Provider, registry LaneRegistry) llm.Provider {
return wrapProviderForLane(inner, registry, defaultLaneExecTimeout)
}
func wrapProviderForLane(inner llm.Provider, registry LaneRegistry, execTimeout time.Duration) llm.Provider {
if inner == nil {
return nil
}
if execTimeout <= 0 {
execTimeout = defaultLaneExecTimeout
}
return &laneProvider{inner: inner, registry: registry, execTimeout: execTimeout}
}
func (p *laneProvider) Name() string { return p.inner.Name() }
func (p *laneProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
m, err := p.inner.Model(id, opts...)
if err != nil {
return nil, err
}
return &laneModel{
inner: m,
registry: p.registry,
laneName: LaneFor(p.inner.Name() + "/" + id),
execTimeout: p.execTimeout,
}, nil
}
// laneModel routes one model's calls through its lane and stamps error
// attribution. The lane name is resolved once at Model() time — the
// provider name and model id are both known there, unlike legacy gollm where
// the request had to be inspected per call.
type laneModel struct {
inner llm.Model
registry LaneRegistry
laneName string
execTimeout time.Duration
}
func (m *laneModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// laneJob adapts an in-flight call to the lane.Job interface. The result
// is captured into the struct and read after SubmitWait returns.
type laneJob struct {
id string
callerID string
run func(ctx context.Context) error
}
func (j *laneJob) ID() string { return j.id }
func (j *laneJob) CallerID() string { return j.callerID }
func (j *laneJob) Priority() int { return 0 }
func (j *laneJob) Run(ctx context.Context) error { return j.run(ctx) }
func (m *laneModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
// Fold options now so the job closure and the attribution snapshot
// both see the final request.
req = req.Apply(opts...)
info := CallInfo{
CallerID: LaneCallerFromContext(ctx),
RunID: RunIDFromContext(ctx),
Messages: req.Messages,
}
resp, err := m.submit(ctx, func(execCtx context.Context) (*llm.Response, error) {
return m.inner.Generate(execCtx, req)
})
if err != nil {
return resp, WithCallInfo(err, info)
}
return resp, nil
}
func (m *laneModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
info := CallInfo{
CallerID: LaneCallerFromContext(ctx),
RunID: RunIDFromContext(ctx),
Messages: req.Messages,
}
l := m.lane(ctx)
if l == nil {
s, err := m.inner.Stream(ctx, req)
if err != nil {
return nil, WithCallInfo(err, info)
}
return s, nil
}
// Streams hold their lane slot only while ESTABLISHING the stream —
// holding it for the full consumption would deadlock a slow consumer
// against the pool. The caller's ctx is used as-is (no deadline
// detach): severing cancellation from a long-lived stream would leak
// connections.
var (
stream llm.Stream
serr error
)
job := &laneJob{
id: uuid.New().String(),
callerID: info.CallerID,
run: func(context.Context) error {
stream, serr = m.inner.Stream(ctx, req)
return serr
},
}
if err := l.SubmitWait(ctx, job); err != nil {
return nil, WithCallInfo(err, info)
}
if serr != nil {
return nil, WithCallInfo(serr, info)
}
return stream, nil
}
// lane resolves the lane for this model, or nil when queueing is
// disabled (nil registry, or a registry that declines the name).
func (m *laneModel) lane(ctx context.Context) lane.Lane {
if m.registry == nil {
return nil
}
return m.registry.GetOrCreate(ctx, m.laneName)
}
// submit runs fn through the lane (or directly when queueing is off).
//
// Inside a lane job the caller's deadline is detached so queue wait does
// not consume the execution budget — ctx VALUES (usage attribution,
// trace ids) are preserved, only cancellation/deadline are severed — and
// an execTimeout backstop prevents runaway calls. Queue-phase
// cancellation still works: SubmitWait waits on the original ctx, so a
// caller that gives up while queued exits immediately.
func (m *laneModel) submit(ctx context.Context, fn func(context.Context) (*llm.Response, error)) (*llm.Response, error) {
l := m.lane(ctx)
if l == nil {
return fn(ctx)
}
var (
resp *llm.Response
err error
)
job := &laneJob{
id: uuid.New().String(),
callerID: LaneCallerFromContext(ctx),
run: func(context.Context) error {
execCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), m.execTimeout)
defer cancel()
resp, err = fn(execCtx)
// Returning err lets the lane's pool propagate it to
// SubmitWait; the captured err is what we surface.
return err
},
}
if serr := l.SubmitWait(ctx, job); serr != nil && err == nil {
return nil, serr
}
return resp, err
}
// ---------------------------------------------------------------------------
// Model timeout decoration (foreman)
// ---------------------------------------------------------------------------
// timeoutProvider wraps a provider so every minted model enforces a hard
// per-call deadline on Generate. Used for foreman targets (slow local
// LLMs). Stream is passed through: a wall-clock deadline on a long-lived
// stream would sever it mid-consumption.
type timeoutProvider struct {
inner llm.Provider
timeout time.Duration
}
// withModelTimeout decorates p so its models' Generate calls carry a
// hard timeout.
func withModelTimeout(p llm.Provider, d time.Duration) llm.Provider {
if p == nil || d <= 0 {
return p
}
return &timeoutProvider{inner: p, timeout: d}
}
func (p *timeoutProvider) Name() string { return p.inner.Name() }
func (p *timeoutProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
m, err := p.inner.Model(id, opts...)
if err != nil {
return nil, err
}
return &timeoutModel{inner: m, timeout: p.timeout}, nil
}
type timeoutModel struct {
inner llm.Model
timeout time.Duration
}
func (m *timeoutModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
func (m *timeoutModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()
return m.inner.Generate(ctx, req, opts...)
}
func (m *timeoutModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
+477
View File
@@ -0,0 +1,477 @@
// Package model is executus's config-driven model-access layer over majordomo: it owns the
// package-level *majordomo.Registry (providers with mort's env keys,
// OpenAI-compat presets, lane-aware decoration, the DB-backed tier
// resolver, legacy shortcut aliases, the foreman timeout decorator, and
// failover/health wiring), plus the mort-facing call helpers
// (ParseModelRequest / ParseModelForContext / GenerateWith /
// CallAndExecute / SimpleCall) and usage/trace recording.
//
// The ":low/:medium/:high" reasoning-suffix dialect is an executus convenience:
// majordomo treats model ids as verbatim, so this package strips the
// suffix from specs and tier values and re-applies it per request via
// llm.WithReasoningEffort on a wrapping Model.
package model
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
majordomo "gitea.stevedudenhoeffer.com/steve/majordomo"
"gitea.stevedudenhoeffer.com/steve/majordomo/health"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/anthropic"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/google"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/ollama"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/openai"
)
// Usage and trace recording live in sink.go: SetUsageSink / SetTraceSink
// install the host seams, and ParseModelForContext stamps the model name on
// the context (via WithModel) for attribution.
// ---------------------------------------------------------------------------
// Package registry
// ---------------------------------------------------------------------------
// buildConfig carries the knobs Wire feeds into buildRegistry. The zero
// value yields a lane-less registry with majordomo's default failover
// behavior — the bootstrap state tests and pre-Wire code paths run on.
type buildConfig struct {
lanes LaneRegistry
// maxRetries maps the llms.failover.max_retries convar onto
// ChainConfig.TransientRetries. <= 0 keeps majordomo's default (1).
maxRetries int
// cooldown maps the llms.failover.cooldown_seconds convar onto
// health.Config.BaseCooldown. <= 0 keeps the mort default (300s).
// Note majordomo grows the cooldown exponentially from this base;
// MaxCooldown is set to max(cooldown, 5m) so the operator dial
// dominates (a 10m base never gets capped below itself).
cooldown time.Duration
// observer receives one event per failover decision (failed attempt,
// bench, benched-skip). Typically failoverlog.NewObserver(...).
observer func(majordomo.FailoverEvent)
}
// defaultFailoverCooldown matches the historical llms.failover.cooldown_seconds
// convar default (300s).
const defaultFailoverCooldown = 300 * time.Second
var (
registryMu sync.RWMutex
registry = buildRegistry(buildConfig{})
)
// Registry returns the current package-level majordomo registry. Most
// callers should use ParseModelRequest / ParseModelForContext instead;
// the registry itself is exposed for admin surfaces (health/bench) and
// for tests that need to substitute providers.
func Registry() *majordomo.Registry {
registryMu.RLock()
defer registryMu.RUnlock()
return registry
}
// Health returns the health tracker of the current registry — the live
// source of truth for benched models. Used by the `.failover` commands
// and the failover web UI (see ListBenched/BenchModel/UnbenchModel for
// the mort-flavored facade).
func Health() *health.Tracker {
return Registry().Health()
}
// setRegistry swaps the package registry. Bench/backoff state of the old
// registry is discarded — Wire is a boot-time operation.
func setRegistry(r *majordomo.Registry) {
registryMu.Lock()
defer registryMu.Unlock()
registry = r
}
// buildRegistry constructs a fully-wired majordomo registry:
//
// - health/chain config from the failover convars (via cfg),
// - mort's providers under their nonstandard env keys (OPENAI_KEY,
// GOOGLE_GEMINI_API_KEY, ...), every one lane-decorated,
// - OpenAI-compat presets (deepseek, moonshot+kimi, xai+grok, groq),
// - scheme factories for LLM_* env DSNs re-registered so DSN-defined
// providers (m1, arbitrary foreman targets) are lane-decorated too,
// with foreman additionally getting the 30-minute model timeout,
// - the legacy shortcut aliases, and
// - the delegating tier resolver (reads defaultResolver at Resolve
// time, so Init() can swap in the DB-backed resolver later).
func buildRegistry(cfg buildConfig) *majordomo.Registry {
cooldown := cfg.cooldown
if cooldown <= 0 {
cooldown = defaultFailoverCooldown
}
maxCooldown := cooldown
if maxCooldown < 5*time.Minute {
maxCooldown = 5 * time.Minute
}
r := majordomo.New(
// Env DSNs are loaded manually below, AFTER the scheme factories
// are overridden — New()'s eager scan would otherwise build
// LLM_*-defined providers with the stock (un-decorated) factories.
majordomo.WithoutEnvProviders(),
majordomo.WithHealthConfig(health.Config{
BaseCooldown: cooldown,
MaxCooldown: maxCooldown,
}),
majordomo.WithChainConfig(majordomo.ChainConfig{
TransientRetries: cfg.maxRetries,
// legacy gollm failed over on request-specific errors (400/413/422)
// without benching; majordomo fails fast on permanent errors by
// default. AdvanceOnPermanent preserves the availability-first
// behavior mort's executors rely on.
AdvanceOnPermanent: true,
Observer: cfg.observer,
}),
)
wrap := func(p llm.Provider) llm.Provider {
return wrapProviderForLane(p, cfg.lanes, defaultLaneExecTimeout)
}
// Core providers with mort's env keys.
r.RegisterProvider(wrap(openai.New(
openai.WithAPIKey(os.Getenv("OPENAI_KEY")),
)))
r.RegisterProvider(wrap(anthropic.New(
anthropic.WithAPIKey(os.Getenv("ANTHROPIC_API_KEY")),
)))
r.RegisterProvider(wrap(google.New(
google.WithAPIKey(os.Getenv("GOOGLE_GEMINI_API_KEY")),
)))
r.RegisterProvider(wrap(localOllamaProvider()))
// ollama.Cloud reads OLLAMA_API_KEY itself; with the key unset the
// provider still registers and errors clearly at call time (parity
// with the previous behavior).
r.RegisterProvider(wrap(ollama.Cloud()))
// OpenAI-compatible presets. Base URLs mirror legacy gollm's defaults.
for _, preset := range []struct {
name, baseURL, envKey string
}{
{"deepseek", "https://api.deepseek.com/v1", "DEEPSEEK_API_KEY"},
{"moonshot", "https://api.moonshot.ai/v1", "MOONSHOT_API_KEY"},
{"kimi", "https://api.moonshot.ai/v1", "MOONSHOT_API_KEY"}, // alias provider for moonshot
{"xai", "https://api.x.ai/v1", "XAI_API_KEY"},
{"grok", "https://api.x.ai/v1", "XAI_API_KEY"}, // alias provider for xai
{"groq", "https://api.groq.com/openai/v1", "GROQ_API_KEY"},
} {
r.RegisterProvider(wrap(openai.New(
openai.WithName(preset.name),
openai.WithBaseURL(preset.baseURL),
openai.WithAPIKey(os.Getenv(preset.envKey)),
)))
}
// Scheme factories for LLM_* env DSNs. Re-registered so DSN-defined
// providers go through the lane decorator like the built-ins.
//
// foreman targets are slow local LLMs (large model loads, queued
// behind other requests), so their models additionally get a hard
// 30-minute timeout and a matching lane execution backstop — the
// default 5-minute lane backstop would strangle them.
r.RegisterScheme("foreman", func(name string, dsn majordomo.DSN) (llm.Provider, error) {
p := ollama.Foreman(dsn.BaseURL(), dsn.Token, ollama.WithName(name))
return wrapProviderForLane(
withModelTimeout(p, foremanModelTimeout),
cfg.lanes,
foremanLaneExecTimeout,
), nil
})
laneScheme := func(factory majordomo.SchemeFactory) majordomo.SchemeFactory {
return func(name string, dsn majordomo.DSN) (llm.Provider, error) {
p, err := factory(name, dsn)
if err != nil {
return nil, err
}
return wrap(p), nil
}
}
ollamaScheme := laneScheme(func(name string, dsn majordomo.DSN) (llm.Provider, error) {
return ollama.New(
ollama.WithName(name),
ollama.WithBaseURL(dsn.BaseURL()),
ollama.WithToken(dsn.Token),
), nil
})
r.RegisterScheme("ollama", ollamaScheme)
r.RegisterScheme("ollama-cloud", ollamaScheme)
r.RegisterScheme("openai", laneScheme(func(name string, dsn majordomo.DSN) (llm.Provider, error) {
return openai.New(
openai.WithName(name),
openai.WithBaseURL(dsn.BaseURL()),
openai.WithAPIKey(dsn.Token),
), nil
}))
r.RegisterScheme("anthropic", laneScheme(func(name string, dsn majordomo.DSN) (llm.Provider, error) {
return anthropic.New(
anthropic.WithName(name),
anthropic.WithBaseURL(dsn.BaseURL()),
anthropic.WithAPIKey(dsn.Token),
), nil
}))
googleScheme := laneScheme(func(name string, dsn majordomo.DSN) (llm.Provider, error) {
return google.New(
google.WithName(name),
google.WithBaseURL(dsn.BaseURL()),
google.WithAPIKey(dsn.Token),
), nil
})
r.RegisterScheme("google", googleScheme)
r.RegisterScheme("gemini", googleScheme)
// Eager LLM_* env scan, now with the decorated scheme factories in
// place. Malformed entries are recorded per-name and surface on use.
env := make(map[string]string)
for _, kv := range os.Environ() {
if k, v, ok := strings.Cut(kv, "="); ok {
env[k] = v
}
}
_ = r.LoadEnv(env)
// Legacy shortcut aliases (sonnet, haiku, ...). Same strings as the
// historical table; kept in sync with legacyAliasSpecs below.
for name, spec := range legacyAliasSpecs {
r.RegisterAlias(name, spec)
}
// Tier resolver: a delegating closure so Init() and test helpers can
// swap defaultResolver without rebuilding the registry. The resolver
// returns specs with the legacy reasoning suffixes already stripped
// (per chain element); the tier's default reasoning level is applied
// by ParseModelRequest, not here.
r.RegisterResolver(majordomo.ResolverFunc(func(name string) (string, bool) {
res := defaultResolver
if res == nil {
return "", false
}
spec, _, ok := res.Resolve(name)
return spec, ok
}))
return r
}
// localOllamaProvider builds the local Ollama provider, honoring
// OLLAMA_BASE_URL when set (mort's historical env var; ollama.Local
// itself honors OLLAMA_HOST).
func localOllamaProvider() llm.Provider {
if url := os.Getenv("OLLAMA_BASE_URL"); url != "" {
return ollama.Local(ollama.WithBaseURL(url))
}
return ollama.Local()
}
// ---------------------------------------------------------------------------
// Spec parsing
// ---------------------------------------------------------------------------
// ParseModelRequest resolves a model request string to a ready-to-use Model.
// It handles, in order:
//
// - empty spec → tier "fast"
// - the legacy ":low/:medium/:high" reasoning suffix, stripped per chain
// element (ollama tags like ":30b" or ":cloud" are preserved); the
// level is applied to every call via llm.WithReasoningEffort
// - tier aliases (DB-backed convars; a tier value's own suffix becomes
// the default level when the caller didn't supply one)
// - legacy shortcut aliases (sonnet, haiku, opus, ...)
// - provider/model lookup and LLM_* env-DSN fallback (majordomo)
// - comma-separated failover chains with health-tracked bench/backoff
//
// The returned Model is instrumented: token usage from every successful
// Generate is recorded to the package usage recorder automatically. Do
// NOT additionally call RecordUsage on responses from a parsed model.
func ParseModelRequest(spec string) (majordomo.Model, error) {
spec = strings.TrimSpace(spec)
if spec == "" {
spec = "fast"
}
clean, level := splitReasoningSpec(spec)
// Tier default reasoning: when the (suffix-free) spec is exactly a
// tier name and the caller didn't ask for a level, the tier value's
// own suffix (e.g. "anthropic/claude-opus-4-6:high") applies.
if level == "" && defaultResolver != nil {
if _, tierLevel, ok := defaultResolver.Resolve(clean); ok {
level = tierLevel
}
}
m, err := Registry().Parse(clean)
if err != nil {
return nil, fmt.Errorf("model %q: %w", spec, err)
}
if level != "" {
m = &reasoningModel{inner: m, level: level}
}
return &instrumentedModel{inner: m}, nil
}
// ParseModelForContext combines ParseModelRequest with llmusage.WithModel so
// that the resolved model name is recorded in the context for usage tracking.
// Prefer this over bare ParseModelRequest in all new code.
func ParseModelForContext(ctx context.Context, req string) (context.Context, majordomo.Model, error) {
model, err := ParseModelRequest(req)
if err != nil {
return ctx, nil, err
}
ctx = WithModel(ctx, ResolveModelName(req))
return ctx, model, nil
}
// reasoningModel applies a default reasoning effort to every request that
// doesn't carry one already. Mort's legacy ":low/:medium/:high" suffix
// dialect resolves to this wrapper because majordomo treats model ids as
// verbatim (no suffix stripping).
type reasoningModel struct {
inner llm.Model
level string
}
func (m *reasoningModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
req = req.Apply(opts...)
if req.ReasoningEffort == "" {
req.ReasoningEffort = m.level
}
return m.inner.Generate(ctx, req)
}
func (m *reasoningModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
if req.ReasoningEffort == "" {
req.ReasoningEffort = m.level
}
return m.inner.Stream(ctx, req)
}
func (m *reasoningModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// ---------------------------------------------------------------------------
// Reasoning-suffix dialect
// ---------------------------------------------------------------------------
// reasoningLevels is the set of recognized legacy suffix values.
var reasoningLevels = map[string]bool{"low": true, "medium": true, "high": true}
// splitReasoning peels an optional ":low" / ":medium" / ":high" suffix off
// a single model request string. Returns the input unchanged and "" when no
// recognized level is present, so non-reasoning suffixes (ollama tags like
// ":30b" or ":q4_K_M", date stamps) flow through untouched.
func splitReasoning(s string) (string, string) {
idx := strings.LastIndex(s, ":")
if idx < 0 {
return s, ""
}
if lvl := s[idx+1:]; reasoningLevels[lvl] {
return s[:idx], lvl
}
return s, ""
}
// splitReasoningSpec strips the legacy reasoning suffix from every element
// of a (possibly comma-separated) spec. The returned level is the first
// non-empty per-element level — majordomo chains carry one request-level
// reasoning effort, not one per target, so the head element's preference
// wins. Elements without a suffix are unchanged.
func splitReasoningSpec(spec string) (string, string) {
if !strings.Contains(spec, ",") {
return splitReasoning(strings.TrimSpace(spec))
}
parts := strings.Split(spec, ",")
level := ""
for i, p := range parts {
s, l := splitReasoning(strings.TrimSpace(p))
parts[i] = s
if level == "" {
level = l
}
}
return strings.Join(parts, ","), level
}
// ---------------------------------------------------------------------------
// Usage-attribution name resolution
// ---------------------------------------------------------------------------
// ResolveModelName returns the model portion of a request string, stripping
// any reasoning suffix and resolving tier aliases. The result is used for
// usage attribution (keyed on model name, not provider or reasoning level).
func ResolveModelName(req string) string {
// Strip any reasoning-level suffix before resolving — the level is a
// per-request setting, not part of the model identity.
req, _ = splitReasoning(req)
// Tier expansion: when the request is a tier alias, fold it through the
// resolver and return the model portion of its current convar value. The
// empty string is treated as "fast" for compatibility with callers that
// pre-resolution defaulted to fast.
if defaultResolver != nil {
key := req
if key == "" {
key = "fast"
}
if spec, _, ok := defaultResolver.Resolve(key); ok && spec != "" {
// A tier may resolve to a comma-separated failover chain. Attribute
// usage to the first (preferred) entry's model name rather than the
// whole chain string.
if i := strings.IndexByte(spec, ','); i >= 0 {
spec = strings.TrimSpace(spec[:i])
}
if idx := strings.Index(spec, "/"); idx >= 0 {
return spec[idx+1:]
}
return spec
}
}
// For non-tier requests, return the model portion after the slash.
// Static aliases are NOT expanded here beyond the legacy table below:
// callers that went through ParseModelRequest already carry the
// concrete spec.
if idx := strings.Index(req, "/"); idx >= 0 {
return req[idx+1:]
}
// Legacy shortcut fallback: callers that pass bare names like "sonnet"
// to ResolveModelName (without going through ParseModelRequest) still
// need the concrete model name for usage keys.
if spec, ok := legacyAliasSpecs[req]; ok {
if idx := strings.Index(spec, "/"); idx >= 0 {
return spec[idx+1:]
}
return spec
}
return req
}
// legacyAliasSpecs maps legacy shortcut names to their full provider/model
// spec. Registered with the registry as static aliases AND consulted by
// ResolveModelName for bare-name usage attribution.
var legacyAliasSpecs = map[string]string{
"openai": "openai/gpt-4o-mini",
"gpt-4": "openai/gpt-4",
"gpt-4o": "openai/gpt-4o",
"gpt-4o-mini": "openai/gpt-4o-mini",
"sonnet": "anthropic/claude-sonnet-4-6",
"sonnet-4.5": "anthropic/claude-sonnet-4-5-20250929",
"haiku": "anthropic/claude-haiku-4-5-20251001",
"opus": "anthropic/claude-opus-4-6",
"gemini": "google/gemini-2.0-flash",
"gemini-flash": "google/gemini-2.0-flash",
"gemini-pro": "google/gemini-2.0-pro",
}
+131
View File
@@ -0,0 +1,131 @@
package model
import (
"context"
"time"
)
// This file is executus's inversion of mort's llmusage / llmtrace coupling.
// The model package owns the MECHANISM (instrument every parsed model's
// Generate, attribute by serving model, emit a span when a trace is active);
// WHERE usage/traces land is a host seam. A host registers a UsageSink and/or
// a TraceSink; both are optional (nil = off), so a light host records nothing.
// --- Usage ---
// UsageSink receives one record per successful Generate through a model parsed
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
// meter or bill; the token detail mirrors majordomo's Response.Usage.
type UsageSink interface {
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
}
var usageSink UsageSink
// SetUsageSink installs the usage sink (nil disables usage recording). Call at
// startup before model calls.
func SetUsageSink(s UsageSink) { usageSink = s }
// --- Trace ---
// Span is one traced model call. The host's TraceSink persists it however it
// likes (a DB row, a log line, an OTel span). String fields carrying structured
// data (Messages, ToolDefinitions, ...) are pre-marshalled JSON.
type Span struct {
SpanID string
TraceID string
Model string
SystemPrompt string
Messages string
ToolDefinitions string
ResponseText string
ResponseToolCalls string
ToolResults string
Error string
InputTokens int
OutputTokens int
DurationMs int64
StartedAt time.Time
CompletedAt time.Time
CreatedAt time.Time
}
// TraceSink receives a Span for each traced call (one is emitted only when a
// trace id is present on the context — see WithTraceID).
type TraceSink interface {
WriteSpan(span Span)
}
var traceSink TraceSink
// SetTraceSink installs the trace sink (nil disables tracing).
func SetTraceSink(s TraceSink) { traceSink = s }
// TraceSinkActive reports whether a trace sink is installed.
func TraceSinkActive() bool { return traceSink != nil }
// --- Context attribution ---
//
// ParseModelForContext stamps the requested model onto the context so usage
// from a response that doesn't name its serving model can still be attributed.
// A host's tracing/usage middleware stamps a trace id and optional caller/tool
// for diagnostics. All reads are nil/empty-safe.
type (
ctxKeyModel struct{}
ctxKeyTrace struct{}
ctxKeyTool struct{}
ctxKeyUser struct{}
)
// WithModel attributes subsequent usage on ctx to the given model name.
func WithModel(ctx context.Context, model string) context.Context {
return context.WithValue(ctx, ctxKeyModel{}, model)
}
func modelFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyModel{}).(string); ok {
return v
}
return ""
}
// WithTraceID marks ctx as belonging to a trace; a TraceSink (if installed)
// then receives a Span per call. An empty id (or no id) disables tracing.
func WithTraceID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, ctxKeyTrace{}, id)
}
func traceIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyTrace{}).(string); ok {
return v
}
return ""
}
// WithUsageTool / WithUsageUser attach optional attribution used only in the
// "unknown model" diagnostic warning. Default "unknown".
func WithUsageTool(ctx context.Context, tool string) context.Context {
return context.WithValue(ctx, ctxKeyTool{}, tool)
}
func toolFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyTool{}).(string); ok && v != "" {
return v
}
return "unknown"
}
func WithUsageUser(ctx context.Context, user string) context.Context {
return context.WithValue(ctx, ctxKeyUser{}, user)
}
func userFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyUser{}).(string); ok && v != "" {
return v
}
return "unknown"
}
+162
View File
@@ -0,0 +1,162 @@
package model
import (
"fmt"
"sort"
"strings"
"sync"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/config"
)
// tierResolver expands tier aliases (e.g. "fast", "thinking", "agent-working")
// into a concrete model spec or a comma-separated failover chain. The set of
// tier names and their FALLBACK specs are host-supplied (a map passed at
// Configure time); the live value of each tier is read from a config.Source
// under the key "model.tier.<name>", so a host whose config backend mutates at
// runtime (mort's convar) re-targets tiers without a restart, while a static
// host (gadfly's env) just gets the fallback. A small in-process cache (TTL
// from "model.tier.cache_ttl_seconds", default 30s) saves config round-trips on
// the hot path; ReloadTiers clears it.
//
// This is executus's inversion of mort's convar-bound resolver: the MECHANISM
// (tier lookup, reasoning-suffix dialect, chain validation, cache) is generic;
// the tier MAP content (which tiers exist + their default specs) is host config.
type tierResolver struct {
cfg config.Source
defaults map[string]string // tier name -> fallback spec
ttl time.Duration
mu sync.RWMutex
cache map[string]tierEntry
now func() time.Time // overridable for tests
}
type tierEntry struct {
spec string
reasoning string
expires time.Time
}
const tierConfigPrefix = "model.tier."
// NewTierResolver builds a resolver over cfg with the given tier defaults
// (name -> fallback spec). cfg may be nil (the fallbacks are then always used).
// ttl<=0 reads "model.tier.cache_ttl_seconds" (default 30s).
func NewTierResolver(cfg config.Source, defaults map[string]string, ttl time.Duration) *tierResolver {
if ttl <= 0 {
ttl = time.Duration(config.Int(cfg, tierConfigPrefix+"cache_ttl_seconds", 30)) * time.Second
}
if ttl <= 0 {
ttl = 30 * time.Second
}
cp := make(map[string]string, len(defaults))
for k, v := range defaults {
cp[k] = v
}
return &tierResolver{
cfg: cfg,
defaults: cp,
ttl: ttl,
cache: make(map[string]tierEntry),
now: time.Now,
}
}
func (r *tierResolver) has(name string) bool {
_, ok := r.defaults[name]
return ok
}
func (r *tierResolver) names() []string {
out := make([]string, 0, len(r.defaults))
for k := range r.defaults {
out = append(out, k)
}
sort.Strings(out)
return out
}
// Resolve returns the current model spec and default reasoning level for a tier
// name. ok=false if name is not a registered tier. Legacy reasoning suffixes
// (":low/:medium/:high") are stripped per chain element; the first non-empty
// level becomes the tier's default reasoning level (ollama tags like ":cloud"
// pass through). The live value is read from config with the host-supplied
// fallback; an empty resolved value yields ok=true with an empty spec
// (ParseModelRequest surfaces a clear error in that path).
func (r *tierResolver) Resolve(name string) (string, string, bool) {
if !r.has(name) {
return "", "", false
}
now := r.now()
r.mu.RLock()
if e, hit := r.cache[name]; hit && now.Before(e.expires) {
r.mu.RUnlock()
return e.spec, e.reasoning, true
}
r.mu.RUnlock()
r.mu.Lock()
defer r.mu.Unlock()
if e, hit := r.cache[name]; hit && now.Before(e.expires) {
return e.spec, e.reasoning, true
}
raw := strings.TrimSpace(config.String(r.cfg, tierConfigPrefix+name, r.defaults[name]))
spec, level := splitReasoningSpec(raw)
r.cache[name] = tierEntry{spec: spec, reasoning: level, expires: now.Add(r.ttl)}
return spec, level, true
}
// Reload clears the cache so the next Resolve fetches fresh from config.
func (r *tierResolver) Reload() {
r.mu.Lock()
defer r.mu.Unlock()
r.cache = make(map[string]tierEntry)
}
// --- package-level resolver + facade ---
// defaultResolver is initialized as a package-level var (not in init()) so it
// is ready before any other file's init runs — buildRegistry's delegating
// resolver closure reads it at Resolve time. It starts with no tiers; a host
// installs its tier table via Configure.
var defaultResolver = NewTierResolver(nil, nil, 0)
// Configure installs the host's tier table. cfg is the live config source
// (nil = fallbacks only); defaults maps each tier name to its fallback spec;
// ttl<=0 uses the config'd / 30s default. The package registry's delegating
// resolver reads defaultResolver at Resolve time, so swapping it here is
// sufficient — no registry rebuild needed.
func Configure(cfg config.Source, defaults map[string]string, ttl time.Duration) {
defaultResolver = NewTierResolver(cfg, defaults, ttl)
}
// TierNames returns the registered tier alias names (sorted). Exported so UI
// layers can populate tier dropdowns without hardcoding.
func TierNames() []string { return defaultResolver.names() }
// IsTierName reports whether s is a registered tier alias.
func IsTierName(s string) bool { return defaultResolver.has(s) }
// ReloadTiers clears the package resolver's cache so the next request resolves
// freshly from config.
func ReloadTiers() { defaultResolver.Reload() }
// ValidateTierValue returns an error if value cannot be used as a tier spec —
// specifically, when a chain entry is itself a tier name (which would form a
// resolution loop). Chain entries must be concrete provider/model specs.
func ValidateTierValue(value string) error {
for _, part := range strings.Split(value, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
spec, _ := splitReasoning(part)
if IsTierName(spec) {
return fmt.Errorf("tier value %q contains tier alias %q (chains must use concrete provider/model specs, not nested tiers)", value, spec)
}
}
return nil
}
+110
View File
@@ -0,0 +1,110 @@
// Package llms — wiring.go: the production boot hook that rebuilds the
// package registry with the lane registry, the failover convars, and the
// failover-event observer.
//
// Why a dedicated helper (vs spreading registry construction through
// mort.go): the chatbot regression test in lane_chatbot_test.go and the
// production boot path must call the SAME wiring code. Historically
// mort.go skipped the lane wiring entirely (lanes were defined but never
// installed — 30+ skill_runs in production with 0 skill_queue_jobs rows);
// concentrating the install here means a regression in one wires fails
// the test for the other.
package model
import (
"context"
"log/slog"
"time"
majordomo "gitea.stevedudenhoeffer.com/steve/majordomo"
)
// WireOptions configures Wire. The zero value rebuilds the registry with
// no lanes and default failover behavior.
type WireOptions struct {
// Lanes is the lane registry every provider is decorated with. nil
// disables lane queueing (calls pass straight through) but keeps
// error attribution for the failover log.
Lanes LaneRegistry
// FailoverMaxRetries maps the llms.failover.max_retries convar onto
// majordomo's ChainConfig.TransientRetries (same-target retries after
// a transient error). <= 0 keeps majordomo's default (1).
FailoverMaxRetries int
// FailoverCooldown maps the llms.failover.cooldown_seconds convar
// onto health.Config.BaseCooldown. majordomo grows the cooldown
// exponentially from this base per consecutive bench; the cap is
// max(FailoverCooldown, 5m) so the operator's dial dominates.
// <= 0 keeps the mort default (300s).
FailoverCooldown time.Duration
// FailoverObserver receives one event per failover decision (failed
// attempt, bench, benched-skip). Wire it to failoverlog.NewObserver.
// Attribution (caller/run/prompts) rides on the event's error — see
// CallInfoFromError.
FailoverObserver func(majordomo.FailoverEvent)
}
// Wire rebuilds the package registry from opts and installs it. Call once
// at boot, after the lane registry and the failover convars exist (and
// after Init for DB-backed tiers — though Init and Wire are order-
// independent: the tier resolver is consulted through a delegating
// indirection).
//
// Rebuilding discards in-memory health/bench state — Wire is a boot-time
// operation, not a runtime toggle.
//
// When Lanes is non-nil, the well-known lanes (KnownLanes) are eagerly
// registered so admin dashboards have baseline state from the moment mort
// starts instead of "no lanes registered" until the first LLM call.
//
// Returns the installed registry for inspection (tests, health surfaces).
func Wire(ctx context.Context, opts WireOptions) *majordomo.Registry {
r := buildRegistry(buildConfig{
lanes: opts.Lanes,
maxRetries: opts.FailoverMaxRetries,
cooldown: opts.FailoverCooldown,
observer: opts.FailoverObserver,
})
setRegistry(r)
if opts.Lanes != nil {
names := KnownLanes()
for _, name := range names {
opts.Lanes.GetOrCreate(ctx, name)
}
slog.Info("llms: wired lane-aware registry", "lanes", len(names))
} else {
slog.Warn("llms: Wire called without a lane registry — lane queueing is inert")
}
return r
}
// KnownLanes returns the well-known lane names the LLM transport resolves
// to. Eager-registering these at boot gives admin dashboards
// (`/skills/admin/queues`, `.skill admin queue`) a baseline view from the
// moment mort starts — without this, the dashboard reads "no lanes
// registered" until the first chatbot/skill call materialises the lane
// via lazy GetOrCreate.
//
// Why this list (and not "every lane name ever"): these are the ones
// LaneFor in lane_mapping.go can produce for a real model spec. Future
// non-LLM lanes (e.g. a future image-generation lane) should be eagerly
// registered by their owning subsystem, not here.
//
// LaneSkillDefault is included even though it isn't an LLM-routing
// lane: skills run through it via skillexec.WithLaneRegistry, and the
// skills admin dashboard needs to see it from boot.
//
// Test: wiring_test.go::TestKnownLanes_NonEmpty.
func KnownLanes() []string {
return []string{
LaneOllama,
LaneAnthropicThinking,
LaneAnthropicDefault,
LaneM1,
LaneLLMDefault,
"skill-default",
}
}