Files
executus/audit/sink.go
T
steve 09fc1a07e8
executus CI / test (pull_request) Failing after 59s
fix: address verified gadfly P4/#4 findings (audit/budget/persona)
Security (all 3 models — HIGH): audit OnTool persisted raw tool args + results
verbatim for the very tools the OnStep narration-redaction flags as secret
(mcp_call/email_send/http_*) — the args/results are what CARRY the secret, so
they landed in skill_run_logs unredacted. Factored the predicate into
isSecretTool() (single source of truth) and OnTool now emits
args_redacted/result_redacted (+ lengths) for secret tools. Test asserts no
secret reaches the log. (persona) webhook_ip_allowlist entries are now
CIDR/IP-validated at load (malformed dropped + warned) instead of accepted raw.

Contract correctness (glm-5.2 + deepseek) — audit Memory now honors its
documented Storage contract: ListChildrenByParent/ListFinishedRunsBefore return
oldest-first; WalkParentChain returns root-first and honors MaxParentChainDepth;
ListRunsFiltered clamps limit (<=0 or >500 -> 50); ListFinishedRunsBefore with
limit<=0 returns none; an explicit RunFilter.Status (incl. "dry_run") matches
regardless of IncludeDryRun; LastRunBySkills counts only status=="ok" unless
includeFailed. (PurgeOlderThan's FinishedAt key is the SAFE behavior — in-flight
runs retained — so the doc was aligned to it, not the impl.)

Error-handling: appendLog now uses a bounded context (auditAppendTimeout=3s) so
a hung backend can't block the run goroutine on the hot path; Sink.StartRun
logs its (still best-effort) failure instead of swallowing it; budget Memory.Get
uses RLock (RWMutex); budget package doc fixed (was skillexec's); Check uses the
budgetWindow constant, not a duplicated literal.

Triaged false-positive: NewNoOpBudget returning BudgetTracker is assignable to
run.Budget (identical method sets) — no change needed.

Core go.sum still free of host/DB deps.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 23:44:34 -04:00

82 lines
2.9 KiB
Go

package audit
import (
"context"
"log/slog"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/run"
)
// Sink adapts an audit Storage to the run.Audit port: StartRun opens a run row
// and returns a per-run recorder (a Writer) that the executor feeds with steps,
// tool calls, and the terminal roll-up. This is what plugs the audit battery
// into run.Ports.Audit — mort backs it with its GORM Storage, a light host with
// Memory() (or omits it entirely).
type Sink struct{ storage Storage }
// NewSink wraps a Storage as a run.Audit. A nil Storage yields a Sink whose
// StartRun returns nil (the executor then records nothing) — so NewSink(nil) is
// equivalent to leaving run.Ports.Audit unset.
func NewSink(storage Storage) *Sink { return &Sink{storage: storage} }
// compile-time proof the adapter satisfies the core seams.
var (
_ run.Audit = (*Sink)(nil)
_ run.RunRecorder = (*recorder)(nil)
)
// StartRun records the run start and returns a recorder. Implements run.Audit.
func (s *Sink) StartRun(ctx context.Context, info run.RunInfo) run.RunRecorder {
if s == nil || s.storage == nil {
return nil
}
started := info.StartedAt
if started.IsZero() {
started = time.Now()
}
// Best-effort: a failed StartRun must not break the user-visible run, but we
// surface it (a swallowed failure leaves orphan log events with no run row).
if err := s.storage.StartRun(ctx, SkillRun{
ID: info.RunID,
SkillID: info.SubjectID,
CallerID: info.CallerID,
ChannelID: info.ChannelID,
ParentRunID: info.ParentRunID,
Inputs: info.Inputs,
StartedAt: started,
Status: "running",
}); err != nil {
slog.Warn("audit: StartRun failed; the run row is missing so its log events will orphan",
"run_id", info.RunID, "error", err)
}
return &recorder{w: NewWriter(s.storage, info.RunID)}
}
// recorder adapts a *Writer to run.RunRecorder, converting run.RunStats to the
// audit RunStats on Close (the two have identical fields).
type recorder struct{ w *Writer }
func (r *recorder) TokenStats() (in, out, thinking int64) { return r.w.TokenStats() }
func (r *recorder) ToolCallsCount() int { return r.w.ToolCallsCount() }
func (r *recorder) OnStep(iter int, resp *llm.Response) { r.w.OnStep(iter, resp) }
func (r *recorder) OnTool(call llm.ToolCall, result string) { r.w.OnTool(call, result) }
func (r *recorder) LogEvent(eventType string, payload map[string]any) {
r.w.LogEvent(eventType, payload)
}
func (r *recorder) LogError(msg string) { r.w.LogError(msg) }
func (r *recorder) Close(ctx context.Context, s run.RunStats) {
r.w.Close(ctx, RunStats{
Status: s.Status,
Output: s.Output,
Error: s.Error,
ToolCalls: s.ToolCalls,
RuntimeSeconds: s.RuntimeSeconds,
InputTokens: s.InputTokens,
OutputTokens: s.OutputTokens,
ThinkingTokens: s.ThinkingTokens,
})
}