Files
executus/model/call.go
T
steve b424261aca
executus CI / test (pull_request) Successful in 58s
Adversarial Review (Gadfly) / review (pull_request) Successful in 26m27s
executus CI / test (push) Successful in 1m2s
P1: model layer (convar->config inversion) + llmmeta
Lifts mort's pkg/logic/llms into executus/model, decoupled from mort:

- tiers.go: the tier resolver now reads a host-supplied config.Source under
  "model.tier.<name>" with host-supplied fallbacks (Configure(cfg, defaults,
  ttl)), instead of convar.Manager. Tier NAMES + specs are host config; the
  resolution mechanism (cache, reasoning-suffix dialect, chain validation) is
  generic. No tier names hard-coded in the harness.
- sink.go: usage/trace recording inverted off mort's llmusage/llmtrace into
  UsageSink / TraceSink seams + a model-owned Span, with nil-safe context
  attribution helpers (WithModel/WithTraceID/WithUsageTool/WithUsageUser).
  Both sinks optional (nil = off) so a light host records nothing.
- lane decoration repointed to executus/lane; utils.Errorf -> fmt.Errorf.
- call.go keeps GenerateWith[T] (instrumented structured output) — this is the
  structured-output primitive; no separate structured/ package.
- llmmeta moved over model/ (the meta-LLM helper: tier allowlist + JSON retry
  + ledger). Its tests configure a minimal tier table via TestMain.

New tests cover the inversion: config overrides fallback, tier registration,
reasoning-suffix survival, nested-tier rejection, nil-sink no-ops.

Full module: go build/vet/test -race green; core go.sum still free of
gorm/redis/discordgo/sqlite.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:47:13 -04:00

416 lines
13 KiB
Go

package model
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"time"
majordomo "gitea.stevedudenhoeffer.com/steve/majordomo"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"github.com/google/uuid"
)
// CallResult captures the result of a single tool call execution.
type CallResult struct {
Name string
Arguments string
Result string
Error error
}
// instrumentedModel decorates a parsed model so every successful Generate
// records token usage to the usage sink automatically. This is the
// single usage chokepoint: ANY call through a model from
// ParseModelRequest / ParseModelForContext is accounted, whether it goes
// through the helpers in this file, the agent loop, or a direct
// model.Generate at a call site.
//
// IMPORTANT: do not call RecordUsage on responses from a parsed model —
// that would double-count. RecordUsage exists for models obtained outside
// this package.
type instrumentedModel struct {
inner llm.Model
}
func (m *instrumentedModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
resp, err := m.inner.Generate(ctx, req, opts...)
if err == nil && resp != nil {
recordUsage(ctx, resp)
}
return resp, err
}
func (m *instrumentedModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
func (m *instrumentedModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// CallAndExecute sends messages to the model with a toolbox, executes any
// tool calls, and returns the results. It performs a single round of
// generation + tool execution (no looping) — multi-step loops belong to
// the agent package.
func CallAndExecute(ctx context.Context, model llm.Model, systemPrompt string, toolbox *llm.Toolbox, messages []llm.Message, opts ...llm.Option) ([]CallResult, string, error) {
req := llm.Request{System: systemPrompt, Messages: messages}
allOpts := make([]llm.Option, 0, len(opts)+1)
if toolbox != nil {
allOpts = append(allOpts, llm.WithToolbox(toolbox))
}
allOpts = append(allOpts, opts...)
startTime := time.Now()
resp, err := model.Generate(ctx, req, allOpts...)
if err != nil {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, nil, nil, startTime, err)
return nil, "", fmt.Errorf("completion failed: %w", err)
}
if len(resp.ToolCalls) == 0 || toolbox == nil {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, nil, startTime, nil)
return nil, resp.Text(), nil
}
var results []CallResult
for _, call := range resp.ToolCalls {
tr := toolbox.Execute(ctx, call)
cr := CallResult{
Name: call.Name,
Arguments: string(call.Arguments),
Result: tr.Content,
}
if tr.IsError {
cr.Error = errors.New(tr.Content)
}
results = append(results, cr)
}
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, results, startTime, nil)
return results, resp.Text(), nil
}
// GenerateWith sends messages to the model with an optional system prompt and
// returns structured output parsed into T. T must be a struct. Uses
// majordomo's native structured output (response schema derived from T).
func GenerateWith[T any](ctx context.Context, model llm.Model, systemPrompt string, messages []llm.Message, opts ...llm.Option) (T, error) {
req := llm.Request{System: systemPrompt, Messages: messages}
startTime := time.Now()
// Capture the raw response so the trace span carries usage and the
// concrete serving model even though majordomo.Generate only returns T.
capture := &captureModel{inner: model}
result, err := majordomo.Generate[T](ctx, capture, req, opts...)
resolvedModel := resolvedModelName(ctx, capture.resp)
if tracingEnabled(ctx) {
span := Span{
SpanID: uuid.New().String(),
TraceID: traceIDFromContext(ctx),
Model: resolvedModel,
SystemPrompt: systemPrompt,
Messages: marshalMessages(messages),
DurationMs: time.Since(startTime).Milliseconds(),
StartedAt: startTime,
CompletedAt: time.Now(),
CreatedAt: time.Now(),
}
if capture.resp != nil {
span.InputTokens = capture.resp.Usage.InputTokens
span.OutputTokens = capture.resp.Usage.OutputTokens
}
if err != nil {
span.Error = err.Error()
// Structured-output failure: log loudly so operators can chase
// down a regression (e.g. a model returning prose or fenced
// JSON the decoder rejects) from the trace span alone. The
// error string includes the failing field path on decode
// errors.
if isStructuredOutputParseError(err) {
slog.Warn("llms.GenerateWith: structured-output parse failure",
"model", resolvedModel,
"span_id", span.SpanID,
"trace_id", span.TraceID,
"err", err.Error(),
)
}
} else {
b, _ := json.Marshal(result)
span.ResponseText = string(b)
}
traceSink.WriteSpan(span)
} else if err != nil && isStructuredOutputParseError(err) {
// Tracing disabled: slog.Warn is the only breadcrumb operators get.
slog.Warn("llms.GenerateWith: structured-output parse failure (no trace span)",
"model", resolvedModel,
"err", err.Error(),
)
}
return result, err
}
// captureModel records the last successful response so wrappers that
// only see the decoded result (majordomo.Generate) can still attribute
// usage and tracing.
type captureModel struct {
inner llm.Model
resp *llm.Response
}
func (m *captureModel) Generate(ctx context.Context, req llm.Request, opts ...llm.Option) (*llm.Response, error) {
resp, err := m.inner.Generate(ctx, req, opts...)
if err == nil {
m.resp = resp
}
return resp, err
}
func (m *captureModel) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
return m.inner.Stream(ctx, req, opts...)
}
func (m *captureModel) Capabilities() llm.Capabilities { return m.inner.Capabilities() }
// isStructuredOutputParseError reports whether err looks like a
// structured-output failure from majordomo.Generate — either the decode
// path ("decode structured response") or the empty-response path
// ("structured response from ... is empty"). Used to gate the loud
// slog.Warn so transport errors don't get tagged as parse failures.
func isStructuredOutputParseError(err error) bool {
if err == nil {
return false
}
s := err.Error()
return strings.Contains(s, "decode structured response") ||
strings.Contains(s, "structured response from")
}
// SimpleCall sends a single user message to the model with an optional system
// prompt and returns the text response. No tools involved.
func SimpleCall(ctx context.Context, model llm.Model, systemPrompt string, userMessage string, opts ...llm.Option) (string, error) {
msgs := []llm.Message{llm.UserText(userMessage)}
startTime := time.Now()
resp, err := model.Generate(ctx, llm.Request{System: systemPrompt, Messages: msgs}, opts...)
if err != nil {
recordSpanFromWrapper(ctx, systemPrompt, msgs, nil, nil, nil, startTime, err)
return "", fmt.Errorf("completion failed: %w", err)
}
recordSpanFromWrapper(ctx, systemPrompt, msgs, nil, resp, nil, startTime, nil)
return resp.Text(), nil
}
// RecordUsage records LLM token usage from a successful Generate response.
//
// ONLY call this for models obtained outside this package: models returned
// by ParseModelRequest / ParseModelForContext record usage automatically on
// every Generate, and calling RecordUsage on their responses double-counts.
func RecordUsage(ctx context.Context, resp llm.Response) {
recordUsage(ctx, &resp)
}
// RecordSpan records a trace span for a direct model.Generate() call.
// Call this from modules that invoke model.Generate() directly when they
// want the call traced (usage is already recorded automatically for
// parsed models).
func RecordSpan(ctx context.Context, systemPrompt string, messages []llm.Message, toolbox *llm.Toolbox, resp *llm.Response, callResults []CallResult, startTime time.Time, callErr error) {
recordSpanFromWrapper(ctx, systemPrompt, messages, toolbox, resp, callResults, startTime, callErr)
}
// recordUsage records token usage for one response. The model is
// attributed from the response itself when possible (resp.Model names
// the chain element that actually served the request — more precise than
// the requested spec), falling back to the context attribution set by
// ParseModelForContext.
func recordUsage(ctx context.Context, resp *llm.Response) {
if usageSink == nil || resp == nil {
return
}
u := resp.Usage
if u.InputTokens == 0 && u.OutputTokens == 0 {
return
}
model := resolvedModelName(ctx, resp)
if model == "unknown" || model == "" {
tool := toolFromContext(ctx)
if tool == "unknown" {
slog.Warn("model usage: recording with both unknown model and tool",
"user", userFromContext(ctx), "stack", string(debug.Stack()))
} else {
slog.Warn("model usage: recording with unknown model — caller should set model.WithModel or use model.ParseModelForContext",
"tool", tool, "user", userFromContext(ctx))
}
}
usageSink.Record(ctx, model, u.InputTokens, u.OutputTokens, u.CacheReadTokens, u.CacheWriteTokens)
}
// resolvedModelName picks the usage/trace attribution name: the serving
// model from the response when present ("provider/model" → "model"),
// else the context's requested model resolved through the tier table.
func resolvedModelName(ctx context.Context, resp *llm.Response) string {
if resp != nil && resp.Model != "" {
name := resp.Model
if idx := strings.Index(name, "/"); idx >= 0 {
name = name[idx+1:]
}
return name
}
return ResolveModelName(modelFromContext(ctx))
}
// tracingEnabled returns true if there's an active trace and tracing is enabled.
func tracingEnabled(ctx context.Context) bool {
if traceSink == nil {
return false
}
return traceIDFromContext(ctx) != ""
}
// recordSpanFromWrapper records a trace span if tracing is active.
func recordSpanFromWrapper(ctx context.Context, systemPrompt string, messages []llm.Message, toolbox *llm.Toolbox, resp *llm.Response, callResults []CallResult, startTime time.Time, callErr error) {
if !tracingEnabled(ctx) {
return
}
now := time.Now()
span := Span{
SpanID: uuid.New().String(),
TraceID: traceIDFromContext(ctx),
Model: resolvedModelName(ctx, resp),
SystemPrompt: systemPrompt,
Messages: marshalMessages(messages),
ToolDefinitions: marshalToolDefs(toolbox),
DurationMs: now.Sub(startTime).Milliseconds(),
StartedAt: startTime,
CompletedAt: now,
CreatedAt: now,
}
if callErr != nil {
span.Error = callErr.Error()
}
if resp != nil {
span.ResponseText = resp.Text()
span.InputTokens = resp.Usage.InputTokens
span.OutputTokens = resp.Usage.OutputTokens
if len(resp.ToolCalls) > 0 {
span.ResponseToolCalls = marshalToolCalls(resp.ToolCalls)
}
}
if len(callResults) > 0 {
span.ToolResults = marshalCallResults(callResults)
}
traceSink.WriteSpan(span)
}
// --- Serialization helpers ---
type jsonMessage struct {
Role string `json:"role"`
Text string `json:"text,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
ImageCount int `json:"image_count,omitempty"`
}
func marshalMessages(msgs []llm.Message) string {
out := make([]jsonMessage, 0, len(msgs))
for _, m := range msgs {
jm := jsonMessage{
Role: string(m.Role),
Text: m.Text(),
}
for _, p := range m.Parts {
if _, ok := p.(llm.ImagePart); ok {
jm.ImageCount++
}
}
if len(m.ToolResults) > 0 {
jm.ToolCallID = m.ToolResults[0].ID
}
out = append(out, jm)
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonToolCall struct {
ID string `json:"id"`
Name string `json:"name"`
Arguments string `json:"arguments"`
}
func marshalToolCalls(calls []llm.ToolCall) string {
out := make([]jsonToolCall, 0, len(calls))
for _, c := range calls {
out = append(out, jsonToolCall{
ID: c.ID,
Name: c.Name,
Arguments: string(c.Arguments),
})
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonCallResult struct {
Name string `json:"name"`
Arguments string `json:"arguments"`
Result string `json:"result"`
Error string `json:"error,omitempty"`
}
func marshalCallResults(results []CallResult) string {
out := make([]jsonCallResult, 0, len(results))
for _, r := range results {
jr := jsonCallResult{
Name: r.Name,
Arguments: r.Arguments,
Result: r.Result,
}
if r.Error != nil {
jr.Error = r.Error.Error()
}
out = append(out, jr)
}
b, _ := json.Marshal(out)
return string(b)
}
type jsonToolDef struct {
Name string `json:"name"`
Description string `json:"description"`
}
func marshalToolDefs(tb *llm.Toolbox) string {
if tb == nil {
return ""
}
tools := tb.Tools()
if len(tools) == 0 {
return ""
}
out := make([]jsonToolDef, 0, len(tools))
for _, t := range tools {
out = append(out, jsonToolDef{
Name: t.Name,
Description: t.Description,
})
}
b, _ := json.Marshal(out)
return string(b)
}