P4: contrib/store — audit SQLite store (run history complete)
executus CI / test (pull_request) Successful in 1m41s

db.Audit() satisfies audit.Storage (all 17 methods) over SQLite: one indexed
row per run (+ a JSON inputs blob), one row per log event. Filter/list/walk
queries are indexed on the columns they filter (skill_id, caller_id,
parent_run_id, started_at); WalkParentChain follows parent_run_id with a
seen-set guard; LastRunBySkills is a grouped MAX.

Test covers run start/finish round-trip (inputs map + token roll-up), log
append + ordered read, parent/child + ancestor-chain walks, caller listing,
TopLevelOnly filter, and the last-run-per-skill map.

contrib/store now backs ALL four store seams — budget + persona + skill + audit
— so a host gets turnkey durable persistence (run history, budgets, agents,
skills) with zero store code. Core go.sum still has 0 sqlite refs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 22:50:21 -04:00
parent 954efde474
commit b194a9621d
3 changed files with 428 additions and 4 deletions
+5 -4
View File
@@ -75,10 +75,11 @@ BATTERIES (opt-in siblings, each nil-safe + a default):
budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓] budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓]
BudgetStorage iface + Memory default BudgetStorage iface + Memory default
contrib/store/ SECOND module (+ modernc.org/sqlite): [P4 ~] contrib/store/ SECOND module (+ modernc.org/sqlite): [P4 ]
pure-Go SQLite impls of the *Store seams. budget + pure-Go SQLite impls of ALL store seams: budget +
persona + skill ✓ (JSON-blob+indexed cols, round-trip persona + skill + audit ✓ (JSON-blob+indexed cols,
tested); audit pending. round-trip tested). CI proves the driver lands HERE,
not in the core go.sum.
CI proves the driver lands HERE, not in the core go.sum. CI proves the driver lands HERE, not in the core go.sum.
``` ```
+356
View File
@@ -0,0 +1,356 @@
package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/audit"
)
// auditStore is the SQLite-backed audit.Storage: one row per run (+ a JSON
// `inputs` blob), one row per log event. The run-list/filter/walk queries are
// indexed on the columns they filter; the log payload is a JSON blob.
type auditStore struct{ db *sql.DB }
// Audit returns a durable audit.Storage backed by this database.
func (d *DB) Audit() audit.Storage { return &auditStore{db: d.sql} }
var _ audit.Storage = (*auditStore)(nil)
func (s *auditStore) Initialize(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS skill_runs (
id TEXT PRIMARY KEY,
skill_id TEXT NOT NULL DEFAULT '',
caller_id TEXT NOT NULL DEFAULT '',
channel_id TEXT NOT NULL DEFAULT '',
parent_run_id TEXT NOT NULL DEFAULT '',
inputs TEXT NOT NULL DEFAULT '{}',
started_at INTEGER NOT NULL DEFAULT 0,
finished_at INTEGER NOT NULL DEFAULT 0, -- 0 = still running
status TEXT NOT NULL DEFAULT 'running',
output TEXT NOT NULL DEFAULT '',
error TEXT NOT NULL DEFAULT '',
tool_calls INTEGER NOT NULL DEFAULT 0,
runtime_seconds REAL NOT NULL DEFAULT 0,
total_input_tokens INTEGER NOT NULL DEFAULT 0,
total_output_tokens INTEGER NOT NULL DEFAULT 0,
total_thinking_tokens INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_runs_skill ON skill_runs(skill_id, started_at);
CREATE INDEX IF NOT EXISTS idx_runs_caller ON skill_runs(caller_id, started_at);
CREATE INDEX IF NOT EXISTS idx_runs_parent ON skill_runs(parent_run_id);
CREATE INDEX IF NOT EXISTS idx_runs_started ON skill_runs(started_at);
CREATE TABLE IF NOT EXISTS skill_run_logs (
run_id TEXT NOT NULL,
seq INTEGER NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL DEFAULT '{}',
created_at INTEGER NOT NULL,
PRIMARY KEY (run_id, seq)
);`)
if err != nil {
return fmt.Errorf("auditStore.Initialize: %w", err)
}
return nil
}
func unixOrZero(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.Unix()
}
func (s *auditStore) StartRun(ctx context.Context, r audit.SkillRun) error {
inputs, _ := json.Marshal(r.Inputs)
var fin int64
if r.FinishedAt != nil {
fin = unixOrZero(*r.FinishedAt)
}
status := r.Status
if status == "" {
status = "running"
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO skill_runs (id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at,
status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
skill_id=excluded.skill_id, caller_id=excluded.caller_id, channel_id=excluded.channel_id,
parent_run_id=excluded.parent_run_id, inputs=excluded.inputs, started_at=excluded.started_at`,
r.ID, r.SkillID, r.CallerID, r.ChannelID, r.ParentRunID, string(inputs), unixOrZero(r.StartedAt), fin,
status, r.Output, r.Error, r.ToolCallsCount, r.RuntimeSeconds,
r.TotalInputTokens, r.TotalOutputTokens, r.TotalThinkingTokens)
if err != nil {
return fmt.Errorf("auditStore.StartRun: %w", err)
}
return nil
}
func (s *auditStore) FinishRun(ctx context.Context, runID string, st audit.RunStats) error {
res, err := s.db.ExecContext(ctx, `
UPDATE skill_runs SET finished_at=?, status=?, output=?, error=?, tool_calls=?, runtime_seconds=?,
total_input_tokens=?, total_output_tokens=?, total_thinking_tokens=? WHERE id=?`,
time.Now().Unix(), st.Status, st.Output, st.Error, st.ToolCalls, st.RuntimeSeconds,
st.InputTokens, st.OutputTokens, st.ThinkingTokens, runID)
if err != nil {
return fmt.Errorf("auditStore.FinishRun: %w", err)
}
if n, _ := res.RowsAffected(); n == 0 {
return audit.ErrNotFound
}
return nil
}
func (s *auditStore) AppendLog(ctx context.Context, l audit.SkillRunLog) error {
payload, _ := json.Marshal(l.Payload)
created := unixOrZero(l.CreatedAt)
if created == 0 {
created = time.Now().Unix()
}
_, err := s.db.ExecContext(ctx,
`INSERT OR REPLACE INTO skill_run_logs (run_id, seq, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)`,
l.RunID, l.Sequence, l.EventType, string(payload), created)
if err != nil {
return fmt.Errorf("auditStore.AppendLog: %w", err)
}
return nil
}
// runCols is the SELECT column list matching scanRun.
const runCols = `id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at,
status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens`
func scanRun(sc interface{ Scan(...any) error }) (*audit.SkillRun, error) {
var r audit.SkillRun
var inputs string
var started, finished int64
if err := sc.Scan(&r.ID, &r.SkillID, &r.CallerID, &r.ChannelID, &r.ParentRunID, &inputs,
&started, &finished, &r.Status, &r.Output, &r.Error, &r.ToolCallsCount, &r.RuntimeSeconds,
&r.TotalInputTokens, &r.TotalOutputTokens, &r.TotalThinkingTokens); err != nil {
return nil, err
}
_ = json.Unmarshal([]byte(inputs), &r.Inputs)
r.StartedAt = time.Unix(started, 0).UTC()
if finished > 0 {
t := time.Unix(finished, 0).UTC()
r.FinishedAt = &t
}
return &r, nil
}
func (s *auditStore) GetRun(ctx context.Context, runID string) (*audit.SkillRun, error) {
row := s.db.QueryRowContext(ctx, `SELECT `+runCols+` FROM skill_runs WHERE id = ?`, runID)
r, err := scanRun(row)
if errors.Is(err, sql.ErrNoRows) {
return nil, audit.ErrNotFound
}
return r, err
}
func (s *auditStore) queryRuns(ctx context.Context, tail string, args ...any) ([]audit.SkillRun, error) {
rows, err := s.db.QueryContext(ctx, `SELECT `+runCols+` FROM skill_runs `+tail, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []audit.SkillRun
for rows.Next() {
r, err := scanRun(rows)
if err != nil {
return nil, err
}
out = append(out, *r)
}
return out, rows.Err()
}
func (s *auditStore) ListLogsByRun(ctx context.Context, runID string) ([]audit.SkillRunLog, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT run_id, seq, event_type, payload, created_at FROM skill_run_logs WHERE run_id = ? ORDER BY seq`, runID)
if err != nil {
return nil, fmt.Errorf("auditStore.ListLogsByRun: %w", err)
}
defer rows.Close()
var out []audit.SkillRunLog
for rows.Next() {
var l audit.SkillRunLog
var payload string
var created int64
if err := rows.Scan(&l.RunID, &l.Sequence, &l.EventType, &payload, &created); err != nil {
return nil, err
}
_ = json.Unmarshal([]byte(payload), &l.Payload)
l.CreatedAt = time.Unix(created, 0).UTC()
out = append(out, l)
}
return out, rows.Err()
}
func (s *auditStore) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]audit.SkillRun, error) {
return s.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false)
}
func (s *auditStore) ListRunsBySkillPaginated(ctx context.Context, skillID string, offset, limit int, includeDryRun bool) ([]audit.SkillRun, error) {
w := `WHERE skill_id = ?`
args := []any{skillID}
if !includeDryRun {
w += ` AND status != 'dry_run'`
}
return s.queryRuns(ctx, w+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...)
}
func (s *auditStore) CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error) {
q := `SELECT COUNT(*) FROM skill_runs WHERE skill_id = ?`
if !includeDryRun {
q += ` AND status != 'dry_run'`
}
var n int64
err := s.db.QueryRowContext(ctx, q, skillID).Scan(&n)
return n, err
}
func (s *auditStore) ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]audit.SkillRun, error) {
return s.queryRuns(ctx, `WHERE caller_id = ? AND status != 'dry_run' ORDER BY started_at DESC `+limitOffset(limit, 0), callerID)
}
func (s *auditStore) buildFilter(f audit.RunFilter) (string, []any) {
var conds []string
var args []any
if !f.IncludeDryRun {
conds = append(conds, `status != 'dry_run'`)
}
if f.Status != "" {
conds = append(conds, `status = ?`)
args = append(args, f.Status)
}
if f.SkillID != "" {
conds = append(conds, `skill_id = ?`)
args = append(args, f.SkillID)
}
if f.CallerID != "" {
conds = append(conds, `caller_id = ?`)
args = append(args, f.CallerID)
}
if f.ChannelID != "" {
conds = append(conds, `channel_id = ?`)
args = append(args, f.ChannelID)
}
if f.TopLevelOnly {
conds = append(conds, `parent_run_id = ''`)
}
if !f.Since.IsZero() {
conds = append(conds, `started_at >= ?`)
args = append(args, f.Since.Unix())
}
if !f.Until.IsZero() {
conds = append(conds, `started_at <= ?`)
args = append(args, f.Until.Unix())
}
where := ""
if len(conds) > 0 {
where = `WHERE ` + strings.Join(conds, " AND ")
}
return where, args
}
func (s *auditStore) ListRunsFiltered(ctx context.Context, f audit.RunFilter, offset, limit int) ([]audit.SkillRun, error) {
where, args := s.buildFilter(f)
return s.queryRuns(ctx, where+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...)
}
func (s *auditStore) CountRunsFiltered(ctx context.Context, f audit.RunFilter) (int64, error) {
where, args := s.buildFilter(f)
var n int64
err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM skill_runs `+where, args...).Scan(&n)
return n, err
}
func (s *auditStore) PurgeOlderThan(ctx context.Context, t time.Time) (int64, error) {
res, err := s.db.ExecContext(ctx, `DELETE FROM skill_runs WHERE finished_at > 0 AND finished_at < ?`, t.Unix())
if err != nil {
return 0, fmt.Errorf("auditStore.PurgeOlderThan: %w", err)
}
n, _ := res.RowsAffected()
// Best-effort orphan-log cleanup.
_, _ = s.db.ExecContext(ctx, `DELETE FROM skill_run_logs WHERE run_id NOT IN (SELECT id FROM skill_runs)`)
return n, nil
}
func (s *auditStore) ListChildrenByParent(ctx context.Context, parentRunID string) ([]audit.SkillRun, error) {
return s.queryRuns(ctx, `WHERE parent_run_id = ? ORDER BY started_at DESC`, parentRunID)
}
func (s *auditStore) WalkParentChain(ctx context.Context, runID string) ([]audit.SkillRun, error) {
var chain []audit.SkillRun
seen := map[string]bool{}
for id := runID; id != ""; {
if seen[id] {
break
}
seen[id] = true
r, err := s.GetRun(ctx, id)
if errors.Is(err, audit.ErrNotFound) {
break
}
if err != nil {
return nil, err
}
chain = append(chain, *r)
id = r.ParentRunID
}
return chain, nil
}
func (s *auditStore) ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]audit.SkillRun, error) {
return s.queryRuns(ctx,
`WHERE finished_at > 0 AND finished_at < ? ORDER BY started_at DESC `+limitOffset(limit, 0), cutoff.Unix())
}
func (s *auditStore) LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) {
out := map[string]time.Time{}
if len(skillIDs) == 0 {
return out, nil
}
q := `SELECT skill_id, MAX(started_at) FROM skill_runs WHERE skill_id IN (` +
strings.TrimSuffix(strings.Repeat("?,", len(skillIDs)), ",") + `)`
args := make([]any, 0, len(skillIDs))
for _, id := range skillIDs {
args = append(args, id)
}
if !includeFailed {
q += ` AND status NOT IN ('error','timeout')`
}
q += ` GROUP BY skill_id`
rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("auditStore.LastRunBySkills: %w", err)
}
defer rows.Close()
for rows.Next() {
var id string
var ts int64
if err := rows.Scan(&id, &ts); err != nil {
return nil, err
}
out[id] = time.Unix(ts, 0).UTC()
}
return out, rows.Err()
}
// limitOffset renders an optional LIMIT/OFFSET clause (limit<=0 = no limit).
func limitOffset(limit, offset int) string {
if limit <= 0 {
return ""
}
if offset > 0 {
return fmt.Sprintf("LIMIT %d OFFSET %d", limit, offset)
}
return fmt.Sprintf("LIMIT %d", limit)
}
+67
View File
@@ -0,0 +1,67 @@
package store
import (
"context"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/audit"
)
func TestSQLiteAuditStore(t *testing.T) {
ctx := context.Background()
db, err := Open(":memory:")
if err != nil {
t.Fatal(err)
}
defer db.Close()
st := db.Audit()
if err := st.Initialize(ctx); err != nil {
t.Fatal(err)
}
now := time.Now().UTC()
// parent run
if err := st.StartRun(ctx, audit.SkillRun{ID: "r1", SkillID: "agent-x", CallerID: "c1",
Inputs: map[string]any{"q": "hi"}, StartedAt: now}); err != nil {
t.Fatal(err)
}
// child run
st.StartRun(ctx, audit.SkillRun{ID: "r2", SkillID: "skill-y", CallerID: "c1", ParentRunID: "r1", StartedAt: now.Add(time.Second)})
st.AppendLog(ctx, audit.SkillRunLog{RunID: "r1", Sequence: 1, EventType: "step", Payload: map[string]any{"i": 1}, CreatedAt: now})
if err := st.FinishRun(ctx, "r1", audit.RunStats{Status: "ok", Output: "done", ToolCalls: 2, InputTokens: 10, OutputTokens: 5}); err != nil {
t.Fatal(err)
}
got, err := st.GetRun(ctx, "r1")
if err != nil || got.Status != "ok" || got.Output != "done" || got.FinishedAt == nil ||
got.Inputs["q"] != "hi" || got.TotalInputTokens != 10 {
t.Fatalf("GetRun: %v %+v", err, got)
}
if logs, _ := st.ListLogsByRun(ctx, "r1"); len(logs) != 1 || logs[0].EventType != "step" {
t.Errorf("ListLogsByRun = %+v", logs)
}
if kids, _ := st.ListChildrenByParent(ctx, "r1"); len(kids) != 1 || kids[0].ID != "r2" {
t.Errorf("ListChildrenByParent = %+v", kids)
}
if chain, _ := st.WalkParentChain(ctx, "r2"); len(chain) != 2 || chain[1].ID != "r1" {
t.Errorf("WalkParentChain = %+v", chain)
}
if byCaller, _ := st.ListRunsByCaller(ctx, "c1", 10); len(byCaller) != 2 {
t.Errorf("ListRunsByCaller = %d, want 2", len(byCaller))
}
// filter: top-level only
tl, _ := st.ListRunsFiltered(ctx, audit.RunFilter{TopLevelOnly: true}, 0, 10)
if len(tl) != 1 || tl[0].ID != "r1" {
t.Errorf("TopLevelOnly filter = %+v", tl)
}
// last-run map
last, _ := st.LastRunBySkills(ctx, []string{"agent-x", "skill-y"}, true)
if _, ok := last["agent-x"]; !ok {
t.Errorf("LastRunBySkills missing agent-x: %+v", last)
}
if n, _ := st.CountRunsBySkill(ctx, "agent-x", false); n != 1 {
t.Errorf("CountRunsBySkill = %d, want 1", n)
}
}