Files
executus/model/sink.go
T
steve 69c2eb5f47
executus CI / test (pull_request) Successful in 1m0s
fix: address verified gadfly P2 findings (9 real of 18)
Independently verified all 18 gadfly findings against the code (18-agent
fan-out). Fixed the 9 real ones; the other 9 were false-positive /
hallucinated / valid-tradeoff (no change).

High:
- F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the
  agent loop and nil-panicked. Now a clean error (Run never panics). +test.
- F9 compactor data-leak: renderTranscript sent tool-call args verbatim to
  the summarizer (a possibly-different provider/tier); secret-bearing tool
  args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc
  note that result bodies still flow (summary needs them).

Medium/minor:
- F2 compactor error path returned the folded slice, not the original msgs
  (contradicting the documented non-fatal contract) -> return msgs.
- F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) /
  cancelled (Canceled) via statusFor. +test.
- F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs
  min(calls,results) only.
- F5 SetIteration was never called -> RunState.Iteration always 0; the step
  observer now updates it each loop.
- F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue).
- F7 estimateTokens had no default arm (future Part kinds counted as 0);
  unknown parts now counted conservatively.
- F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now
  a clear "response exceeded N bytes" via readCapped.
- F12 step observer captured the caller ctx; now the merged runCtx.
- F13 compaction onFire was nil (doc claimed it logged); now wired to
  audit LogEvent("compaction_fired").
- F11 (no pre-dispatch hook in majordomo) documented honestly as a known
  limitation; F18 UsageSink doc clarified cache tokens are subsets of input.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 21:42:46 -04:00

137 lines
4.1 KiB
Go

package model
import (
"context"
"time"
)
// This file is executus's inversion of mort's llmusage / llmtrace coupling.
// The model package owns the MECHANISM (instrument every parsed model's
// Generate, attribute by serving model, emit a span when a trace is active);
// WHERE usage/traces land is a host seam. A host registers a UsageSink and/or
// a TraceSink; both are optional (nil = off), so a light host records nothing.
// --- Usage ---
// UsageSink receives one record per successful Generate through a model parsed
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
// meter or bill; the token detail mirrors majordomo's Response.Usage.
//
// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens,
// not independent additive values (they let a sink price cached vs fresh input
// differently). A sink must NOT compute total = input+output+cacheRead+
// cacheWrite — that double-counts the cached input.
type UsageSink interface {
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
}
var usageSink UsageSink
// SetUsageSink installs the usage sink (nil disables usage recording). Call at
// startup before model calls.
func SetUsageSink(s UsageSink) { usageSink = s }
// --- Trace ---
// Span is one traced model call. The host's TraceSink persists it however it
// likes (a DB row, a log line, an OTel span). String fields carrying structured
// data (Messages, ToolDefinitions, ...) are pre-marshalled JSON.
type Span struct {
SpanID string
TraceID string
Model string
SystemPrompt string
Messages string
ToolDefinitions string
ResponseText string
ResponseToolCalls string
ToolResults string
Error string
InputTokens int
OutputTokens int
DurationMs int64
StartedAt time.Time
CompletedAt time.Time
CreatedAt time.Time
}
// TraceSink receives a Span for each traced call (one is emitted only when a
// trace id is present on the context — see WithTraceID).
type TraceSink interface {
WriteSpan(span Span)
}
var traceSink TraceSink
// SetTraceSink installs the trace sink (nil disables tracing).
func SetTraceSink(s TraceSink) { traceSink = s }
// TraceSinkActive reports whether a trace sink is installed.
func TraceSinkActive() bool { return traceSink != nil }
// --- Context attribution ---
//
// ParseModelForContext stamps the requested model onto the context so usage
// from a response that doesn't name its serving model can still be attributed.
// A host's tracing/usage middleware stamps a trace id and optional caller/tool
// for diagnostics. All reads are nil/empty-safe.
type (
ctxKeyModel struct{}
ctxKeyTrace struct{}
ctxKeyTool struct{}
ctxKeyUser struct{}
)
// WithModel attributes subsequent usage on ctx to the given model name.
func WithModel(ctx context.Context, model string) context.Context {
return context.WithValue(ctx, ctxKeyModel{}, model)
}
func modelFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyModel{}).(string); ok {
return v
}
return ""
}
// WithTraceID marks ctx as belonging to a trace; a TraceSink (if installed)
// then receives a Span per call. An empty id (or no id) disables tracing.
func WithTraceID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, ctxKeyTrace{}, id)
}
func traceIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyTrace{}).(string); ok {
return v
}
return ""
}
// WithUsageTool / WithUsageUser attach optional attribution used only in the
// "unknown model" diagnostic warning. Default "unknown".
func WithUsageTool(ctx context.Context, tool string) context.Context {
return context.WithValue(ctx, ctxKeyTool{}, tool)
}
func toolFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyTool{}).(string); ok && v != "" {
return v
}
return "unknown"
}
func WithUsageUser(ctx context.Context, user string) context.Context {
return context.WithValue(ctx, ctxKeyUser{}, user)
}
func userFromContext(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyUser{}).(string); ok && v != "" {
return v
}
return "unknown"
}