Files
executus/audit/storage.go
T
steve 4d2f85d139 P4: audit battery — run.Audit Sink + Writer + queryable Memory store
First Tier-2 battery, plugging into run.Ports.Audit:
- storage.go/writer.go: skillaudit's Storage interface + per-run Writer moved
  clean (only utils->fmt); the Writer already matches run.RunRecorder's shape.
- sink.go: Sink adapts a Storage to run.Audit (StartRun -> a run row + a Writer
  wrapped as run.RunRecorder, converting run.RunStats on Close). NewSink(nil) is
  equivalent to no audit. Compile-time proofs: Sink is run.Audit, recorder is
  run.RunRecorder.
- memory.go: NewMemory() — a zero-dependency, queryable in-process Storage
  (retains runs + logs; all 17 read/filter/purge/walk methods) so a light host
  gets run history with no setup. Mort keeps its GORM Storage; contrib/store
  adds durable SQLite at P4.

End-to-end test: wire audit.NewSink(audit.NewMemory()) into the executor, run an
agent, and the run is recorded with terminal status/output and queryable by
caller. CI invariant verified: core imports ZERO from the audit battery (proper
battery direction; battery imports core, never the reverse).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 00:12:19 -04:00

246 lines
10 KiB
Go

// Package skillaudit persists skill execution traces: per-run summary rows
// (skill_runs) and per-step event logs (skill_run_logs). The executor in
// pkg/logic/skillexec emits events through a Writer; the storage layer is
// kept separate so tests can mock it and so retention pruning has a clear
// home.
//
// Why: agentic runs can be long, multi-tool affairs. Without a structured
// audit trail, debugging "why did the LLM do that?" is impossible. The
// log table is keyed by (run_id, sequence) so insert order is preserved.
package audit
import (
"context"
"errors"
"time"
)
// ErrNotFound is returned when a run lookup fails.
var ErrNotFound = errors.New("skill run not found")
// SkillRun is the per-invocation summary row. One per call to
// Executor.Run. Status transitions through running → ok / error /
// timeout / budget_exceeded / dry_run.
type SkillRun struct {
ID string
SkillID string
CallerID string
ChannelID string
Inputs map[string]any
StartedAt time.Time
FinishedAt *time.Time
Status string // running|ok|error|timeout|budget_exceeded|dry_run
Output string
Error string
ToolCallsCount int
RuntimeSeconds float64
// ParentRunID is the run_id of the parent skill that invoked this
// run via skill_invoke. Empty for top-level invocations. Indexed
// in the gorm model so call-tree queries (ListChildrenByParent +
// WalkParentChain) are cheap.
ParentRunID string
// Token roll-ups, summed across all model completions in this run
// (one Usage per OnStep). All default to 0 when the provider did
// not expose token usage.
TotalInputTokens int64
TotalOutputTokens int64
TotalThinkingTokens int64
}
// RunStats captures the terminal state of a run for FinishRun. Bundling
// these into one struct (vs a long positional argument list) keeps
// callers readable; future fields slot in here without touching every
// call site.
//
// Why: FinishRun originally took six positional args; adding token
// columns would push it higher. A struct is the idiomatic Go way to
// avoid the positional-arg explosion.
type RunStats struct {
Status string // ok|error|timeout|budget_exceeded|dry_run
Output string // final agent output (empty on error)
Error string // error message (empty on success)
ToolCalls int // total OnTool count
RuntimeSeconds float64 // wall-clock duration
// Token roll-ups (all default to 0 when token usage was not
// exposed by the provider).
InputTokens int64
OutputTokens int64
ThinkingTokens int64
}
// SkillRunLog is one event recorded during a run. EventType ∈
// step|tool_call|tool_result|error. Payload is opaque JSON the writer
// emits.
type SkillRunLog struct {
RunID string
Sequence int
EventType string
Payload map[string]any
CreatedAt time.Time
}
// RunFilter is the predicate bundle for the cross-surface "recent runs"
// query (ListRunsFiltered / CountRunsFiltered). Every field is optional;
// the zero value matches the most recent runs across ALL audited surfaces
// (agents + skills). This powers the admin agent-trace debug view and the
// Claude debug API's /runs list.
//
// Why a struct (vs positional args): the debug list filters along several
// independent axes and more will be added; bundling avoids a positional
// explosion and keeps call sites readable.
type RunFilter struct {
Status string // exact status match; "" = all (dry_run excluded unless IncludeDryRun)
SkillID string // exact skill_id (holds the agent UUID for agent runs)
CallerID string // exact caller (Discord member id)
ChannelID string // exact channel id
// TopLevelOnly restricts to root runs (parent_run_id = ''), hiding
// nested sub-agent / sub-skill runs from the firehose. The debug list
// defaults this on; an "include nested" toggle clears it.
TopLevelOnly bool
// IncludeDryRun surfaces status="dry_run" sandbox rows, which are
// excluded by default. Ignored when Status is set explicitly (an
// explicit Status=="dry_run" still matches).
IncludeDryRun bool
// Since / Until bound started_at: started_at >= Since (zero = no lower
// bound) and started_at < Until (zero = no upper bound).
Since time.Time
Until time.Time
}
// Storage is the persistence interface for skill runs and per-step logs.
//
// Why: tests substitute fake implementations; production wires
// NewGormStorage. Keep the interface narrow — the system only needs CRUD
// plus the retention prune helper.
type Storage interface {
Initialize(ctx context.Context) error
// StartRun inserts the run with status=running. The caller MUST
// invoke FinishRun later (or the row stays in running indefinitely
// — operationally that signals a crash mid-run, which is useful
// signal).
StartRun(ctx context.Context, run SkillRun) error
// FinishRun updates the running row with terminal status, output
// and stats. Idempotent on second call (last write wins).
//
// V5: takes a RunStats struct so token + cost columns can be
// written alongside the legacy fields without changing the
// signature for every future addition.
FinishRun(ctx context.Context, runID string, stats RunStats) error
// AppendLog adds one event to the run's log. Sequence numbers must
// be unique per run; the writer is responsible for monotonic
// ordering.
AppendLog(ctx context.Context, log SkillRunLog) error
// GetRun returns the run summary, or ErrNotFound.
GetRun(ctx context.Context, runID string) (*SkillRun, error)
// ListLogsByRun returns all logs for a run in sequence order.
ListLogsByRun(ctx context.Context, runID string) ([]SkillRunLog, error)
// ListRunsBySkill returns recent runs for a skill, newest first,
// capped at limit. Excludes dry-run rows by default — use
// ListRunsBySkillPaginated with includeDryRun=true to see them.
ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error)
// ListRunsBySkillPaginated returns recent runs for a skill, newest
// first, with offset+limit. When includeDryRun is false, rows with
// status="dry_run" are excluded (matches the wizard's sandbox
// status; see skillaudit.Writer / wizardtools docs).
//
// Why a separate paginated method vs. expanding ListRunsBySkill:
// callers that need the legacy "last N" view (Discord .skill runs,
// chatbot tool result) want the simpler signature; the paginated
// view is webui-specific.
ListRunsBySkillPaginated(ctx context.Context, skillID string,
offset, limit int, includeDryRun bool) ([]SkillRun, error)
// CountRunsBySkill returns the total number of runs for a skill.
// When includeDryRun is false, dry-run rows are excluded so the
// count matches the default ListRunsBySkillPaginated result.
CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error)
// ListRunsByCaller returns recent runs by a caller, newest first,
// capped at limit.
ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]SkillRun, error)
// ListRunsFiltered returns runs matching f, newest first
// (started_at DESC), with offset+limit. With an all-zero filter it
// returns the most recent runs across EVERY audited surface (agents +
// skills) — the cross-surface feed behind the admin agent-trace debug
// view and the Claude debug API. dry_run rows are excluded unless
// f.IncludeDryRun or f.Status=="dry_run". limit is clamped (<=0 or
// >500 → 50) to bound admin scans.
ListRunsFiltered(ctx context.Context, f RunFilter, offset, limit int) ([]SkillRun, error)
// CountRunsFiltered returns the total rows matching f (ignoring
// offset/limit), for pagination math.
CountRunsFiltered(ctx context.Context, f RunFilter) (int64, error)
// PurgeOlderThan deletes runs (and their logs) whose StartedAt is
// strictly before t. Returns the number of runs deleted.
PurgeOlderThan(ctx context.Context, t time.Time) (int64, error)
// ListChildrenByParent returns all SkillRun rows where
// parent_run_id == parentRunID, oldest first. Used for the
// call-tree view (skill_invoke trace section) and as a building
// block for WalkParentChain.
//
// Returns an empty slice when parentRunID has no children. An
// empty parentRunID never matches anything (no row stores ""
// as a parent — that's the top-level sentinel).
ListChildrenByParent(ctx context.Context, parentRunID string) ([]SkillRun, error)
// WalkParentChain walks from runID up via parent_run_id, returning
// the chain of SkillRun summaries (oldest = root first, newest =
// runID last). Used by the loop guard in skill_invoke.
//
// Cap walk depth at 32 to prevent pathological loops in the data
// itself: if the parent_run_id chain has been corrupted (e.g. by
// a bad migration) and forms a cycle, we want a bounded result
// rather than an infinite loop.
WalkParentChain(ctx context.Context, runID string) ([]SkillRun, error)
// ListFinishedRunsBefore returns runs whose FinishedAt is strictly
// before cutoff, oldest first, capped at limit. limit <= 0 yields
// no rows (the caller is expected to specify a real bound).
//
// Why: skills.StorageSweeper drives the run-scope storage purge from
// this query. The sweeper picks up only finished runs so an
// in-flight run's run-scope KV/files cannot be deleted out from
// under it.
//
// Test: storage_test.go covers the include/exclude boundaries
// (running rows excluded; finished-after-cutoff excluded; finished-
// before-cutoff included).
ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]SkillRun, error)
// LastRunBySkills returns the most recent StartedAt timestamp per
// skill in the input ID list. Skills with no rows simply have no
// entry in the result map (caller distinguishes "never run" from
// "run but no timestamp" by map key presence).
//
// When includeFailed is true, all non-dry-run statuses count
// (ok / error / timeout / budget_exceeded / preempted / lane_busy).
// When false, only status="ok" rows count — useful for "last
// successful run" semantics on dashboards where errored runs
// shouldn't surface as recent activity.
//
// Empty skillIDs short-circuits to an empty map without touching
// the DB.
LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error)
}
// MaxParentChainDepth is the safety cap for WalkParentChain. The loop
// guard in skill_invoke enforces a separate (smaller) MaxInvokeDepth
// at the tool layer; this cap exists only to bound the walk in the
// presence of corrupted data.
const MaxParentChainDepth = 32