Files
executus/audit/memory.go
steve d82cef46b4
executus CI / test (push) Failing after 1m4s
fix: address verified gadfly P4/#4 findings (audit/budget/persona)
Security (all 3 models — HIGH): audit OnTool persisted raw tool args + results
verbatim for the very tools the OnStep narration-redaction flags as secret
(mcp_call/email_send/http_*) — the args/results are what CARRY the secret, so
they landed in skill_run_logs unredacted. Factored the predicate into
isSecretTool() (single source of truth) and OnTool now emits
args_redacted/result_redacted (+ lengths) for secret tools. Test asserts no
secret reaches the log. (persona) webhook_ip_allowlist entries are now
CIDR/IP-validated at load (malformed dropped + warned) instead of accepted raw.

Contract correctness (glm-5.2 + deepseek) — audit Memory now honors its
documented Storage contract: ListChildrenByParent/ListFinishedRunsBefore return
oldest-first; WalkParentChain returns root-first and honors MaxParentChainDepth;
ListRunsFiltered clamps limit (<=0 or >500 -> 50); ListFinishedRunsBefore with
limit<=0 returns none; an explicit RunFilter.Status (incl. "dry_run") matches
regardless of IncludeDryRun; LastRunBySkills counts only status=="ok" unless
includeFailed. (PurgeOlderThan's FinishedAt key is the SAFE behavior — in-flight
runs retained — so the doc was aligned to it, not the impl.)

Error-handling: appendLog now uses a bounded context (auditAppendTimeout=3s) so
a hung backend can't block the run goroutine on the hot path; Sink.StartRun
logs its (still best-effort) failure instead of swallowing it; budget Memory.Get
uses RLock (RWMutex); budget package doc fixed (was skillexec's); Check uses the
budgetWindow constant, not a duplicated literal.

Triaged false-positive: NewNoOpBudget returning BudgetTracker is assignable to
run.Budget (identical method sets) — no change needed.

Core go.sum still free of host/DB deps.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 00:12:19 -04:00

281 lines
7.6 KiB
Go

package audit
import (
"context"
"sort"
"sync"
"time"
)
// Memory is an in-process Storage: it retains runs + logs in memory so a light
// host (or a test) gets queryable run history with zero setup. It is bounded
// only by process memory — a host that runs forever should PurgeOlderThan
// periodically, or use a persistent Storage. Construct with NewMemory.
//
// Mort uses its GORM/MySQL Storage; contrib/store adds a durable SQLite one.
// Memory is the zero-dependency default behind audit.NewSink(audit.NewMemory()).
type Memory struct {
mu sync.RWMutex
order []string // run ids in insertion order
runs map[string]SkillRun // by run id
logs map[string][]SkillRunLog // by run id
}
// NewMemory returns an empty in-memory Storage.
func NewMemory() *Memory {
return &Memory{runs: map[string]SkillRun{}, logs: map[string][]SkillRunLog{}}
}
var _ Storage = (*Memory)(nil)
func (m *Memory) Initialize(context.Context) error { return nil }
func (m *Memory) StartRun(_ context.Context, run SkillRun) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.runs[run.ID]; !ok {
m.order = append(m.order, run.ID)
}
m.runs[run.ID] = run
return nil
}
func (m *Memory) FinishRun(_ context.Context, runID string, s RunStats) error {
m.mu.Lock()
defer m.mu.Unlock()
r, ok := m.runs[runID]
if !ok {
return ErrNotFound
}
now := time.Now()
r.FinishedAt = &now
r.Status = s.Status
r.Output = s.Output
r.Error = s.Error
r.ToolCallsCount = s.ToolCalls
r.RuntimeSeconds = s.RuntimeSeconds
r.TotalInputTokens = s.InputTokens
r.TotalOutputTokens = s.OutputTokens
r.TotalThinkingTokens = s.ThinkingTokens
m.runs[runID] = r
return nil
}
func (m *Memory) AppendLog(_ context.Context, log SkillRunLog) error {
m.mu.Lock()
defer m.mu.Unlock()
m.logs[log.RunID] = append(m.logs[log.RunID], log)
return nil
}
func (m *Memory) GetRun(_ context.Context, runID string) (*SkillRun, error) {
m.mu.RLock()
defer m.mu.RUnlock()
r, ok := m.runs[runID]
if !ok {
return nil, ErrNotFound
}
return &r, nil
}
func (m *Memory) ListLogsByRun(_ context.Context, runID string) ([]SkillRunLog, error) {
m.mu.RLock()
defer m.mu.RUnlock()
ls := append([]SkillRunLog(nil), m.logs[runID]...)
sort.SliceStable(ls, func(i, j int) bool { return ls[i].Sequence < ls[j].Sequence })
return ls, nil
}
// newestFirst returns the retained runs in reverse insertion order, optionally
// filtered. Caller holds at least RLock.
func (m *Memory) newestFirst(keep func(SkillRun) bool) []SkillRun {
out := make([]SkillRun, 0, len(m.order))
for i := len(m.order) - 1; i >= 0; i-- {
r := m.runs[m.order[i]]
if keep == nil || keep(r) {
out = append(out, r)
}
}
return out
}
// oldestFirst returns the retained runs in insertion (oldest-first) order,
// optionally filtered. Caller holds at least RLock.
func (m *Memory) oldestFirst(keep func(SkillRun) bool) []SkillRun {
out := make([]SkillRun, 0, len(m.order))
for _, id := range m.order {
r := m.runs[id]
if keep == nil || keep(r) {
out = append(out, r)
}
}
return out
}
func page(rs []SkillRun, offset, limit int) []SkillRun {
if offset < 0 {
offset = 0
}
if offset >= len(rs) {
return nil
}
rs = rs[offset:]
if limit > 0 && limit < len(rs) {
rs = rs[:limit]
}
return rs
}
func (m *Memory) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) {
return m.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false)
}
func (m *Memory) ListRunsBySkillPaginated(_ context.Context, skillID string, offset, limit int, includeDryRun bool) ([]SkillRun, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return page(m.newestFirst(func(r SkillRun) bool {
return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run")
}), offset, limit), nil
}
func (m *Memory) CountRunsBySkill(_ context.Context, skillID string, includeDryRun bool) (int64, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return int64(len(m.newestFirst(func(r SkillRun) bool {
return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run")
}))), nil
}
func (m *Memory) ListRunsByCaller(_ context.Context, callerID string, limit int) ([]SkillRun, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return page(m.newestFirst(func(r SkillRun) bool {
return r.CallerID == callerID && r.Status != "dry_run"
}), 0, limit), nil
}
func (m *Memory) matchesFilter(r SkillRun, f RunFilter) bool {
if f.Status != "" {
if r.Status != f.Status {
return false
}
// An explicit Status (even "dry_run") matches regardless of IncludeDryRun.
} else if !f.IncludeDryRun && r.Status == "dry_run" {
return false
}
if f.SkillID != "" && r.SkillID != f.SkillID {
return false
}
if f.CallerID != "" && r.CallerID != f.CallerID {
return false
}
if f.ChannelID != "" && r.ChannelID != f.ChannelID {
return false
}
if f.TopLevelOnly && r.ParentRunID != "" {
return false
}
if !f.Since.IsZero() && r.StartedAt.Before(f.Since) {
return false
}
if !f.Until.IsZero() && r.StartedAt.After(f.Until) {
return false
}
return true
}
func (m *Memory) ListRunsFiltered(_ context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) {
if limit <= 0 || limit > 500 {
limit = 50 // bound admin scans, per the Storage contract
}
m.mu.RLock()
defer m.mu.RUnlock()
return page(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }), offset, limit), nil
}
func (m *Memory) CountRunsFiltered(_ context.Context, f RunFilter) (int64, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return int64(len(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }))), nil
}
func (m *Memory) PurgeOlderThan(_ context.Context, t time.Time) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
var purged int64
kept := m.order[:0:0]
for _, id := range m.order {
r := m.runs[id]
if r.FinishedAt != nil && r.FinishedAt.Before(t) {
delete(m.runs, id)
delete(m.logs, id)
purged++
continue
}
kept = append(kept, id)
}
m.order = kept
return purged, nil
}
func (m *Memory) ListChildrenByParent(_ context.Context, parentRunID string) ([]SkillRun, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.oldestFirst(func(r SkillRun) bool { return r.ParentRunID == parentRunID }), nil
}
func (m *Memory) WalkParentChain(_ context.Context, runID string) ([]SkillRun, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var chain []SkillRun
seen := map[string]bool{}
for id := runID; id != "" && len(chain) < MaxParentChainDepth; {
r, ok := m.runs[id]
if !ok || seen[id] {
break
}
seen[id] = true
chain = append(chain, r)
id = r.ParentRunID
}
// Contract: root first, the queried run last. We walked child→root, so reverse.
for i, j := 0, len(chain)-1; i < j; i, j = i+1, j-1 {
chain[i], chain[j] = chain[j], chain[i]
}
return chain, nil
}
func (m *Memory) ListFinishedRunsBefore(_ context.Context, cutoff time.Time, limit int) ([]SkillRun, error) {
if limit <= 0 {
return nil, nil // contract: a real bound is required
}
m.mu.RLock()
defer m.mu.RUnlock()
return page(m.oldestFirst(func(r SkillRun) bool {
return r.FinishedAt != nil && r.FinishedAt.Before(cutoff)
}), 0, limit), nil
}
func (m *Memory) LastRunBySkills(_ context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) {
m.mu.RLock()
defer m.mu.RUnlock()
want := map[string]bool{}
for _, id := range skillIDs {
want[id] = true
}
out := map[string]time.Time{}
for _, id := range m.order {
r := m.runs[id]
if !want[r.SkillID] {
continue
}
if !includeFailed && r.Status != "ok" {
continue // contract: only status=="ok" counts unless includeFailed
}
if r.StartedAt.After(out[r.SkillID]) {
out[r.SkillID] = r.StartedAt
}
}
return out, nil
}