From addf3a19d17235be84adc697adc85ba22bfda08d Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 22:14:37 -0400 Subject: [PATCH] =?UTF-8?q?P4:=20audit=20battery=20=E2=80=94=20run.Audit?= =?UTF-8?q?=20Sink=20+=20Writer=20+=20queryable=20Memory=20store?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First Tier-2 battery, plugging into run.Ports.Audit: - storage.go/writer.go: skillaudit's Storage interface + per-run Writer moved clean (only utils->fmt); the Writer already matches run.RunRecorder's shape. - sink.go: Sink adapts a Storage to run.Audit (StartRun -> a run row + a Writer wrapped as run.RunRecorder, converting run.RunStats on Close). NewSink(nil) is equivalent to no audit. Compile-time proofs: Sink is run.Audit, recorder is run.RunRecorder. - memory.go: NewMemory() — a zero-dependency, queryable in-process Storage (retains runs + logs; all 17 read/filter/purge/walk methods) so a light host gets run history with no setup. Mort keeps its GORM Storage; contrib/store adds durable SQLite at P4. End-to-end test: wire audit.NewSink(audit.NewMemory()) into the executor, run an agent, and the run is recorded with terminal status/output and queryable by caller. CI invariant verified: core imports ZERO from the audit battery (proper battery direction; battery imports core, never the reverse). Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 3 +- audit/audit_test.go | 78 +++++++++++ audit/memory.go | 255 +++++++++++++++++++++++++++++++++++ audit/sink.go | 76 +++++++++++ audit/storage.go | 245 +++++++++++++++++++++++++++++++++ audit/writer.go | 322 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 978 insertions(+), 1 deletion(-) create mode 100644 audit/audit_test.go create mode 100644 audit/memory.go create mode 100644 audit/sink.go create mode 100644 audit/storage.go create mode 100644 audit/writer.go diff --git a/CLAUDE.md b/CLAUDE.md index 86a7be9..44fcc59 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,7 +63,8 @@ CORE (majordomo + stdlib): BATTERIES (opt-in siblings, each nil-safe + a default): persona/ Agent noun + AgentStore seam + yml loader [P4] skill/ rich Skill + SkillStore seam + toml loader [P4] - audit/ run-trace Sink (+ Noop/Slog) [P4] + audit/ run.Audit Sink + Writer + queryable Memory [P4 ✓] + default (skillaudit Storage iface; GORM stays in mort) critic/ two-tier timeout state machine + Escalator [P4] schedule/ cron runner cores [P4] checkpoint/ durable resume seam [P4] diff --git a/audit/audit_test.go b/audit/audit_test.go new file mode 100644 index 0000000..7cd0980 --- /dev/null +++ b/audit/audit_test.go @@ -0,0 +1,78 @@ +package audit_test + +import ( + "context" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/audit" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// TestAuditBatteryEndToEnd wires the audit battery (Memory storage) into +// run.Ports.Audit, runs an agent, and verifies the run was recorded and is +// queryable — proving Sink/Writer/Memory satisfy the core seams end to end. +func TestAuditBatteryEndToEnd(t *testing.T) { + mem := audit.NewMemory() + + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("the answer")) + m, err := fp.Model("m") + if err != nil { + t.Fatal(err) + } + + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, m, nil + }, + Ports: run.Ports{Audit: audit.NewSink(mem)}, + }) + + res := ex.Run(context.Background(), + run.RunnableAgent{ID: "agent-1", Name: "a", ModelTier: "m"}, + tool.Invocation{RunID: "run-xyz", CallerID: "caller-1"}, + "question") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + + // The run was recorded with a terminal status + output. + got, err := mem.GetRun(context.Background(), "run-xyz") + if err != nil { + t.Fatalf("GetRun: %v", err) + } + if got.Status != "ok" { + t.Errorf("status = %q, want ok", got.Status) + } + if got.Output != "the answer" { + t.Errorf("output = %q, want %q", got.Output, "the answer") + } + if got.FinishedAt == nil { + t.Error("FinishedAt should be set after the run") + } + if got.SkillID != "agent-1" { + t.Errorf("SkillID = %q, want agent-1 (the subject id)", got.SkillID) + } + + // And it is queryable by caller. + runs, err := mem.ListRunsByCaller(context.Background(), "caller-1", 10) + if err != nil { + t.Fatalf("ListRunsByCaller: %v", err) + } + if len(runs) != 1 || runs[0].ID != "run-xyz" { + t.Errorf("ListRunsByCaller = %+v, want [run-xyz]", runs) + } +} + +// TestNilSinkRecordsNothing: NewSink(nil) is equivalent to no audit. +func TestNilSinkRecordsNothing(t *testing.T) { + s := audit.NewSink(nil) + if rec := s.StartRun(context.Background(), run.RunInfo{RunID: "r"}); rec != nil { + t.Error("NewSink(nil).StartRun should return a nil recorder") + } +} diff --git a/audit/memory.go b/audit/memory.go new file mode 100644 index 0000000..00e0cfe --- /dev/null +++ b/audit/memory.go @@ -0,0 +1,255 @@ +package audit + +import ( + "context" + "sort" + "sync" + "time" +) + +// Memory is an in-process Storage: it retains runs + logs in memory so a light +// host (or a test) gets queryable run history with zero setup. It is bounded +// only by process memory — a host that runs forever should PurgeOlderThan +// periodically, or use a persistent Storage. Construct with NewMemory. +// +// Mort uses its GORM/MySQL Storage; contrib/store adds a durable SQLite one. +// Memory is the zero-dependency default behind audit.NewSink(audit.NewMemory()). +type Memory struct { + mu sync.RWMutex + order []string // run ids in insertion order + runs map[string]SkillRun // by run id + logs map[string][]SkillRunLog // by run id +} + +// NewMemory returns an empty in-memory Storage. +func NewMemory() *Memory { + return &Memory{runs: map[string]SkillRun{}, logs: map[string][]SkillRunLog{}} +} + +var _ Storage = (*Memory)(nil) + +func (m *Memory) Initialize(context.Context) error { return nil } + +func (m *Memory) StartRun(_ context.Context, run SkillRun) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.runs[run.ID]; !ok { + m.order = append(m.order, run.ID) + } + m.runs[run.ID] = run + return nil +} + +func (m *Memory) FinishRun(_ context.Context, runID string, s RunStats) error { + m.mu.Lock() + defer m.mu.Unlock() + r, ok := m.runs[runID] + if !ok { + return ErrNotFound + } + now := time.Now() + r.FinishedAt = &now + r.Status = s.Status + r.Output = s.Output + r.Error = s.Error + r.ToolCallsCount = s.ToolCalls + r.RuntimeSeconds = s.RuntimeSeconds + r.TotalInputTokens = s.InputTokens + r.TotalOutputTokens = s.OutputTokens + r.TotalThinkingTokens = s.ThinkingTokens + m.runs[runID] = r + return nil +} + +func (m *Memory) AppendLog(_ context.Context, log SkillRunLog) error { + m.mu.Lock() + defer m.mu.Unlock() + m.logs[log.RunID] = append(m.logs[log.RunID], log) + return nil +} + +func (m *Memory) GetRun(_ context.Context, runID string) (*SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + r, ok := m.runs[runID] + if !ok { + return nil, ErrNotFound + } + return &r, nil +} + +func (m *Memory) ListLogsByRun(_ context.Context, runID string) ([]SkillRunLog, error) { + m.mu.RLock() + defer m.mu.RUnlock() + ls := append([]SkillRunLog(nil), m.logs[runID]...) + sort.SliceStable(ls, func(i, j int) bool { return ls[i].Sequence < ls[j].Sequence }) + return ls, nil +} + +// newestFirst returns the retained runs in reverse insertion order, optionally +// filtered. Caller holds at least RLock. +func (m *Memory) newestFirst(keep func(SkillRun) bool) []SkillRun { + out := make([]SkillRun, 0, len(m.order)) + for i := len(m.order) - 1; i >= 0; i-- { + r := m.runs[m.order[i]] + if keep == nil || keep(r) { + out = append(out, r) + } + } + return out +} + +func page(rs []SkillRun, offset, limit int) []SkillRun { + if offset < 0 { + offset = 0 + } + if offset >= len(rs) { + return nil + } + rs = rs[offset:] + if limit > 0 && limit < len(rs) { + rs = rs[:limit] + } + return rs +} + +func (m *Memory) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) { + return m.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false) +} + +func (m *Memory) ListRunsBySkillPaginated(_ context.Context, skillID string, offset, limit int, includeDryRun bool) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { + return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") + }), offset, limit), nil +} + +func (m *Memory) CountRunsBySkill(_ context.Context, skillID string, includeDryRun bool) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return int64(len(m.newestFirst(func(r SkillRun) bool { + return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") + }))), nil +} + +func (m *Memory) ListRunsByCaller(_ context.Context, callerID string, limit int) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { + return r.CallerID == callerID && r.Status != "dry_run" + }), 0, limit), nil +} + +func (m *Memory) matchesFilter(r SkillRun, f RunFilter) bool { + if !f.IncludeDryRun && r.Status == "dry_run" { + return false + } + if f.Status != "" && r.Status != f.Status { + return false + } + if f.SkillID != "" && r.SkillID != f.SkillID { + return false + } + if f.CallerID != "" && r.CallerID != f.CallerID { + return false + } + if f.ChannelID != "" && r.ChannelID != f.ChannelID { + return false + } + if f.TopLevelOnly && r.ParentRunID != "" { + return false + } + if !f.Since.IsZero() && r.StartedAt.Before(f.Since) { + return false + } + if !f.Until.IsZero() && r.StartedAt.After(f.Until) { + return false + } + return true +} + +func (m *Memory) ListRunsFiltered(_ context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }), offset, limit), nil +} + +func (m *Memory) CountRunsFiltered(_ context.Context, f RunFilter) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return int64(len(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }))), nil +} + +func (m *Memory) PurgeOlderThan(_ context.Context, t time.Time) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + var purged int64 + kept := m.order[:0:0] + for _, id := range m.order { + r := m.runs[id] + if r.FinishedAt != nil && r.FinishedAt.Before(t) { + delete(m.runs, id) + delete(m.logs, id) + purged++ + continue + } + kept = append(kept, id) + } + m.order = kept + return purged, nil +} + +func (m *Memory) ListChildrenByParent(_ context.Context, parentRunID string) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.newestFirst(func(r SkillRun) bool { return r.ParentRunID == parentRunID }), nil +} + +func (m *Memory) WalkParentChain(_ context.Context, runID string) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + var chain []SkillRun + seen := map[string]bool{} + for id := runID; id != ""; { + r, ok := m.runs[id] + if !ok || seen[id] { + break + } + seen[id] = true + chain = append(chain, r) + id = r.ParentRunID + } + return chain, nil +} + +func (m *Memory) ListFinishedRunsBefore(_ context.Context, cutoff time.Time, limit int) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { + return r.FinishedAt != nil && r.FinishedAt.Before(cutoff) + }), 0, limit), nil +} + +func (m *Memory) LastRunBySkills(_ context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) { + m.mu.RLock() + defer m.mu.RUnlock() + want := map[string]bool{} + for _, id := range skillIDs { + want[id] = true + } + out := map[string]time.Time{} + for _, id := range m.order { + r := m.runs[id] + if !want[r.SkillID] { + continue + } + if !includeFailed && (r.Status == "error" || r.Status == "timeout") { + continue + } + if r.StartedAt.After(out[r.SkillID]) { + out[r.SkillID] = r.StartedAt + } + } + return out, nil +} diff --git a/audit/sink.go b/audit/sink.go new file mode 100644 index 0000000..b5071f3 --- /dev/null +++ b/audit/sink.go @@ -0,0 +1,76 @@ +package audit + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// Sink adapts an audit Storage to the run.Audit port: StartRun opens a run row +// and returns a per-run recorder (a Writer) that the executor feeds with steps, +// tool calls, and the terminal roll-up. This is what plugs the audit battery +// into run.Ports.Audit — mort backs it with its GORM Storage, a light host with +// Memory() (or omits it entirely). +type Sink struct{ storage Storage } + +// NewSink wraps a Storage as a run.Audit. A nil Storage yields a Sink whose +// StartRun returns nil (the executor then records nothing) — so NewSink(nil) is +// equivalent to leaving run.Ports.Audit unset. +func NewSink(storage Storage) *Sink { return &Sink{storage: storage} } + +// compile-time proof the adapter satisfies the core seams. +var ( + _ run.Audit = (*Sink)(nil) + _ run.RunRecorder = (*recorder)(nil) +) + +// StartRun records the run start and returns a recorder. Implements run.Audit. +func (s *Sink) StartRun(ctx context.Context, info run.RunInfo) run.RunRecorder { + if s == nil || s.storage == nil { + return nil + } + started := info.StartedAt + if started.IsZero() { + started = time.Now() + } + // Best-effort: a failed StartRun must not break the user-visible run. + _ = s.storage.StartRun(ctx, SkillRun{ + ID: info.RunID, + SkillID: info.SubjectID, + CallerID: info.CallerID, + ChannelID: info.ChannelID, + ParentRunID: info.ParentRunID, + Inputs: info.Inputs, + StartedAt: started, + Status: "running", + }) + return &recorder{w: NewWriter(s.storage, info.RunID)} +} + +// recorder adapts a *Writer to run.RunRecorder, converting run.RunStats to the +// audit RunStats on Close (the two have identical fields). +type recorder struct{ w *Writer } + +func (r *recorder) TokenStats() (in, out, thinking int64) { return r.w.TokenStats() } +func (r *recorder) ToolCallsCount() int { return r.w.ToolCallsCount() } +func (r *recorder) OnStep(iter int, resp *llm.Response) { r.w.OnStep(iter, resp) } +func (r *recorder) OnTool(call llm.ToolCall, result string) { r.w.OnTool(call, result) } +func (r *recorder) LogEvent(eventType string, payload map[string]any) { + r.w.LogEvent(eventType, payload) +} +func (r *recorder) LogError(msg string) { r.w.LogError(msg) } +func (r *recorder) Close(ctx context.Context, s run.RunStats) { + r.w.Close(ctx, RunStats{ + Status: s.Status, + Output: s.Output, + Error: s.Error, + ToolCalls: s.ToolCalls, + RuntimeSeconds: s.RuntimeSeconds, + InputTokens: s.InputTokens, + OutputTokens: s.OutputTokens, + ThinkingTokens: s.ThinkingTokens, + }) +} diff --git a/audit/storage.go b/audit/storage.go new file mode 100644 index 0000000..481c653 --- /dev/null +++ b/audit/storage.go @@ -0,0 +1,245 @@ +// Package skillaudit persists skill execution traces: per-run summary rows +// (skill_runs) and per-step event logs (skill_run_logs). The executor in +// pkg/logic/skillexec emits events through a Writer; the storage layer is +// kept separate so tests can mock it and so retention pruning has a clear +// home. +// +// Why: agentic runs can be long, multi-tool affairs. Without a structured +// audit trail, debugging "why did the LLM do that?" is impossible. The +// log table is keyed by (run_id, sequence) so insert order is preserved. +package audit + +import ( + "context" + "errors" + "time" +) + +// ErrNotFound is returned when a run lookup fails. +var ErrNotFound = errors.New("skill run not found") + +// SkillRun is the per-invocation summary row. One per call to +// Executor.Run. Status transitions through running → ok / error / +// timeout / budget_exceeded / dry_run. +type SkillRun struct { + ID string + SkillID string + CallerID string + ChannelID string + Inputs map[string]any + StartedAt time.Time + FinishedAt *time.Time + Status string // running|ok|error|timeout|budget_exceeded|dry_run + Output string + Error string + ToolCallsCount int + RuntimeSeconds float64 + // ParentRunID is the run_id of the parent skill that invoked this + // run via skill_invoke. Empty for top-level invocations. Indexed + // in the gorm model so call-tree queries (ListChildrenByParent + + // WalkParentChain) are cheap. + ParentRunID string + + // Token roll-ups, summed across all model completions in this run + // (one Usage per OnStep). All default to 0 when the provider did + // not expose token usage. + TotalInputTokens int64 + TotalOutputTokens int64 + TotalThinkingTokens int64 +} + +// RunStats captures the terminal state of a run for FinishRun. Bundling +// these into one struct (vs a long positional argument list) keeps +// callers readable; future fields slot in here without touching every +// call site. +// +// Why: FinishRun originally took six positional args; adding token +// columns would push it higher. A struct is the idiomatic Go way to +// avoid the positional-arg explosion. +type RunStats struct { + Status string // ok|error|timeout|budget_exceeded|dry_run + Output string // final agent output (empty on error) + Error string // error message (empty on success) + ToolCalls int // total OnTool count + RuntimeSeconds float64 // wall-clock duration + + // Token roll-ups (all default to 0 when token usage was not + // exposed by the provider). + InputTokens int64 + OutputTokens int64 + ThinkingTokens int64 +} + +// SkillRunLog is one event recorded during a run. EventType ∈ +// step|tool_call|tool_result|error. Payload is opaque JSON the writer +// emits. +type SkillRunLog struct { + RunID string + Sequence int + EventType string + Payload map[string]any + CreatedAt time.Time +} + +// RunFilter is the predicate bundle for the cross-surface "recent runs" +// query (ListRunsFiltered / CountRunsFiltered). Every field is optional; +// the zero value matches the most recent runs across ALL audited surfaces +// (agents + skills). This powers the admin agent-trace debug view and the +// Claude debug API's /runs list. +// +// Why a struct (vs positional args): the debug list filters along several +// independent axes and more will be added; bundling avoids a positional +// explosion and keeps call sites readable. +type RunFilter struct { + Status string // exact status match; "" = all (dry_run excluded unless IncludeDryRun) + SkillID string // exact skill_id (holds the agent UUID for agent runs) + CallerID string // exact caller (Discord member id) + ChannelID string // exact channel id + + // TopLevelOnly restricts to root runs (parent_run_id = ''), hiding + // nested sub-agent / sub-skill runs from the firehose. The debug list + // defaults this on; an "include nested" toggle clears it. + TopLevelOnly bool + + // IncludeDryRun surfaces status="dry_run" sandbox rows, which are + // excluded by default. Ignored when Status is set explicitly (an + // explicit Status=="dry_run" still matches). + IncludeDryRun bool + + // Since / Until bound started_at: started_at >= Since (zero = no lower + // bound) and started_at < Until (zero = no upper bound). + Since time.Time + Until time.Time +} + +// Storage is the persistence interface for skill runs and per-step logs. +// +// Why: tests substitute fake implementations; production wires +// NewGormStorage. Keep the interface narrow — the system only needs CRUD +// plus the retention prune helper. +type Storage interface { + Initialize(ctx context.Context) error + + // StartRun inserts the run with status=running. The caller MUST + // invoke FinishRun later (or the row stays in running indefinitely + // — operationally that signals a crash mid-run, which is useful + // signal). + StartRun(ctx context.Context, run SkillRun) error + + // FinishRun updates the running row with terminal status, output + // and stats. Idempotent on second call (last write wins). + // + // V5: takes a RunStats struct so token + cost columns can be + // written alongside the legacy fields without changing the + // signature for every future addition. + FinishRun(ctx context.Context, runID string, stats RunStats) error + + // AppendLog adds one event to the run's log. Sequence numbers must + // be unique per run; the writer is responsible for monotonic + // ordering. + AppendLog(ctx context.Context, log SkillRunLog) error + + // GetRun returns the run summary, or ErrNotFound. + GetRun(ctx context.Context, runID string) (*SkillRun, error) + + // ListLogsByRun returns all logs for a run in sequence order. + ListLogsByRun(ctx context.Context, runID string) ([]SkillRunLog, error) + + // ListRunsBySkill returns recent runs for a skill, newest first, + // capped at limit. Excludes dry-run rows by default — use + // ListRunsBySkillPaginated with includeDryRun=true to see them. + ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) + + // ListRunsBySkillPaginated returns recent runs for a skill, newest + // first, with offset+limit. When includeDryRun is false, rows with + // status="dry_run" are excluded (matches the wizard's sandbox + // status; see skillaudit.Writer / wizardtools docs). + // + // Why a separate paginated method vs. expanding ListRunsBySkill: + // callers that need the legacy "last N" view (Discord .skill runs, + // chatbot tool result) want the simpler signature; the paginated + // view is webui-specific. + ListRunsBySkillPaginated(ctx context.Context, skillID string, + offset, limit int, includeDryRun bool) ([]SkillRun, error) + + // CountRunsBySkill returns the total number of runs for a skill. + // When includeDryRun is false, dry-run rows are excluded so the + // count matches the default ListRunsBySkillPaginated result. + CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error) + + // ListRunsByCaller returns recent runs by a caller, newest first, + // capped at limit. + ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]SkillRun, error) + + // ListRunsFiltered returns runs matching f, newest first + // (started_at DESC), with offset+limit. With an all-zero filter it + // returns the most recent runs across EVERY audited surface (agents + + // skills) — the cross-surface feed behind the admin agent-trace debug + // view and the Claude debug API. dry_run rows are excluded unless + // f.IncludeDryRun or f.Status=="dry_run". limit is clamped (<=0 or + // >500 → 50) to bound admin scans. + ListRunsFiltered(ctx context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) + + // CountRunsFiltered returns the total rows matching f (ignoring + // offset/limit), for pagination math. + CountRunsFiltered(ctx context.Context, f RunFilter) (int64, error) + + // PurgeOlderThan deletes runs (and their logs) whose StartedAt is + // strictly before t. Returns the number of runs deleted. + PurgeOlderThan(ctx context.Context, t time.Time) (int64, error) + + // ListChildrenByParent returns all SkillRun rows where + // parent_run_id == parentRunID, oldest first. Used for the + // call-tree view (skill_invoke trace section) and as a building + // block for WalkParentChain. + // + // Returns an empty slice when parentRunID has no children. An + // empty parentRunID never matches anything (no row stores "" + // as a parent — that's the top-level sentinel). + ListChildrenByParent(ctx context.Context, parentRunID string) ([]SkillRun, error) + + // WalkParentChain walks from runID up via parent_run_id, returning + // the chain of SkillRun summaries (oldest = root first, newest = + // runID last). Used by the loop guard in skill_invoke. + // + // Cap walk depth at 32 to prevent pathological loops in the data + // itself: if the parent_run_id chain has been corrupted (e.g. by + // a bad migration) and forms a cycle, we want a bounded result + // rather than an infinite loop. + WalkParentChain(ctx context.Context, runID string) ([]SkillRun, error) + + // ListFinishedRunsBefore returns runs whose FinishedAt is strictly + // before cutoff, oldest first, capped at limit. limit <= 0 yields + // no rows (the caller is expected to specify a real bound). + // + // Why: skills.StorageSweeper drives the run-scope storage purge from + // this query. The sweeper picks up only finished runs so an + // in-flight run's run-scope KV/files cannot be deleted out from + // under it. + // + // Test: storage_test.go covers the include/exclude boundaries + // (running rows excluded; finished-after-cutoff excluded; finished- + // before-cutoff included). + ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]SkillRun, error) + + // LastRunBySkills returns the most recent StartedAt timestamp per + // skill in the input ID list. Skills with no rows simply have no + // entry in the result map (caller distinguishes "never run" from + // "run but no timestamp" by map key presence). + // + // When includeFailed is true, all non-dry-run statuses count + // (ok / error / timeout / budget_exceeded / preempted / lane_busy). + // When false, only status="ok" rows count — useful for "last + // successful run" semantics on dashboards where errored runs + // shouldn't surface as recent activity. + // + // Empty skillIDs short-circuits to an empty map without touching + // the DB. + LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) +} + +// MaxParentChainDepth is the safety cap for WalkParentChain. The loop +// guard in skill_invoke enforces a separate (smaller) MaxInvokeDepth +// at the tool layer; this cap exists only to bound the walk in the +// presence of corrupted data. +const MaxParentChainDepth = 32 diff --git a/audit/writer.go b/audit/writer.go new file mode 100644 index 0000000..801c89c --- /dev/null +++ b/audit/writer.go @@ -0,0 +1,322 @@ +package audit + +import ( + "context" + "fmt" + "log/slog" + "strings" + "sync" + "sync/atomic" + "time" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// stepTextMax caps the per-step assistant-text preview persisted on a +// "step" event. Large enough to capture the model's reasoning around a +// (mis)fired tool call — the single best clue to WHY a model emitted a +// malformed call — but bounded so the longtext payload can't balloon. +const stepTextMax = 2000 + +// Writer wraps a Storage with the OnStep / OnTool callbacks suitable for +// wiring into the majordomo agent loop's step observer, tracking sequence +// numbers and tool-call counts internally. +// +// Why: the agent loop's observer hooks are unaware of run identity; the +// writer captures the runID + skill metadata at construction so the +// per-event callbacks stay simple. AppendLog failures are logged but +// never fatal — audit must not break user-visible execution. +// +// What: NewWriter(storage, runID) → use OnStep / OnTool / Close. Close +// records the final FinishRun. The executors translate each agent.Step +// into one OnStep call (1-indexed iteration, the step's *llm.Response) +// plus one OnTool call per executed tool. +// +// Test: see writer_test.go for sequence ordering and finish semantics. +type Writer struct { + storage Storage + runID string + sequence atomic.Int32 + calls atomic.Int32 + mu sync.Mutex // guards Close idempotency + token tally + closed bool + + // V5 token accumulator — summed across each OnStep's resp.Usage. + // Reads come from TokenStats() so the executor can pass them to + // FinishRun. atomics-on-Int64 would also work, but mu already + // guards Close + we need consistent multi-field reads anyway + // (input + output + thinking). The mutex hot-path overhead is + // negligible vs the LLM call latency that dominates step time. + inputTokens int64 + outputTokens int64 + thinkingTokens int64 + + // Per-step wall-clock + run-level model attribution (guarded by mu). + // startedAt anchors the first step's duration; lastStepAt is the + // previous step's observation time; resolvedModelLogged ensures the + // one-shot "resolved_model" run-level event fires at most once. + startedAt time.Time + lastStepAt time.Time + resolvedModelLogged bool +} + +// NewWriter constructs a Writer. The caller is expected to have already +// called Storage.StartRun. +func NewWriter(storage Storage, runID string) *Writer { + return &Writer{storage: storage, runID: runID, startedAt: time.Now()} +} + +// OnStep records one agent-loop step: a "step" event with the iteration +// number and the response's text size. +// +// V5: also tallies per-step token usage. majordomo populates +// resp.Usage when the provider reports it; for providers that don't, +// the fields stay 0 and the tally stays at 0 — the formatter then +// renders "—" rather than a misleading "$0.00". +// +// Why we tally here vs in the agent loop: the loop's Result.Usage is a +// run total; the audit row needs the same numbers, but the writer also +// serves the live RunState accessor mid-run, so a per-step running sum +// is the right shape. Global usage attribution is handled by the llms +// package's instrumented models — the writer tally is strictly the +// per-run audit roll-up. +func (w *Writer) OnStep(iter int, resp *llm.Response) { + if w == nil || w.storage == nil { + return + } + now := time.Now() + payload := map[string]any{"iter": iter} + + w.mu.Lock() + // Per-step wall-clock: time since the previous observed step, or since + // run start for the first step. A long gap localises a slow/hung model + // call — the signal that was missing when an animate step-0 call hung + // ~5 min. NOTE: this is step-to-step wall time (model call + the prior + // step's tool execution), not pure model latency. + prev := w.lastStepAt + if prev.IsZero() { + prev = w.startedAt + } + if !prev.IsZero() { + payload["step_ms"] = now.Sub(prev).Milliseconds() + } + w.lastStepAt = now + if resp != nil { + w.inputTokens += int64(resp.Usage.InputTokens) + w.outputTokens += int64(resp.Usage.OutputTokens) + // Thinking/reasoning tokens are a first-class Usage field in + // majordomo (populated by the providers that report them). + w.thinkingTokens += int64(resp.Usage.ReasoningTokens) + } + // One-shot run-level served-model attribution: the FIRST step with a + // resolved model name emits a "resolved_model" event so a run that + // errors before producing a useful step still records which model + // served it. resp.Model is failover-aware ("provider/model-id" of the + // element that actually served), unlike the static configured head. + logResolvedModel := "" + if resp != nil && resp.Model != "" && !w.resolvedModelLogged { + w.resolvedModelLogged = true + logResolvedModel = resp.Model + } + w.mu.Unlock() + + if resp != nil { + payload["text_len"] = len(resp.Text()) + // Served model + why generation stopped — the two scalars that turn + // a "model misbehaved" guess into a fact. finish_reason on an + // empty-tool-call step disambiguates truncation (length) from a + // deliberate empty emission (tool_calls). + if resp.Model != "" { + payload["model"] = resp.Model + } + if resp.FinishReason != "" { + payload["finish_reason"] = string(resp.FinishReason) + } + // Per-step token breakdown (OnStep already reads these into the run + // total above; persisting the per-step slice costs nothing more). + payload["in_tokens"] = resp.Usage.InputTokens + payload["out_tokens"] = resp.Usage.OutputTokens + if resp.Usage.ReasoningTokens > 0 { + payload["thinking_tokens"] = resp.Usage.ReasoningTokens + } + if resp.Usage.CacheReadTokens > 0 { + payload["cache_read_tokens"] = resp.Usage.CacheReadTokens + } + // The model's own narration accompanying this step — the smoking gun + // for WHY a malformed tool call was emitted. Capped; suppressed when + // the step fired a secret-bearing tool (mcp_call/email_send/http_*) + // whose narration could echo the secret it's about to send. + if t := strings.TrimSpace(resp.Text()); t != "" { + if stepHasSecretTool(resp) { + payload["text_redacted"] = true + } else { + payload["text"] = truncate(t, stepTextMax) + } + } + } else { + payload["text_len"] = 0 + } + + w.appendLog("step", payload) + + if logResolvedModel != "" { + w.appendLog("resolved_model", map[string]any{"model": logResolvedModel}) + } +} + +// stepHasSecretTool reports whether a step's response fired a tool whose +// surrounding narration could leak a secret (MCP args, email body/ +// recipients, raw HTTP request). Mirrors the steps.go redaction list so +// the audit trace never persists secret-adjacent assistant text. +func stepHasSecretTool(resp *llm.Response) bool { + if resp == nil { + return false + } + for _, c := range resp.ToolCalls { + switch c.Name { + case "mcp_call", "email_send": + return true + } + if strings.HasPrefix(c.Name, "http_") { + return true + } + } + return false +} + +// TokenStats returns the running totals tallied from OnStep. +// Safe to call concurrently. Returned values are a snapshot at call +// time. Used by the executors to populate RunStats before Close +// finalises the audit row. +// +// Why: the executor needs the totals AND a model name to compute cost, +// but cost calculation is a different concern from audit persistence. +// Exposing this getter lets the cost calculation live in the executor +// where the model is known. +func (w *Writer) TokenStats() (input, output, thinking int64) { + if w == nil { + return 0, 0, 0 + } + w.mu.Lock() + defer w.mu.Unlock() + return w.inputTokens, w.outputTokens, w.thinkingTokens +} + +// OnTool records a "tool_call" event with the tool name and a +// "tool_result" event with the result length. Tool count is incremented +// for each call. The executors call this once per executed tool call +// from their step observers (call + matching result content). +func (w *Writer) OnTool(call llm.ToolCall, result string) { + if w == nil || w.storage == nil { + return + } + w.calls.Add(1) + w.appendLog("tool_call", map[string]any{ + "name": call.Name, + "args": string(call.Arguments), + "id": call.ID, + }) + w.appendLog("tool_result", map[string]any{ + "name": call.Name, + "id": call.ID, + "result": truncate(result, 4000), + "truncated": len(result) > 4000, + }) +} + +// LogEvent records a custom event mid-run. The executor uses this for +// diagnostic events (e.g. "compaction_setup" / "compaction_fired") +// outside the canonical step / tool_call / tool_result / error set. +// Nil-safe: no-op when receiver or storage is nil. +// +// Why: skill_run_logs is the only sink Steve can read from SQL, so +// diagnostics intended for post-hoc debugging belong here. slog goes +// to mort.log which is harder to reach from outside the host. +func (w *Writer) LogEvent(eventType string, payload map[string]any) { + if w == nil || w.storage == nil { + return + } + w.appendLog(eventType, payload) +} + +// LogError records an "error" event mid-run. Distinct from the terminal +// status set by Close. +func (w *Writer) LogError(msg string) { + if w == nil || w.storage == nil { + return + } + w.appendLog("error", map[string]any{"message": msg}) +} + +// Close finishes the run. The caller assembles a RunStats; the writer +// fills in ToolCalls (which is bookkept on the writer itself) and +// hands the full record to FinishRun. +// +// Idempotent: subsequent calls are no-ops. +// +// Why a struct vs the old positional form: v5 adds four token + cost +// fields on top of the legacy six. The struct keeps call sites readable +// and lets future fields slot in without churning every caller. +// +// Why context.WithoutCancel: the run's terminal status MUST land in +// the audit row regardless of the run ctx's state. Pre-fix, child +// skill runs invoked via skill_invoke / skill_invoke_parallel inherited +// the parent agent's runCtx as their outer ctx; when the parent +// timed out at MaxRuntime, every in-flight child's FinishRun fired +// with that already-cancelled ctx and the row was left in +// status=running forever. Detaching here is defence in depth — the +// caller (skillexec.runInner / agentexec.runInner) ALSO detaches at +// the call site, but a cancelled ctx in the writer's hands MUST NOT +// drop the audit write. The short timeout (auditFinishTimeout) bounds +// the write so a hung DB doesn't pin the run goroutine indefinitely. +func (w *Writer) Close(ctx context.Context, stats RunStats) { + if w == nil || w.storage == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + if w.closed { + return + } + w.closed = true + stats.ToolCalls = int(w.calls.Load()) + // Detach from the caller's deadline + cancellation. Run cleanup + // must complete even when the run ctx is dead. The fresh + // auditFinishTimeout caps how long we'll wait on the storage. + finishCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), auditFinishTimeout) + defer cancel() + if err := w.storage.FinishRun(finishCtx, w.runID, stats); err != nil { + slog.Warn("skillaudit: FinishRun failed", "run_id", w.runID, "error", err) + } +} + +// auditFinishTimeout caps how long Close will wait on the storage's +// FinishRun call after detaching from the caller's ctx. 10s is generous +// for a single-row UPDATE against MySQL — anything longer suggests a +// hung connection that the run goroutine shouldn't keep waiting on. +const auditFinishTimeout = 10 * time.Second + +// ToolCallsCount returns how many tool invocations OnTool has seen so +// far. Useful for budget enforcement. +func (w *Writer) ToolCallsCount() int { return int(w.calls.Load()) } + +func (w *Writer) appendLog(eventType string, payload map[string]any) { + seq := int(w.sequence.Add(1)) + log := SkillRunLog{ + RunID: w.runID, + Sequence: seq, + EventType: eventType, + Payload: payload, + CreatedAt: time.Now(), + } + if err := w.storage.AppendLog(context.Background(), log); err != nil { + slog.Warn("skillaudit: AppendLog failed", "run_id", w.runID, "seq", seq, "type", eventType, "error", err) + } +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + fmt.Sprintf("…[+%d bytes]", len(s)-max) +}