// Package llms — lane_transport.go: the lane-aware decorator. Wraps an // llm.Provider so every model it mints submits its Generate/Stream calls // through the matching named lane's bounded worker pool (lane selection // per lane_mapping.go), and stamps every returned error with per-call // attribution (caller id, run id, prompt snapshot) for the failover log. // // Why intercept at the llm.Provider layer: majordomo's Provider and Model // are small public interfaces, so the decorator slots between the chain // executor and the real provider with no fork. Every chain attempt calls // laneModel.Generate, which queues on the lane, runs the real call, and // wraps failures with CallInfo — the ChainConfig.Observer (which receives // no context) recovers the attribution from the error itself. // // Test: lane_transport_test.go covers mapping correctness, the // concurrency-limiting behavior, and error attribution. // lane_chatbot_test.go is the regression guard proving chatbot-path LLM // calls actually go through the lane. package model import ( "context" "errors" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "github.com/google/uuid" "gitea.stevedudenhoeffer.com/steve/executus/lane" ) // defaultLaneExecTimeout is the execution backstop applied inside a lane // job once it leaves the queue: the caller's deadline is detached (queue // wait must not consume the LLM execution budget) and replaced with this // hard cap so a hung provider can't leak workers. const defaultLaneExecTimeout = 5 * time.Minute // foremanModelTimeout is the hard per-call timeout for foreman targets — // slow local LLMs that may block on model loads and upstream queues. const foremanModelTimeout = 30 * time.Minute // foremanLaneExecTimeout is the lane execution backstop for foreman // targets. Slightly above foremanModelTimeout so the model-level timeout // (the documented contract) is the one that fires. const foremanLaneExecTimeout = foremanModelTimeout + time.Minute // laneCallerKey is the context key for the per-call caller identity used // for fair-share queueing. type laneCallerKey struct{} // runIDKey is the context key for the per-call run id used for failover // event attribution. type runIDKey struct{} // ContextWithLaneCaller attaches a caller identity to ctx. The lane // decorator reads this when constructing a Job so fair-share queueing // can isolate heavy users, and snapshots it into error attribution for // the failover log. // // Empty string is a no-op and lumps every empty-caller invocation into a // single fair-share bucket; production callers should always populate it. func ContextWithLaneCaller(ctx context.Context, callerID string) context.Context { if callerID == "" { return ctx } return context.WithValue(ctx, laneCallerKey{}, callerID) } // LaneCallerFromContext returns the caller identity attached via // ContextWithLaneCaller, or "" if none is set. func LaneCallerFromContext(ctx context.Context) string { s, _ := ctx.Value(laneCallerKey{}).(string) return s } // ContextWithRunID attaches a skill/agent run id to ctx. Snapshotted into // error attribution so failover events can be correlated to runs. func ContextWithRunID(ctx context.Context, runID string) context.Context { if runID == "" { return ctx } return context.WithValue(ctx, runIDKey{}, runID) } // RunIDFromContext returns the run id attached via ContextWithRunID, or // "" if none is set. func RunIDFromContext(ctx context.Context) string { s, _ := ctx.Value(runIDKey{}).(string) return s } // --------------------------------------------------------------------------- // Error attribution // --------------------------------------------------------------------------- // CallInfo is the per-call attribution snapshot the lane decorator stamps // onto every error it returns. majordomo's ChainConfig.Observer receives // a bare FailoverEvent (no context); the failover log recovers caller, // run id, and the prompt chain from the event's error via // CallInfoFromError. type CallInfo struct { // CallerID is the fair-share caller identity (ContextWithLaneCaller). CallerID string // RunID is the skill/agent run id (ContextWithRunID); "" if not threaded. RunID string // Messages is the request's message chain at call time, for the // failover log's persist_prompts feature. Messages []llm.Message } // callInfoError carries CallInfo along an error chain without changing // the error's message or classification (Unwrap preserves errors.Is/As). type callInfoError struct { inner error info CallInfo } func (e *callInfoError) Error() string { return e.inner.Error() } func (e *callInfoError) Unwrap() error { return e.inner } // WithCallInfo stamps attribution onto err. nil err returns nil. func WithCallInfo(err error, info CallInfo) error { if err == nil { return nil } return &callInfoError{inner: err, info: info} } // CallInfoFromError extracts the attribution stamped by the lane // decorator (or WithCallInfo), if any. func CallInfoFromError(err error) (CallInfo, bool) { var cie *callInfoError if errors.As(err, &cie) { return cie.info, true } return CallInfo{}, false } // --------------------------------------------------------------------------- // Lane decoration // --------------------------------------------------------------------------- // LaneRegistry is the narrow surface the lane decorator needs from // pkg/lane.Registry. Defined as an interface so tests can substitute a // fake registry without spinning up a real one. type LaneRegistry interface { GetOrCreate(ctx context.Context, name string) lane.Lane } // laneProvider decorates an llm.Provider so every model it mints routes // calls through the lane named by LaneFor(provider/model). With a nil // registry the queueing is skipped but error attribution still applies. type laneProvider struct { inner llm.Provider registry LaneRegistry execTimeout time.Duration } // WrapProviderForLane returns a provider whose models submit each // Generate/Stream call through the lane named by LaneFor(name/model) in // the registry, and stamp CallInfo attribution onto every error. // // A nil registry disables queueing (calls pass straight through) but the // decoration — and with it error attribution — remains, so failover // logging works in lane-less deployments and tests. func WrapProviderForLane(inner llm.Provider, registry LaneRegistry) llm.Provider { return wrapProviderForLane(inner, registry, defaultLaneExecTimeout) } func wrapProviderForLane(inner llm.Provider, registry LaneRegistry, execTimeout time.Duration) llm.Provider { if inner == nil { return nil } if execTimeout <= 0 { execTimeout = defaultLaneExecTimeout } return &laneProvider{inner: inner, registry: registry, execTimeout: execTimeout} } func (p *laneProvider) Name() string { return p.inner.Name() } func (p *laneProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) { m, err := p.inner.Model(id, opts...) if err != nil { return nil, err } return &laneModel{ inner: m, registry: p.registry, laneName: LaneFor(p.inner.Name() + "/" + id), execTimeout: p.execTimeout, }, nil } // laneModel routes one model's calls through its lane and stamps error // attribution. The lane name is resolved once at Model() time — the // provider name and model id are both known there, unlike legacy gollm where // the request had to be inspected per call. type laneModel struct { inner llm.Model registry LaneRegistry laneName string execTimeout time.Duration } func (m *laneModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() } // laneJob adapts an in-flight call to the lane.Job interface. The result // is captured into the struct and read after SubmitWait returns. type laneJob struct { id string callerID string run func(ctx context.Context) error } func (j *laneJob) ID() string { return j.id } func (j *laneJob) CallerID() string { return j.callerID } func (j *laneJob) Priority() int { return 0 } func (j *laneJob) Run(ctx context.Context) error { return j.run(ctx) } func (m *laneModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) { // Fold options now so the job closure and the attribution snapshot // both see the final request. req = req.Apply(opts...) info := CallInfo{ CallerID: LaneCallerFromContext(ctx), RunID: RunIDFromContext(ctx), Messages: req.Messages, } resp, err := m.submit(ctx, func(execCtx context.Context) (*llm.Response, error) { return m.inner.Generate(execCtx, req) }) if err != nil { return resp, WithCallInfo(err, info) } return resp, nil } func (m *laneModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) { req = req.Apply(opts...) info := CallInfo{ CallerID: LaneCallerFromContext(ctx), RunID: RunIDFromContext(ctx), Messages: req.Messages, } l := m.lane(ctx) if l == nil { s, err := m.inner.Stream(ctx, req) if err != nil { return nil, WithCallInfo(err, info) } return s, nil } // Streams hold their lane slot only while ESTABLISHING the stream — // holding it for the full consumption would deadlock a slow consumer // against the pool. The caller's ctx is used as-is (no deadline // detach): severing cancellation from a long-lived stream would leak // connections. var ( stream llm.Stream serr error ) job := &laneJob{ id: uuid.New().String(), callerID: info.CallerID, run: func(context.Context) error { stream, serr = m.inner.Stream(ctx, req) return serr }, } if err := l.SubmitWait(ctx, job); err != nil { return nil, WithCallInfo(err, info) } if serr != nil { return nil, WithCallInfo(serr, info) } return stream, nil } // lane resolves the lane for this model, or nil when queueing is // disabled (nil registry, or a registry that declines the name). func (m *laneModel) lane(ctx context.Context) lane.Lane { if m.registry == nil { return nil } return m.registry.GetOrCreate(ctx, m.laneName) } // submit runs fn through the lane (or directly when queueing is off). // // Inside a lane job the caller's deadline is detached so queue wait does // not consume the execution budget — ctx VALUES (usage attribution, // trace ids) are preserved, only cancellation/deadline are severed — and // an execTimeout backstop prevents runaway calls. Queue-phase // cancellation still works: SubmitWait waits on the original ctx, so a // caller that gives up while queued exits immediately. func (m *laneModel) submit(ctx context.Context, fn func(context.Context) (*llm.Response, error)) (*llm.Response, error) { l := m.lane(ctx) if l == nil { return fn(ctx) } var ( resp *llm.Response err error ) job := &laneJob{ id: uuid.New().String(), callerID: LaneCallerFromContext(ctx), run: func(context.Context) error { execCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), m.execTimeout) defer cancel() resp, err = fn(execCtx) // Returning err lets the lane's pool propagate it to // SubmitWait; the captured err is what we surface. return err }, } if serr := l.SubmitWait(ctx, job); serr != nil && err == nil { return nil, serr } return resp, err } // --------------------------------------------------------------------------- // Model timeout decoration (foreman) // --------------------------------------------------------------------------- // timeoutProvider wraps a provider so every minted model enforces a hard // per-call deadline on Generate. Used for foreman targets (slow local // LLMs). Stream is passed through: a wall-clock deadline on a long-lived // stream would sever it mid-consumption. type timeoutProvider struct { inner llm.Provider timeout time.Duration } // withModelTimeout decorates p so its models' Generate calls carry a // hard timeout. func withModelTimeout(p llm.Provider, d time.Duration) llm.Provider { if p == nil || d <= 0 { return p } return &timeoutProvider{inner: p, timeout: d} } func (p *timeoutProvider) Name() string { return p.inner.Name() } func (p *timeoutProvider) Model(id string, opts ...llm.ModelOption) (llm.Model, error) { m, err := p.inner.Model(id, opts...) if err != nil { return nil, err } return &timeoutModel{inner: m, timeout: p.timeout}, nil } type timeoutModel struct { inner llm.Model timeout time.Duration } func (m *timeoutModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() } func (m *timeoutModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) { ctx, cancel := context.WithTimeout(ctx, m.timeout) defer cancel() return m.inner.Generate(ctx, req, opts...) } func (m *timeoutModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) { return m.inner.Stream(ctx, req, opts...) }