diff --git a/CLAUDE.md b/CLAUDE.md index 86a7be9..2e7f530 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -61,13 +61,17 @@ CORE (majordomo + stdlib): tools/{web,net,store,compose,meta,comms} generic tools [P3] BATTERIES (opt-in siblings, each nil-safe + a default): - persona/ Agent noun + AgentStore seam + yml loader [P4] + persona/ Agent noun + Storage seam + builtin loader [P4 ~] + + ToRunnable() bridge to run.RunnableAgent + + Memory default (host: chatbot/commands/personalization) skill/ rich Skill + SkillStore seam + toml loader [P4] - audit/ run-trace Sink (+ Noop/Slog) [P4] + audit/ run.Audit Sink + Writer + queryable Memory [P4 ✓] + default (skillaudit Storage iface; GORM stays in mort) critic/ two-tier timeout state machine + Escalator [P4] schedule/ cron runner cores [P4] checkpoint/ durable resume seam [P4] - budget/ rolling-window tracker (+ NoOp) [P4] + budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓] + BudgetStorage iface + Memory default contrib/store/ SECOND module (+ modernc.org/sqlite): [P4] in-memory + pure-Go SQLite impls of every *Store seam diff --git a/audit/audit_test.go b/audit/audit_test.go new file mode 100644 index 0000000..7cd0980 --- /dev/null +++ b/audit/audit_test.go @@ -0,0 +1,78 @@ +package audit_test + +import ( + "context" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/audit" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// TestAuditBatteryEndToEnd wires the audit battery (Memory storage) into +// run.Ports.Audit, runs an agent, and verifies the run was recorded and is +// queryable — proving Sink/Writer/Memory satisfy the core seams end to end. +func TestAuditBatteryEndToEnd(t *testing.T) { + mem := audit.NewMemory() + + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("the answer")) + m, err := fp.Model("m") + if err != nil { + t.Fatal(err) + } + + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, m, nil + }, + Ports: run.Ports{Audit: audit.NewSink(mem)}, + }) + + res := ex.Run(context.Background(), + run.RunnableAgent{ID: "agent-1", Name: "a", ModelTier: "m"}, + tool.Invocation{RunID: "run-xyz", CallerID: "caller-1"}, + "question") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + + // The run was recorded with a terminal status + output. + got, err := mem.GetRun(context.Background(), "run-xyz") + if err != nil { + t.Fatalf("GetRun: %v", err) + } + if got.Status != "ok" { + t.Errorf("status = %q, want ok", got.Status) + } + if got.Output != "the answer" { + t.Errorf("output = %q, want %q", got.Output, "the answer") + } + if got.FinishedAt == nil { + t.Error("FinishedAt should be set after the run") + } + if got.SkillID != "agent-1" { + t.Errorf("SkillID = %q, want agent-1 (the subject id)", got.SkillID) + } + + // And it is queryable by caller. + runs, err := mem.ListRunsByCaller(context.Background(), "caller-1", 10) + if err != nil { + t.Fatalf("ListRunsByCaller: %v", err) + } + if len(runs) != 1 || runs[0].ID != "run-xyz" { + t.Errorf("ListRunsByCaller = %+v, want [run-xyz]", runs) + } +} + +// TestNilSinkRecordsNothing: NewSink(nil) is equivalent to no audit. +func TestNilSinkRecordsNothing(t *testing.T) { + s := audit.NewSink(nil) + if rec := s.StartRun(context.Background(), run.RunInfo{RunID: "r"}); rec != nil { + t.Error("NewSink(nil).StartRun should return a nil recorder") + } +} diff --git a/audit/memory.go b/audit/memory.go new file mode 100644 index 0000000..7674d85 --- /dev/null +++ b/audit/memory.go @@ -0,0 +1,280 @@ +package audit + +import ( + "context" + "sort" + "sync" + "time" +) + +// Memory is an in-process Storage: it retains runs + logs in memory so a light +// host (or a test) gets queryable run history with zero setup. It is bounded +// only by process memory — a host that runs forever should PurgeOlderThan +// periodically, or use a persistent Storage. Construct with NewMemory. +// +// Mort uses its GORM/MySQL Storage; contrib/store adds a durable SQLite one. +// Memory is the zero-dependency default behind audit.NewSink(audit.NewMemory()). +type Memory struct { + mu sync.RWMutex + order []string // run ids in insertion order + runs map[string]SkillRun // by run id + logs map[string][]SkillRunLog // by run id +} + +// NewMemory returns an empty in-memory Storage. +func NewMemory() *Memory { + return &Memory{runs: map[string]SkillRun{}, logs: map[string][]SkillRunLog{}} +} + +var _ Storage = (*Memory)(nil) + +func (m *Memory) Initialize(context.Context) error { return nil } + +func (m *Memory) StartRun(_ context.Context, run SkillRun) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.runs[run.ID]; !ok { + m.order = append(m.order, run.ID) + } + m.runs[run.ID] = run + return nil +} + +func (m *Memory) FinishRun(_ context.Context, runID string, s RunStats) error { + m.mu.Lock() + defer m.mu.Unlock() + r, ok := m.runs[runID] + if !ok { + return ErrNotFound + } + now := time.Now() + r.FinishedAt = &now + r.Status = s.Status + r.Output = s.Output + r.Error = s.Error + r.ToolCallsCount = s.ToolCalls + r.RuntimeSeconds = s.RuntimeSeconds + r.TotalInputTokens = s.InputTokens + r.TotalOutputTokens = s.OutputTokens + r.TotalThinkingTokens = s.ThinkingTokens + m.runs[runID] = r + return nil +} + +func (m *Memory) AppendLog(_ context.Context, log SkillRunLog) error { + m.mu.Lock() + defer m.mu.Unlock() + m.logs[log.RunID] = append(m.logs[log.RunID], log) + return nil +} + +func (m *Memory) GetRun(_ context.Context, runID string) (*SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + r, ok := m.runs[runID] + if !ok { + return nil, ErrNotFound + } + return &r, nil +} + +func (m *Memory) ListLogsByRun(_ context.Context, runID string) ([]SkillRunLog, error) { + m.mu.RLock() + defer m.mu.RUnlock() + ls := append([]SkillRunLog(nil), m.logs[runID]...) + sort.SliceStable(ls, func(i, j int) bool { return ls[i].Sequence < ls[j].Sequence }) + return ls, nil +} + +// newestFirst returns the retained runs in reverse insertion order, optionally +// filtered. Caller holds at least RLock. +func (m *Memory) newestFirst(keep func(SkillRun) bool) []SkillRun { + out := make([]SkillRun, 0, len(m.order)) + for i := len(m.order) - 1; i >= 0; i-- { + r := m.runs[m.order[i]] + if keep == nil || keep(r) { + out = append(out, r) + } + } + return out +} + +// oldestFirst returns the retained runs in insertion (oldest-first) order, +// optionally filtered. Caller holds at least RLock. +func (m *Memory) oldestFirst(keep func(SkillRun) bool) []SkillRun { + out := make([]SkillRun, 0, len(m.order)) + for _, id := range m.order { + r := m.runs[id] + if keep == nil || keep(r) { + out = append(out, r) + } + } + return out +} + +func page(rs []SkillRun, offset, limit int) []SkillRun { + if offset < 0 { + offset = 0 + } + if offset >= len(rs) { + return nil + } + rs = rs[offset:] + if limit > 0 && limit < len(rs) { + rs = rs[:limit] + } + return rs +} + +func (m *Memory) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) { + return m.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false) +} + +func (m *Memory) ListRunsBySkillPaginated(_ context.Context, skillID string, offset, limit int, includeDryRun bool) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { + return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") + }), offset, limit), nil +} + +func (m *Memory) CountRunsBySkill(_ context.Context, skillID string, includeDryRun bool) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return int64(len(m.newestFirst(func(r SkillRun) bool { + return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run") + }))), nil +} + +func (m *Memory) ListRunsByCaller(_ context.Context, callerID string, limit int) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { + return r.CallerID == callerID && r.Status != "dry_run" + }), 0, limit), nil +} + +func (m *Memory) matchesFilter(r SkillRun, f RunFilter) bool { + if f.Status != "" { + if r.Status != f.Status { + return false + } + // An explicit Status (even "dry_run") matches regardless of IncludeDryRun. + } else if !f.IncludeDryRun && r.Status == "dry_run" { + return false + } + if f.SkillID != "" && r.SkillID != f.SkillID { + return false + } + if f.CallerID != "" && r.CallerID != f.CallerID { + return false + } + if f.ChannelID != "" && r.ChannelID != f.ChannelID { + return false + } + if f.TopLevelOnly && r.ParentRunID != "" { + return false + } + if !f.Since.IsZero() && r.StartedAt.Before(f.Since) { + return false + } + if !f.Until.IsZero() && r.StartedAt.After(f.Until) { + return false + } + return true +} + +func (m *Memory) ListRunsFiltered(_ context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) { + if limit <= 0 || limit > 500 { + limit = 50 // bound admin scans, per the Storage contract + } + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }), offset, limit), nil +} + +func (m *Memory) CountRunsFiltered(_ context.Context, f RunFilter) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return int64(len(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }))), nil +} + +func (m *Memory) PurgeOlderThan(_ context.Context, t time.Time) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + var purged int64 + kept := m.order[:0:0] + for _, id := range m.order { + r := m.runs[id] + if r.FinishedAt != nil && r.FinishedAt.Before(t) { + delete(m.runs, id) + delete(m.logs, id) + purged++ + continue + } + kept = append(kept, id) + } + m.order = kept + return purged, nil +} + +func (m *Memory) ListChildrenByParent(_ context.Context, parentRunID string) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.oldestFirst(func(r SkillRun) bool { return r.ParentRunID == parentRunID }), nil +} + +func (m *Memory) WalkParentChain(_ context.Context, runID string) ([]SkillRun, error) { + m.mu.RLock() + defer m.mu.RUnlock() + var chain []SkillRun + seen := map[string]bool{} + for id := runID; id != "" && len(chain) < MaxParentChainDepth; { + r, ok := m.runs[id] + if !ok || seen[id] { + break + } + seen[id] = true + chain = append(chain, r) + id = r.ParentRunID + } + // Contract: root first, the queried run last. We walked child→root, so reverse. + for i, j := 0, len(chain)-1; i < j; i, j = i+1, j-1 { + chain[i], chain[j] = chain[j], chain[i] + } + return chain, nil +} + +func (m *Memory) ListFinishedRunsBefore(_ context.Context, cutoff time.Time, limit int) ([]SkillRun, error) { + if limit <= 0 { + return nil, nil // contract: a real bound is required + } + m.mu.RLock() + defer m.mu.RUnlock() + return page(m.oldestFirst(func(r SkillRun) bool { + return r.FinishedAt != nil && r.FinishedAt.Before(cutoff) + }), 0, limit), nil +} + +func (m *Memory) LastRunBySkills(_ context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) { + m.mu.RLock() + defer m.mu.RUnlock() + want := map[string]bool{} + for _, id := range skillIDs { + want[id] = true + } + out := map[string]time.Time{} + for _, id := range m.order { + r := m.runs[id] + if !want[r.SkillID] { + continue + } + if !includeFailed && r.Status != "ok" { + continue // contract: only status=="ok" counts unless includeFailed + } + if r.StartedAt.After(out[r.SkillID]) { + out[r.SkillID] = r.StartedAt + } + } + return out, nil +} diff --git a/audit/redaction_test.go b/audit/redaction_test.go new file mode 100644 index 0000000..0664ed5 --- /dev/null +++ b/audit/redaction_test.go @@ -0,0 +1,58 @@ +package audit + +import ( + "context" + "strings" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// TestOnToolRedactsSecretTools: a secret-bearing tool's args/result must NOT be +// persisted verbatim in the audit log. +func TestOnToolRedactsSecretTools(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + mem.StartRun(ctx, SkillRun{ID: "r1"}) + w := NewWriter(mem, "r1") + + secret := `{"url":"https://x","headers":{"Authorization":"Bearer SUPERSECRET"}}` + w.OnTool(llm.ToolCall{Name: "http_get", ID: "1", Arguments: []byte(secret)}, "TOPSECRETBODY") + // a non-secret tool is logged verbatim + w.OnTool(llm.ToolCall{Name: "think", ID: "2", Arguments: []byte(`{"thought":"hi"}`)}, "ok") + + logs, _ := mem.ListLogsByRun(ctx, "r1") + var dump strings.Builder + for _, l := range logs { + for k, v := range l.Payload { + dump.WriteString(k) + dump.WriteString("=") + if s, ok := v.(string); ok { + dump.WriteString(s) + } + dump.WriteString(" ") + } + } + all := dump.String() + if strings.Contains(all, "SUPERSECRET") || strings.Contains(all, "TOPSECRETBODY") { + t.Fatalf("secret leaked into audit log: %s", all) + } + // the redaction marker is present, and the non-secret tool's args survive + foundRedacted, foundThink := false, false + for _, l := range logs { + if l.EventType == "tool_call" { + if r, _ := l.Payload["args_redacted"].(bool); r { + foundRedacted = true + } + if a, _ := l.Payload["args"].(string); strings.Contains(a, "thought") { + foundThink = true + } + } + } + if !foundRedacted { + t.Error("secret tool_call should carry args_redacted=true") + } + if !foundThink { + t.Error("non-secret tool args should be logged verbatim") + } +} diff --git a/audit/sink.go b/audit/sink.go new file mode 100644 index 0000000..44e1802 --- /dev/null +++ b/audit/sink.go @@ -0,0 +1,81 @@ +package audit + +import ( + "context" + "log/slog" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/run" +) + +// Sink adapts an audit Storage to the run.Audit port: StartRun opens a run row +// and returns a per-run recorder (a Writer) that the executor feeds with steps, +// tool calls, and the terminal roll-up. This is what plugs the audit battery +// into run.Ports.Audit — mort backs it with its GORM Storage, a light host with +// Memory() (or omits it entirely). +type Sink struct{ storage Storage } + +// NewSink wraps a Storage as a run.Audit. A nil Storage yields a Sink whose +// StartRun returns nil (the executor then records nothing) — so NewSink(nil) is +// equivalent to leaving run.Ports.Audit unset. +func NewSink(storage Storage) *Sink { return &Sink{storage: storage} } + +// compile-time proof the adapter satisfies the core seams. +var ( + _ run.Audit = (*Sink)(nil) + _ run.RunRecorder = (*recorder)(nil) +) + +// StartRun records the run start and returns a recorder. Implements run.Audit. +func (s *Sink) StartRun(ctx context.Context, info run.RunInfo) run.RunRecorder { + if s == nil || s.storage == nil { + return nil + } + started := info.StartedAt + if started.IsZero() { + started = time.Now() + } + // Best-effort: a failed StartRun must not break the user-visible run, but we + // surface it (a swallowed failure leaves orphan log events with no run row). + if err := s.storage.StartRun(ctx, SkillRun{ + ID: info.RunID, + SkillID: info.SubjectID, + CallerID: info.CallerID, + ChannelID: info.ChannelID, + ParentRunID: info.ParentRunID, + Inputs: info.Inputs, + StartedAt: started, + Status: "running", + }); err != nil { + slog.Warn("audit: StartRun failed; the run row is missing so its log events will orphan", + "run_id", info.RunID, "error", err) + } + return &recorder{w: NewWriter(s.storage, info.RunID)} +} + +// recorder adapts a *Writer to run.RunRecorder, converting run.RunStats to the +// audit RunStats on Close (the two have identical fields). +type recorder struct{ w *Writer } + +func (r *recorder) TokenStats() (in, out, thinking int64) { return r.w.TokenStats() } +func (r *recorder) ToolCallsCount() int { return r.w.ToolCallsCount() } +func (r *recorder) OnStep(iter int, resp *llm.Response) { r.w.OnStep(iter, resp) } +func (r *recorder) OnTool(call llm.ToolCall, result string) { r.w.OnTool(call, result) } +func (r *recorder) LogEvent(eventType string, payload map[string]any) { + r.w.LogEvent(eventType, payload) +} +func (r *recorder) LogError(msg string) { r.w.LogError(msg) } +func (r *recorder) Close(ctx context.Context, s run.RunStats) { + r.w.Close(ctx, RunStats{ + Status: s.Status, + Output: s.Output, + Error: s.Error, + ToolCalls: s.ToolCalls, + RuntimeSeconds: s.RuntimeSeconds, + InputTokens: s.InputTokens, + OutputTokens: s.OutputTokens, + ThinkingTokens: s.ThinkingTokens, + }) +} diff --git a/audit/storage.go b/audit/storage.go new file mode 100644 index 0000000..481c653 --- /dev/null +++ b/audit/storage.go @@ -0,0 +1,245 @@ +// Package skillaudit persists skill execution traces: per-run summary rows +// (skill_runs) and per-step event logs (skill_run_logs). The executor in +// pkg/logic/skillexec emits events through a Writer; the storage layer is +// kept separate so tests can mock it and so retention pruning has a clear +// home. +// +// Why: agentic runs can be long, multi-tool affairs. Without a structured +// audit trail, debugging "why did the LLM do that?" is impossible. The +// log table is keyed by (run_id, sequence) so insert order is preserved. +package audit + +import ( + "context" + "errors" + "time" +) + +// ErrNotFound is returned when a run lookup fails. +var ErrNotFound = errors.New("skill run not found") + +// SkillRun is the per-invocation summary row. One per call to +// Executor.Run. Status transitions through running → ok / error / +// timeout / budget_exceeded / dry_run. +type SkillRun struct { + ID string + SkillID string + CallerID string + ChannelID string + Inputs map[string]any + StartedAt time.Time + FinishedAt *time.Time + Status string // running|ok|error|timeout|budget_exceeded|dry_run + Output string + Error string + ToolCallsCount int + RuntimeSeconds float64 + // ParentRunID is the run_id of the parent skill that invoked this + // run via skill_invoke. Empty for top-level invocations. Indexed + // in the gorm model so call-tree queries (ListChildrenByParent + + // WalkParentChain) are cheap. + ParentRunID string + + // Token roll-ups, summed across all model completions in this run + // (one Usage per OnStep). All default to 0 when the provider did + // not expose token usage. + TotalInputTokens int64 + TotalOutputTokens int64 + TotalThinkingTokens int64 +} + +// RunStats captures the terminal state of a run for FinishRun. Bundling +// these into one struct (vs a long positional argument list) keeps +// callers readable; future fields slot in here without touching every +// call site. +// +// Why: FinishRun originally took six positional args; adding token +// columns would push it higher. A struct is the idiomatic Go way to +// avoid the positional-arg explosion. +type RunStats struct { + Status string // ok|error|timeout|budget_exceeded|dry_run + Output string // final agent output (empty on error) + Error string // error message (empty on success) + ToolCalls int // total OnTool count + RuntimeSeconds float64 // wall-clock duration + + // Token roll-ups (all default to 0 when token usage was not + // exposed by the provider). + InputTokens int64 + OutputTokens int64 + ThinkingTokens int64 +} + +// SkillRunLog is one event recorded during a run. EventType ∈ +// step|tool_call|tool_result|error. Payload is opaque JSON the writer +// emits. +type SkillRunLog struct { + RunID string + Sequence int + EventType string + Payload map[string]any + CreatedAt time.Time +} + +// RunFilter is the predicate bundle for the cross-surface "recent runs" +// query (ListRunsFiltered / CountRunsFiltered). Every field is optional; +// the zero value matches the most recent runs across ALL audited surfaces +// (agents + skills). This powers the admin agent-trace debug view and the +// Claude debug API's /runs list. +// +// Why a struct (vs positional args): the debug list filters along several +// independent axes and more will be added; bundling avoids a positional +// explosion and keeps call sites readable. +type RunFilter struct { + Status string // exact status match; "" = all (dry_run excluded unless IncludeDryRun) + SkillID string // exact skill_id (holds the agent UUID for agent runs) + CallerID string // exact caller (Discord member id) + ChannelID string // exact channel id + + // TopLevelOnly restricts to root runs (parent_run_id = ''), hiding + // nested sub-agent / sub-skill runs from the firehose. The debug list + // defaults this on; an "include nested" toggle clears it. + TopLevelOnly bool + + // IncludeDryRun surfaces status="dry_run" sandbox rows, which are + // excluded by default. Ignored when Status is set explicitly (an + // explicit Status=="dry_run" still matches). + IncludeDryRun bool + + // Since / Until bound started_at: started_at >= Since (zero = no lower + // bound) and started_at < Until (zero = no upper bound). + Since time.Time + Until time.Time +} + +// Storage is the persistence interface for skill runs and per-step logs. +// +// Why: tests substitute fake implementations; production wires +// NewGormStorage. Keep the interface narrow — the system only needs CRUD +// plus the retention prune helper. +type Storage interface { + Initialize(ctx context.Context) error + + // StartRun inserts the run with status=running. The caller MUST + // invoke FinishRun later (or the row stays in running indefinitely + // — operationally that signals a crash mid-run, which is useful + // signal). + StartRun(ctx context.Context, run SkillRun) error + + // FinishRun updates the running row with terminal status, output + // and stats. Idempotent on second call (last write wins). + // + // V5: takes a RunStats struct so token + cost columns can be + // written alongside the legacy fields without changing the + // signature for every future addition. + FinishRun(ctx context.Context, runID string, stats RunStats) error + + // AppendLog adds one event to the run's log. Sequence numbers must + // be unique per run; the writer is responsible for monotonic + // ordering. + AppendLog(ctx context.Context, log SkillRunLog) error + + // GetRun returns the run summary, or ErrNotFound. + GetRun(ctx context.Context, runID string) (*SkillRun, error) + + // ListLogsByRun returns all logs for a run in sequence order. + ListLogsByRun(ctx context.Context, runID string) ([]SkillRunLog, error) + + // ListRunsBySkill returns recent runs for a skill, newest first, + // capped at limit. Excludes dry-run rows by default — use + // ListRunsBySkillPaginated with includeDryRun=true to see them. + ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) + + // ListRunsBySkillPaginated returns recent runs for a skill, newest + // first, with offset+limit. When includeDryRun is false, rows with + // status="dry_run" are excluded (matches the wizard's sandbox + // status; see skillaudit.Writer / wizardtools docs). + // + // Why a separate paginated method vs. expanding ListRunsBySkill: + // callers that need the legacy "last N" view (Discord .skill runs, + // chatbot tool result) want the simpler signature; the paginated + // view is webui-specific. + ListRunsBySkillPaginated(ctx context.Context, skillID string, + offset, limit int, includeDryRun bool) ([]SkillRun, error) + + // CountRunsBySkill returns the total number of runs for a skill. + // When includeDryRun is false, dry-run rows are excluded so the + // count matches the default ListRunsBySkillPaginated result. + CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error) + + // ListRunsByCaller returns recent runs by a caller, newest first, + // capped at limit. + ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]SkillRun, error) + + // ListRunsFiltered returns runs matching f, newest first + // (started_at DESC), with offset+limit. With an all-zero filter it + // returns the most recent runs across EVERY audited surface (agents + + // skills) — the cross-surface feed behind the admin agent-trace debug + // view and the Claude debug API. dry_run rows are excluded unless + // f.IncludeDryRun or f.Status=="dry_run". limit is clamped (<=0 or + // >500 → 50) to bound admin scans. + ListRunsFiltered(ctx context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) + + // CountRunsFiltered returns the total rows matching f (ignoring + // offset/limit), for pagination math. + CountRunsFiltered(ctx context.Context, f RunFilter) (int64, error) + + // PurgeOlderThan deletes runs (and their logs) whose StartedAt is + // strictly before t. Returns the number of runs deleted. + PurgeOlderThan(ctx context.Context, t time.Time) (int64, error) + + // ListChildrenByParent returns all SkillRun rows where + // parent_run_id == parentRunID, oldest first. Used for the + // call-tree view (skill_invoke trace section) and as a building + // block for WalkParentChain. + // + // Returns an empty slice when parentRunID has no children. An + // empty parentRunID never matches anything (no row stores "" + // as a parent — that's the top-level sentinel). + ListChildrenByParent(ctx context.Context, parentRunID string) ([]SkillRun, error) + + // WalkParentChain walks from runID up via parent_run_id, returning + // the chain of SkillRun summaries (oldest = root first, newest = + // runID last). Used by the loop guard in skill_invoke. + // + // Cap walk depth at 32 to prevent pathological loops in the data + // itself: if the parent_run_id chain has been corrupted (e.g. by + // a bad migration) and forms a cycle, we want a bounded result + // rather than an infinite loop. + WalkParentChain(ctx context.Context, runID string) ([]SkillRun, error) + + // ListFinishedRunsBefore returns runs whose FinishedAt is strictly + // before cutoff, oldest first, capped at limit. limit <= 0 yields + // no rows (the caller is expected to specify a real bound). + // + // Why: skills.StorageSweeper drives the run-scope storage purge from + // this query. The sweeper picks up only finished runs so an + // in-flight run's run-scope KV/files cannot be deleted out from + // under it. + // + // Test: storage_test.go covers the include/exclude boundaries + // (running rows excluded; finished-after-cutoff excluded; finished- + // before-cutoff included). + ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]SkillRun, error) + + // LastRunBySkills returns the most recent StartedAt timestamp per + // skill in the input ID list. Skills with no rows simply have no + // entry in the result map (caller distinguishes "never run" from + // "run but no timestamp" by map key presence). + // + // When includeFailed is true, all non-dry-run statuses count + // (ok / error / timeout / budget_exceeded / preempted / lane_busy). + // When false, only status="ok" rows count — useful for "last + // successful run" semantics on dashboards where errored runs + // shouldn't surface as recent activity. + // + // Empty skillIDs short-circuits to an empty map without touching + // the DB. + LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) +} + +// MaxParentChainDepth is the safety cap for WalkParentChain. The loop +// guard in skill_invoke enforces a separate (smaller) MaxInvokeDepth +// at the tool layer; this cap exists only to bound the walk in the +// presence of corrupted data. +const MaxParentChainDepth = 32 diff --git a/audit/writer.go b/audit/writer.go new file mode 100644 index 0000000..986e3c3 --- /dev/null +++ b/audit/writer.go @@ -0,0 +1,359 @@ +package audit + +import ( + "context" + "fmt" + "log/slog" + "strings" + "sync" + "sync/atomic" + "time" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// stepTextMax caps the per-step assistant-text preview persisted on a +// "step" event. Large enough to capture the model's reasoning around a +// (mis)fired tool call — the single best clue to WHY a model emitted a +// malformed call — but bounded so the longtext payload can't balloon. +const stepTextMax = 2000 + +// Writer wraps a Storage with the OnStep / OnTool callbacks suitable for +// wiring into the majordomo agent loop's step observer, tracking sequence +// numbers and tool-call counts internally. +// +// Why: the agent loop's observer hooks are unaware of run identity; the +// writer captures the runID + skill metadata at construction so the +// per-event callbacks stay simple. AppendLog failures are logged but +// never fatal — audit must not break user-visible execution. +// +// What: NewWriter(storage, runID) → use OnStep / OnTool / Close. Close +// records the final FinishRun. The executors translate each agent.Step +// into one OnStep call (1-indexed iteration, the step's *llm.Response) +// plus one OnTool call per executed tool. +// +// Test: see writer_test.go for sequence ordering and finish semantics. +type Writer struct { + storage Storage + runID string + sequence atomic.Int32 + calls atomic.Int32 + mu sync.Mutex // guards Close idempotency + token tally + closed bool + + // V5 token accumulator — summed across each OnStep's resp.Usage. + // Reads come from TokenStats() so the executor can pass them to + // FinishRun. atomics-on-Int64 would also work, but mu already + // guards Close + we need consistent multi-field reads anyway + // (input + output + thinking). The mutex hot-path overhead is + // negligible vs the LLM call latency that dominates step time. + inputTokens int64 + outputTokens int64 + thinkingTokens int64 + + // Per-step wall-clock + run-level model attribution (guarded by mu). + // startedAt anchors the first step's duration; lastStepAt is the + // previous step's observation time; resolvedModelLogged ensures the + // one-shot "resolved_model" run-level event fires at most once. + startedAt time.Time + lastStepAt time.Time + resolvedModelLogged bool +} + +// NewWriter constructs a Writer. The caller is expected to have already +// called Storage.StartRun. +func NewWriter(storage Storage, runID string) *Writer { + return &Writer{storage: storage, runID: runID, startedAt: time.Now()} +} + +// OnStep records one agent-loop step: a "step" event with the iteration +// number and the response's text size. +// +// V5: also tallies per-step token usage. majordomo populates +// resp.Usage when the provider reports it; for providers that don't, +// the fields stay 0 and the tally stays at 0 — the formatter then +// renders "—" rather than a misleading "$0.00". +// +// Why we tally here vs in the agent loop: the loop's Result.Usage is a +// run total; the audit row needs the same numbers, but the writer also +// serves the live RunState accessor mid-run, so a per-step running sum +// is the right shape. Global usage attribution is handled by the llms +// package's instrumented models — the writer tally is strictly the +// per-run audit roll-up. +func (w *Writer) OnStep(iter int, resp *llm.Response) { + if w == nil || w.storage == nil { + return + } + now := time.Now() + payload := map[string]any{"iter": iter} + + w.mu.Lock() + // Per-step wall-clock: time since the previous observed step, or since + // run start for the first step. A long gap localises a slow/hung model + // call — the signal that was missing when an animate step-0 call hung + // ~5 min. NOTE: this is step-to-step wall time (model call + the prior + // step's tool execution), not pure model latency. + prev := w.lastStepAt + if prev.IsZero() { + prev = w.startedAt + } + if !prev.IsZero() { + payload["step_ms"] = now.Sub(prev).Milliseconds() + } + w.lastStepAt = now + if resp != nil { + w.inputTokens += int64(resp.Usage.InputTokens) + w.outputTokens += int64(resp.Usage.OutputTokens) + // Thinking/reasoning tokens are a first-class Usage field in + // majordomo (populated by the providers that report them). + w.thinkingTokens += int64(resp.Usage.ReasoningTokens) + } + // One-shot run-level served-model attribution: the FIRST step with a + // resolved model name emits a "resolved_model" event so a run that + // errors before producing a useful step still records which model + // served it. resp.Model is failover-aware ("provider/model-id" of the + // element that actually served), unlike the static configured head. + logResolvedModel := "" + if resp != nil && resp.Model != "" && !w.resolvedModelLogged { + w.resolvedModelLogged = true + logResolvedModel = resp.Model + } + w.mu.Unlock() + + if resp != nil { + payload["text_len"] = len(resp.Text()) + // Served model + why generation stopped — the two scalars that turn + // a "model misbehaved" guess into a fact. finish_reason on an + // empty-tool-call step disambiguates truncation (length) from a + // deliberate empty emission (tool_calls). + if resp.Model != "" { + payload["model"] = resp.Model + } + if resp.FinishReason != "" { + payload["finish_reason"] = string(resp.FinishReason) + } + // Per-step token breakdown (OnStep already reads these into the run + // total above; persisting the per-step slice costs nothing more). + payload["in_tokens"] = resp.Usage.InputTokens + payload["out_tokens"] = resp.Usage.OutputTokens + if resp.Usage.ReasoningTokens > 0 { + payload["thinking_tokens"] = resp.Usage.ReasoningTokens + } + if resp.Usage.CacheReadTokens > 0 { + payload["cache_read_tokens"] = resp.Usage.CacheReadTokens + } + // The model's own narration accompanying this step — the smoking gun + // for WHY a malformed tool call was emitted. Capped; suppressed when + // the step fired a secret-bearing tool (mcp_call/email_send/http_*) + // whose narration could echo the secret it's about to send. + if t := strings.TrimSpace(resp.Text()); t != "" { + if stepHasSecretTool(resp) { + payload["text_redacted"] = true + } else { + payload["text"] = truncate(t, stepTextMax) + } + } + } else { + payload["text_len"] = 0 + } + + w.appendLog("step", payload) + + if logResolvedModel != "" { + w.appendLog("resolved_model", map[string]any{"model": logResolvedModel}) + } +} + +// stepHasSecretTool reports whether a step's response fired a tool whose +// surrounding narration could leak a secret (MCP args, email body/ +// recipients, raw HTTP request). Mirrors the steps.go redaction list so +// the audit trace never persists secret-adjacent assistant text. +// isSecretTool reports whether a tool's arguments/results may carry secrets +// (MCP args, email bodies/recipients, HTTP auth headers/bodies) and so must be +// redacted from the persisted audit log. Single source of truth for both the +// step-narration redaction and the OnTool arg/result redaction. NOTE: this is +// a name-prefix allowlist — a NEW secret-bearing tool must be added here or its +// args/results will be logged verbatim. +func isSecretTool(name string) bool { + switch name { + case "mcp_call", "email_send": + return true + } + return strings.HasPrefix(name, "http_") +} + +func stepHasSecretTool(resp *llm.Response) bool { + if resp == nil { + return false + } + for _, c := range resp.ToolCalls { + if isSecretTool(c.Name) { + return true + } + } + return false +} + +// TokenStats returns the running totals tallied from OnStep. +// Safe to call concurrently. Returned values are a snapshot at call +// time. Used by the executors to populate RunStats before Close +// finalises the audit row. +// +// Why: the executor needs the totals AND a model name to compute cost, +// but cost calculation is a different concern from audit persistence. +// Exposing this getter lets the cost calculation live in the executor +// where the model is known. +func (w *Writer) TokenStats() (input, output, thinking int64) { + if w == nil { + return 0, 0, 0 + } + w.mu.Lock() + defer w.mu.Unlock() + return w.inputTokens, w.outputTokens, w.thinkingTokens +} + +// OnTool records a "tool_call" event with the tool name and a +// "tool_result" event with the result length. Tool count is incremented +// for each call. The executors call this once per executed tool call +// from their step observers (call + matching result content). +func (w *Writer) OnTool(call llm.ToolCall, result string) { + if w == nil || w.storage == nil { + return + } + w.calls.Add(1) + // Redact the args/result of secret-bearing tools — these fields actually + // CARRY the secret (MCP args, email body/recipients, HTTP auth/body), so + // logging them verbatim would defeat the OnStep narration redaction. + if isSecretTool(call.Name) { + w.appendLog("tool_call", map[string]any{ + "name": call.Name, + "id": call.ID, + "args_redacted": true, + "args_len": len(call.Arguments), + }) + w.appendLog("tool_result", map[string]any{ + "name": call.Name, + "id": call.ID, + "result_redacted": true, + "result_len": len(result), + }) + return + } + w.appendLog("tool_call", map[string]any{ + "name": call.Name, + "args": string(call.Arguments), + "id": call.ID, + }) + w.appendLog("tool_result", map[string]any{ + "name": call.Name, + "id": call.ID, + "result": truncate(result, 4000), + "truncated": len(result) > 4000, + }) +} + +// LogEvent records a custom event mid-run. The executor uses this for +// diagnostic events (e.g. "compaction_setup" / "compaction_fired") +// outside the canonical step / tool_call / tool_result / error set. +// Nil-safe: no-op when receiver or storage is nil. +// +// Why: skill_run_logs is the only sink Steve can read from SQL, so +// diagnostics intended for post-hoc debugging belong here. slog goes +// to mort.log which is harder to reach from outside the host. +func (w *Writer) LogEvent(eventType string, payload map[string]any) { + if w == nil || w.storage == nil { + return + } + w.appendLog(eventType, payload) +} + +// LogError records an "error" event mid-run. Distinct from the terminal +// status set by Close. +func (w *Writer) LogError(msg string) { + if w == nil || w.storage == nil { + return + } + w.appendLog("error", map[string]any{"message": msg}) +} + +// Close finishes the run. The caller assembles a RunStats; the writer +// fills in ToolCalls (which is bookkept on the writer itself) and +// hands the full record to FinishRun. +// +// Idempotent: subsequent calls are no-ops. +// +// Why a struct vs the old positional form: v5 adds four token + cost +// fields on top of the legacy six. The struct keeps call sites readable +// and lets future fields slot in without churning every caller. +// +// Why context.WithoutCancel: the run's terminal status MUST land in +// the audit row regardless of the run ctx's state. Pre-fix, child +// skill runs invoked via skill_invoke / skill_invoke_parallel inherited +// the parent agent's runCtx as their outer ctx; when the parent +// timed out at MaxRuntime, every in-flight child's FinishRun fired +// with that already-cancelled ctx and the row was left in +// status=running forever. Detaching here is defence in depth — the +// caller (skillexec.runInner / agentexec.runInner) ALSO detaches at +// the call site, but a cancelled ctx in the writer's hands MUST NOT +// drop the audit write. The short timeout (auditFinishTimeout) bounds +// the write so a hung DB doesn't pin the run goroutine indefinitely. +func (w *Writer) Close(ctx context.Context, stats RunStats) { + if w == nil || w.storage == nil { + return + } + w.mu.Lock() + defer w.mu.Unlock() + if w.closed { + return + } + w.closed = true + stats.ToolCalls = int(w.calls.Load()) + // Detach from the caller's deadline + cancellation. Run cleanup + // must complete even when the run ctx is dead. The fresh + // auditFinishTimeout caps how long we'll wait on the storage. + finishCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), auditFinishTimeout) + defer cancel() + if err := w.storage.FinishRun(finishCtx, w.runID, stats); err != nil { + slog.Warn("skillaudit: FinishRun failed", "run_id", w.runID, "error", err) + } +} + +// auditFinishTimeout caps how long Close will wait on the storage's +// FinishRun call after detaching from the caller's ctx. 10s is generous +// for a single-row UPDATE against MySQL — anything longer suggests a +// hung connection that the run goroutine shouldn't keep waiting on. +const auditFinishTimeout = 10 * time.Second + +// auditAppendTimeout bounds each per-event AppendLog on the hot path so a hung +// storage backend can't block the run goroutine. +const auditAppendTimeout = 3 * time.Second + +// ToolCallsCount returns how many tool invocations OnTool has seen so +// far. Useful for budget enforcement. +func (w *Writer) ToolCallsCount() int { return int(w.calls.Load()) } + +func (w *Writer) appendLog(eventType string, payload map[string]any) { + seq := int(w.sequence.Add(1)) + log := SkillRunLog{ + RunID: w.runID, + Sequence: seq, + EventType: eventType, + Payload: payload, + CreatedAt: time.Now(), + } + // Bound the write: a hung storage backend must not block the run goroutine + // on the hot path (every step/tool event flows through here). Detached from + // any caller deadline — the log write is independent of the run's context. + ctx, cancel := context.WithTimeout(context.Background(), auditAppendTimeout) + defer cancel() + if err := w.storage.AppendLog(ctx, log); err != nil { + slog.Warn("skillaudit: AppendLog failed", "run_id", w.runID, "seq", seq, "type", eventType, "error", err) + } +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + fmt.Sprintf("…[+%d bytes]", len(s)-max) +} diff --git a/budget/budget.go b/budget/budget.go new file mode 100644 index 0000000..5359539 --- /dev/null +++ b/budget/budget.go @@ -0,0 +1,169 @@ +// Package budget gates and meters per-caller resource use over a rolling +// 7-day window (run.Ports.Budget). DBBudget is the durable tracker; NoOpBudget +// disables metering; the BudgetStorage seam backs it (Memory / contrib SQLite). +// loop (gitea.stevedudenhoeffer.com/steve/majordomo/agent). +// +// Why: a Skill is data; the executor turns data into a running agent +// (resolve model, build toolbox, start audit, run the agent loop, +// finish audit, deliver). +package budget + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" +) + +// BudgetTracker enforces per-user GPU budgets in v2. v1 ships +// NoOpBudget which always allows. The interface exists now so the v2 +// migration is a single line in the executor. +// +// Why interface now: the executor's Check/Commit calls would need to +// be added in v2 anyway; doing it now means v2 only swaps NoOp for +// DBBudget without touching call sites. +type BudgetTracker interface { + // Check reports whether the caller has remaining budget. Returns + // nil for "yes" or an error describing the exhaustion. + Check(ctx context.Context, callerID string) error + + // Commit records that the caller spent runtimeSeconds of budget on + // this run. Called after the agent completes (success or error). + Commit(ctx context.Context, callerID string, runtimeSeconds float64) +} + +// NoOpBudget always allows and never records. v1 default. +type NoOpBudget struct{} + +// NewNoOpBudget constructs the no-op tracker. +func NewNoOpBudget() BudgetTracker { return NoOpBudget{} } + +// Check always returns nil. +func (NoOpBudget) Check(_ context.Context, _ string) error { return nil } + +// Commit is a no-op. +func (NoOpBudget) Commit(_ context.Context, _ string, _ float64) {} + +// ErrBudgetExceeded is returned by DBBudget.Check when the caller's +// 7-day rolling window has hit the convar-configured cap. +// +// Why a sentinel: callers (executor, audit writer) need to distinguish +// budget rejection from generic errors so they can record +// status="budget_exceeded" instead of "error" and skip user-visible +// delivery side-effects. +var ErrBudgetExceeded = errors.New("weekly skill budget exceeded") + +// BudgetNotifier is the optional callback DBBudget invokes when a +// Check rejects a caller. Production wires a Discord-DM hook so the +// user knows why their skill failed; tests inject a recorder. +// +// nil is allowed and is silently skipped. +type BudgetNotifier func(ctx context.Context, userID string, secondsUsed, cap float64) + +// DBBudget enforces per-user weekly GPU budgets via the BudgetStorage +// interface. The "weekly" cap is a rolling 7-day window — see +// SkillBudget for the rollover semantics. +// +// Why a closure for the limit instead of an int field: the cap comes +// from a runtime convar. Reading it on every Check means a `.convar +// set skills.user_budget_seconds_per_week 7200` takes effect on the +// next call without restarting the bot or rewiring the executor. +type DBBudget struct { + storage BudgetStorage + // weeklyLimit returns the current cap in seconds. Reads convar at + // every Check so a runtime convar bump takes effect on the next + // call. + weeklyLimit func() float64 + + // notify is called when a Check rejects a caller. Optional — + // production wires a Discord-DM hook so the user knows why their + // skill failed. nil-safe. + notify BudgetNotifier + + // now is the time source. Test injects a fake clock; production + // uses time.Now. + now func() time.Time +} + +// NewDBBudget constructs a DBBudget. now may be nil — defaults to +// time.Now. +// +// Why time injection: budget rollover is time-sensitive; tests need to +// fast-forward past the 7-day boundary deterministically. now=nil +// means production callers (mort.go) don't have to think about it. +// +// Test: pass a closure that returns a fixed instant; assert rollover +// only happens when (now - WindowStart) >= 7 days. +func NewDBBudget(storage BudgetStorage, weeklyLimit func() float64, notify BudgetNotifier, now func() time.Time) *DBBudget { + if now == nil { + now = time.Now + } + return &DBBudget{ + storage: storage, + weeklyLimit: weeklyLimit, + notify: notify, + now: now, + } +} + +// Check returns ErrBudgetExceeded if the caller has spent at least +// weeklyLimit seconds in the current rolling 7-day window. +// +// Why anonymous callerID="" is unbudgeted: scheduler-driven and +// system-initiated runs don't have a Discord user to bill; charging +// "system" would conflate them with a real user. The scheduler sets +// CallerID to the skill owner where applicable, so cron-loop +// abusiveness still consumes the owner's budget. +// +// Why cap<=0 means "disabled": operator wants a runtime kill-switch. +// Setting the convar to "0" turns enforcement off without restart. +// +// Test: Get returns nil → Check returns nil; Get returns row with +// SecondsUsed >= cap → Check returns ErrBudgetExceeded and notify is +// invoked; window expired (>=7d) → Check returns nil regardless of +// SecondsUsed. +func (b *DBBudget) Check(ctx context.Context, callerID string) error { + if callerID == "" { + return nil + } + bud, err := b.storage.Get(ctx, callerID) + if err != nil { + return fmt.Errorf("budget: %w", err) + } + if bud != nil { + if b.now().Sub(bud.WindowStart) < budgetWindow { + cap := b.weeklyLimit() + if cap > 0 && bud.SecondsUsed >= cap { + if b.notify != nil { + b.notify(ctx, callerID, bud.SecondsUsed, cap) + } + return ErrBudgetExceeded + } + } + } + return nil +} + +// Commit records the run's runtime against the caller's budget. +// Failures are logged but never returned — budget accounting must +// not break user-visible execution. +// +// Why callerID="" is a no-op: matches Check's anonymous-caller +// shortcut; system runs don't get billed. +// +// Why runtimeSeconds<=0 is a no-op: a run that errored before +// resolving a model has wallSecs near 0 in floating-point terms but +// can also be exactly 0 (synthetic test fixtures). Skipping avoids +// spurious 0-runs rows from short-lived failures. +// +// Test: Commit(50) → Get reports SecondsUsed=50; storage failure +// surfaces only as a slog.Warn (no panic, no return). +func (b *DBBudget) Commit(ctx context.Context, callerID string, runtimeSeconds float64) { + if callerID == "" || runtimeSeconds <= 0 { + return + } + if err := b.storage.Add(ctx, callerID, runtimeSeconds, b.now()); err != nil { + slog.Warn("skills budget: commit failed", "user", callerID, "error", err) + } +} diff --git a/budget/budget_test.go b/budget/budget_test.go new file mode 100644 index 0000000..d47a7ef --- /dev/null +++ b/budget/budget_test.go @@ -0,0 +1,44 @@ +package budget + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestDBBudgetRollingWindow(t *testing.T) { + ctx := context.Background() + mem := NewMemory() + now := time.Now() + clock := func() time.Time { return now } + b := NewDBBudget(mem, func() float64 { return 100 }, nil, clock) + + // Under cap: allowed. + if err := b.Check(ctx, "u"); err != nil { + t.Fatalf("fresh caller should pass: %v", err) + } + b.Commit(ctx, "u", 60) + if err := b.Check(ctx, "u"); err != nil { + t.Fatalf("60/100 should pass: %v", err) + } + // Over cap: rejected. + b.Commit(ctx, "u", 50) // 110 total + if err := b.Check(ctx, "u"); !errors.Is(err, ErrBudgetExceeded) { + t.Fatalf("110/100 should be ErrBudgetExceeded, got %v", err) + } + // Window rolls over after 7 days: allowed again. + now = now.Add(8 * 24 * time.Hour) + b.Commit(ctx, "u", 1) // triggers rollover inside Add + if err := b.Check(ctx, "u"); err != nil { + t.Fatalf("after window rollover should pass: %v", err) + } +} + +func TestNoOpBudgetAlwaysAllows(t *testing.T) { + b := NewNoOpBudget() + if err := b.Check(context.Background(), "anyone"); err != nil { + t.Fatalf("NoOp must always allow: %v", err) + } + b.Commit(context.Background(), "anyone", 1e9) // no-op +} diff --git a/budget/memory.go b/budget/memory.go new file mode 100644 index 0000000..30745b5 --- /dev/null +++ b/budget/memory.go @@ -0,0 +1,56 @@ +package budget + +import ( + "context" + "sync" + "time" +) + +// Memory is a zero-dependency in-process BudgetStorage: per-user rolling-window +// usage held in memory (lost on restart). The default behind DBBudget for a +// light host or tests; mort uses its GORM Storage, contrib/store adds SQLite. +type Memory struct { + mu sync.RWMutex + rows map[string]*SkillBudget +} + +// NewMemory returns an empty in-memory BudgetStorage. +func NewMemory() *Memory { return &Memory{rows: map[string]*SkillBudget{}} } + +var _ BudgetStorage = (*Memory)(nil) + +func (m *Memory) Initialize(context.Context) error { return nil } + +func (m *Memory) Get(_ context.Context, userID string) (*SkillBudget, error) { + m.mu.RLock() + defer m.mu.RUnlock() + r, ok := m.rows[userID] + if !ok { + return nil, nil + } + cp := *r // copy out so callers can't mutate our row + return &cp, nil +} + +func (m *Memory) Add(_ context.Context, userID string, secondsUsed float64, now time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + r, ok := m.rows[userID] + if !ok { + m.rows[userID] = &SkillBudget{ + UserID: userID, WindowStart: now, + SecondsUsed: secondsUsed, RunsCount: 1, UpdatedAt: now, + } + return nil + } + // Roll the window over if it's older than the window length. + if now.Sub(r.WindowStart) >= budgetWindow { + r.WindowStart = now + r.SecondsUsed = 0 + r.RunsCount = 0 + } + r.SecondsUsed += secondsUsed + r.RunsCount++ + r.UpdatedAt = now + return nil +} diff --git a/budget/run.go b/budget/run.go new file mode 100644 index 0000000..3cef64a --- /dev/null +++ b/budget/run.go @@ -0,0 +1,9 @@ +package budget + +import "gitea.stevedudenhoeffer.com/steve/executus/run" + +// The budget trackers plug directly into run.Ports.Budget (Check/Commit match). +var ( + _ run.Budget = NoOpBudget{} + _ run.Budget = (*DBBudget)(nil) +) diff --git a/budget/storage.go b/budget/storage.go new file mode 100644 index 0000000..529b660 --- /dev/null +++ b/budget/storage.go @@ -0,0 +1,33 @@ +package budget + +import ( + "context" + "time" +) + +// BudgetStorage is the persistence seam behind DBBudget: one budget row per +// user, with an atomic Add that rolls the 7-day window over transparently. Mort +// backs this with GORM/MySQL (the skill_budgets table); Memory() is the +// zero-dependency default; contrib/store adds a durable SQLite one. +type BudgetStorage interface { + // Initialize runs any schema setup. Safe to call repeatedly. + Initialize(ctx context.Context) error + // Get returns the user's current budget row, or (nil, nil) if none exists. + Get(ctx context.Context, userID string) (*SkillBudget, error) + // Add increments seconds_used + runs_count atomically, rolling the window + // over when WindowStart is older than 7 days (reset to now, fresh count). + // Creates the row if absent. + Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error +} + +// SkillBudget is one user's rolling-window usage row. +type SkillBudget struct { + UserID string + WindowStart time.Time + SecondsUsed float64 + RunsCount int + UpdatedAt time.Time +} + +// budgetWindow is the rolling window length the storage rolls over at. +const budgetWindow = 7 * 24 * time.Hour diff --git a/go.mod b/go.mod index c8ea9c0..7463def 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( gitea.stevedudenhoeffer.com/steve/majordomo v0.0.0-20260626223738-1fd7109a42f3 github.com/google/uuid v1.6.0 golang.org/x/crypto v0.53.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( diff --git a/go.sum b/go.sum index 22e9ecc..a7e5c65 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,7 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/persona/agent.go b/persona/agent.go new file mode 100644 index 0000000..19fbce0 --- /dev/null +++ b/persona/agent.go @@ -0,0 +1,191 @@ +// Package agents implements the Agent noun: a persisted persona + +// execution spec + palette of skills/sub-agents/low-level tools, with +// optional trigger metadata (schedule, webhook, chatbot channel +// listener) and personalization sources. +// +// Phase 1 scope: this package introduces Agent as a persisted noun +// with CRUD only — no execution path, no palette resolution, no +// trigger handling. See /Users/steve/.claude/plans/serene-churning-micali.md +// for the staged rollout. Later phases add agentexec, agent_invoke, +// trigger dispatch (schedule/webhook/chatbot), and CommandBinding. +// +// The three-layer storage pattern from pkg/logic/storage/CLAUDE.md +// applies — when adding a field to Agent, you MUST update +// pkg/logic/storage/agents.go (gormAgent, agentFromStorage, +// toStorage) or persistence will silently break. +package persona + +import "time" + +// Agent is the domain definition of an Agent persona + execution spec. +// +// Why: an Agent is the "configured invoker" — model tier + base +// system prompt + a palette of capabilities (skills, sub-agents, +// low-level tools) it may exercise during a run. Where a Skill is a +// reusable parameterised callable (a library function), an Agent is +// the actor with a persistent persona that calls those skills. The +// struct is flat — every field lives on its own column on the +// agents table; JSON columns are used only for variable-length +// collections (palette lists, tags, etc.). +// +// What: identity + persona + execution caps + palette + triggers + +// personalization + UX, all on one struct. Several field families +// (Palette, Triggers, Personalization) are persisted now but NOT +// exercised until later phases — they exist so the schema is stable +// and future phases can light up behaviour without DB migrations. +// +// Test: see pkg/logic/agents/storage_round_trip_test.go for +// Save/Get/GetByName/List/Delete coverage. +type Agent struct { + // Identity + ID string // UUID + Name string // unique per OwnerID; human-friendly identifier + Description string + OwnerID string // Discord member ID + AuthoredBy string // Discord member ID; usually == OwnerID + Version int // monotonic, for future versioning + CreatedAt time.Time + UpdatedAt time.Time + + // Extends names the parent agent this agent inherits from. Only used + // during builtin loading — the loader resolves extends references and + // merges fields before persisting. The resolved agent is a standalone + // entity; Extends is NOT persisted in the database. Only single-level + // extends is supported (no chains). + Extends string + + // SystemPromptPrepend, when non-empty, is prepended to the system + // prompt (with a trailing newline separator). Used by the extends + // mechanism so a child agent can prepend persona instructions to the + // parent's full system prompt without duplicating it. Like Extends, + // this is resolved at load time and NOT persisted — the final + // SystemPrompt on the persisted Agent already has the prepend applied. + SystemPromptPrepend string + + // Persona / execution spec + ModelTier string // "fast" | "standard" | "thinking" | provider/model + SystemPrompt string // base persona prompt (Phase 5 layers personalization on top) + MaxIterations int // 0 → use convar default at execution time + MaxToolCalls int // 0 → use convar default at execution time + MaxRuntime time.Duration // stored as MaxRuntimeNs int64 in GORM (avoid duration-driver flakiness) + ExecutionLane string // lane name; empty = default at execution time + EncryptionEnabled bool // Phase 1 stores the flag; envelope-encryption bridge wires in a later phase + + // Run-critic (two-tier timeout). When CriticEnabled is false (the + // default) MaxRuntime is the hard kill, exactly as before. When true, + // MaxRuntime becomes a SOFT trigger: at MaxRuntime the run-critic + // activates and periodically reviews the run; the hard backstop (the + // absolute kill) is MaxRuntime × the multiplier. CriticBackstopMultiplier + // of 0 means "use the convar default" (agents.critic.backstop_multiplier_default, + // default 6×). See pkg/logic/agentcritic. + CriticEnabled bool + CriticBackstopMultiplier float64 + + // Palette — what this Agent may invoke (Phase 2 reads these). + // Stored as JSON arrays; not exercised by Phase 1 CRUD. + SkillPalette []string // skill IDs/names + SubAgentPalette []string // agent IDs/names + LowLevelTools []string // skilltools registry names + + // Personalization (Phase 5 reads these). Each layer name maps to + // a registered PersonalizationProvider that returns text appended + // to SystemPrompt at run time. Empty list = base prompt only. + PersonalizationSources []string + + // Triggers — persisted now, NOT dispatched by Phase 1. + // + // Schedule: cron expression or "daily"/"weekly" shorthand. Empty + // = on-demand only. NextRunAt + LastScheduledRunAt are scheduler + // bookkeeping for Phase 3's per-Agent scheduler. + Schedule string + NextRunAt *time.Time + LastScheduledRunAt *time.Time + + // Webhook trigger metadata. WebhookSecret empty = inbound + // webhooks disabled. WebhookSignatureRequired defaults true at + // save time (see Skill's lesson: don't store a GORM default on a + // bool where false is a legitimate explicit value — application + // layer is the source of truth). + WebhookSecret string + WebhookSignatureRequired bool + WebhookIPAllowlist []string // CIDR strings; stored as JSON array + + // Chatbot trigger metadata. ChatbotChannelFilter names a filter + // registered in pkg/logic/skills' ChannelFilterRegistry — when + // the migrated chatbot dispatches via this Agent, the filter + // gates which channels it listens in. + ChatbotChannelFilter string + + // UX + // + // DefaultEmoji is an optional identity emoji for this agent. + // Used as the __start__ fallback and forwarded to the invoking + // Discord message when a parent calls this agent via agent_invoke. + DefaultEmoji string + + // StateReactEmoji maps tool names (and reserved keys "__start__", + // "__end__", "__error__") to Discord emoji that the executor + // reacts with as the run progresses. Empty map = no reactions. + StateReactEmoji map[string]string + + // Tags is a free-form set of short labels for organisation + + // discovery on the agents list page (Phase 1 admin commands + + // future web UI). + Tags []string + + // Phases defines a multi-phase pipeline for this agent. When + // non-empty, the executor runs agentexec's sequential phase runner + // instead of the single agent loop. Empty = single-loop agent. + // + // Phases IS persisted (JSON struct-slice column `phases` on + // gormAgent). It used to be transient — "TOML is the only source of + // truth" — but every production dispatch path resolves the agent from + // the DB, where the dropped Phases meant research / deepresearch + // silently degraded to a single-loop run (the executor's + // `len(a.Phases) > 0` pipeline branch was dead). The builtin loader + // still seeds phases from YAML; persisting them is what makes the + // pipeline branch fire for DB-loaded agents. + Phases []AgentPhase +} + +// AgentPhase describes one stage of a multi-phase pipeline in an +// agent definition. Executed directly by agentexec's phase runner +// (pipeline.go) — there is no intermediate execution-spec struct. +// +// What: name + prompt template + model/iteration overrides + tool +// list + optional/fallback flags + IsRunFunc indicator. +// +// Test: see builtin_loader_test.go for YAML round-trip coverage. +type AgentPhase struct { + // Name identifies the phase (e.g., "scout", "plan", "investigate"). + Name string + + // SystemPrompt for this phase. Supports template variables: + // {{.Query}} for the original query, and {{.}} for + // prior phase outputs (e.g., {{.scout}}, {{.plan}}). + SystemPrompt string + + // ModelTier overrides the agent's ModelTier for this phase. + // Empty = use agent default. + ModelTier string + + // MaxIter overrides the agent's MaxIterations for this phase. + // 0 = use agent default. + MaxIter int + + // Tools are tool names for this phase only. These are resolved + // from the agent's low-level tools + palette at execution time. + Tools []string + + // Optional means errors in this phase don't abort the pipeline. + Optional bool + + // FallbackMessage is used when an optional phase fails. + // Default: "(Phase encountered an error)" + FallbackMessage string + + // IsRunFunc indicates this phase is a bare LLM call (no tool + // loop). When true, the executor makes a single model.Complete + // call instead of running the full agent loop. + IsRunFunc bool +} diff --git a/persona/builtin_loader.go b/persona/builtin_loader.go new file mode 100644 index 0000000..6e0b71d --- /dev/null +++ b/persona/builtin_loader.go @@ -0,0 +1,599 @@ +package persona + +// Phase 6 — Builtin Agent loader. +// +// Why: Phase 1-5 introduced the Agent noun, runtime, triggers, +// CommandBinding, and ChatBot bridge — but every Agent in production +// was either (a) a wrapper auto-migrated from a triggered Skill, or +// (b) admin-created via `.agent new`. There were no SHIPPED Agents +// authored as builtins. Phase 6 adds an idempotent boot-time loader so +// the repo can ship canonical Agent definitions (alongside the +// existing skills//skill.yml builtins) without manual admin +// creation per deploy. +// +// What: scans `/agents/*/agent.yml`, decodes each YAML +// into an Agent, and upserts via Storage.SaveAgent under the deterministic +// system owner ID "builtin". Skill-palette entries are validated AT LOAD +// TIME against the live skills storage; missing skills warn but do not +// fail the load (the skill might arrive later via a different code +// path, and runtime resolution happens at invocation time anyway). +// +// Bypass note (v3 lesson, mirrored): like skills.LoadBuiltins, this +// loader writes via Storage.SaveAgent directly. There is no agents +// equivalent of SaveUserSkill's save-time gates today (Phase 1-5 don't +// have authoring requirements on agents), but if such gates appear in +// future phases, this loader MUST keep bypassing them — builtins are +// trusted infrastructure. +// +// Test: pkg/logic/agents/builtin_loader_test.go covers happy path, +// idempotent re-load, missing-skill warn capture, and malformed YAML +// surfaced as a per-bundle warning (not a fatal error). + +import ( + "context" + "errors" + "fmt" + "io/fs" + "log/slog" + "net" + "path" + "strings" + "time" + + "github.com/google/uuid" + "gopkg.in/yaml.v3" +) + +// BuiltinAgentOwnerID is the deterministic system owner ID used for +// every Agent created by LoadBuiltinAgents. Chosen as a non-empty +// string so the (owner_id, name) unique index distinguishes builtins +// from any user-authored Agent (Discord member IDs are numeric, so +// "builtin" cannot collide). The skills builtin loader uses owner_id="" +// instead; the two systems are independent storage scopes — there's +// no need for consistency here. +const BuiltinAgentOwnerID = "builtin" + +// SkillExistenceChecker is the narrow surface LoadBuiltinAgents needs +// to validate skill_palette entries at load time. Production wires +// skills.Storage which already exposes ListByName for non-owner-scoped +// lookups. nil means "skip palette validation" (tests that don't care). +// +// Why a separate narrow interface (vs importing skills.Storage): +// agents already transitively depends on skills via migrate_from_skills, +// but the loader only needs "does a skill with this name exist +// somewhere?" — a single Boolean. Keeping the interface narrow makes +// the loader testable without a full skills storage stub. +type SkillExistenceChecker interface { + // SkillExistsByName reports whether at least one skill row has the + // given name across any owner (builtins live under owner_id=""; + // users own their own rows; the loader's validation just wants + // "does ANY row exist with this name?"). + SkillExistsByName(ctx context.Context, name string) (bool, error) +} + +// LoadBuiltinAgents discovers and seeds builtin Agents from `builtinsDir`. +// `builtinsDir` is the root that contains an `agents/` subdirectory; +// per-agent YAML lives at `agents//agent.yml`. Returns the count +// of agents seeded or updated (skipped rows do not contribute to the +// count). Returns nil error when the agents/ directory is absent — a +// deployment without any builtin agents is valid; the loader is then a +// no-op. +// +// Idempotency contract: existing Agent rows (matched by (owner_id="builtin", +// name)) are UPDATED to the freshly-parsed YAML on each boot. ID + +// CreatedAt are preserved; UpdatedAt is refreshed. User clones of a +// builtin Agent (different owner_id, same name) are NEVER touched — +// the loader only writes to (owner_id="builtin", name) rows. +// +// `skillChecker` may be nil; when non-nil, each SkillPalette entry is +// looked up and a WARN log emitted (with the agent + missing skill +// name) for absent references. The Agent row is still seeded with the +// palette intact — runtime resolution at invocation time is the +// authoritative gate. +func LoadBuiltinAgents(ctx context.Context, store Storage, builtinsDir fs.FS, skillChecker SkillExistenceChecker) (int, error) { + if store == nil { + return 0, errors.New("agents.LoadBuiltinAgents: nil store") + } + if builtinsDir == nil { + return 0, errors.New("agents.LoadBuiltinAgents: nil builtinsDir FS") + } + entries, err := fs.ReadDir(builtinsDir, "agents") + if err != nil { + // Missing agents/ directory is benign — a deployment may not + // ship any builtins. Other errors propagate so a permission / + // IO problem surfaces loudly. + if errors.Is(err, fs.ErrNotExist) { + slog.Info("agents: no builtin agents directory", "path", "agents") + return 0, nil + } + return 0, fmt.Errorf("agents: read agents dir: %w", err) + } + + // Phase 1: parse all agent manifests into a map keyed by name. + // The map is needed so extends references can be resolved before + // any agent is upserted. + type parsedEntry struct { + agent *Agent + dir string + } + parsed := make(map[string]*parsedEntry) + var parseOrder []string // preserve FS iteration order for deterministic upsert + + var scanned, failed int + for _, entry := range entries { + if !entry.IsDir() { + continue + } + manifestPath := path.Join("agents", entry.Name(), "agent.yml") + data, readErr := fs.ReadFile(builtinsDir, manifestPath) + if readErr != nil { + slog.Debug("agents: skipping (no agent.yml)", "dir", entry.Name(), "error", readErr) + continue + } + scanned++ + ag, parseErr := decodeAgentManifest(data) + if parseErr != nil { + slog.Warn("agents: invalid agent.yml", "dir", entry.Name(), "error", parseErr) + failed++ + continue + } + parsed[ag.Name] = &parsedEntry{agent: ag, dir: entry.Name()} + parseOrder = append(parseOrder, ag.Name) + } + + // Phase 2: resolve extends references. Only single-level is + // supported — chains (A extends B extends C) are rejected. + for _, name := range parseOrder { + pe := parsed[name] + ag := pe.agent + if ag.Extends == "" { + continue + } + parent, ok := parsed[ag.Extends] + if !ok { + slog.Warn("agents: extends references unknown agent", + "agent", ag.Name, "extends", ag.Extends) + failed++ + delete(parsed, name) + continue + } + if parent.agent.Extends != "" { + slog.Warn("agents: extends chain not supported — parent also uses extends", + "agent", ag.Name, "extends", ag.Extends, + "parent_extends", parent.agent.Extends) + failed++ + delete(parsed, name) + continue + } + if ag.Extends == ag.Name { + slog.Warn("agents: agent extends itself", "agent", ag.Name) + failed++ + delete(parsed, name) + continue + } + resolveExtends(ag, parent.agent) + } + + // Phase 3: palette validation + upsert. + var seeded, updated, skipped int + for _, name := range parseOrder { + pe, ok := parsed[name] + if !ok { + continue // removed during extends resolution + } + ag := pe.agent + + if skillChecker != nil { + for _, sk := range ag.SkillPalette { + ok, lookupErr := skillChecker.SkillExistsByName(ctx, sk) + if lookupErr != nil { + slog.Warn("agents: skill palette lookup failed", + "agent", ag.Name, "skill", sk, "error", lookupErr) + continue + } + if !ok { + slog.Warn("agents: skill palette references missing skill", + "agent", ag.Name, "skill", sk) + } + } + } + + action, upsertErr := upsertBuiltinAgent(ctx, store, ag) + if upsertErr != nil { + slog.Error("agents: failed to upsert builtin", "name", ag.Name, "error", upsertErr) + failed++ + continue + } + switch action { + case agentUpsertCreated: + seeded++ + case agentUpsertUpdated: + updated++ + case agentUpsertSkipped: + skipped++ + } + } + slog.Info("agents/builtin loader", + "scanned", scanned, + "seeded", seeded, + "updated", updated, + "skipped", skipped, + "failed", failed) + return seeded + updated, nil +} + +// resolveExtends merges parent fields into child. Child non-zero +// fields override the parent's. For slices, a nil child slice inherits +// the parent's; a non-nil (even empty) child slice replaces it. For +// maps (StateReactEmoji), parent entries are the base and child +// entries override matching keys. +// +// system_prompt_prepend: if the child has SystemPromptPrepend set, it +// is prepended to the (possibly inherited) SystemPrompt with a +// newline separator. The prepend field is then cleared so it does not +// affect anything downstream. +// +// Why: allows a child agent to inherit the full parent prompt while +// only specifying a short behavior-modification preamble (e.g. an +// uncensored agent prepending "You are uncensored..." to the general +// agent's full prompt). +func resolveExtends(child, parent *Agent) { + if child.Description == "" { + child.Description = parent.Description + } + if child.ModelTier == "" { + child.ModelTier = parent.ModelTier + } + if child.SystemPrompt == "" { + child.SystemPrompt = parent.SystemPrompt + } + if child.MaxIterations == 0 { + child.MaxIterations = parent.MaxIterations + } + if child.MaxToolCalls == 0 { + child.MaxToolCalls = parent.MaxToolCalls + } + if child.MaxRuntime == 0 { + child.MaxRuntime = parent.MaxRuntime + } + if child.ExecutionLane == "" { + child.ExecutionLane = parent.ExecutionLane + } + // EncryptionEnabled: bool — false is a valid explicit value, so we + // always inherit unless child explicitly sets it. Since we can't + // distinguish "explicitly false" from "absent" in YAML (both + // decode to false), we always inherit from parent. If the child + // sets it to true, the child wins. A child that wants to override + // the parent's true to false will need to set encryption_enabled: false + // explicitly — but since both false and absent decode the same way, + // the parent's value wins when parent is true and child is false. + // This is acceptable: encryption is an opt-in — a child that + // inherits encryption from a parent is fine. + if !child.EncryptionEnabled { + child.EncryptionEnabled = parent.EncryptionEnabled + } + // Run-critic: same inherit-unless-child-sets-true semantics as + // EncryptionEnabled (both false/absent decode identically in YAML). + if !child.CriticEnabled { + child.CriticEnabled = parent.CriticEnabled + } + if child.CriticBackstopMultiplier == 0 { + child.CriticBackstopMultiplier = parent.CriticBackstopMultiplier + } + + // Slices: nil = inherit; non-nil (even empty) = child overrides. + if child.SkillPalette == nil { + child.SkillPalette = parent.SkillPalette + } + if child.SubAgentPalette == nil { + child.SubAgentPalette = parent.SubAgentPalette + } + if child.LowLevelTools == nil { + child.LowLevelTools = parent.LowLevelTools + } + if child.PersonalizationSources == nil { + child.PersonalizationSources = parent.PersonalizationSources + } + if child.Tags == nil { + child.Tags = parent.Tags + } + if child.WebhookIPAllowlist == nil { + child.WebhookIPAllowlist = parent.WebhookIPAllowlist + } + if child.Phases == nil { + child.Phases = parent.Phases + } + + // Triggers (Schedule, ChatbotChannelFilter, WebhookSecret, …) are + // deliberately NOT inherited. A trigger is an ACTIVATION decision — + // "this agent fires on a schedule" / "this agent is a chatbot tool in + // these channels" — and silently inheriting it from a parent persona + // is a behavioural surprise: `uncensored extends general` would inherit + // general's `chatbot_channel_filter: "none"` (match-every-channel) and + // surface the unfiltered model as a direct chatbot tool everywhere the + // instant agents.triggers.enabled flips on. A child that wants a trigger + // must declare it explicitly. (Persona, caps, palette, and tools are + // inherited above — those are capability, not activation.) + + // DefaultEmoji: child wins if set; otherwise inherit. + if child.DefaultEmoji == "" { + child.DefaultEmoji = parent.DefaultEmoji + } + + // Maps: merge — parent is the base, child entries override. + if child.StateReactEmoji == nil && parent.StateReactEmoji != nil { + child.StateReactEmoji = make(map[string]string, len(parent.StateReactEmoji)) + for k, v := range parent.StateReactEmoji { + child.StateReactEmoji[k] = v + } + } else if parent.StateReactEmoji != nil { + merged := make(map[string]string, len(parent.StateReactEmoji)+len(child.StateReactEmoji)) + for k, v := range parent.StateReactEmoji { + merged[k] = v + } + for k, v := range child.StateReactEmoji { + merged[k] = v + } + child.StateReactEmoji = merged + } + + // SystemPromptPrepend: prepend to the (now resolved) SystemPrompt. + if child.SystemPromptPrepend != "" { + child.SystemPrompt = child.SystemPromptPrepend + "\n\n" + child.SystemPrompt + child.SystemPromptPrepend = "" // consumed + } + + // Clear Extends — the resolution is complete, the persisted agent + // is standalone. + child.Extends = "" +} + +// agentUpsertAction reports what upsertBuiltinAgent did. Exported only +// to the test in this package; the loader's public surface returns a +// count, not a per-row action. +type agentUpsertAction int + +const ( + agentUpsertCreated agentUpsertAction = iota + agentUpsertUpdated + agentUpsertSkipped // reserved; current loader never returns this — every parse-OK row is upserted +) + +// upsertBuiltinAgent looks up an existing (BuiltinAgentOwnerID, name) +// row. If absent, inserts a new row with a freshly-minted UUID. +// Otherwise updates the existing row in place, preserving ID + CreatedAt. +// +// Why not version-skip like skills.upsertBuiltin: the Agent struct has +// a Version int field but it's a monotonic counter, not a semver +// string for change detection. Agent YAML doesn't carry a "version" +// at the wire shape; every boot writes the latest YAML content, +// trusting the YAML file in-repo IS the source of truth. The Agent's +// internal Version int auto-increments on each loader pass so admin +// inspection (`.agent show`) reveals "how many times has the loader +// touched this row". +func upsertBuiltinAgent(ctx context.Context, store Storage, fresh *Agent) (agentUpsertAction, error) { + existing, err := store.GetAgentByName(ctx, BuiltinAgentOwnerID, fresh.Name) + if err != nil && !errors.Is(err, ErrNotFound) { + return agentUpsertCreated, fmt.Errorf("lookup builtin agent %q: %w", fresh.Name, err) + } + if errors.Is(err, ErrNotFound) { + fresh.ID = uuid.New().String() + fresh.OwnerID = BuiltinAgentOwnerID + fresh.AuthoredBy = BuiltinAgentOwnerID + if fresh.Version == 0 { + fresh.Version = 1 + } + now := time.Now() + fresh.CreatedAt = now + fresh.UpdatedAt = now + if saveErr := store.SaveAgent(ctx, fresh); saveErr != nil { + return agentUpsertCreated, saveErr + } + slog.Info("agents: created builtin", "name", fresh.Name, "id", fresh.ID) + return agentUpsertCreated, nil + } + // Update in place. Preserve ID, OwnerID, AuthoredBy, CreatedAt. + // Bump Version so admins can see "the loader has touched this N + // times" — useful when investigating a builtin that was + // hand-edited via the future web UI and unexpectedly reverted on + // next boot. + fresh.ID = existing.ID + fresh.OwnerID = BuiltinAgentOwnerID + fresh.AuthoredBy = BuiltinAgentOwnerID + fresh.Version = existing.Version + 1 + fresh.CreatedAt = existing.CreatedAt + fresh.UpdatedAt = time.Now() + // Carry forward operator/scheduler-owned fields that the manifest + // never sets (decodeAgentManifest leaves these zero by design — a + // secret in-repo would be a credential leak). Without this, every + // boot CLOBBERS an operator-armed webhook secret + signature flag + // back to empty/false and nukes the scheduler's next-fire cursor, so + // a scheduled or webhook-armed builtin silently breaks on each deploy. + fresh.WebhookSecret = existing.WebhookSecret + fresh.WebhookSignatureRequired = existing.WebhookSignatureRequired + fresh.NextRunAt = existing.NextRunAt + fresh.LastScheduledRunAt = existing.LastScheduledRunAt + if saveErr := store.SaveAgent(ctx, fresh); saveErr != nil { + return agentUpsertUpdated, saveErr + } + slog.Info("agents: updated builtin", + "name", fresh.Name, "id", fresh.ID, "version", fresh.Version) + return agentUpsertUpdated, nil +} + +// builtinAgentManifest is the YAML wire format for agents//agent.yml. +// The schema is intentionally a SUBSET of the Agent struct — future +// fields can be added without breaking existing manifests so long as +// we keep KnownFields(true) decoding (so a typo on a key surfaces as +// an error rather than silently dropping data). +// +// See pkg/logic/agents/CLAUDE.md for the schema reference. +type builtinAgentManifest struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + ModelTier string `yaml:"model_tier"` + SystemPrompt string `yaml:"system_prompt"` + SystemPromptPrepend string `yaml:"system_prompt_prepend"` + MaxIterations int `yaml:"max_iterations"` + MaxToolCalls int `yaml:"max_tool_calls"` + MaxRuntimeSeconds int `yaml:"max_runtime_seconds"` + ExecutionLane string `yaml:"execution_lane"` + EncryptionEnabled bool `yaml:"encryption_enabled"` + + // Run-critic two-tier timeout. CriticEnabled flips MaxRuntime from a + // hard kill into a soft trigger; CriticBackstopMultiplier (0 => convar + // default 6×) sets the hard backstop = MaxRuntime × multiplier. + CriticEnabled bool `yaml:"critic_enabled"` + CriticBackstopMultiplier float64 `yaml:"critic_backstop_multiplier"` + + // Extends names a parent agent whose fields are inherited. The + // child's non-zero fields override the parent; nil/empty slices + // inherit the parent's. Maps (state_react) are merged — child + // entries override parent entries with the same key. Only single- + // level extends is supported (no chains). + Extends string `yaml:"extends"` + + SkillPalette []string `yaml:"skill_palette"` + SubAgentPalette []string `yaml:"sub_agent_palette"` + LowLevelTools []string `yaml:"low_level_tools"` + + PersonalizationSources []string `yaml:"personalization_sources"` + + // Triggers — builtin agents typically don't ship with triggers + // (admins flip these on per-deployment), but the keys are accepted + // so a sufficiently sophisticated builtin (e.g. a scheduled "weekly + // digest" agent) can ship triggers in-repo. Default empty. + Schedule string `yaml:"schedule"` + WebhookIPAllowlist []string `yaml:"webhook_ip_allowlist"` + ChatbotChannelFilter string `yaml:"chatbot_channel_filter"` + + DefaultEmoji string `yaml:"default_emoji"` + StateReact map[string]string `yaml:"state_react"` + Tags []string `yaml:"tags"` + + // Pipeline phases — when non-empty, the executor runs the + // sequential phase runner instead of the single agent loop. + Phases []builtinAgentPhaseManifest `yaml:"phases"` +} + +// builtinAgentPhaseManifest is the YAML wire format for a single +// phases list entry in agents//agent.yml. Maps 1:1 to +// AgentPhase at decode time. +type builtinAgentPhaseManifest struct { + Name string `yaml:"name"` + SystemPrompt string `yaml:"system_prompt"` + ModelTier string `yaml:"model_tier"` + MaxIter int `yaml:"max_iter"` + Tools []string `yaml:"tools"` + Optional bool `yaml:"optional"` + FallbackMessage string `yaml:"fallback_message"` + IsRunFunc bool `yaml:"is_run_func"` +} + +// decodeAgentManifest parses an agent.yml bundle into a domain Agent. +// Uses KnownFields(true) so a typo'd key surfaces as a parse error +// rather than silently dropping the value. +// +// What this method does NOT set: +// - ID (loader mints UUID on insert / preserves existing on update) +// - OwnerID + AuthoredBy (loader sets to BuiltinAgentOwnerID) +// - Version (loader increments on update) +// - CreatedAt + UpdatedAt (loader stamps) +// - WebhookSecret (operator generates via admin tooling at deploy +// time — shipping a secret in-repo would be a credential leak) +// - NextRunAt + LastScheduledRunAt (scheduler bookkeeping; nil at +// load time, populated on first scheduled fire) +// - WebhookSignatureRequired (application-layer default applies on +// first save; a `default:true` GORM tag would substitute on every +// write — see the v8 lesson on this exact trap) +func decodeAgentManifest(data []byte) (*Agent, error) { + var m builtinAgentManifest + dec := yaml.NewDecoder(strings.NewReader(string(data))) + dec.KnownFields(true) + if err := dec.Decode(&m); err != nil { + return nil, fmt.Errorf("decode agent.yml: %w", err) + } + if strings.TrimSpace(m.Name) == "" { + return nil, errors.New("agent.yml: missing required field 'name'") + } + // system_prompt is required UNLESS the agent uses extends (the parent + // will supply it) or system_prompt_prepend (the prepend will be + // combined with the parent's system_prompt after extends resolution). + if strings.TrimSpace(m.SystemPrompt) == "" && strings.TrimSpace(m.Extends) == "" && strings.TrimSpace(m.SystemPromptPrepend) == "" { + return nil, errors.New("agent.yml: missing required field 'system_prompt'") + } + + // Convert YAML phase manifests to domain AgentPhase structs. + var phases []AgentPhase + for _, pm := range m.Phases { + if strings.TrimSpace(pm.Name) == "" { + return nil, errors.New("agent.yml: phase missing required field 'name'") + } + phases = append(phases, AgentPhase{ + Name: strings.TrimSpace(pm.Name), + SystemPrompt: pm.SystemPrompt, + ModelTier: strings.TrimSpace(pm.ModelTier), + MaxIter: pm.MaxIter, + Tools: pm.Tools, + Optional: pm.Optional, + FallbackMessage: pm.FallbackMessage, + IsRunFunc: pm.IsRunFunc, + }) + } + + // Validate the webhook IP allow-list (CIDR or bare IP); drop + warn on + // malformed entries so a typo can't silently widen or void the allow-list. + allowlist := validateIPAllowlist(m.WebhookIPAllowlist, m.Name) + + ag := &Agent{ + Name: strings.TrimSpace(m.Name), + Description: m.Description, + Extends: strings.TrimSpace(m.Extends), + SystemPromptPrepend: m.SystemPromptPrepend, + ModelTier: strings.TrimSpace(m.ModelTier), + SystemPrompt: m.SystemPrompt, + MaxIterations: m.MaxIterations, + MaxToolCalls: m.MaxToolCalls, + MaxRuntime: time.Duration(m.MaxRuntimeSeconds) * time.Second, + ExecutionLane: strings.TrimSpace(m.ExecutionLane), + EncryptionEnabled: m.EncryptionEnabled, + CriticEnabled: m.CriticEnabled, + CriticBackstopMultiplier: m.CriticBackstopMultiplier, + SkillPalette: m.SkillPalette, + SubAgentPalette: m.SubAgentPalette, + LowLevelTools: m.LowLevelTools, + PersonalizationSources: m.PersonalizationSources, + Schedule: strings.TrimSpace(m.Schedule), + WebhookIPAllowlist: allowlist, + ChatbotChannelFilter: strings.TrimSpace(m.ChatbotChannelFilter), + DefaultEmoji: m.DefaultEmoji, + StateReactEmoji: m.StateReact, + Tags: m.Tags, + Phases: phases, + } + return ag, nil +} + +// validateIPAllowlist keeps only entries that parse as a CIDR block or a bare +// IP; malformed entries are dropped with a warning (a typo must not silently +// widen or void the webhook allow-list). The struct field documents "CIDR +// strings", so this enforces it at load time. +func validateIPAllowlist(entries []string, agent string) []string { + var out []string + for _, e := range entries { + e = strings.TrimSpace(e) + if e == "" { + continue + } + if _, _, err := net.ParseCIDR(e); err == nil { + out = append(out, e) + continue + } + if ip := net.ParseIP(e); ip != nil { + out = append(out, e) + continue + } + slog.Warn("agents: dropping malformed webhook_ip_allowlist entry (not a CIDR or IP)", "agent", agent, "entry", e) + } + return out +} diff --git a/persona/ipallowlist_test.go b/persona/ipallowlist_test.go new file mode 100644 index 0000000..c77d7fa --- /dev/null +++ b/persona/ipallowlist_test.go @@ -0,0 +1,17 @@ +package persona + +import "testing" + +func TestValidateIPAllowlist(t *testing.T) { + in := []string{"10.0.0.0/8", " 192.168.1.5 ", "not-an-ip", "", "2001:db8::/32", "garbage/99"} + got := validateIPAllowlist(in, "test") + want := map[string]bool{"10.0.0.0/8": true, "192.168.1.5": true, "2001:db8::/32": true} + if len(got) != len(want) { + t.Fatalf("got %v, want %d valid entries", got, len(want)) + } + for _, e := range got { + if !want[e] { + t.Errorf("unexpected entry kept: %q", e) + } + } +} diff --git a/persona/memory.go b/persona/memory.go new file mode 100644 index 0000000..cbbf840 --- /dev/null +++ b/persona/memory.go @@ -0,0 +1,120 @@ +package persona + +import ( + "context" + "sort" + "sync" + "time" +) + +// Memory is a zero-dependency in-process Storage for agent personas — a light +// host (or tests) gets persona persistence with no DB. Mort keeps its +// GORM/MySQL Storage; contrib/store adds a durable SQLite one. +type Memory struct { + mu sync.RWMutex + agents map[string]*Agent // by ID +} + +// NewMemory returns an empty in-memory persona Storage. +func NewMemory() *Memory { return &Memory{agents: map[string]*Agent{}} } + +var _ Storage = (*Memory)(nil) + +func (m *Memory) InitializeAgentStorage(context.Context) error { return nil } + +func (m *Memory) SaveAgent(_ context.Context, a *Agent) error { + m.mu.Lock() + defer m.mu.Unlock() + cp := *a + m.agents[a.ID] = &cp + return nil +} + +func (m *Memory) GetAgent(_ context.Context, id string) (*Agent, error) { + m.mu.RLock() + defer m.mu.RUnlock() + a, ok := m.agents[id] + if !ok { + return nil, ErrNotFound + } + cp := *a + return &cp, nil +} + +func (m *Memory) GetAgentByName(_ context.Context, ownerID, name string) (*Agent, error) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, a := range m.agents { + if a.OwnerID == ownerID && a.Name == name { + cp := *a + return &cp, nil + } + } + return nil, ErrNotFound +} + +func (m *Memory) listWhere(keep func(*Agent) bool) []*Agent { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]*Agent, 0, len(m.agents)) + for _, a := range m.agents { + if keep == nil || keep(a) { + cp := *a + out = append(out, &cp) + } + } + sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) + return out +} + +func (m *Memory) ListAgents(_ context.Context, ownerID string) ([]*Agent, error) { + return m.listWhere(func(a *Agent) bool { return a.OwnerID == ownerID }), nil +} + +func (m *Memory) ListAllAgents(context.Context) ([]*Agent, error) { + return m.listWhere(nil), nil +} + +func (m *Memory) DeleteAgent(_ context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.agents, id) + return nil +} + +func (m *Memory) GetAgentByWebhookSecret(_ context.Context, secret string) (*Agent, error) { + if secret == "" { + return nil, ErrNotFound + } + m.mu.RLock() + defer m.mu.RUnlock() + for _, a := range m.agents { + if a.WebhookSecret == secret { + cp := *a + return &cp, nil + } + } + return nil, ErrNotFound +} + +func (m *Memory) ListAgentsByChatbotChannelFilter(context.Context) ([]*Agent, error) { + return m.listWhere(func(a *Agent) bool { return a.ChatbotChannelFilter != "" }), nil +} + +func (m *Memory) ListScheduledAgents(_ context.Context, dueBefore time.Time) ([]*Agent, error) { + return m.listWhere(func(a *Agent) bool { + return a.Schedule != "" && a.NextRunAt != nil && !a.NextRunAt.After(dueBefore) + }), nil +} + +func (m *Memory) MarkAgentScheduledRun(_ context.Context, agentID string, ranAt, nextAt time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + a, ok := m.agents[agentID] + if !ok { + return ErrNotFound + } + a.LastScheduledRunAt = &ranAt + a.NextRunAt = &nextAt + return nil +} diff --git a/persona/persona_test.go b/persona/persona_test.go new file mode 100644 index 0000000..9434f6a --- /dev/null +++ b/persona/persona_test.go @@ -0,0 +1,45 @@ +package persona + +import ( + "context" + "testing" + "time" +) + +func TestToRunnable(t *testing.T) { + a := &Agent{ + ID: "id1", Name: "helper", SystemPrompt: "be nice", ModelTier: "fast", + MaxIterations: 5, MaxRuntime: 30 * time.Second, + LowLevelTools: []string{"think"}, SkillPalette: []string{"animate"}, + CriticEnabled: true, CriticBackstopMultiplier: 2, + Phases: []AgentPhase{{Name: "p1", ModelTier: "thinking", MaxIter: 3, Tools: []string{"now"}, Optional: true}}, + } + r := a.ToRunnable() + if r.ID != "id1" || r.ModelTier != "fast" || r.MaxIterations != 5 || !r.Critic.Enabled { + t.Fatalf("ToRunnable mapping wrong: %+v", r) + } + if len(r.Phases) != 1 || r.Phases[0].MaxIterations != 3 || !r.Phases[0].Optional { + t.Fatalf("phase mapping wrong: %+v", r.Phases) + } +} + +func TestMemoryStoreRoundTrip(t *testing.T) { + ctx := context.Background() + m := NewMemory() + a := &Agent{ID: "a1", Name: "n", OwnerID: "o1"} + if err := m.SaveAgent(ctx, a); err != nil { + t.Fatal(err) + } + got, err := m.GetAgent(ctx, "a1") + if err != nil || got.Name != "n" { + t.Fatalf("GetAgent: %v %+v", err, got) + } + byName, err := m.GetAgentByName(ctx, "o1", "n") + if err != nil || byName.ID != "a1" { + t.Fatalf("GetAgentByName: %v %+v", err, byName) + } + list, _ := m.ListAgents(ctx, "o1") + if len(list) != 1 { + t.Fatalf("ListAgents = %d", len(list)) + } +} diff --git a/persona/runnable.go b/persona/runnable.go new file mode 100644 index 0000000..7a4bafd --- /dev/null +++ b/persona/runnable.go @@ -0,0 +1,37 @@ +package persona + +import "gitea.stevedudenhoeffer.com/steve/executus/run" + +// ToRunnable lowers an Agent persona into the kernel's run.RunnableAgent DTO — +// the bridge that lets run.Executor run a persona WITHOUT importing this +// battery (the inversion of mort's agentexec.Run(*agents.Agent)). It maps the +// static shape only; per-run personalization, palette resolution, the critic, +// audit, etc. are supplied separately via run.Ports. +func (a *Agent) ToRunnable() run.RunnableAgent { + ra := run.RunnableAgent{ + ID: a.ID, + Name: a.Name, + SystemPrompt: a.SystemPrompt, + ModelTier: a.ModelTier, + MaxIterations: a.MaxIterations, + MaxRuntime: a.MaxRuntime, + LowLevelTools: a.LowLevelTools, + SkillPalette: a.SkillPalette, + SubAgentPalette: a.SubAgentPalette, + Critic: run.CriticConfig{ + Enabled: a.CriticEnabled, + BackstopMultiplier: a.CriticBackstopMultiplier, + }, + } + for _, p := range a.Phases { + ra.Phases = append(ra.Phases, run.Phase{ + Name: p.Name, + SystemPrompt: p.SystemPrompt, + ModelTier: p.ModelTier, + MaxIterations: p.MaxIter, + Tools: p.Tools, + Optional: p.Optional, + }) + } + return ra +} diff --git a/persona/storage.go b/persona/storage.go new file mode 100644 index 0000000..46eb88f --- /dev/null +++ b/persona/storage.go @@ -0,0 +1,115 @@ +package persona + +import ( + "context" + "errors" + "time" +) + +// ErrNotFound is returned when an agent lookup fails. Callers compare +// with errors.Is(err, ErrNotFound). +var ErrNotFound = errors.New("agent not found") + +// Storage is the persistence interface for the agents system. +// +// Why: tests substitute fake implementations; production wires +// through pkg/logic/storage's Grand Storage which embeds this +// interface. Mirrors the three-layer pattern in +// pkg/logic/storage/CLAUDE.md (domain → GORM → DB). +// +// What: Phase 1 CRUD plus Phase 3 trigger queries +// (ListDueScheduled, GetAgentByWebhookSecret, +// ListAgentsByChatbotChannelFilter, MarkScheduledRun). Trigger +// queries are read by the agentsched runner, webhook router, and +// chatbot tool provider; all are gated behind the +// agents.triggers.enabled convar so old skill-driven paths keep +// running until the convar flips. +// +// Test: see storage_round_trip_test.go for round-trip coverage. +type Storage interface { + // (Mort's Discord command-binding CRUD — the CommandBindingStorage + // embedding — stays a host concern and is NOT part of the executus + // persona Storage seam.) + + // InitializeAgentStorage prepares storage (e.g. AutoMigrate) + // and is idempotent. Called from the grand storage's + // InitializeAll path. + InitializeAgentStorage(ctx context.Context) error + + // SaveAgent creates or updates an Agent by ID. ID must be + // non-empty (Phase 1 admin commands mint a UUID). + SaveAgent(ctx context.Context, a *Agent) error + + // GetAgent returns the agent with the given ID, or ErrNotFound. + GetAgent(ctx context.Context, id string) (*Agent, error) + + // GetAgentByName resolves (owner_id, name) → agent. ownerID + // must match exactly (Phase 1 has no shared/public visibility + // yet; every agent is owned). + GetAgentByName(ctx context.Context, ownerID, name string) (*Agent, error) + + // ListAgents returns every agent owned by the given member ID, + // sorted by Name ASC. + ListAgents(ctx context.Context, ownerID string) ([]*Agent, error) + + // ListAllAgents returns every agent across all owners, sorted by + // (OwnerID ASC, Name ASC) so builtin rows (OwnerID="builtin") + // group together, then numeric Discord-ID owners in lexical order, + // then chatbot-shadow rows whose OwnerID is the chatbot owner's + // Discord ID but whose Name carries the "chatbot:" prefix. + // + // Why: Phase 1 admin commands ran owner-scoped (a steve-owned + // agent list shows ONLY steve's rows), which hid builtin and + // shadow Agents from the admin view. `.agent list` for admins now + // uses this method to surface every row. Non-admin invocations + // (or `.agent list --mine`) keep using ListAgents. + // + // Storage MAY back this with a single full-table scan — admin + // row counts are small (dozens to low hundreds), so no need for + // pagination at this phase. + ListAllAgents(ctx context.Context) ([]*Agent, error) + + // DeleteAgent removes an agent by ID. Idempotent — deleting a + // missing row returns nil. + DeleteAgent(ctx context.Context, id string) error + + // GetAgentByWebhookSecret resolves a posted /webhooks/ URL + // to the matching agent. Returns ErrNotFound when no agent has + // the secret. Phase 3 webhook router consults this AFTER the + // existing Skill lookup falls through, but only when + // agents.triggers.enabled is true. + // + // Empty secret is rejected with ErrNotFound (empty WebhookSecret + // rows are NOT webhook-enabled — the application layer guards + // this, the lookup defends against accidental match). + GetAgentByWebhookSecret(ctx context.Context, secret string) (*Agent, error) + + // ListAgentsByChatbotChannelFilter returns every agent with a + // non-empty ChatbotChannelFilter. Phase 3 chatbot tool provider + // uses this on every chatbot turn to assemble the per-channel + // tool list (gated by agents.triggers.enabled). The result is + // not channel-filtered here — the provider applies the channel + // filter predicate (registered in skills.ChannelFilterRegistry) + // to each row. + // + // Why no channel filter at the storage layer: the filter is a + // runtime predicate (e.g. dm_only depends on the live Discord + // channel kind cache), not a static column we can index on. + ListAgentsByChatbotChannelFilter(ctx context.Context) ([]*Agent, error) + + // ListScheduledAgents returns every agent with a non-empty + // Schedule whose NextRunAt is at or before `dueBefore`. Result + // is ordered by NextRunAt ASC so the scheduler runner can drain + // in oldest-due-first order. Mirrors skills.Storage.ListDueScheduled. + // + // Phase 3 scheduler reads this on every tick when + // agents.triggers.enabled is true. The (Schedule, NextRunAt) + // composite index backs the query — see gorm tags on gormAgent. + ListScheduledAgents(ctx context.Context, dueBefore time.Time) ([]*Agent, error) + + // MarkAgentScheduledRun atomically updates LastScheduledRunAt + // and NextRunAt for the given agent. Called by the agentsched + // runner after each scheduled invocation. Mirrors + // skills.Storage.MarkScheduledRun semantics. + MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error +}