Files
executus/model/lane_transport.go
steve b424261aca
executus CI / test (pull_request) Successful in 58s
Adversarial Review (Gadfly) / review (pull_request) Successful in 26m27s
executus CI / test (push) Successful in 1m2s
P1: model layer (convar->config inversion) + llmmeta
Lifts mort's pkg/logic/llms into executus/model, decoupled from mort:

- tiers.go: the tier resolver now reads a host-supplied config.Source under
  "model.tier.<name>" with host-supplied fallbacks (Configure(cfg, defaults,
  ttl)), instead of convar.Manager. Tier NAMES + specs are host config; the
  resolution mechanism (cache, reasoning-suffix dialect, chain validation) is
  generic. No tier names hard-coded in the harness.
- sink.go: usage/trace recording inverted off mort's llmusage/llmtrace into
  UsageSink / TraceSink seams + a model-owned Span, with nil-safe context
  attribution helpers (WithModel/WithTraceID/WithUsageTool/WithUsageUser).
  Both sinks optional (nil = off) so a light host records nothing.
- lane decoration repointed to executus/lane; utils.Errorf -> fmt.Errorf.
- call.go keeps GenerateWith[T] (instrumented structured output) — this is the
  structured-output primitive; no separate structured/ package.
- llmmeta moved over model/ (the meta-LLM helper: tier allowlist + JSON retry
  + ledger). Its tests configure a minimal tier table via TestMain.

New tests cover the inversion: config overrides fallback, tier registration,
reasoning-suffix survival, nested-tier rejection, nil-sink no-ops.

Full module: go build/vet/test -race green; core go.sum still free of
gorm/redis/discordgo/sqlite.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:47:13 -04:00

374 lines
13 KiB
Go

// Package llms — lane_transport.go: the lane-aware decorator. Wraps an
// llm.Provider so every model it mints submits its Generate/Stream calls
// through the matching named lane's bounded worker pool (lane selection
// per lane_mapping.go), and stamps every returned error with per-call
// attribution (caller id, run id, prompt snapshot) for the failover log.
//
// Why intercept at the llm.Provider layer: majordomo's Provider and Model
// are small public interfaces, so the decorator slots between the chain
// executor and the real provider with no fork. Every chain attempt calls
// laneModel.Generate, which queues on the lane, runs the real call, and
// wraps failures with CallInfo — the ChainConfig.Observer (which receives
// no context) recovers the attribution from the error itself.
//
// Test: lane_transport_test.go covers mapping correctness, the
// concurrency-limiting behavior, and error attribution.
// lane_chatbot_test.go is the regression guard proving chatbot-path LLM
// calls actually go through the lane.
package model
import (
"context"
"errors"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"github.com/google/uuid"
"gitea.stevedudenhoeffer.com/steve/executus/lane"
)
// defaultLaneExecTimeout is the execution backstop applied inside a lane
// job once it leaves the queue: the caller's deadline is detached (queue
// wait must not consume the LLM execution budget) and replaced with this
// hard cap so a hung provider can't leak workers.
const defaultLaneExecTimeout = 5 * time.Minute
// foremanModelTimeout is the hard per-call timeout for foreman targets —
// slow local LLMs that may block on model loads and upstream queues.
const foremanModelTimeout = 30 * time.Minute
// foremanLaneExecTimeout is the lane execution backstop for foreman
// targets. Slightly above foremanModelTimeout so the model-level timeout
// (the documented contract) is the one that fires.
const foremanLaneExecTimeout = foremanModelTimeout + time.Minute
// laneCallerKey is the context key for the per-call caller identity used
// for fair-share queueing.
type laneCallerKey struct{}
// runIDKey is the context key for the per-call run id used for failover
// event attribution.
type runIDKey struct{}
// ContextWithLaneCaller attaches a caller identity to ctx. The lane
// decorator reads this when constructing a Job so fair-share queueing
// can isolate heavy users, and snapshots it into error attribution for
// the failover log.
//
// Empty string is a no-op and lumps every empty-caller invocation into a
// single fair-share bucket; production callers should always populate it.
func ContextWithLaneCaller(ctx context.Context, callerID string) context.Context {
if callerID == "" {
return ctx
}
return context.WithValue(ctx, laneCallerKey{}, callerID)
}
// LaneCallerFromContext returns the caller identity attached via
// ContextWithLaneCaller, or "" if none is set.
func LaneCallerFromContext(ctx context.Context) string {
s, _ := ctx.Value(laneCallerKey{}).(string)
return s
}
// ContextWithRunID attaches a skill/agent run id to ctx. Snapshotted into
// error attribution so failover events can be correlated to runs.
func ContextWithRunID(ctx context.Context, runID string) context.Context {
if runID == "" {
return ctx
}
return context.WithValue(ctx, runIDKey{}, runID)
}
// RunIDFromContext returns the run id attached via ContextWithRunID, or
// "" if none is set.
func RunIDFromContext(ctx context.Context) string {
s, _ := ctx.Value(runIDKey{}).(string)
return s
}
// ---------------------------------------------------------------------------
// Error attribution
// ---------------------------------------------------------------------------
// CallInfo is the per-call attribution snapshot the lane decorator stamps
// onto every error it returns. majordomo's ChainConfig.Observer receives
// a bare FailoverEvent (no context); the failover log recovers caller,
// run id, and the prompt chain from the event's error via
// CallInfoFromError.
type CallInfo struct {
// CallerID is the fair-share caller identity (ContextWithLaneCaller).
CallerID string
// RunID is the skill/agent run id (ContextWithRunID); "" if not threaded.
RunID string
// Messages is the request's message chain at call time, for the
// failover log's persist_prompts feature.
Messages []llm.Message
}
// callInfoError carries CallInfo along an error chain without changing
// the error's message or classification (Unwrap preserves errors.Is/As).
type callInfoError struct {
inner error
info CallInfo
}
func (e *callInfoError) Error() string { return e.inner.Error() }
func (e *callInfoError) Unwrap() error { return e.inner }
// WithCallInfo stamps attribution onto err. nil err returns nil.
func WithCallInfo(err error, info CallInfo) error {
if err == nil {
return nil
}
return &callInfoError{inner: err, info: info}
}
// CallInfoFromError extracts the attribution stamped by the lane
// decorator (or WithCallInfo), if any.
func CallInfoFromError(err error) (CallInfo, bool) {
var cie *callInfoError
if errors.As(err, &cie) {
return cie.info, true
}
return CallInfo{}, false
}
// ---------------------------------------------------------------------------
// Lane decoration
// ---------------------------------------------------------------------------
// LaneRegistry is the narrow surface the lane decorator needs from
// pkg/lane.Registry. Defined as an interface so tests can substitute a
// fake registry without spinning up a real one.
type LaneRegistry interface {
GetOrCreate(ctx context.Context, name string) lane.Lane
}
// laneProvider decorates an llm.Provider so every model it mints routes
// calls through the lane named by LaneFor(provider/model). With a nil
// registry the queueing is skipped but error attribution still applies.
type laneProvider struct {
inner llm.Provider
registry LaneRegistry
execTimeout time.Duration
}
// WrapProviderForLane returns a provider whose models submit each
// Generate/Stream call through the lane named by LaneFor(name/model) in
// the registry, and stamp CallInfo attribution onto every error.
//
// A nil registry disables queueing (calls pass straight through) but the
// decoration — and with it error attribution — remains, so failover
// logging works in lane-less deployments and tests.
func WrapProviderForLane(inner llm.Provider, registry LaneRegistry) llm.Provider {
return wrapProviderForLane(inner, registry, defaultLaneExecTimeout)
}
func wrapProviderForLane(inner llm.Provider, registry LaneRegistry, execTimeout time.Duration) llm.Provider {
if inner == nil {
return nil
}
if execTimeout <= 0 {
execTimeout = defaultLaneExecTimeout
}
return &laneProvider{inner: inner, registry: registry, execTimeout: execTimeout}
}
func (p *laneProvider) Name() string { return p.inner.Name() }
func (p *laneProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
m, err := p.inner.Model(id, opts...)
if err != nil {
return nil, err
}
return &laneModel{
inner: m,
registry: p.registry,
laneName: LaneFor(p.inner.Name() + "/" + id),
execTimeout: p.execTimeout,
}, nil
}
// laneModel routes one model's calls through its lane and stamps error
// attribution. The lane name is resolved once at Model() time — the
// provider name and model id are both known there, unlike legacy gollm where
// the request had to be inspected per call.
type laneModel struct {
inner llm.Model
registry LaneRegistry
laneName string
execTimeout time.Duration
}
func (m *laneModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// laneJob adapts an in-flight call to the lane.Job interface. The result
// is captured into the struct and read after SubmitWait returns.
type laneJob struct {
id string
callerID string
run func(ctx context.Context) error
}
func (j *laneJob) ID() string { return j.id }
func (j *laneJob) CallerID() string { return j.callerID }
func (j *laneJob) Priority() int { return 0 }
func (j *laneJob) Run(ctx context.Context) error { return j.run(ctx) }
func (m *laneModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
// Fold options now so the job closure and the attribution snapshot
// both see the final request.
req = req.Apply(opts...)
info := CallInfo{
CallerID: LaneCallerFromContext(ctx),
RunID: RunIDFromContext(ctx),
Messages: req.Messages,
}
resp, err := m.submit(ctx, func(execCtx context.Context) (*llm.Response, error) {
return m.inner.Generate(execCtx, req)
})
if err != nil {
return resp, WithCallInfo(err, info)
}
return resp, nil
}
func (m *laneModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
info := CallInfo{
CallerID: LaneCallerFromContext(ctx),
RunID: RunIDFromContext(ctx),
Messages: req.Messages,
}
l := m.lane(ctx)
if l == nil {
s, err := m.inner.Stream(ctx, req)
if err != nil {
return nil, WithCallInfo(err, info)
}
return s, nil
}
// Streams hold their lane slot only while ESTABLISHING the stream —
// holding it for the full consumption would deadlock a slow consumer
// against the pool. The caller's ctx is used as-is (no deadline
// detach): severing cancellation from a long-lived stream would leak
// connections.
var (
stream llm.Stream
serr error
)
job := &laneJob{
id: uuid.New().String(),
callerID: info.CallerID,
run: func(context.Context) error {
stream, serr = m.inner.Stream(ctx, req)
return serr
},
}
if err := l.SubmitWait(ctx, job); err != nil {
return nil, WithCallInfo(err, info)
}
if serr != nil {
return nil, WithCallInfo(serr, info)
}
return stream, nil
}
// lane resolves the lane for this model, or nil when queueing is
// disabled (nil registry, or a registry that declines the name).
func (m *laneModel) lane(ctx context.Context) lane.Lane {
if m.registry == nil {
return nil
}
return m.registry.GetOrCreate(ctx, m.laneName)
}
// submit runs fn through the lane (or directly when queueing is off).
//
// Inside a lane job the caller's deadline is detached so queue wait does
// not consume the execution budget — ctx VALUES (usage attribution,
// trace ids) are preserved, only cancellation/deadline are severed — and
// an execTimeout backstop prevents runaway calls. Queue-phase
// cancellation still works: SubmitWait waits on the original ctx, so a
// caller that gives up while queued exits immediately.
func (m *laneModel) submit(ctx context.Context, fn func(context.Context) (*llm.Response, error)) (*llm.Response, error) {
l := m.lane(ctx)
if l == nil {
return fn(ctx)
}
var (
resp *llm.Response
err error
)
job := &laneJob{
id: uuid.New().String(),
callerID: LaneCallerFromContext(ctx),
run: func(context.Context) error {
execCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), m.execTimeout)
defer cancel()
resp, err = fn(execCtx)
// Returning err lets the lane's pool propagate it to
// SubmitWait; the captured err is what we surface.
return err
},
}
if serr := l.SubmitWait(ctx, job); serr != nil && err == nil {
return nil, serr
}
return resp, err
}
// ---------------------------------------------------------------------------
// Model timeout decoration (foreman)
// ---------------------------------------------------------------------------
// timeoutProvider wraps a provider so every minted model enforces a hard
// per-call deadline on Generate. Used for foreman targets (slow local
// LLMs). Stream is passed through: a wall-clock deadline on a long-lived
// stream would sever it mid-consumption.
type timeoutProvider struct {
inner llm.Provider
timeout time.Duration
}
// withModelTimeout decorates p so its models' Generate calls carry a
// hard timeout.
func withModelTimeout(p llm.Provider, d time.Duration) llm.Provider {
if p == nil || d <= 0 {
return p
}
return &timeoutProvider{inner: p, timeout: d}
}
func (p *timeoutProvider) Name() string { return p.inner.Name() }
func (p *timeoutProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
m, err := p.inner.Model(id, opts...)
if err != nil {
return nil, err
}
return &timeoutModel{inner: m, timeout: p.timeout}, nil
}
type timeoutModel struct {
inner llm.Model
timeout time.Duration
}
func (m *timeoutModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
func (m *timeoutModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
ctx, cancel := context.WithTimeout(ctx, m.timeout)
defer cancel()
return m.inner.Generate(ctx, req, opts...)
}
func (m *timeoutModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}