package audit import ( "context" "fmt" "log/slog" "strings" "sync" "sync/atomic" "time" llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // stepTextMax caps the per-step assistant-text preview persisted on a // "step" event. Large enough to capture the model's reasoning around a // (mis)fired tool call — the single best clue to WHY a model emitted a // malformed call — but bounded so the longtext payload can't balloon. const stepTextMax = 2000 // Writer wraps a Storage with the OnStep / OnTool callbacks suitable for // wiring into the majordomo agent loop's step observer, tracking sequence // numbers and tool-call counts internally. // // Why: the agent loop's observer hooks are unaware of run identity; the // writer captures the runID + skill metadata at construction so the // per-event callbacks stay simple. AppendLog failures are logged but // never fatal — audit must not break user-visible execution. // // What: NewWriter(storage, runID) → use OnStep / OnTool / Close. Close // records the final FinishRun. The executors translate each agent.Step // into one OnStep call (1-indexed iteration, the step's *llm.Response) // plus one OnTool call per executed tool. // // Test: see writer_test.go for sequence ordering and finish semantics. type Writer struct { storage Storage runID string sequence atomic.Int32 calls atomic.Int32 mu sync.Mutex // guards Close idempotency + token tally closed bool // V5 token accumulator — summed across each OnStep's resp.Usage. // Reads come from TokenStats() so the executor can pass them to // FinishRun. atomics-on-Int64 would also work, but mu already // guards Close + we need consistent multi-field reads anyway // (input + output + thinking). The mutex hot-path overhead is // negligible vs the LLM call latency that dominates step time. inputTokens int64 outputTokens int64 thinkingTokens int64 // Per-step wall-clock + run-level model attribution (guarded by mu). // startedAt anchors the first step's duration; lastStepAt is the // previous step's observation time; resolvedModelLogged ensures the // one-shot "resolved_model" run-level event fires at most once. startedAt time.Time lastStepAt time.Time resolvedModelLogged bool } // NewWriter constructs a Writer. The caller is expected to have already // called Storage.StartRun. func NewWriter(storage Storage, runID string) *Writer { return &Writer{storage: storage, runID: runID, startedAt: time.Now()} } // OnStep records one agent-loop step: a "step" event with the iteration // number and the response's text size. // // V5: also tallies per-step token usage. majordomo populates // resp.Usage when the provider reports it; for providers that don't, // the fields stay 0 and the tally stays at 0 — the formatter then // renders "—" rather than a misleading "$0.00". // // Why we tally here vs in the agent loop: the loop's Result.Usage is a // run total; the audit row needs the same numbers, but the writer also // serves the live RunState accessor mid-run, so a per-step running sum // is the right shape. Global usage attribution is handled by the llms // package's instrumented models — the writer tally is strictly the // per-run audit roll-up. func (w *Writer) OnStep(iter int, resp *llm.Response) { if w == nil || w.storage == nil { return } now := time.Now() payload := map[string]any{"iter": iter} w.mu.Lock() // Per-step wall-clock: time since the previous observed step, or since // run start for the first step. A long gap localises a slow/hung model // call — the signal that was missing when an animate step-0 call hung // ~5 min. NOTE: this is step-to-step wall time (model call + the prior // step's tool execution), not pure model latency. prev := w.lastStepAt if prev.IsZero() { prev = w.startedAt } if !prev.IsZero() { payload["step_ms"] = now.Sub(prev).Milliseconds() } w.lastStepAt = now if resp != nil { w.inputTokens += int64(resp.Usage.InputTokens) w.outputTokens += int64(resp.Usage.OutputTokens) // Thinking/reasoning tokens are a first-class Usage field in // majordomo (populated by the providers that report them). w.thinkingTokens += int64(resp.Usage.ReasoningTokens) } // One-shot run-level served-model attribution: the FIRST step with a // resolved model name emits a "resolved_model" event so a run that // errors before producing a useful step still records which model // served it. resp.Model is failover-aware ("provider/model-id" of the // element that actually served), unlike the static configured head. logResolvedModel := "" if resp != nil && resp.Model != "" && !w.resolvedModelLogged { w.resolvedModelLogged = true logResolvedModel = resp.Model } w.mu.Unlock() if resp != nil { payload["text_len"] = len(resp.Text()) // Served model + why generation stopped — the two scalars that turn // a "model misbehaved" guess into a fact. finish_reason on an // empty-tool-call step disambiguates truncation (length) from a // deliberate empty emission (tool_calls). if resp.Model != "" { payload["model"] = resp.Model } if resp.FinishReason != "" { payload["finish_reason"] = string(resp.FinishReason) } // Per-step token breakdown (OnStep already reads these into the run // total above; persisting the per-step slice costs nothing more). payload["in_tokens"] = resp.Usage.InputTokens payload["out_tokens"] = resp.Usage.OutputTokens if resp.Usage.ReasoningTokens > 0 { payload["thinking_tokens"] = resp.Usage.ReasoningTokens } if resp.Usage.CacheReadTokens > 0 { payload["cache_read_tokens"] = resp.Usage.CacheReadTokens } // The model's own narration accompanying this step — the smoking gun // for WHY a malformed tool call was emitted. Capped; suppressed when // the step fired a secret-bearing tool (mcp_call/email_send/http_*) // whose narration could echo the secret it's about to send. if t := strings.TrimSpace(resp.Text()); t != "" { if stepHasSecretTool(resp) { payload["text_redacted"] = true } else { payload["text"] = truncate(t, stepTextMax) } } } else { payload["text_len"] = 0 } w.appendLog("step", payload) if logResolvedModel != "" { w.appendLog("resolved_model", map[string]any{"model": logResolvedModel}) } } // stepHasSecretTool reports whether a step's response fired a tool whose // surrounding narration could leak a secret (MCP args, email body/ // recipients, raw HTTP request). Mirrors the steps.go redaction list so // the audit trace never persists secret-adjacent assistant text. func stepHasSecretTool(resp *llm.Response) bool { if resp == nil { return false } for _, c := range resp.ToolCalls { switch c.Name { case "mcp_call", "email_send": return true } if strings.HasPrefix(c.Name, "http_") { return true } } return false } // TokenStats returns the running totals tallied from OnStep. // Safe to call concurrently. Returned values are a snapshot at call // time. Used by the executors to populate RunStats before Close // finalises the audit row. // // Why: the executor needs the totals AND a model name to compute cost, // but cost calculation is a different concern from audit persistence. // Exposing this getter lets the cost calculation live in the executor // where the model is known. func (w *Writer) TokenStats() (input, output, thinking int64) { if w == nil { return 0, 0, 0 } w.mu.Lock() defer w.mu.Unlock() return w.inputTokens, w.outputTokens, w.thinkingTokens } // OnTool records a "tool_call" event with the tool name and a // "tool_result" event with the result length. Tool count is incremented // for each call. The executors call this once per executed tool call // from their step observers (call + matching result content). func (w *Writer) OnTool(call llm.ToolCall, result string) { if w == nil || w.storage == nil { return } w.calls.Add(1) w.appendLog("tool_call", map[string]any{ "name": call.Name, "args": string(call.Arguments), "id": call.ID, }) w.appendLog("tool_result", map[string]any{ "name": call.Name, "id": call.ID, "result": truncate(result, 4000), "truncated": len(result) > 4000, }) } // LogEvent records a custom event mid-run. The executor uses this for // diagnostic events (e.g. "compaction_setup" / "compaction_fired") // outside the canonical step / tool_call / tool_result / error set. // Nil-safe: no-op when receiver or storage is nil. // // Why: skill_run_logs is the only sink Steve can read from SQL, so // diagnostics intended for post-hoc debugging belong here. slog goes // to mort.log which is harder to reach from outside the host. func (w *Writer) LogEvent(eventType string, payload map[string]any) { if w == nil || w.storage == nil { return } w.appendLog(eventType, payload) } // LogError records an "error" event mid-run. Distinct from the terminal // status set by Close. func (w *Writer) LogError(msg string) { if w == nil || w.storage == nil { return } w.appendLog("error", map[string]any{"message": msg}) } // Close finishes the run. The caller assembles a RunStats; the writer // fills in ToolCalls (which is bookkept on the writer itself) and // hands the full record to FinishRun. // // Idempotent: subsequent calls are no-ops. // // Why a struct vs the old positional form: v5 adds four token + cost // fields on top of the legacy six. The struct keeps call sites readable // and lets future fields slot in without churning every caller. // // Why context.WithoutCancel: the run's terminal status MUST land in // the audit row regardless of the run ctx's state. Pre-fix, child // skill runs invoked via skill_invoke / skill_invoke_parallel inherited // the parent agent's runCtx as their outer ctx; when the parent // timed out at MaxRuntime, every in-flight child's FinishRun fired // with that already-cancelled ctx and the row was left in // status=running forever. Detaching here is defence in depth — the // caller (skillexec.runInner / agentexec.runInner) ALSO detaches at // the call site, but a cancelled ctx in the writer's hands MUST NOT // drop the audit write. The short timeout (auditFinishTimeout) bounds // the write so a hung DB doesn't pin the run goroutine indefinitely. func (w *Writer) Close(ctx context.Context, stats RunStats) { if w == nil || w.storage == nil { return } w.mu.Lock() defer w.mu.Unlock() if w.closed { return } w.closed = true stats.ToolCalls = int(w.calls.Load()) // Detach from the caller's deadline + cancellation. Run cleanup // must complete even when the run ctx is dead. The fresh // auditFinishTimeout caps how long we'll wait on the storage. finishCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), auditFinishTimeout) defer cancel() if err := w.storage.FinishRun(finishCtx, w.runID, stats); err != nil { slog.Warn("skillaudit: FinishRun failed", "run_id", w.runID, "error", err) } } // auditFinishTimeout caps how long Close will wait on the storage's // FinishRun call after detaching from the caller's ctx. 10s is generous // for a single-row UPDATE against MySQL — anything longer suggests a // hung connection that the run goroutine shouldn't keep waiting on. const auditFinishTimeout = 10 * time.Second // ToolCallsCount returns how many tool invocations OnTool has seen so // far. Useful for budget enforcement. func (w *Writer) ToolCallsCount() int { return int(w.calls.Load()) } func (w *Writer) appendLog(eventType string, payload map[string]any) { seq := int(w.sequence.Add(1)) log := SkillRunLog{ RunID: w.runID, Sequence: seq, EventType: eventType, Payload: payload, CreatedAt: time.Now(), } if err := w.storage.AppendLog(context.Background(), log); err != nil { slog.Warn("skillaudit: AppendLog failed", "run_id", w.runID, "seq", seq, "type", eventType, "error", err) } } func truncate(s string, max int) string { if len(s) <= max { return s } return s[:max] + fmt.Sprintf("…[+%d bytes]", len(s)-max) }