package audit import ( "context" "sort" "sync" "time" ) // Memory is an in-process Storage: it retains runs + logs in memory so a light // host (or a test) gets queryable run history with zero setup. It is bounded // only by process memory — a host that runs forever should PurgeOlderThan // periodically, or use a persistent Storage. Construct with NewMemory. // // Mort uses its GORM/MySQL Storage; contrib/store adds a durable SQLite one. // Memory is the zero-dependency default behind audit.NewSink(audit.NewMemory()). type Memory struct { mu sync.RWMutex order []string // run ids in insertion order runs map[string]SkillRun // by run id logs map[string][]SkillRunLog // by run id } // NewMemory returns an empty in-memory Storage. func NewMemory() *Memory { return &Memory{runs: map[string]SkillRun{}, logs: map[string][]SkillRunLog{}} } var _ Storage = (*Memory)(nil) func (m *Memory) Initialize(context.Context) error { return nil } func (m *Memory) StartRun(_ context.Context, run SkillRun) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.runs[run.ID]; !ok { m.order = append(m.order, run.ID) } m.runs[run.ID] = run return nil } func (m *Memory) FinishRun(_ context.Context, runID string, s RunStats) error { m.mu.Lock() defer m.mu.Unlock() r, ok := m.runs[runID] if !ok { return ErrNotFound } now := time.Now() r.FinishedAt = &now r.Status = s.Status r.Output = s.Output r.Error = s.Error r.ToolCallsCount = s.ToolCalls r.RuntimeSeconds = s.RuntimeSeconds r.TotalInputTokens = s.InputTokens r.TotalOutputTokens = s.OutputTokens r.TotalThinkingTokens = s.ThinkingTokens m.runs[runID] = r return nil } func (m *Memory) AppendLog(_ context.Context, log SkillRunLog) error { m.mu.Lock() defer m.mu.Unlock() m.logs[log.RunID] = append(m.logs[log.RunID], log) return nil } func (m *Memory) GetRun(_ context.Context, runID string) (*SkillRun, error) { m.mu.RLock() defer m.mu.RUnlock() r, ok := m.runs[runID] if !ok { return nil, ErrNotFound } return &r, nil } func (m *Memory) ListLogsByRun(_ context.Context, runID string) ([]SkillRunLog, error) { m.mu.RLock() defer m.mu.RUnlock() ls := append([]SkillRunLog(nil), m.logs[runID]...) sort.SliceStable(ls, func(i, j int) bool { return ls[i].Sequence < ls[j].Sequence }) return ls, nil } // newestFirst returns the retained runs in reverse insertion order, optionally // filtered. Caller holds at least RLock. func (m *Memory) newestFirst(keep func(SkillRun) bool) []SkillRun { out := make([]SkillRun, 0, len(m.order)) for i := len(m.order) - 1; i >= 0; i-- { r := m.runs[m.order[i]] if keep == nil || keep(r) { out = append(out, r) } } return out } // oldestFirst returns the retained runs in insertion (oldest-first) order, // optionally filtered. Caller holds at least RLock. func (m *Memory) oldestFirst(keep func(SkillRun) bool) []SkillRun { out := make([]SkillRun, 0, len(m.order)) for _, id := range m.order { r := m.runs[id] if keep == nil || keep(r) { out = append(out, r) } } return out } func page(rs []SkillRun, offset, limit int) []SkillRun { if offset < 0 { offset = 0 } if offset >= len(rs) { return nil } rs = rs[offset:] if limit > 0 && limit < len(rs) { rs = rs[:limit] } return rs } func (m *Memory) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) { return m.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false) } func (m *Memory) ListRunsBySkillPaginated(_ context.Context, skillID string, offset, limit int, includeDryRun bool) ([]SkillRun, error) { m.mu.RLock() defer m.mu.RUnlock() return page(m.newestFirst(func(r SkillRun) bool { return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") }), offset, limit), nil } func (m *Memory) CountRunsBySkill(_ context.Context, skillID string, includeDryRun bool) (int64, error) { m.mu.RLock() defer m.mu.RUnlock() return int64(len(m.newestFirst(func(r SkillRun) bool { return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") }))), nil } func (m *Memory) ListRunsByCaller(_ context.Context, callerID string, limit int) ([]SkillRun, error) { m.mu.RLock() defer m.mu.RUnlock() return page(m.newestFirst(func(r SkillRun) bool { return r.CallerID == callerID && r.Status != "dry_run" }), 0, limit), nil } func (m *Memory) matchesFilter(r SkillRun, f RunFilter) bool { if f.Status != "" { if r.Status != f.Status { return false } // An explicit Status (even "dry_run") matches regardless of IncludeDryRun. } else if !f.IncludeDryRun && r.Status == "dry_run" { return false } if f.SkillID != "" && r.SkillID != f.SkillID { return false } if f.CallerID != "" && r.CallerID != f.CallerID { return false } if f.ChannelID != "" && r.ChannelID != f.ChannelID { return false } if f.TopLevelOnly && r.ParentRunID != "" { return false } if !f.Since.IsZero() && r.StartedAt.Before(f.Since) { return false } if !f.Until.IsZero() && r.StartedAt.After(f.Until) { return false } return true } func (m *Memory) ListRunsFiltered(_ context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) { if limit <= 0 || limit > 500 { limit = 50 // bound admin scans, per the Storage contract } m.mu.RLock() defer m.mu.RUnlock() return page(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }), offset, limit), nil } func (m *Memory) CountRunsFiltered(_ context.Context, f RunFilter) (int64, error) { m.mu.RLock() defer m.mu.RUnlock() return int64(len(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }))), nil } func (m *Memory) PurgeOlderThan(_ context.Context, t time.Time) (int64, error) { m.mu.Lock() defer m.mu.Unlock() var purged int64 kept := m.order[:0:0] for _, id := range m.order { r := m.runs[id] if r.FinishedAt != nil && r.FinishedAt.Before(t) { delete(m.runs, id) delete(m.logs, id) purged++ continue } kept = append(kept, id) } m.order = kept return purged, nil } func (m *Memory) ListChildrenByParent(_ context.Context, parentRunID string) ([]SkillRun, error) { m.mu.RLock() defer m.mu.RUnlock() return m.oldestFirst(func(r SkillRun) bool { return r.ParentRunID == parentRunID }), nil } func (m *Memory) WalkParentChain(_ context.Context, runID string) ([]SkillRun, error) { m.mu.RLock() defer m.mu.RUnlock() var chain []SkillRun seen := map[string]bool{} for id := runID; id != "" && len(chain) < MaxParentChainDepth; { r, ok := m.runs[id] if !ok || seen[id] { break } seen[id] = true chain = append(chain, r) id = r.ParentRunID } // Contract: root first, the queried run last. We walked child→root, so reverse. for i, j := 0, len(chain)-1; i < j; i, j = i+1, j-1 { chain[i], chain[j] = chain[j], chain[i] } return chain, nil } func (m *Memory) ListFinishedRunsBefore(_ context.Context, cutoff time.Time, limit int) ([]SkillRun, error) { if limit <= 0 { return nil, nil // contract: a real bound is required } m.mu.RLock() defer m.mu.RUnlock() return page(m.oldestFirst(func(r SkillRun) bool { return r.FinishedAt != nil && r.FinishedAt.Before(cutoff) }), 0, limit), nil } func (m *Memory) LastRunBySkills(_ context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) { m.mu.RLock() defer m.mu.RUnlock() want := map[string]bool{} for _, id := range skillIDs { want[id] = true } out := map[string]time.Time{} for _, id := range m.order { r := m.runs[id] if !want[r.SkillID] { continue } if !includeFailed && r.Status != "ok" { continue // contract: only status=="ok" counts unless includeFailed } if r.StartedAt.After(out[r.SkillID]) { out[r.SkillID] = r.StartedAt } } return out, nil }