4d2f85d139
First Tier-2 battery, plugging into run.Ports.Audit: - storage.go/writer.go: skillaudit's Storage interface + per-run Writer moved clean (only utils->fmt); the Writer already matches run.RunRecorder's shape. - sink.go: Sink adapts a Storage to run.Audit (StartRun -> a run row + a Writer wrapped as run.RunRecorder, converting run.RunStats on Close). NewSink(nil) is equivalent to no audit. Compile-time proofs: Sink is run.Audit, recorder is run.RunRecorder. - memory.go: NewMemory() — a zero-dependency, queryable in-process Storage (retains runs + logs; all 17 read/filter/purge/walk methods) so a light host gets run history with no setup. Mort keeps its GORM Storage; contrib/store adds durable SQLite at P4. End-to-end test: wire audit.NewSink(audit.NewMemory()) into the executor, run an agent, and the run is recorded with terminal status/output and queryable by caller. CI invariant verified: core imports ZERO from the audit battery (proper battery direction; battery imports core, never the reverse). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
323 lines
12 KiB
Go
323 lines
12 KiB
Go
package audit
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
)
|
|
|
|
// stepTextMax caps the per-step assistant-text preview persisted on a
|
|
// "step" event. Large enough to capture the model's reasoning around a
|
|
// (mis)fired tool call — the single best clue to WHY a model emitted a
|
|
// malformed call — but bounded so the longtext payload can't balloon.
|
|
const stepTextMax = 2000
|
|
|
|
// Writer wraps a Storage with the OnStep / OnTool callbacks suitable for
|
|
// wiring into the majordomo agent loop's step observer, tracking sequence
|
|
// numbers and tool-call counts internally.
|
|
//
|
|
// Why: the agent loop's observer hooks are unaware of run identity; the
|
|
// writer captures the runID + skill metadata at construction so the
|
|
// per-event callbacks stay simple. AppendLog failures are logged but
|
|
// never fatal — audit must not break user-visible execution.
|
|
//
|
|
// What: NewWriter(storage, runID) → use OnStep / OnTool / Close. Close
|
|
// records the final FinishRun. The executors translate each agent.Step
|
|
// into one OnStep call (1-indexed iteration, the step's *llm.Response)
|
|
// plus one OnTool call per executed tool.
|
|
//
|
|
// Test: see writer_test.go for sequence ordering and finish semantics.
|
|
type Writer struct {
|
|
storage Storage
|
|
runID string
|
|
sequence atomic.Int32
|
|
calls atomic.Int32
|
|
mu sync.Mutex // guards Close idempotency + token tally
|
|
closed bool
|
|
|
|
// V5 token accumulator — summed across each OnStep's resp.Usage.
|
|
// Reads come from TokenStats() so the executor can pass them to
|
|
// FinishRun. atomics-on-Int64 would also work, but mu already
|
|
// guards Close + we need consistent multi-field reads anyway
|
|
// (input + output + thinking). The mutex hot-path overhead is
|
|
// negligible vs the LLM call latency that dominates step time.
|
|
inputTokens int64
|
|
outputTokens int64
|
|
thinkingTokens int64
|
|
|
|
// Per-step wall-clock + run-level model attribution (guarded by mu).
|
|
// startedAt anchors the first step's duration; lastStepAt is the
|
|
// previous step's observation time; resolvedModelLogged ensures the
|
|
// one-shot "resolved_model" run-level event fires at most once.
|
|
startedAt time.Time
|
|
lastStepAt time.Time
|
|
resolvedModelLogged bool
|
|
}
|
|
|
|
// NewWriter constructs a Writer. The caller is expected to have already
|
|
// called Storage.StartRun.
|
|
func NewWriter(storage Storage, runID string) *Writer {
|
|
return &Writer{storage: storage, runID: runID, startedAt: time.Now()}
|
|
}
|
|
|
|
// OnStep records one agent-loop step: a "step" event with the iteration
|
|
// number and the response's text size.
|
|
//
|
|
// V5: also tallies per-step token usage. majordomo populates
|
|
// resp.Usage when the provider reports it; for providers that don't,
|
|
// the fields stay 0 and the tally stays at 0 — the formatter then
|
|
// renders "—" rather than a misleading "$0.00".
|
|
//
|
|
// Why we tally here vs in the agent loop: the loop's Result.Usage is a
|
|
// run total; the audit row needs the same numbers, but the writer also
|
|
// serves the live RunState accessor mid-run, so a per-step running sum
|
|
// is the right shape. Global usage attribution is handled by the llms
|
|
// package's instrumented models — the writer tally is strictly the
|
|
// per-run audit roll-up.
|
|
func (w *Writer) OnStep(iter int, resp *llm.Response) {
|
|
if w == nil || w.storage == nil {
|
|
return
|
|
}
|
|
now := time.Now()
|
|
payload := map[string]any{"iter": iter}
|
|
|
|
w.mu.Lock()
|
|
// Per-step wall-clock: time since the previous observed step, or since
|
|
// run start for the first step. A long gap localises a slow/hung model
|
|
// call — the signal that was missing when an animate step-0 call hung
|
|
// ~5 min. NOTE: this is step-to-step wall time (model call + the prior
|
|
// step's tool execution), not pure model latency.
|
|
prev := w.lastStepAt
|
|
if prev.IsZero() {
|
|
prev = w.startedAt
|
|
}
|
|
if !prev.IsZero() {
|
|
payload["step_ms"] = now.Sub(prev).Milliseconds()
|
|
}
|
|
w.lastStepAt = now
|
|
if resp != nil {
|
|
w.inputTokens += int64(resp.Usage.InputTokens)
|
|
w.outputTokens += int64(resp.Usage.OutputTokens)
|
|
// Thinking/reasoning tokens are a first-class Usage field in
|
|
// majordomo (populated by the providers that report them).
|
|
w.thinkingTokens += int64(resp.Usage.ReasoningTokens)
|
|
}
|
|
// One-shot run-level served-model attribution: the FIRST step with a
|
|
// resolved model name emits a "resolved_model" event so a run that
|
|
// errors before producing a useful step still records which model
|
|
// served it. resp.Model is failover-aware ("provider/model-id" of the
|
|
// element that actually served), unlike the static configured head.
|
|
logResolvedModel := ""
|
|
if resp != nil && resp.Model != "" && !w.resolvedModelLogged {
|
|
w.resolvedModelLogged = true
|
|
logResolvedModel = resp.Model
|
|
}
|
|
w.mu.Unlock()
|
|
|
|
if resp != nil {
|
|
payload["text_len"] = len(resp.Text())
|
|
// Served model + why generation stopped — the two scalars that turn
|
|
// a "model misbehaved" guess into a fact. finish_reason on an
|
|
// empty-tool-call step disambiguates truncation (length) from a
|
|
// deliberate empty emission (tool_calls).
|
|
if resp.Model != "" {
|
|
payload["model"] = resp.Model
|
|
}
|
|
if resp.FinishReason != "" {
|
|
payload["finish_reason"] = string(resp.FinishReason)
|
|
}
|
|
// Per-step token breakdown (OnStep already reads these into the run
|
|
// total above; persisting the per-step slice costs nothing more).
|
|
payload["in_tokens"] = resp.Usage.InputTokens
|
|
payload["out_tokens"] = resp.Usage.OutputTokens
|
|
if resp.Usage.ReasoningTokens > 0 {
|
|
payload["thinking_tokens"] = resp.Usage.ReasoningTokens
|
|
}
|
|
if resp.Usage.CacheReadTokens > 0 {
|
|
payload["cache_read_tokens"] = resp.Usage.CacheReadTokens
|
|
}
|
|
// The model's own narration accompanying this step — the smoking gun
|
|
// for WHY a malformed tool call was emitted. Capped; suppressed when
|
|
// the step fired a secret-bearing tool (mcp_call/email_send/http_*)
|
|
// whose narration could echo the secret it's about to send.
|
|
if t := strings.TrimSpace(resp.Text()); t != "" {
|
|
if stepHasSecretTool(resp) {
|
|
payload["text_redacted"] = true
|
|
} else {
|
|
payload["text"] = truncate(t, stepTextMax)
|
|
}
|
|
}
|
|
} else {
|
|
payload["text_len"] = 0
|
|
}
|
|
|
|
w.appendLog("step", payload)
|
|
|
|
if logResolvedModel != "" {
|
|
w.appendLog("resolved_model", map[string]any{"model": logResolvedModel})
|
|
}
|
|
}
|
|
|
|
// stepHasSecretTool reports whether a step's response fired a tool whose
|
|
// surrounding narration could leak a secret (MCP args, email body/
|
|
// recipients, raw HTTP request). Mirrors the steps.go redaction list so
|
|
// the audit trace never persists secret-adjacent assistant text.
|
|
func stepHasSecretTool(resp *llm.Response) bool {
|
|
if resp == nil {
|
|
return false
|
|
}
|
|
for _, c := range resp.ToolCalls {
|
|
switch c.Name {
|
|
case "mcp_call", "email_send":
|
|
return true
|
|
}
|
|
if strings.HasPrefix(c.Name, "http_") {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// TokenStats returns the running totals tallied from OnStep.
|
|
// Safe to call concurrently. Returned values are a snapshot at call
|
|
// time. Used by the executors to populate RunStats before Close
|
|
// finalises the audit row.
|
|
//
|
|
// Why: the executor needs the totals AND a model name to compute cost,
|
|
// but cost calculation is a different concern from audit persistence.
|
|
// Exposing this getter lets the cost calculation live in the executor
|
|
// where the model is known.
|
|
func (w *Writer) TokenStats() (input, output, thinking int64) {
|
|
if w == nil {
|
|
return 0, 0, 0
|
|
}
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.inputTokens, w.outputTokens, w.thinkingTokens
|
|
}
|
|
|
|
// OnTool records a "tool_call" event with the tool name and a
|
|
// "tool_result" event with the result length. Tool count is incremented
|
|
// for each call. The executors call this once per executed tool call
|
|
// from their step observers (call + matching result content).
|
|
func (w *Writer) OnTool(call llm.ToolCall, result string) {
|
|
if w == nil || w.storage == nil {
|
|
return
|
|
}
|
|
w.calls.Add(1)
|
|
w.appendLog("tool_call", map[string]any{
|
|
"name": call.Name,
|
|
"args": string(call.Arguments),
|
|
"id": call.ID,
|
|
})
|
|
w.appendLog("tool_result", map[string]any{
|
|
"name": call.Name,
|
|
"id": call.ID,
|
|
"result": truncate(result, 4000),
|
|
"truncated": len(result) > 4000,
|
|
})
|
|
}
|
|
|
|
// LogEvent records a custom event mid-run. The executor uses this for
|
|
// diagnostic events (e.g. "compaction_setup" / "compaction_fired")
|
|
// outside the canonical step / tool_call / tool_result / error set.
|
|
// Nil-safe: no-op when receiver or storage is nil.
|
|
//
|
|
// Why: skill_run_logs is the only sink Steve can read from SQL, so
|
|
// diagnostics intended for post-hoc debugging belong here. slog goes
|
|
// to mort.log which is harder to reach from outside the host.
|
|
func (w *Writer) LogEvent(eventType string, payload map[string]any) {
|
|
if w == nil || w.storage == nil {
|
|
return
|
|
}
|
|
w.appendLog(eventType, payload)
|
|
}
|
|
|
|
// LogError records an "error" event mid-run. Distinct from the terminal
|
|
// status set by Close.
|
|
func (w *Writer) LogError(msg string) {
|
|
if w == nil || w.storage == nil {
|
|
return
|
|
}
|
|
w.appendLog("error", map[string]any{"message": msg})
|
|
}
|
|
|
|
// Close finishes the run. The caller assembles a RunStats; the writer
|
|
// fills in ToolCalls (which is bookkept on the writer itself) and
|
|
// hands the full record to FinishRun.
|
|
//
|
|
// Idempotent: subsequent calls are no-ops.
|
|
//
|
|
// Why a struct vs the old positional form: v5 adds four token + cost
|
|
// fields on top of the legacy six. The struct keeps call sites readable
|
|
// and lets future fields slot in without churning every caller.
|
|
//
|
|
// Why context.WithoutCancel: the run's terminal status MUST land in
|
|
// the audit row regardless of the run ctx's state. Pre-fix, child
|
|
// skill runs invoked via skill_invoke / skill_invoke_parallel inherited
|
|
// the parent agent's runCtx as their outer ctx; when the parent
|
|
// timed out at MaxRuntime, every in-flight child's FinishRun fired
|
|
// with that already-cancelled ctx and the row was left in
|
|
// status=running forever. Detaching here is defence in depth — the
|
|
// caller (skillexec.runInner / agentexec.runInner) ALSO detaches at
|
|
// the call site, but a cancelled ctx in the writer's hands MUST NOT
|
|
// drop the audit write. The short timeout (auditFinishTimeout) bounds
|
|
// the write so a hung DB doesn't pin the run goroutine indefinitely.
|
|
func (w *Writer) Close(ctx context.Context, stats RunStats) {
|
|
if w == nil || w.storage == nil {
|
|
return
|
|
}
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
if w.closed {
|
|
return
|
|
}
|
|
w.closed = true
|
|
stats.ToolCalls = int(w.calls.Load())
|
|
// Detach from the caller's deadline + cancellation. Run cleanup
|
|
// must complete even when the run ctx is dead. The fresh
|
|
// auditFinishTimeout caps how long we'll wait on the storage.
|
|
finishCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), auditFinishTimeout)
|
|
defer cancel()
|
|
if err := w.storage.FinishRun(finishCtx, w.runID, stats); err != nil {
|
|
slog.Warn("skillaudit: FinishRun failed", "run_id", w.runID, "error", err)
|
|
}
|
|
}
|
|
|
|
// auditFinishTimeout caps how long Close will wait on the storage's
|
|
// FinishRun call after detaching from the caller's ctx. 10s is generous
|
|
// for a single-row UPDATE against MySQL — anything longer suggests a
|
|
// hung connection that the run goroutine shouldn't keep waiting on.
|
|
const auditFinishTimeout = 10 * time.Second
|
|
|
|
// ToolCallsCount returns how many tool invocations OnTool has seen so
|
|
// far. Useful for budget enforcement.
|
|
func (w *Writer) ToolCallsCount() int { return int(w.calls.Load()) }
|
|
|
|
func (w *Writer) appendLog(eventType string, payload map[string]any) {
|
|
seq := int(w.sequence.Add(1))
|
|
log := SkillRunLog{
|
|
RunID: w.runID,
|
|
Sequence: seq,
|
|
EventType: eventType,
|
|
Payload: payload,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
if err := w.storage.AppendLog(context.Background(), log); err != nil {
|
|
slog.Warn("skillaudit: AppendLog failed", "run_id", w.runID, "seq", seq, "type", eventType, "error", err)
|
|
}
|
|
}
|
|
|
|
func truncate(s string, max int) string {
|
|
if len(s) <= max {
|
|
return s
|
|
}
|
|
return s[:max] + fmt.Sprintf("…[+%d bytes]", len(s)-max)
|
|
}
|