b424261aca
Lifts mort's pkg/logic/llms into executus/model, decoupled from mort: - tiers.go: the tier resolver now reads a host-supplied config.Source under "model.tier.<name>" with host-supplied fallbacks (Configure(cfg, defaults, ttl)), instead of convar.Manager. Tier NAMES + specs are host config; the resolution mechanism (cache, reasoning-suffix dialect, chain validation) is generic. No tier names hard-coded in the harness. - sink.go: usage/trace recording inverted off mort's llmusage/llmtrace into UsageSink / TraceSink seams + a model-owned Span, with nil-safe context attribution helpers (WithModel/WithTraceID/WithUsageTool/WithUsageUser). Both sinks optional (nil = off) so a light host records nothing. - lane decoration repointed to executus/lane; utils.Errorf -> fmt.Errorf. - call.go keeps GenerateWith[T] (instrumented structured output) — this is the structured-output primitive; no separate structured/ package. - llmmeta moved over model/ (the meta-LLM helper: tier allowlist + JSON retry + ledger). Its tests configure a minimal tier table via TestMain. New tests cover the inversion: config overrides fallback, tier registration, reasoning-suffix survival, nested-tier rejection, nil-sink no-ops. Full module: go build/vet/test -race green; core go.sum still free of gorm/redis/discordgo/sqlite. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
374 lines
13 KiB
Go
374 lines
13 KiB
Go
// Package llms — lane_transport.go: the lane-aware decorator. Wraps an
|
|
// llm.Provider so every model it mints submits its Generate/Stream calls
|
|
// through the matching named lane's bounded worker pool (lane selection
|
|
// per lane_mapping.go), and stamps every returned error with per-call
|
|
// attribution (caller id, run id, prompt snapshot) for the failover log.
|
|
//
|
|
// Why intercept at the llm.Provider layer: majordomo's Provider and Model
|
|
// are small public interfaces, so the decorator slots between the chain
|
|
// executor and the real provider with no fork. Every chain attempt calls
|
|
// laneModel.Generate, which queues on the lane, runs the real call, and
|
|
// wraps failures with CallInfo — the ChainConfig.Observer (which receives
|
|
// no context) recovers the attribution from the error itself.
|
|
//
|
|
// Test: lane_transport_test.go covers mapping correctness, the
|
|
// concurrency-limiting behavior, and error attribution.
|
|
// lane_chatbot_test.go is the regression guard proving chatbot-path LLM
|
|
// calls actually go through the lane.
|
|
package model
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
"github.com/google/uuid"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/executus/lane"
|
|
)
|
|
|
|
// defaultLaneExecTimeout is the execution backstop applied inside a lane
|
|
// job once it leaves the queue: the caller's deadline is detached (queue
|
|
// wait must not consume the LLM execution budget) and replaced with this
|
|
// hard cap so a hung provider can't leak workers.
|
|
const defaultLaneExecTimeout = 5 * time.Minute
|
|
|
|
// foremanModelTimeout is the hard per-call timeout for foreman targets —
|
|
// slow local LLMs that may block on model loads and upstream queues.
|
|
const foremanModelTimeout = 30 * time.Minute
|
|
|
|
// foremanLaneExecTimeout is the lane execution backstop for foreman
|
|
// targets. Slightly above foremanModelTimeout so the model-level timeout
|
|
// (the documented contract) is the one that fires.
|
|
const foremanLaneExecTimeout = foremanModelTimeout + time.Minute
|
|
|
|
// laneCallerKey is the context key for the per-call caller identity used
|
|
// for fair-share queueing.
|
|
type laneCallerKey struct{}
|
|
|
|
// runIDKey is the context key for the per-call run id used for failover
|
|
// event attribution.
|
|
type runIDKey struct{}
|
|
|
|
// ContextWithLaneCaller attaches a caller identity to ctx. The lane
|
|
// decorator reads this when constructing a Job so fair-share queueing
|
|
// can isolate heavy users, and snapshots it into error attribution for
|
|
// the failover log.
|
|
//
|
|
// Empty string is a no-op and lumps every empty-caller invocation into a
|
|
// single fair-share bucket; production callers should always populate it.
|
|
func ContextWithLaneCaller(ctx context.Context, callerID string) context.Context {
|
|
if callerID == "" {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, laneCallerKey{}, callerID)
|
|
}
|
|
|
|
// LaneCallerFromContext returns the caller identity attached via
|
|
// ContextWithLaneCaller, or "" if none is set.
|
|
func LaneCallerFromContext(ctx context.Context) string {
|
|
s, _ := ctx.Value(laneCallerKey{}).(string)
|
|
return s
|
|
}
|
|
|
|
// ContextWithRunID attaches a skill/agent run id to ctx. Snapshotted into
|
|
// error attribution so failover events can be correlated to runs.
|
|
func ContextWithRunID(ctx context.Context, runID string) context.Context {
|
|
if runID == "" {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, runIDKey{}, runID)
|
|
}
|
|
|
|
// RunIDFromContext returns the run id attached via ContextWithRunID, or
|
|
// "" if none is set.
|
|
func RunIDFromContext(ctx context.Context) string {
|
|
s, _ := ctx.Value(runIDKey{}).(string)
|
|
return s
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Error attribution
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// CallInfo is the per-call attribution snapshot the lane decorator stamps
|
|
// onto every error it returns. majordomo's ChainConfig.Observer receives
|
|
// a bare FailoverEvent (no context); the failover log recovers caller,
|
|
// run id, and the prompt chain from the event's error via
|
|
// CallInfoFromError.
|
|
type CallInfo struct {
|
|
// CallerID is the fair-share caller identity (ContextWithLaneCaller).
|
|
CallerID string
|
|
// RunID is the skill/agent run id (ContextWithRunID); "" if not threaded.
|
|
RunID string
|
|
// Messages is the request's message chain at call time, for the
|
|
// failover log's persist_prompts feature.
|
|
Messages []llm.Message
|
|
}
|
|
|
|
// callInfoError carries CallInfo along an error chain without changing
|
|
// the error's message or classification (Unwrap preserves errors.Is/As).
|
|
type callInfoError struct {
|
|
inner error
|
|
info CallInfo
|
|
}
|
|
|
|
func (e *callInfoError) Error() string { return e.inner.Error() }
|
|
func (e *callInfoError) Unwrap() error { return e.inner }
|
|
|
|
// WithCallInfo stamps attribution onto err. nil err returns nil.
|
|
func WithCallInfo(err error, info CallInfo) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
return &callInfoError{inner: err, info: info}
|
|
}
|
|
|
|
// CallInfoFromError extracts the attribution stamped by the lane
|
|
// decorator (or WithCallInfo), if any.
|
|
func CallInfoFromError(err error) (CallInfo, bool) {
|
|
var cie *callInfoError
|
|
if errors.As(err, &cie) {
|
|
return cie.info, true
|
|
}
|
|
return CallInfo{}, false
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Lane decoration
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// LaneRegistry is the narrow surface the lane decorator needs from
|
|
// pkg/lane.Registry. Defined as an interface so tests can substitute a
|
|
// fake registry without spinning up a real one.
|
|
type LaneRegistry interface {
|
|
GetOrCreate(ctx context.Context, name string) lane.Lane
|
|
}
|
|
|
|
// laneProvider decorates an llm.Provider so every model it mints routes
|
|
// calls through the lane named by LaneFor(provider/model). With a nil
|
|
// registry the queueing is skipped but error attribution still applies.
|
|
type laneProvider struct {
|
|
inner llm.Provider
|
|
registry LaneRegistry
|
|
execTimeout time.Duration
|
|
}
|
|
|
|
// WrapProviderForLane returns a provider whose models submit each
|
|
// Generate/Stream call through the lane named by LaneFor(name/model) in
|
|
// the registry, and stamp CallInfo attribution onto every error.
|
|
//
|
|
// A nil registry disables queueing (calls pass straight through) but the
|
|
// decoration — and with it error attribution — remains, so failover
|
|
// logging works in lane-less deployments and tests.
|
|
func WrapProviderForLane(inner llm.Provider, registry LaneRegistry) llm.Provider {
|
|
return wrapProviderForLane(inner, registry, defaultLaneExecTimeout)
|
|
}
|
|
|
|
func wrapProviderForLane(inner llm.Provider, registry LaneRegistry, execTimeout time.Duration) llm.Provider {
|
|
if inner == nil {
|
|
return nil
|
|
}
|
|
if execTimeout <= 0 {
|
|
execTimeout = defaultLaneExecTimeout
|
|
}
|
|
return &laneProvider{inner: inner, registry: registry, execTimeout: execTimeout}
|
|
}
|
|
|
|
func (p *laneProvider) Name() string { return p.inner.Name() }
|
|
|
|
func (p *laneProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
|
|
m, err := p.inner.Model(id, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &laneModel{
|
|
inner: m,
|
|
registry: p.registry,
|
|
laneName: LaneFor(p.inner.Name() + "/" + id),
|
|
execTimeout: p.execTimeout,
|
|
}, nil
|
|
}
|
|
|
|
// laneModel routes one model's calls through its lane and stamps error
|
|
// attribution. The lane name is resolved once at Model() time — the
|
|
// provider name and model id are both known there, unlike legacy gollm where
|
|
// the request had to be inspected per call.
|
|
type laneModel struct {
|
|
inner llm.Model
|
|
registry LaneRegistry
|
|
laneName string
|
|
execTimeout time.Duration
|
|
}
|
|
|
|
func (m *laneModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
|
|
|
|
// laneJob adapts an in-flight call to the lane.Job interface. The result
|
|
// is captured into the struct and read after SubmitWait returns.
|
|
type laneJob struct {
|
|
id string
|
|
callerID string
|
|
run func(ctx context.Context) error
|
|
}
|
|
|
|
func (j *laneJob) ID() string { return j.id }
|
|
func (j *laneJob) CallerID() string { return j.callerID }
|
|
func (j *laneJob) Priority() int { return 0 }
|
|
func (j *laneJob) Run(ctx context.Context) error { return j.run(ctx) }
|
|
|
|
func (m *laneModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
|
|
// Fold options now so the job closure and the attribution snapshot
|
|
// both see the final request.
|
|
req = req.Apply(opts...)
|
|
info := CallInfo{
|
|
CallerID: LaneCallerFromContext(ctx),
|
|
RunID: RunIDFromContext(ctx),
|
|
Messages: req.Messages,
|
|
}
|
|
|
|
resp, err := m.submit(ctx, func(execCtx context.Context) (*llm.Response, error) {
|
|
return m.inner.Generate(execCtx, req)
|
|
})
|
|
if err != nil {
|
|
return resp, WithCallInfo(err, info)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (m *laneModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
|
req = req.Apply(opts...)
|
|
info := CallInfo{
|
|
CallerID: LaneCallerFromContext(ctx),
|
|
RunID: RunIDFromContext(ctx),
|
|
Messages: req.Messages,
|
|
}
|
|
|
|
l := m.lane(ctx)
|
|
if l == nil {
|
|
s, err := m.inner.Stream(ctx, req)
|
|
if err != nil {
|
|
return nil, WithCallInfo(err, info)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Streams hold their lane slot only while ESTABLISHING the stream —
|
|
// holding it for the full consumption would deadlock a slow consumer
|
|
// against the pool. The caller's ctx is used as-is (no deadline
|
|
// detach): severing cancellation from a long-lived stream would leak
|
|
// connections.
|
|
var (
|
|
stream llm.Stream
|
|
serr error
|
|
)
|
|
job := &laneJob{
|
|
id: uuid.New().String(),
|
|
callerID: info.CallerID,
|
|
run: func(context.Context) error {
|
|
stream, serr = m.inner.Stream(ctx, req)
|
|
return serr
|
|
},
|
|
}
|
|
if err := l.SubmitWait(ctx, job); err != nil {
|
|
return nil, WithCallInfo(err, info)
|
|
}
|
|
if serr != nil {
|
|
return nil, WithCallInfo(serr, info)
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
// lane resolves the lane for this model, or nil when queueing is
|
|
// disabled (nil registry, or a registry that declines the name).
|
|
func (m *laneModel) lane(ctx context.Context) lane.Lane {
|
|
if m.registry == nil {
|
|
return nil
|
|
}
|
|
return m.registry.GetOrCreate(ctx, m.laneName)
|
|
}
|
|
|
|
// submit runs fn through the lane (or directly when queueing is off).
|
|
//
|
|
// Inside a lane job the caller's deadline is detached so queue wait does
|
|
// not consume the execution budget — ctx VALUES (usage attribution,
|
|
// trace ids) are preserved, only cancellation/deadline are severed — and
|
|
// an execTimeout backstop prevents runaway calls. Queue-phase
|
|
// cancellation still works: SubmitWait waits on the original ctx, so a
|
|
// caller that gives up while queued exits immediately.
|
|
func (m *laneModel) submit(ctx context.Context, fn func(context.Context) (*llm.Response, error)) (*llm.Response, error) {
|
|
l := m.lane(ctx)
|
|
if l == nil {
|
|
return fn(ctx)
|
|
}
|
|
var (
|
|
resp *llm.Response
|
|
err error
|
|
)
|
|
job := &laneJob{
|
|
id: uuid.New().String(),
|
|
callerID: LaneCallerFromContext(ctx),
|
|
run: func(context.Context) error {
|
|
execCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), m.execTimeout)
|
|
defer cancel()
|
|
resp, err = fn(execCtx)
|
|
// Returning err lets the lane's pool propagate it to
|
|
// SubmitWait; the captured err is what we surface.
|
|
return err
|
|
},
|
|
}
|
|
if serr := l.SubmitWait(ctx, job); serr != nil && err == nil {
|
|
return nil, serr
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Model timeout decoration (foreman)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// timeoutProvider wraps a provider so every minted model enforces a hard
|
|
// per-call deadline on Generate. Used for foreman targets (slow local
|
|
// LLMs). Stream is passed through: a wall-clock deadline on a long-lived
|
|
// stream would sever it mid-consumption.
|
|
type timeoutProvider struct {
|
|
inner llm.Provider
|
|
timeout time.Duration
|
|
}
|
|
|
|
// withModelTimeout decorates p so its models' Generate calls carry a
|
|
// hard timeout.
|
|
func withModelTimeout(p llm.Provider, d time.Duration) llm.Provider {
|
|
if p == nil || d <= 0 {
|
|
return p
|
|
}
|
|
return &timeoutProvider{inner: p, timeout: d}
|
|
}
|
|
|
|
func (p *timeoutProvider) Name() string { return p.inner.Name() }
|
|
|
|
func (p *timeoutProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) {
|
|
m, err := p.inner.Model(id, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &timeoutModel{inner: m, timeout: p.timeout}, nil
|
|
}
|
|
|
|
type timeoutModel struct {
|
|
inner llm.Model
|
|
timeout time.Duration
|
|
}
|
|
|
|
func (m *timeoutModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
|
|
|
|
func (m *timeoutModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, m.timeout)
|
|
defer cancel()
|
|
return m.inner.Generate(ctx, req, opts...)
|
|
}
|
|
|
|
func (m *timeoutModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
|
return m.inner.Stream(ctx, req, opts...)
|
|
}
|