diff --git a/CLAUDE.md b/CLAUDE.md index ce1de0d..e602bb4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -75,10 +75,11 @@ BATTERIES (opt-in siblings, each nil-safe + a default): budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓] BudgetStorage iface + Memory default -contrib/store/ SECOND module (+ modernc.org/sqlite): [P4 ~] - pure-Go SQLite impls of the *Store seams. budget + - persona + skill ✓ (JSON-blob+indexed cols, round-trip - tested); audit pending. +contrib/store/ SECOND module (+ modernc.org/sqlite): [P4 ✓] + pure-Go SQLite impls of ALL store seams: budget + + persona + skill + audit ✓ (JSON-blob+indexed cols, + round-trip tested). CI proves the driver lands HERE, + not in the core go.sum. CI proves the driver lands HERE, not in the core go.sum. ``` diff --git a/contrib/store/audit_store.go b/contrib/store/audit_store.go new file mode 100644 index 0000000..96ee04a --- /dev/null +++ b/contrib/store/audit_store.go @@ -0,0 +1,356 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/audit" +) + +// auditStore is the SQLite-backed audit.Storage: one row per run (+ a JSON +// `inputs` blob), one row per log event. The run-list/filter/walk queries are +// indexed on the columns they filter; the log payload is a JSON blob. +type auditStore struct{ db *sql.DB } + +// Audit returns a durable audit.Storage backed by this database. +func (d *DB) Audit() audit.Storage { return &auditStore{db: d.sql} } + +var _ audit.Storage = (*auditStore)(nil) + +func (s *auditStore) Initialize(ctx context.Context) error { + _, err := s.db.ExecContext(ctx, ` +CREATE TABLE IF NOT EXISTS skill_runs ( + id TEXT PRIMARY KEY, + skill_id TEXT NOT NULL DEFAULT '', + caller_id TEXT NOT NULL DEFAULT '', + channel_id TEXT NOT NULL DEFAULT '', + parent_run_id TEXT NOT NULL DEFAULT '', + inputs TEXT NOT NULL DEFAULT '{}', + started_at INTEGER NOT NULL DEFAULT 0, + finished_at INTEGER NOT NULL DEFAULT 0, -- 0 = still running + status TEXT NOT NULL DEFAULT 'running', + output TEXT NOT NULL DEFAULT '', + error TEXT NOT NULL DEFAULT '', + tool_calls INTEGER NOT NULL DEFAULT 0, + runtime_seconds REAL NOT NULL DEFAULT 0, + total_input_tokens INTEGER NOT NULL DEFAULT 0, + total_output_tokens INTEGER NOT NULL DEFAULT 0, + total_thinking_tokens INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_runs_skill ON skill_runs(skill_id, started_at); +CREATE INDEX IF NOT EXISTS idx_runs_caller ON skill_runs(caller_id, started_at); +CREATE INDEX IF NOT EXISTS idx_runs_parent ON skill_runs(parent_run_id); +CREATE INDEX IF NOT EXISTS idx_runs_started ON skill_runs(started_at); +CREATE TABLE IF NOT EXISTS skill_run_logs ( + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_type TEXT NOT NULL, + payload TEXT NOT NULL DEFAULT '{}', + created_at INTEGER NOT NULL, + PRIMARY KEY (run_id, seq) +);`) + if err != nil { + return fmt.Errorf("auditStore.Initialize: %w", err) + } + return nil +} + +func unixOrZero(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.Unix() +} + +func (s *auditStore) StartRun(ctx context.Context, r audit.SkillRun) error { + inputs, _ := json.Marshal(r.Inputs) + var fin int64 + if r.FinishedAt != nil { + fin = unixOrZero(*r.FinishedAt) + } + status := r.Status + if status == "" { + status = "running" + } + _, err := s.db.ExecContext(ctx, ` +INSERT INTO skill_runs (id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at, + status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO UPDATE SET + skill_id=excluded.skill_id, caller_id=excluded.caller_id, channel_id=excluded.channel_id, + parent_run_id=excluded.parent_run_id, inputs=excluded.inputs, started_at=excluded.started_at`, + r.ID, r.SkillID, r.CallerID, r.ChannelID, r.ParentRunID, string(inputs), unixOrZero(r.StartedAt), fin, + status, r.Output, r.Error, r.ToolCallsCount, r.RuntimeSeconds, + r.TotalInputTokens, r.TotalOutputTokens, r.TotalThinkingTokens) + if err != nil { + return fmt.Errorf("auditStore.StartRun: %w", err) + } + return nil +} + +func (s *auditStore) FinishRun(ctx context.Context, runID string, st audit.RunStats) error { + res, err := s.db.ExecContext(ctx, ` +UPDATE skill_runs SET finished_at=?, status=?, output=?, error=?, tool_calls=?, runtime_seconds=?, + total_input_tokens=?, total_output_tokens=?, total_thinking_tokens=? WHERE id=?`, + time.Now().Unix(), st.Status, st.Output, st.Error, st.ToolCalls, st.RuntimeSeconds, + st.InputTokens, st.OutputTokens, st.ThinkingTokens, runID) + if err != nil { + return fmt.Errorf("auditStore.FinishRun: %w", err) + } + if n, _ := res.RowsAffected(); n == 0 { + return audit.ErrNotFound + } + return nil +} + +func (s *auditStore) AppendLog(ctx context.Context, l audit.SkillRunLog) error { + payload, _ := json.Marshal(l.Payload) + created := unixOrZero(l.CreatedAt) + if created == 0 { + created = time.Now().Unix() + } + _, err := s.db.ExecContext(ctx, + `INSERT OR REPLACE INTO skill_run_logs (run_id, seq, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)`, + l.RunID, l.Sequence, l.EventType, string(payload), created) + if err != nil { + return fmt.Errorf("auditStore.AppendLog: %w", err) + } + return nil +} + +// runCols is the SELECT column list matching scanRun. +const runCols = `id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at, + status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens` + +func scanRun(sc interface{ Scan(...any) error }) (*audit.SkillRun, error) { + var r audit.SkillRun + var inputs string + var started, finished int64 + if err := sc.Scan(&r.ID, &r.SkillID, &r.CallerID, &r.ChannelID, &r.ParentRunID, &inputs, + &started, &finished, &r.Status, &r.Output, &r.Error, &r.ToolCallsCount, &r.RuntimeSeconds, + &r.TotalInputTokens, &r.TotalOutputTokens, &r.TotalThinkingTokens); err != nil { + return nil, err + } + _ = json.Unmarshal([]byte(inputs), &r.Inputs) + r.StartedAt = time.Unix(started, 0).UTC() + if finished > 0 { + t := time.Unix(finished, 0).UTC() + r.FinishedAt = &t + } + return &r, nil +} + +func (s *auditStore) GetRun(ctx context.Context, runID string) (*audit.SkillRun, error) { + row := s.db.QueryRowContext(ctx, `SELECT `+runCols+` FROM skill_runs WHERE id = ?`, runID) + r, err := scanRun(row) + if errors.Is(err, sql.ErrNoRows) { + return nil, audit.ErrNotFound + } + return r, err +} + +func (s *auditStore) queryRuns(ctx context.Context, tail string, args ...any) ([]audit.SkillRun, error) { + rows, err := s.db.QueryContext(ctx, `SELECT `+runCols+` FROM skill_runs `+tail, args...) + if err != nil { + return nil, err + } + defer rows.Close() + var out []audit.SkillRun + for rows.Next() { + r, err := scanRun(rows) + if err != nil { + return nil, err + } + out = append(out, *r) + } + return out, rows.Err() +} + +func (s *auditStore) ListLogsByRun(ctx context.Context, runID string) ([]audit.SkillRunLog, error) { + rows, err := s.db.QueryContext(ctx, + `SELECT run_id, seq, event_type, payload, created_at FROM skill_run_logs WHERE run_id = ? ORDER BY seq`, runID) + if err != nil { + return nil, fmt.Errorf("auditStore.ListLogsByRun: %w", err) + } + defer rows.Close() + var out []audit.SkillRunLog + for rows.Next() { + var l audit.SkillRunLog + var payload string + var created int64 + if err := rows.Scan(&l.RunID, &l.Sequence, &l.EventType, &payload, &created); err != nil { + return nil, err + } + _ = json.Unmarshal([]byte(payload), &l.Payload) + l.CreatedAt = time.Unix(created, 0).UTC() + out = append(out, l) + } + return out, rows.Err() +} + +func (s *auditStore) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]audit.SkillRun, error) { + return s.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false) +} + +func (s *auditStore) ListRunsBySkillPaginated(ctx context.Context, skillID string, offset, limit int, includeDryRun bool) ([]audit.SkillRun, error) { + w := `WHERE skill_id = ?` + args := []any{skillID} + if !includeDryRun { + w += ` AND status != 'dry_run'` + } + return s.queryRuns(ctx, w+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...) +} + +func (s *auditStore) CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error) { + q := `SELECT COUNT(*) FROM skill_runs WHERE skill_id = ?` + if !includeDryRun { + q += ` AND status != 'dry_run'` + } + var n int64 + err := s.db.QueryRowContext(ctx, q, skillID).Scan(&n) + return n, err +} + +func (s *auditStore) ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]audit.SkillRun, error) { + return s.queryRuns(ctx, `WHERE caller_id = ? AND status != 'dry_run' ORDER BY started_at DESC `+limitOffset(limit, 0), callerID) +} + +func (s *auditStore) buildFilter(f audit.RunFilter) (string, []any) { + var conds []string + var args []any + if !f.IncludeDryRun { + conds = append(conds, `status != 'dry_run'`) + } + if f.Status != "" { + conds = append(conds, `status = ?`) + args = append(args, f.Status) + } + if f.SkillID != "" { + conds = append(conds, `skill_id = ?`) + args = append(args, f.SkillID) + } + if f.CallerID != "" { + conds = append(conds, `caller_id = ?`) + args = append(args, f.CallerID) + } + if f.ChannelID != "" { + conds = append(conds, `channel_id = ?`) + args = append(args, f.ChannelID) + } + if f.TopLevelOnly { + conds = append(conds, `parent_run_id = ''`) + } + if !f.Since.IsZero() { + conds = append(conds, `started_at >= ?`) + args = append(args, f.Since.Unix()) + } + if !f.Until.IsZero() { + conds = append(conds, `started_at <= ?`) + args = append(args, f.Until.Unix()) + } + where := "" + if len(conds) > 0 { + where = `WHERE ` + strings.Join(conds, " AND ") + } + return where, args +} + +func (s *auditStore) ListRunsFiltered(ctx context.Context, f audit.RunFilter, offset, limit int) ([]audit.SkillRun, error) { + where, args := s.buildFilter(f) + return s.queryRuns(ctx, where+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...) +} + +func (s *auditStore) CountRunsFiltered(ctx context.Context, f audit.RunFilter) (int64, error) { + where, args := s.buildFilter(f) + var n int64 + err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM skill_runs `+where, args...).Scan(&n) + return n, err +} + +func (s *auditStore) PurgeOlderThan(ctx context.Context, t time.Time) (int64, error) { + res, err := s.db.ExecContext(ctx, `DELETE FROM skill_runs WHERE finished_at > 0 AND finished_at < ?`, t.Unix()) + if err != nil { + return 0, fmt.Errorf("auditStore.PurgeOlderThan: %w", err) + } + n, _ := res.RowsAffected() + // Best-effort orphan-log cleanup. + _, _ = s.db.ExecContext(ctx, `DELETE FROM skill_run_logs WHERE run_id NOT IN (SELECT id FROM skill_runs)`) + return n, nil +} + +func (s *auditStore) ListChildrenByParent(ctx context.Context, parentRunID string) ([]audit.SkillRun, error) { + return s.queryRuns(ctx, `WHERE parent_run_id = ? ORDER BY started_at DESC`, parentRunID) +} + +func (s *auditStore) WalkParentChain(ctx context.Context, runID string) ([]audit.SkillRun, error) { + var chain []audit.SkillRun + seen := map[string]bool{} + for id := runID; id != ""; { + if seen[id] { + break + } + seen[id] = true + r, err := s.GetRun(ctx, id) + if errors.Is(err, audit.ErrNotFound) { + break + } + if err != nil { + return nil, err + } + chain = append(chain, *r) + id = r.ParentRunID + } + return chain, nil +} + +func (s *auditStore) ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]audit.SkillRun, error) { + return s.queryRuns(ctx, + `WHERE finished_at > 0 AND finished_at < ? ORDER BY started_at DESC `+limitOffset(limit, 0), cutoff.Unix()) +} + +func (s *auditStore) LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) { + out := map[string]time.Time{} + if len(skillIDs) == 0 { + return out, nil + } + q := `SELECT skill_id, MAX(started_at) FROM skill_runs WHERE skill_id IN (` + + strings.TrimSuffix(strings.Repeat("?,", len(skillIDs)), ",") + `)` + args := make([]any, 0, len(skillIDs)) + for _, id := range skillIDs { + args = append(args, id) + } + if !includeFailed { + q += ` AND status NOT IN ('error','timeout')` + } + q += ` GROUP BY skill_id` + rows, err := s.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("auditStore.LastRunBySkills: %w", err) + } + defer rows.Close() + for rows.Next() { + var id string + var ts int64 + if err := rows.Scan(&id, &ts); err != nil { + return nil, err + } + out[id] = time.Unix(ts, 0).UTC() + } + return out, rows.Err() +} + +// limitOffset renders an optional LIMIT/OFFSET clause (limit<=0 = no limit). +func limitOffset(limit, offset int) string { + if limit <= 0 { + return "" + } + if offset > 0 { + return fmt.Sprintf("LIMIT %d OFFSET %d", limit, offset) + } + return fmt.Sprintf("LIMIT %d", limit) +} diff --git a/contrib/store/audit_store_test.go b/contrib/store/audit_store_test.go new file mode 100644 index 0000000..1ea7039 --- /dev/null +++ b/contrib/store/audit_store_test.go @@ -0,0 +1,67 @@ +package store + +import ( + "context" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/audit" +) + +func TestSQLiteAuditStore(t *testing.T) { + ctx := context.Background() + db, err := Open(":memory:") + if err != nil { + t.Fatal(err) + } + defer db.Close() + st := db.Audit() + if err := st.Initialize(ctx); err != nil { + t.Fatal(err) + } + + now := time.Now().UTC() + // parent run + if err := st.StartRun(ctx, audit.SkillRun{ID: "r1", SkillID: "agent-x", CallerID: "c1", + Inputs: map[string]any{"q": "hi"}, StartedAt: now}); err != nil { + t.Fatal(err) + } + // child run + st.StartRun(ctx, audit.SkillRun{ID: "r2", SkillID: "skill-y", CallerID: "c1", ParentRunID: "r1", StartedAt: now.Add(time.Second)}) + + st.AppendLog(ctx, audit.SkillRunLog{RunID: "r1", Sequence: 1, EventType: "step", Payload: map[string]any{"i": 1}, CreatedAt: now}) + if err := st.FinishRun(ctx, "r1", audit.RunStats{Status: "ok", Output: "done", ToolCalls: 2, InputTokens: 10, OutputTokens: 5}); err != nil { + t.Fatal(err) + } + + got, err := st.GetRun(ctx, "r1") + if err != nil || got.Status != "ok" || got.Output != "done" || got.FinishedAt == nil || + got.Inputs["q"] != "hi" || got.TotalInputTokens != 10 { + t.Fatalf("GetRun: %v %+v", err, got) + } + if logs, _ := st.ListLogsByRun(ctx, "r1"); len(logs) != 1 || logs[0].EventType != "step" { + t.Errorf("ListLogsByRun = %+v", logs) + } + if kids, _ := st.ListChildrenByParent(ctx, "r1"); len(kids) != 1 || kids[0].ID != "r2" { + t.Errorf("ListChildrenByParent = %+v", kids) + } + if chain, _ := st.WalkParentChain(ctx, "r2"); len(chain) != 2 || chain[1].ID != "r1" { + t.Errorf("WalkParentChain = %+v", chain) + } + if byCaller, _ := st.ListRunsByCaller(ctx, "c1", 10); len(byCaller) != 2 { + t.Errorf("ListRunsByCaller = %d, want 2", len(byCaller)) + } + // filter: top-level only + tl, _ := st.ListRunsFiltered(ctx, audit.RunFilter{TopLevelOnly: true}, 0, 10) + if len(tl) != 1 || tl[0].ID != "r1" { + t.Errorf("TopLevelOnly filter = %+v", tl) + } + // last-run map + last, _ := st.LastRunBySkills(ctx, []string{"agent-x", "skill-y"}, true) + if _, ok := last["agent-x"]; !ok { + t.Errorf("LastRunBySkills missing agent-x: %+v", last) + } + if n, _ := st.CountRunsBySkill(ctx, "agent-x", false); n != 1 { + t.Errorf("CountRunsBySkill = %d, want 1", n) + } +}