package store import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strings" "time" "gitea.stevedudenhoeffer.com/steve/executus/audit" ) // auditStore is the SQLite-backed audit.Storage: one row per run (+ a JSON // `inputs` blob), one row per log event. The run-list/filter/walk queries are // indexed on the columns they filter; the log payload is a JSON blob. type auditStore struct{ db *sql.DB } // Audit returns a durable audit.Storage backed by this database. func (d *DB) Audit() audit.Storage { return &auditStore{db: d.sql} } var _ audit.Storage = (*auditStore)(nil) func (s *auditStore) Initialize(ctx context.Context) error { _, err := s.db.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS skill_runs ( id TEXT PRIMARY KEY, skill_id TEXT NOT NULL DEFAULT '', caller_id TEXT NOT NULL DEFAULT '', channel_id TEXT NOT NULL DEFAULT '', parent_run_id TEXT NOT NULL DEFAULT '', inputs TEXT NOT NULL DEFAULT '{}', started_at INTEGER NOT NULL DEFAULT 0, finished_at INTEGER NOT NULL DEFAULT 0, -- 0 = still running status TEXT NOT NULL DEFAULT 'running', output TEXT NOT NULL DEFAULT '', error TEXT NOT NULL DEFAULT '', tool_calls INTEGER NOT NULL DEFAULT 0, runtime_seconds REAL NOT NULL DEFAULT 0, total_input_tokens INTEGER NOT NULL DEFAULT 0, total_output_tokens INTEGER NOT NULL DEFAULT 0, total_thinking_tokens INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_runs_skill ON skill_runs(skill_id, started_at); CREATE INDEX IF NOT EXISTS idx_runs_caller ON skill_runs(caller_id, started_at); CREATE INDEX IF NOT EXISTS idx_runs_parent ON skill_runs(parent_run_id); CREATE INDEX IF NOT EXISTS idx_runs_started ON skill_runs(started_at); CREATE TABLE IF NOT EXISTS skill_run_logs ( run_id TEXT NOT NULL, seq INTEGER NOT NULL, event_type TEXT NOT NULL, payload TEXT NOT NULL DEFAULT '{}', created_at INTEGER NOT NULL, PRIMARY KEY (run_id, seq) );`) if err != nil { return fmt.Errorf("auditStore.Initialize: %w", err) } return nil } func unixOrZero(t time.Time) int64 { if t.IsZero() { return 0 } return t.Unix() } func (s *auditStore) StartRun(ctx context.Context, r audit.SkillRun) error { inputs, _ := json.Marshal(r.Inputs) var fin int64 if r.FinishedAt != nil { fin = unixOrZero(*r.FinishedAt) } status := r.Status if status == "" { status = "running" } _, err := s.db.ExecContext(ctx, ` INSERT INTO skill_runs (id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at, status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET skill_id=excluded.skill_id, caller_id=excluded.caller_id, channel_id=excluded.channel_id, parent_run_id=excluded.parent_run_id, inputs=excluded.inputs, started_at=excluded.started_at`, r.ID, r.SkillID, r.CallerID, r.ChannelID, r.ParentRunID, string(inputs), unixOrZero(r.StartedAt), fin, status, r.Output, r.Error, r.ToolCallsCount, r.RuntimeSeconds, r.TotalInputTokens, r.TotalOutputTokens, r.TotalThinkingTokens) if err != nil { return fmt.Errorf("auditStore.StartRun: %w", err) } return nil } func (s *auditStore) FinishRun(ctx context.Context, runID string, st audit.RunStats) error { res, err := s.db.ExecContext(ctx, ` UPDATE skill_runs SET finished_at=?, status=?, output=?, error=?, tool_calls=?, runtime_seconds=?, total_input_tokens=?, total_output_tokens=?, total_thinking_tokens=? WHERE id=?`, time.Now().Unix(), st.Status, st.Output, st.Error, st.ToolCalls, st.RuntimeSeconds, st.InputTokens, st.OutputTokens, st.ThinkingTokens, runID) if err != nil { return fmt.Errorf("auditStore.FinishRun: %w", err) } if n, _ := res.RowsAffected(); n == 0 { return audit.ErrNotFound } return nil } func (s *auditStore) AppendLog(ctx context.Context, l audit.SkillRunLog) error { payload, _ := json.Marshal(l.Payload) created := unixOrZero(l.CreatedAt) if created == 0 { created = time.Now().Unix() } _, err := s.db.ExecContext(ctx, `INSERT OR REPLACE INTO skill_run_logs (run_id, seq, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)`, l.RunID, l.Sequence, l.EventType, string(payload), created) if err != nil { return fmt.Errorf("auditStore.AppendLog: %w", err) } return nil } // runCols is the SELECT column list matching scanRun. const runCols = `id, skill_id, caller_id, channel_id, parent_run_id, inputs, started_at, finished_at, status, output, error, tool_calls, runtime_seconds, total_input_tokens, total_output_tokens, total_thinking_tokens` func scanRun(sc interface{ Scan(...any) error }) (*audit.SkillRun, error) { var r audit.SkillRun var inputs string var started, finished int64 if err := sc.Scan(&r.ID, &r.SkillID, &r.CallerID, &r.ChannelID, &r.ParentRunID, &inputs, &started, &finished, &r.Status, &r.Output, &r.Error, &r.ToolCallsCount, &r.RuntimeSeconds, &r.TotalInputTokens, &r.TotalOutputTokens, &r.TotalThinkingTokens); err != nil { return nil, err } _ = json.Unmarshal([]byte(inputs), &r.Inputs) r.StartedAt = time.Unix(started, 0).UTC() if finished > 0 { t := time.Unix(finished, 0).UTC() r.FinishedAt = &t } return &r, nil } func (s *auditStore) GetRun(ctx context.Context, runID string) (*audit.SkillRun, error) { row := s.db.QueryRowContext(ctx, `SELECT `+runCols+` FROM skill_runs WHERE id = ?`, runID) r, err := scanRun(row) if errors.Is(err, sql.ErrNoRows) { return nil, audit.ErrNotFound } return r, err } func (s *auditStore) queryRuns(ctx context.Context, tail string, args ...any) ([]audit.SkillRun, error) { rows, err := s.db.QueryContext(ctx, `SELECT `+runCols+` FROM skill_runs `+tail, args...) if err != nil { return nil, err } defer rows.Close() var out []audit.SkillRun for rows.Next() { r, err := scanRun(rows) if err != nil { return nil, err } out = append(out, *r) } return out, rows.Err() } func (s *auditStore) ListLogsByRun(ctx context.Context, runID string) ([]audit.SkillRunLog, error) { rows, err := s.db.QueryContext(ctx, `SELECT run_id, seq, event_type, payload, created_at FROM skill_run_logs WHERE run_id = ? ORDER BY seq`, runID) if err != nil { return nil, fmt.Errorf("auditStore.ListLogsByRun: %w", err) } defer rows.Close() var out []audit.SkillRunLog for rows.Next() { var l audit.SkillRunLog var payload string var created int64 if err := rows.Scan(&l.RunID, &l.Sequence, &l.EventType, &payload, &created); err != nil { return nil, err } _ = json.Unmarshal([]byte(payload), &l.Payload) l.CreatedAt = time.Unix(created, 0).UTC() out = append(out, l) } return out, rows.Err() } func (s *auditStore) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]audit.SkillRun, error) { return s.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false) } func (s *auditStore) ListRunsBySkillPaginated(ctx context.Context, skillID string, offset, limit int, includeDryRun bool) ([]audit.SkillRun, error) { w := `WHERE skill_id = ?` args := []any{skillID} if !includeDryRun { w += ` AND status != 'dry_run'` } return s.queryRuns(ctx, w+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...) } func (s *auditStore) CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error) { q := `SELECT COUNT(*) FROM skill_runs WHERE skill_id = ?` if !includeDryRun { q += ` AND status != 'dry_run'` } var n int64 err := s.db.QueryRowContext(ctx, q, skillID).Scan(&n) return n, err } func (s *auditStore) ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]audit.SkillRun, error) { return s.queryRuns(ctx, `WHERE caller_id = ? AND status != 'dry_run' ORDER BY started_at DESC `+limitOffset(limit, 0), callerID) } func (s *auditStore) buildFilter(f audit.RunFilter) (string, []any) { var conds []string var args []any if !f.IncludeDryRun { conds = append(conds, `status != 'dry_run'`) } if f.Status != "" { conds = append(conds, `status = ?`) args = append(args, f.Status) } if f.SkillID != "" { conds = append(conds, `skill_id = ?`) args = append(args, f.SkillID) } if f.CallerID != "" { conds = append(conds, `caller_id = ?`) args = append(args, f.CallerID) } if f.ChannelID != "" { conds = append(conds, `channel_id = ?`) args = append(args, f.ChannelID) } if f.TopLevelOnly { conds = append(conds, `parent_run_id = ''`) } if !f.Since.IsZero() { conds = append(conds, `started_at >= ?`) args = append(args, f.Since.Unix()) } if !f.Until.IsZero() { conds = append(conds, `started_at <= ?`) args = append(args, f.Until.Unix()) } where := "" if len(conds) > 0 { where = `WHERE ` + strings.Join(conds, " AND ") } return where, args } func (s *auditStore) ListRunsFiltered(ctx context.Context, f audit.RunFilter, offset, limit int) ([]audit.SkillRun, error) { where, args := s.buildFilter(f) return s.queryRuns(ctx, where+` ORDER BY started_at DESC `+limitOffset(limit, offset), args...) } func (s *auditStore) CountRunsFiltered(ctx context.Context, f audit.RunFilter) (int64, error) { where, args := s.buildFilter(f) var n int64 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM skill_runs `+where, args...).Scan(&n) return n, err } func (s *auditStore) PurgeOlderThan(ctx context.Context, t time.Time) (int64, error) { res, err := s.db.ExecContext(ctx, `DELETE FROM skill_runs WHERE finished_at > 0 AND finished_at < ?`, t.Unix()) if err != nil { return 0, fmt.Errorf("auditStore.PurgeOlderThan: %w", err) } n, _ := res.RowsAffected() // Best-effort orphan-log cleanup. _, _ = s.db.ExecContext(ctx, `DELETE FROM skill_run_logs WHERE run_id NOT IN (SELECT id FROM skill_runs)`) return n, nil } func (s *auditStore) ListChildrenByParent(ctx context.Context, parentRunID string) ([]audit.SkillRun, error) { return s.queryRuns(ctx, `WHERE parent_run_id = ? ORDER BY started_at DESC`, parentRunID) } func (s *auditStore) WalkParentChain(ctx context.Context, runID string) ([]audit.SkillRun, error) { var chain []audit.SkillRun seen := map[string]bool{} for id := runID; id != ""; { if seen[id] { break } seen[id] = true r, err := s.GetRun(ctx, id) if errors.Is(err, audit.ErrNotFound) { break } if err != nil { return nil, err } chain = append(chain, *r) id = r.ParentRunID } return chain, nil } func (s *auditStore) ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]audit.SkillRun, error) { return s.queryRuns(ctx, `WHERE finished_at > 0 AND finished_at < ? ORDER BY started_at DESC `+limitOffset(limit, 0), cutoff.Unix()) } func (s *auditStore) LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) { out := map[string]time.Time{} if len(skillIDs) == 0 { return out, nil } q := `SELECT skill_id, MAX(started_at) FROM skill_runs WHERE skill_id IN (` + strings.TrimSuffix(strings.Repeat("?,", len(skillIDs)), ",") + `)` args := make([]any, 0, len(skillIDs)) for _, id := range skillIDs { args = append(args, id) } if !includeFailed { q += ` AND status NOT IN ('error','timeout')` } q += ` GROUP BY skill_id` rows, err := s.db.QueryContext(ctx, q, args...) if err != nil { return nil, fmt.Errorf("auditStore.LastRunBySkills: %w", err) } defer rows.Close() for rows.Next() { var id string var ts int64 if err := rows.Scan(&id, &ts); err != nil { return nil, err } out[id] = time.Unix(ts, 0).UTC() } return out, rows.Err() } // limitOffset renders an optional LIMIT/OFFSET clause (limit<=0 = no limit). func limitOffset(limit, offset int) string { if limit <= 0 { return "" } if offset > 0 { return fmt.Sprintf("LIMIT %d OFFSET %d", limit, offset) } return fmt.Sprintf("LIMIT %d", limit) }