P4: Tier-2 batteries — audit + budget + persona noun #4
@@ -61,13 +61,17 @@ CORE (majordomo + stdlib):
|
|||||||
tools/{web,net,store,compose,meta,comms} generic tools [P3]
|
tools/{web,net,store,compose,meta,comms} generic tools [P3]
|
||||||
|
|
||||||
BATTERIES (opt-in siblings, each nil-safe + a default):
|
BATTERIES (opt-in siblings, each nil-safe + a default):
|
||||||
persona/ Agent noun + AgentStore seam + yml loader [P4]
|
persona/ Agent noun + Storage seam + builtin loader [P4 ~]
|
||||||
|
+ ToRunnable() bridge to run.RunnableAgent +
|
||||||
|
Memory default (host: chatbot/commands/personalization)
|
||||||
skill/ rich Skill + SkillStore seam + toml loader [P4]
|
skill/ rich Skill + SkillStore seam + toml loader [P4]
|
||||||
audit/ run-trace Sink (+ Noop/Slog) [P4]
|
audit/ run.Audit Sink + Writer + queryable Memory [P4 ✓]
|
||||||
|
default (skillaudit Storage iface; GORM stays in mort)
|
||||||
critic/ two-tier timeout state machine + Escalator [P4]
|
critic/ two-tier timeout state machine + Escalator [P4]
|
||||||
schedule/ cron runner cores [P4]
|
schedule/ cron runner cores [P4]
|
||||||
checkpoint/ durable resume seam [P4]
|
checkpoint/ durable resume seam [P4]
|
||||||
budget/ rolling-window tracker (+ NoOp) [P4]
|
budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4 ✓]
|
||||||
|
BudgetStorage iface + Memory default
|
||||||
|
|
||||||
contrib/store/ SECOND module (+ modernc.org/sqlite): [P4]
|
contrib/store/ SECOND module (+ modernc.org/sqlite): [P4]
|
||||||
in-memory + pure-Go SQLite impls of every *Store seam
|
in-memory + pure-Go SQLite impls of every *Store seam
|
||||||
|
|||||||
@@ -0,0 +1,78 @@
|
|||||||
|
package audit_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/audit"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAuditBatteryEndToEnd wires the audit battery (Memory storage) into
|
||||||
|
// run.Ports.Audit, runs an agent, and verifies the run was recorded and is
|
||||||
|
// queryable — proving Sink/Writer/Memory satisfy the core seams end to end.
|
||||||
|
func TestAuditBatteryEndToEnd(t *testing.T) {
|
||||||
|
mem := audit.NewMemory()
|
||||||
|
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("the answer"))
|
||||||
|
m, err := fp.Model("m")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
return ctx, m, nil
|
||||||
|
},
|
||||||
|
Ports: run.Ports{Audit: audit.NewSink(mem)},
|
||||||
|
})
|
||||||
|
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{ID: "agent-1", Name: "a", ModelTier: "m"},
|
||||||
|
tool.Invocation{RunID: "run-xyz", CallerID: "caller-1"},
|
||||||
|
"question")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The run was recorded with a terminal status + output.
|
||||||
|
got, err := mem.GetRun(context.Background(), "run-xyz")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetRun: %v", err)
|
||||||
|
}
|
||||||
|
if got.Status != "ok" {
|
||||||
|
t.Errorf("status = %q, want ok", got.Status)
|
||||||
|
}
|
||||||
|
if got.Output != "the answer" {
|
||||||
|
t.Errorf("output = %q, want %q", got.Output, "the answer")
|
||||||
|
}
|
||||||
|
if got.FinishedAt == nil {
|
||||||
|
t.Error("FinishedAt should be set after the run")
|
||||||
|
}
|
||||||
|
if got.SkillID != "agent-1" {
|
||||||
|
t.Errorf("SkillID = %q, want agent-1 (the subject id)", got.SkillID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// And it is queryable by caller.
|
||||||
|
runs, err := mem.ListRunsByCaller(context.Background(), "caller-1", 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListRunsByCaller: %v", err)
|
||||||
|
}
|
||||||
|
if len(runs) != 1 || runs[0].ID != "run-xyz" {
|
||||||
|
t.Errorf("ListRunsByCaller = %+v, want [run-xyz]", runs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNilSinkRecordsNothing: NewSink(nil) is equivalent to no audit.
|
||||||
|
func TestNilSinkRecordsNothing(t *testing.T) {
|
||||||
|
s := audit.NewSink(nil)
|
||||||
|
if rec := s.StartRun(context.Background(), run.RunInfo{RunID: "r"}); rec != nil {
|
||||||
|
t.Error("NewSink(nil).StartRun should return a nil recorder")
|
||||||
|
}
|
||||||
|
}
|
||||||
+280
@@ -0,0 +1,280 @@
|
|||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Memory is an in-process Storage: it retains runs + logs in memory so a light
|
||||||
|
// host (or a test) gets queryable run history with zero setup. It is bounded
|
||||||
|
// only by process memory — a host that runs forever should PurgeOlderThan
|
||||||
|
// periodically, or use a persistent Storage. Construct with NewMemory.
|
||||||
|
//
|
||||||
|
// Mort uses its GORM/MySQL Storage; contrib/store adds a durable SQLite one.
|
||||||
|
// Memory is the zero-dependency default behind audit.NewSink(audit.NewMemory()).
|
||||||
|
type Memory struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
order []string // run ids in insertion order
|
||||||
|
runs map[string]SkillRun // by run id
|
||||||
|
logs map[string][]SkillRunLog // by run id
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMemory returns an empty in-memory Storage.
|
||||||
|
func NewMemory() *Memory {
|
||||||
|
return &Memory{runs: map[string]SkillRun{}, logs: map[string][]SkillRunLog{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Storage = (*Memory)(nil)
|
||||||
|
|
||||||
|
func (m *Memory) Initialize(context.Context) error { return nil }
|
||||||
|
|
||||||
|
func (m *Memory) StartRun(_ context.Context, run SkillRun) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if _, ok := m.runs[run.ID]; !ok {
|
||||||
|
m.order = append(m.order, run.ID)
|
||||||
|
}
|
||||||
|
m.runs[run.ID] = run
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) FinishRun(_ context.Context, runID string, s RunStats) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
r, ok := m.runs[runID]
|
||||||
|
if !ok {
|
||||||
|
return ErrNotFound
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
r.FinishedAt = &now
|
||||||
|
r.Status = s.Status
|
||||||
|
r.Output = s.Output
|
||||||
|
r.Error = s.Error
|
||||||
|
r.ToolCallsCount = s.ToolCalls
|
||||||
|
r.RuntimeSeconds = s.RuntimeSeconds
|
||||||
|
r.TotalInputTokens = s.InputTokens
|
||||||
|
r.TotalOutputTokens = s.OutputTokens
|
||||||
|
r.TotalThinkingTokens = s.ThinkingTokens
|
||||||
|
m.runs[runID] = r
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) AppendLog(_ context.Context, log SkillRunLog) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.logs[log.RunID] = append(m.logs[log.RunID], log)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) GetRun(_ context.Context, runID string) (*SkillRun, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
r, ok := m.runs[runID]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return &r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListLogsByRun(_ context.Context, runID string) ([]SkillRunLog, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
ls := append([]SkillRunLog(nil), m.logs[runID]...)
|
||||||
|
sort.SliceStable(ls, func(i, j int) bool { return ls[i].Sequence < ls[j].Sequence })
|
||||||
|
return ls, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newestFirst returns the retained runs in reverse insertion order, optionally
|
||||||
|
// filtered. Caller holds at least RLock.
|
||||||
|
func (m *Memory) newestFirst(keep func(SkillRun) bool) []SkillRun {
|
||||||
|
out := make([]SkillRun, 0, len(m.order))
|
||||||
|
for i := len(m.order) - 1; i >= 0; i-- {
|
||||||
|
r := m.runs[m.order[i]]
|
||||||
|
if keep == nil || keep(r) {
|
||||||
|
out = append(out, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// oldestFirst returns the retained runs in insertion (oldest-first) order,
|
||||||
|
// optionally filtered. Caller holds at least RLock.
|
||||||
|
func (m *Memory) oldestFirst(keep func(SkillRun) bool) []SkillRun {
|
||||||
|
out := make([]SkillRun, 0, len(m.order))
|
||||||
|
for _, id := range m.order {
|
||||||
|
r := m.runs[id]
|
||||||
|
if keep == nil || keep(r) {
|
||||||
|
out = append(out, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func page(rs []SkillRun, offset, limit int) []SkillRun {
|
||||||
|
if offset < 0 {
|
||||||
|
offset = 0
|
||||||
|
}
|
||||||
|
if offset >= len(rs) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rs = rs[offset:]
|
||||||
|
if limit > 0 && limit < len(rs) {
|
||||||
|
rs = rs[:limit]
|
||||||
|
}
|
||||||
|
return rs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error) {
|
||||||
|
return m.ListRunsBySkillPaginated(ctx, skillID, 0, limit, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListRunsBySkillPaginated(_ context.Context, skillID string, offset, limit int, includeDryRun bool) ([]SkillRun, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return page(m.newestFirst(func(r SkillRun) bool {
|
||||||
|
return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run")
|
||||||
|
}), offset, limit), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) CountRunsBySkill(_ context.Context, skillID string, includeDryRun bool) (int64, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return int64(len(m.newestFirst(func(r SkillRun) bool {
|
||||||
|
return r.SkillID == skillID && (includeDryRun || r.Status != "dry_run")
|
||||||
|
}))), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListRunsByCaller(_ context.Context, callerID string, limit int) ([]SkillRun, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return page(m.newestFirst(func(r SkillRun) bool {
|
||||||
|
return r.CallerID == callerID && r.Status != "dry_run"
|
||||||
|
}), 0, limit), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) matchesFilter(r SkillRun, f RunFilter) bool {
|
||||||
|
if f.Status != "" {
|
||||||
|
if r.Status != f.Status {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// An explicit Status (even "dry_run") matches regardless of IncludeDryRun.
|
||||||
|
} else if !f.IncludeDryRun && r.Status == "dry_run" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if f.SkillID != "" && r.SkillID != f.SkillID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if f.CallerID != "" && r.CallerID != f.CallerID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if f.ChannelID != "" && r.ChannelID != f.ChannelID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if f.TopLevelOnly && r.ParentRunID != "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !f.Since.IsZero() && r.StartedAt.Before(f.Since) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !f.Until.IsZero() && r.StartedAt.After(f.Until) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListRunsFiltered(_ context.Context, f RunFilter, offset, limit int) ([]SkillRun, error) {
|
||||||
|
if limit <= 0 || limit > 500 {
|
||||||
|
limit = 50 // bound admin scans, per the Storage contract
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return page(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }), offset, limit), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) CountRunsFiltered(_ context.Context, f RunFilter) (int64, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return int64(len(m.newestFirst(func(r SkillRun) bool { return m.matchesFilter(r, f) }))), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) PurgeOlderThan(_ context.Context, t time.Time) (int64, error) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
var purged int64
|
||||||
|
kept := m.order[:0:0]
|
||||||
|
for _, id := range m.order {
|
||||||
|
r := m.runs[id]
|
||||||
|
if r.FinishedAt != nil && r.FinishedAt.Before(t) {
|
||||||
|
delete(m.runs, id)
|
||||||
|
delete(m.logs, id)
|
||||||
|
purged++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kept = append(kept, id)
|
||||||
|
}
|
||||||
|
m.order = kept
|
||||||
|
return purged, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListChildrenByParent(_ context.Context, parentRunID string) ([]SkillRun, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return m.oldestFirst(func(r SkillRun) bool { return r.ParentRunID == parentRunID }), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) WalkParentChain(_ context.Context, runID string) ([]SkillRun, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
var chain []SkillRun
|
||||||
|
seen := map[string]bool{}
|
||||||
|
for id := runID; id != "" && len(chain) < MaxParentChainDepth; {
|
||||||
|
r, ok := m.runs[id]
|
||||||
|
if !ok || seen[id] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
seen[id] = true
|
||||||
|
chain = append(chain, r)
|
||||||
|
id = r.ParentRunID
|
||||||
|
}
|
||||||
|
// Contract: root first, the queried run last. We walked child→root, so reverse.
|
||||||
|
for i, j := 0, len(chain)-1; i < j; i, j = i+1, j-1 {
|
||||||
|
chain[i], chain[j] = chain[j], chain[i]
|
||||||
|
}
|
||||||
|
return chain, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListFinishedRunsBefore(_ context.Context, cutoff time.Time, limit int) ([]SkillRun, error) {
|
||||||
|
if limit <= 0 {
|
||||||
|
return nil, nil // contract: a real bound is required
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return page(m.oldestFirst(func(r SkillRun) bool {
|
||||||
|
return r.FinishedAt != nil && r.FinishedAt.Before(cutoff)
|
||||||
|
}), 0, limit), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) LastRunBySkills(_ context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
want := map[string]bool{}
|
||||||
|
for _, id := range skillIDs {
|
||||||
|
want[id] = true
|
||||||
|
}
|
||||||
|
out := map[string]time.Time{}
|
||||||
|
for _, id := range m.order {
|
||||||
|
r := m.runs[id]
|
||||||
|
if !want[r.SkillID] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !includeFailed && r.Status != "ok" {
|
||||||
|
continue // contract: only status=="ok" counts unless includeFailed
|
||||||
|
}
|
||||||
|
if r.StartedAt.After(out[r.SkillID]) {
|
||||||
|
out[r.SkillID] = r.StartedAt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestOnToolRedactsSecretTools: a secret-bearing tool's args/result must NOT be
|
||||||
|
// persisted verbatim in the audit log.
|
||||||
|
func TestOnToolRedactsSecretTools(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mem := NewMemory()
|
||||||
|
mem.StartRun(ctx, SkillRun{ID: "r1"})
|
||||||
|
w := NewWriter(mem, "r1")
|
||||||
|
|
||||||
|
secret := `{"url":"https://x","headers":{"Authorization":"Bearer SUPERSECRET"}}`
|
||||||
|
w.OnTool(llm.ToolCall{Name: "http_get", ID: "1", Arguments: []byte(secret)}, "TOPSECRETBODY")
|
||||||
|
// a non-secret tool is logged verbatim
|
||||||
|
w.OnTool(llm.ToolCall{Name: "think", ID: "2", Arguments: []byte(`{"thought":"hi"}`)}, "ok")
|
||||||
|
|
||||||
|
logs, _ := mem.ListLogsByRun(ctx, "r1")
|
||||||
|
var dump strings.Builder
|
||||||
|
for _, l := range logs {
|
||||||
|
for k, v := range l.Payload {
|
||||||
|
dump.WriteString(k)
|
||||||
|
dump.WriteString("=")
|
||||||
|
if s, ok := v.(string); ok {
|
||||||
|
dump.WriteString(s)
|
||||||
|
}
|
||||||
|
dump.WriteString(" ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
all := dump.String()
|
||||||
|
if strings.Contains(all, "SUPERSECRET") || strings.Contains(all, "TOPSECRETBODY") {
|
||||||
|
t.Fatalf("secret leaked into audit log: %s", all)
|
||||||
|
}
|
||||||
|
// the redaction marker is present, and the non-secret tool's args survive
|
||||||
|
foundRedacted, foundThink := false, false
|
||||||
|
for _, l := range logs {
|
||||||
|
if l.EventType == "tool_call" {
|
||||||
|
if r, _ := l.Payload["args_redacted"].(bool); r {
|
||||||
|
foundRedacted = true
|
||||||
|
}
|
||||||
|
if a, _ := l.Payload["args"].(string); strings.Contains(a, "thought") {
|
||||||
|
foundThink = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundRedacted {
|
||||||
|
t.Error("secret tool_call should carry args_redacted=true")
|
||||||
|
}
|
||||||
|
if !foundThink {
|
||||||
|
t.Error("non-secret tool args should be logged verbatim")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sink adapts an audit Storage to the run.Audit port: StartRun opens a run row
|
||||||
|
// and returns a per-run recorder (a Writer) that the executor feeds with steps,
|
||||||
|
// tool calls, and the terminal roll-up. This is what plugs the audit battery
|
||||||
|
// into run.Ports.Audit — mort backs it with its GORM Storage, a light host with
|
||||||
|
// Memory() (or omits it entirely).
|
||||||
|
type Sink struct{ storage Storage }
|
||||||
|
|
||||||
|
// NewSink wraps a Storage as a run.Audit. A nil Storage yields a Sink whose
|
||||||
|
// StartRun returns nil (the executor then records nothing) — so NewSink(nil) is
|
||||||
|
// equivalent to leaving run.Ports.Audit unset.
|
||||||
|
func NewSink(storage Storage) *Sink { return &Sink{storage: storage} }
|
||||||
|
|
||||||
|
// compile-time proof the adapter satisfies the core seams.
|
||||||
|
var (
|
||||||
|
_ run.Audit = (*Sink)(nil)
|
||||||
|
_ run.RunRecorder = (*recorder)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
// StartRun records the run start and returns a recorder. Implements run.Audit.
|
||||||
|
func (s *Sink) StartRun(ctx context.Context, info run.RunInfo) run.RunRecorder {
|
||||||
|
if s == nil || s.storage == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
started := info.StartedAt
|
||||||
|
if started.IsZero() {
|
||||||
|
started = time.Now()
|
||||||
|
}
|
||||||
|
// Best-effort: a failed StartRun must not break the user-visible run, but we
|
||||||
|
// surface it (a swallowed failure leaves orphan log events with no run row).
|
||||||
|
if err := s.storage.StartRun(ctx, SkillRun{
|
||||||
|
ID: info.RunID,
|
||||||
|
SkillID: info.SubjectID,
|
||||||
|
CallerID: info.CallerID,
|
||||||
|
ChannelID: info.ChannelID,
|
||||||
|
ParentRunID: info.ParentRunID,
|
||||||
|
Inputs: info.Inputs,
|
||||||
|
StartedAt: started,
|
||||||
|
Status: "running",
|
||||||
|
}); err != nil {
|
||||||
|
slog.Warn("audit: StartRun failed; the run row is missing so its log events will orphan",
|
||||||
|
"run_id", info.RunID, "error", err)
|
||||||
|
}
|
||||||
|
return &recorder{w: NewWriter(s.storage, info.RunID)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recorder adapts a *Writer to run.RunRecorder, converting run.RunStats to the
|
||||||
|
// audit RunStats on Close (the two have identical fields).
|
||||||
|
type recorder struct{ w *Writer }
|
||||||
|
|
||||||
|
func (r *recorder) TokenStats() (in, out, thinking int64) { return r.w.TokenStats() }
|
||||||
|
func (r *recorder) ToolCallsCount() int { return r.w.ToolCallsCount() }
|
||||||
|
func (r *recorder) OnStep(iter int, resp *llm.Response) { r.w.OnStep(iter, resp) }
|
||||||
|
func (r *recorder) OnTool(call llm.ToolCall, result string) { r.w.OnTool(call, result) }
|
||||||
|
func (r *recorder) LogEvent(eventType string, payload map[string]any) {
|
||||||
|
r.w.LogEvent(eventType, payload)
|
||||||
|
}
|
||||||
|
func (r *recorder) LogError(msg string) { r.w.LogError(msg) }
|
||||||
|
func (r *recorder) Close(ctx context.Context, s run.RunStats) {
|
||||||
|
r.w.Close(ctx, RunStats{
|
||||||
|
Status: s.Status,
|
||||||
|
Output: s.Output,
|
||||||
|
Error: s.Error,
|
||||||
|
ToolCalls: s.ToolCalls,
|
||||||
|
RuntimeSeconds: s.RuntimeSeconds,
|
||||||
|
InputTokens: s.InputTokens,
|
||||||
|
OutputTokens: s.OutputTokens,
|
||||||
|
ThinkingTokens: s.ThinkingTokens,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,245 @@
|
|||||||
|
// Package skillaudit persists skill execution traces: per-run summary rows
|
||||||
|
// (skill_runs) and per-step event logs (skill_run_logs). The executor in
|
||||||
|
// pkg/logic/skillexec emits events through a Writer; the storage layer is
|
||||||
|
// kept separate so tests can mock it and so retention pruning has a clear
|
||||||
|
// home.
|
||||||
|
//
|
||||||
|
// Why: agentic runs can be long, multi-tool affairs. Without a structured
|
||||||
|
// audit trail, debugging "why did the LLM do that?" is impossible. The
|
||||||
|
// log table is keyed by (run_id, sequence) so insert order is preserved.
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrNotFound is returned when a run lookup fails.
|
||||||
|
var ErrNotFound = errors.New("skill run not found")
|
||||||
|
|
||||||
|
// SkillRun is the per-invocation summary row. One per call to
|
||||||
|
// Executor.Run. Status transitions through running → ok / error /
|
||||||
|
// timeout / budget_exceeded / dry_run.
|
||||||
|
type SkillRun struct {
|
||||||
|
ID string
|
||||||
|
SkillID string
|
||||||
|
CallerID string
|
||||||
|
ChannelID string
|
||||||
|
Inputs map[string]any
|
||||||
|
StartedAt time.Time
|
||||||
|
FinishedAt *time.Time
|
||||||
|
Status string // running|ok|error|timeout|budget_exceeded|dry_run
|
||||||
|
Output string
|
||||||
|
Error string
|
||||||
|
ToolCallsCount int
|
||||||
|
RuntimeSeconds float64
|
||||||
|
// ParentRunID is the run_id of the parent skill that invoked this
|
||||||
|
// run via skill_invoke. Empty for top-level invocations. Indexed
|
||||||
|
// in the gorm model so call-tree queries (ListChildrenByParent +
|
||||||
|
// WalkParentChain) are cheap.
|
||||||
|
ParentRunID string
|
||||||
|
|
||||||
|
// Token roll-ups, summed across all model completions in this run
|
||||||
|
// (one Usage per OnStep). All default to 0 when the provider did
|
||||||
|
// not expose token usage.
|
||||||
|
TotalInputTokens int64
|
||||||
|
TotalOutputTokens int64
|
||||||
|
TotalThinkingTokens int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunStats captures the terminal state of a run for FinishRun. Bundling
|
||||||
|
// these into one struct (vs a long positional argument list) keeps
|
||||||
|
// callers readable; future fields slot in here without touching every
|
||||||
|
// call site.
|
||||||
|
//
|
||||||
|
// Why: FinishRun originally took six positional args; adding token
|
||||||
|
// columns would push it higher. A struct is the idiomatic Go way to
|
||||||
|
// avoid the positional-arg explosion.
|
||||||
|
type RunStats struct {
|
||||||
|
Status string // ok|error|timeout|budget_exceeded|dry_run
|
||||||
|
Output string // final agent output (empty on error)
|
||||||
|
Error string // error message (empty on success)
|
||||||
|
ToolCalls int // total OnTool count
|
||||||
|
RuntimeSeconds float64 // wall-clock duration
|
||||||
|
|
||||||
|
// Token roll-ups (all default to 0 when token usage was not
|
||||||
|
// exposed by the provider).
|
||||||
|
InputTokens int64
|
||||||
|
OutputTokens int64
|
||||||
|
ThinkingTokens int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// SkillRunLog is one event recorded during a run. EventType ∈
|
||||||
|
// step|tool_call|tool_result|error. Payload is opaque JSON the writer
|
||||||
|
// emits.
|
||||||
|
type SkillRunLog struct {
|
||||||
|
RunID string
|
||||||
|
Sequence int
|
||||||
|
EventType string
|
||||||
|
Payload map[string]any
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunFilter is the predicate bundle for the cross-surface "recent runs"
|
||||||
|
// query (ListRunsFiltered / CountRunsFiltered). Every field is optional;
|
||||||
|
// the zero value matches the most recent runs across ALL audited surfaces
|
||||||
|
// (agents + skills). This powers the admin agent-trace debug view and the
|
||||||
|
// Claude debug API's /runs list.
|
||||||
|
//
|
||||||
|
// Why a struct (vs positional args): the debug list filters along several
|
||||||
|
// independent axes and more will be added; bundling avoids a positional
|
||||||
|
// explosion and keeps call sites readable.
|
||||||
|
type RunFilter struct {
|
||||||
|
Status string // exact status match; "" = all (dry_run excluded unless IncludeDryRun)
|
||||||
|
SkillID string // exact skill_id (holds the agent UUID for agent runs)
|
||||||
|
CallerID string // exact caller (Discord member id)
|
||||||
|
ChannelID string // exact channel id
|
||||||
|
|
||||||
|
// TopLevelOnly restricts to root runs (parent_run_id = ''), hiding
|
||||||
|
// nested sub-agent / sub-skill runs from the firehose. The debug list
|
||||||
|
// defaults this on; an "include nested" toggle clears it.
|
||||||
|
TopLevelOnly bool
|
||||||
|
|
||||||
|
// IncludeDryRun surfaces status="dry_run" sandbox rows, which are
|
||||||
|
// excluded by default. Ignored when Status is set explicitly (an
|
||||||
|
// explicit Status=="dry_run" still matches).
|
||||||
|
IncludeDryRun bool
|
||||||
|
|
||||||
|
// Since / Until bound started_at: started_at >= Since (zero = no lower
|
||||||
|
// bound) and started_at < Until (zero = no upper bound).
|
||||||
|
Since time.Time
|
||||||
|
Until time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Storage is the persistence interface for skill runs and per-step logs.
|
||||||
|
//
|
||||||
|
// Why: tests substitute fake implementations; production wires
|
||||||
|
// NewGormStorage. Keep the interface narrow — the system only needs CRUD
|
||||||
|
// plus the retention prune helper.
|
||||||
|
type Storage interface {
|
||||||
|
Initialize(ctx context.Context) error
|
||||||
|
|
||||||
|
// StartRun inserts the run with status=running. The caller MUST
|
||||||
|
// invoke FinishRun later (or the row stays in running indefinitely
|
||||||
|
// — operationally that signals a crash mid-run, which is useful
|
||||||
|
// signal).
|
||||||
|
StartRun(ctx context.Context, run SkillRun) error
|
||||||
|
|
||||||
|
// FinishRun updates the running row with terminal status, output
|
||||||
|
// and stats. Idempotent on second call (last write wins).
|
||||||
|
//
|
||||||
|
// V5: takes a RunStats struct so token + cost columns can be
|
||||||
|
// written alongside the legacy fields without changing the
|
||||||
|
// signature for every future addition.
|
||||||
|
FinishRun(ctx context.Context, runID string, stats RunStats) error
|
||||||
|
|
||||||
|
// AppendLog adds one event to the run's log. Sequence numbers must
|
||||||
|
// be unique per run; the writer is responsible for monotonic
|
||||||
|
// ordering.
|
||||||
|
AppendLog(ctx context.Context, log SkillRunLog) error
|
||||||
|
|
||||||
|
// GetRun returns the run summary, or ErrNotFound.
|
||||||
|
GetRun(ctx context.Context, runID string) (*SkillRun, error)
|
||||||
|
|
||||||
|
// ListLogsByRun returns all logs for a run in sequence order.
|
||||||
|
ListLogsByRun(ctx context.Context, runID string) ([]SkillRunLog, error)
|
||||||
|
|
||||||
|
// ListRunsBySkill returns recent runs for a skill, newest first,
|
||||||
|
// capped at limit. Excludes dry-run rows by default — use
|
||||||
|
// ListRunsBySkillPaginated with includeDryRun=true to see them.
|
||||||
|
ListRunsBySkill(ctx context.Context, skillID string, limit int) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// ListRunsBySkillPaginated returns recent runs for a skill, newest
|
||||||
|
// first, with offset+limit. When includeDryRun is false, rows with
|
||||||
|
// status="dry_run" are excluded (matches the wizard's sandbox
|
||||||
|
// status; see skillaudit.Writer / wizardtools docs).
|
||||||
|
//
|
||||||
|
// Why a separate paginated method vs. expanding ListRunsBySkill:
|
||||||
|
// callers that need the legacy "last N" view (Discord .skill runs,
|
||||||
|
// chatbot tool result) want the simpler signature; the paginated
|
||||||
|
// view is webui-specific.
|
||||||
|
ListRunsBySkillPaginated(ctx context.Context, skillID string,
|
||||||
|
offset, limit int, includeDryRun bool) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// CountRunsBySkill returns the total number of runs for a skill.
|
||||||
|
// When includeDryRun is false, dry-run rows are excluded so the
|
||||||
|
// count matches the default ListRunsBySkillPaginated result.
|
||||||
|
CountRunsBySkill(ctx context.Context, skillID string, includeDryRun bool) (int64, error)
|
||||||
|
|
||||||
|
// ListRunsByCaller returns recent runs by a caller, newest first,
|
||||||
|
// capped at limit.
|
||||||
|
ListRunsByCaller(ctx context.Context, callerID string, limit int) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// ListRunsFiltered returns runs matching f, newest first
|
||||||
|
// (started_at DESC), with offset+limit. With an all-zero filter it
|
||||||
|
// returns the most recent runs across EVERY audited surface (agents +
|
||||||
|
// skills) — the cross-surface feed behind the admin agent-trace debug
|
||||||
|
// view and the Claude debug API. dry_run rows are excluded unless
|
||||||
|
// f.IncludeDryRun or f.Status=="dry_run". limit is clamped (<=0 or
|
||||||
|
// >500 → 50) to bound admin scans.
|
||||||
|
ListRunsFiltered(ctx context.Context, f RunFilter, offset, limit int) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// CountRunsFiltered returns the total rows matching f (ignoring
|
||||||
|
// offset/limit), for pagination math.
|
||||||
|
CountRunsFiltered(ctx context.Context, f RunFilter) (int64, error)
|
||||||
|
|
||||||
|
// PurgeOlderThan deletes runs (and their logs) whose StartedAt is
|
||||||
|
// strictly before t. Returns the number of runs deleted.
|
||||||
|
PurgeOlderThan(ctx context.Context, t time.Time) (int64, error)
|
||||||
|
|
||||||
|
// ListChildrenByParent returns all SkillRun rows where
|
||||||
|
// parent_run_id == parentRunID, oldest first. Used for the
|
||||||
|
// call-tree view (skill_invoke trace section) and as a building
|
||||||
|
// block for WalkParentChain.
|
||||||
|
//
|
||||||
|
// Returns an empty slice when parentRunID has no children. An
|
||||||
|
// empty parentRunID never matches anything (no row stores ""
|
||||||
|
// as a parent — that's the top-level sentinel).
|
||||||
|
ListChildrenByParent(ctx context.Context, parentRunID string) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// WalkParentChain walks from runID up via parent_run_id, returning
|
||||||
|
// the chain of SkillRun summaries (oldest = root first, newest =
|
||||||
|
// runID last). Used by the loop guard in skill_invoke.
|
||||||
|
//
|
||||||
|
// Cap walk depth at 32 to prevent pathological loops in the data
|
||||||
|
// itself: if the parent_run_id chain has been corrupted (e.g. by
|
||||||
|
// a bad migration) and forms a cycle, we want a bounded result
|
||||||
|
// rather than an infinite loop.
|
||||||
|
WalkParentChain(ctx context.Context, runID string) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// ListFinishedRunsBefore returns runs whose FinishedAt is strictly
|
||||||
|
// before cutoff, oldest first, capped at limit. limit <= 0 yields
|
||||||
|
// no rows (the caller is expected to specify a real bound).
|
||||||
|
//
|
||||||
|
// Why: skills.StorageSweeper drives the run-scope storage purge from
|
||||||
|
// this query. The sweeper picks up only finished runs so an
|
||||||
|
// in-flight run's run-scope KV/files cannot be deleted out from
|
||||||
|
// under it.
|
||||||
|
//
|
||||||
|
// Test: storage_test.go covers the include/exclude boundaries
|
||||||
|
// (running rows excluded; finished-after-cutoff excluded; finished-
|
||||||
|
// before-cutoff included).
|
||||||
|
ListFinishedRunsBefore(ctx context.Context, cutoff time.Time, limit int) ([]SkillRun, error)
|
||||||
|
|
||||||
|
// LastRunBySkills returns the most recent StartedAt timestamp per
|
||||||
|
// skill in the input ID list. Skills with no rows simply have no
|
||||||
|
// entry in the result map (caller distinguishes "never run" from
|
||||||
|
// "run but no timestamp" by map key presence).
|
||||||
|
//
|
||||||
|
// When includeFailed is true, all non-dry-run statuses count
|
||||||
|
// (ok / error / timeout / budget_exceeded / preempted / lane_busy).
|
||||||
|
// When false, only status="ok" rows count — useful for "last
|
||||||
|
// successful run" semantics on dashboards where errored runs
|
||||||
|
// shouldn't surface as recent activity.
|
||||||
|
//
|
||||||
|
// Empty skillIDs short-circuits to an empty map without touching
|
||||||
|
// the DB.
|
||||||
|
LastRunBySkills(ctx context.Context, skillIDs []string, includeFailed bool) (map[string]time.Time, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxParentChainDepth is the safety cap for WalkParentChain. The loop
|
||||||
|
// guard in skill_invoke enforces a separate (smaller) MaxInvokeDepth
|
||||||
|
// at the tool layer; this cap exists only to bound the walk in the
|
||||||
|
// presence of corrupted data.
|
||||||
|
const MaxParentChainDepth = 32
|
||||||
+359
@@ -0,0 +1,359 @@
|
|||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// stepTextMax caps the per-step assistant-text preview persisted on a
|
||||||
|
// "step" event. Large enough to capture the model's reasoning around a
|
||||||
|
// (mis)fired tool call — the single best clue to WHY a model emitted a
|
||||||
|
// malformed call — but bounded so the longtext payload can't balloon.
|
||||||
|
const stepTextMax = 2000
|
||||||
|
|
||||||
|
// Writer wraps a Storage with the OnStep / OnTool callbacks suitable for
|
||||||
|
// wiring into the majordomo agent loop's step observer, tracking sequence
|
||||||
|
// numbers and tool-call counts internally.
|
||||||
|
//
|
||||||
|
// Why: the agent loop's observer hooks are unaware of run identity; the
|
||||||
|
// writer captures the runID + skill metadata at construction so the
|
||||||
|
// per-event callbacks stay simple. AppendLog failures are logged but
|
||||||
|
// never fatal — audit must not break user-visible execution.
|
||||||
|
//
|
||||||
|
// What: NewWriter(storage, runID) → use OnStep / OnTool / Close. Close
|
||||||
|
// records the final FinishRun. The executors translate each agent.Step
|
||||||
|
// into one OnStep call (1-indexed iteration, the step's *llm.Response)
|
||||||
|
// plus one OnTool call per executed tool.
|
||||||
|
//
|
||||||
|
// Test: see writer_test.go for sequence ordering and finish semantics.
|
||||||
|
type Writer struct {
|
||||||
|
storage Storage
|
||||||
|
runID string
|
||||||
|
sequence atomic.Int32
|
||||||
|
calls atomic.Int32
|
||||||
|
mu sync.Mutex // guards Close idempotency + token tally
|
||||||
|
closed bool
|
||||||
|
|
||||||
|
// V5 token accumulator — summed across each OnStep's resp.Usage.
|
||||||
|
// Reads come from TokenStats() so the executor can pass them to
|
||||||
|
// FinishRun. atomics-on-Int64 would also work, but mu already
|
||||||
|
// guards Close + we need consistent multi-field reads anyway
|
||||||
|
// (input + output + thinking). The mutex hot-path overhead is
|
||||||
|
// negligible vs the LLM call latency that dominates step time.
|
||||||
|
inputTokens int64
|
||||||
|
outputTokens int64
|
||||||
|
thinkingTokens int64
|
||||||
|
|
||||||
|
// Per-step wall-clock + run-level model attribution (guarded by mu).
|
||||||
|
// startedAt anchors the first step's duration; lastStepAt is the
|
||||||
|
// previous step's observation time; resolvedModelLogged ensures the
|
||||||
|
// one-shot "resolved_model" run-level event fires at most once.
|
||||||
|
startedAt time.Time
|
||||||
|
lastStepAt time.Time
|
||||||
|
resolvedModelLogged bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWriter constructs a Writer. The caller is expected to have already
|
||||||
|
// called Storage.StartRun.
|
||||||
|
func NewWriter(storage Storage, runID string) *Writer {
|
||||||
|
return &Writer{storage: storage, runID: runID, startedAt: time.Now()}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnStep records one agent-loop step: a "step" event with the iteration
|
||||||
|
// number and the response's text size.
|
||||||
|
//
|
||||||
|
// V5: also tallies per-step token usage. majordomo populates
|
||||||
|
// resp.Usage when the provider reports it; for providers that don't,
|
||||||
|
// the fields stay 0 and the tally stays at 0 — the formatter then
|
||||||
|
// renders "—" rather than a misleading "$0.00".
|
||||||
|
//
|
||||||
|
// Why we tally here vs in the agent loop: the loop's Result.Usage is a
|
||||||
|
// run total; the audit row needs the same numbers, but the writer also
|
||||||
|
// serves the live RunState accessor mid-run, so a per-step running sum
|
||||||
|
// is the right shape. Global usage attribution is handled by the llms
|
||||||
|
// package's instrumented models — the writer tally is strictly the
|
||||||
|
// per-run audit roll-up.
|
||||||
|
func (w *Writer) OnStep(iter int, resp *llm.Response) {
|
||||||
|
if w == nil || w.storage == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
payload := map[string]any{"iter": iter}
|
||||||
|
|
||||||
|
w.mu.Lock()
|
||||||
|
// Per-step wall-clock: time since the previous observed step, or since
|
||||||
|
// run start for the first step. A long gap localises a slow/hung model
|
||||||
|
// call — the signal that was missing when an animate step-0 call hung
|
||||||
|
// ~5 min. NOTE: this is step-to-step wall time (model call + the prior
|
||||||
|
// step's tool execution), not pure model latency.
|
||||||
|
prev := w.lastStepAt
|
||||||
|
if prev.IsZero() {
|
||||||
|
prev = w.startedAt
|
||||||
|
}
|
||||||
|
if !prev.IsZero() {
|
||||||
|
payload["step_ms"] = now.Sub(prev).Milliseconds()
|
||||||
|
}
|
||||||
|
w.lastStepAt = now
|
||||||
|
if resp != nil {
|
||||||
|
w.inputTokens += int64(resp.Usage.InputTokens)
|
||||||
|
w.outputTokens += int64(resp.Usage.OutputTokens)
|
||||||
|
// Thinking/reasoning tokens are a first-class Usage field in
|
||||||
|
// majordomo (populated by the providers that report them).
|
||||||
|
w.thinkingTokens += int64(resp.Usage.ReasoningTokens)
|
||||||
|
}
|
||||||
|
// One-shot run-level served-model attribution: the FIRST step with a
|
||||||
|
// resolved model name emits a "resolved_model" event so a run that
|
||||||
|
// errors before producing a useful step still records which model
|
||||||
|
// served it. resp.Model is failover-aware ("provider/model-id" of the
|
||||||
|
// element that actually served), unlike the static configured head.
|
||||||
|
logResolvedModel := ""
|
||||||
|
if resp != nil && resp.Model != "" && !w.resolvedModelLogged {
|
||||||
|
w.resolvedModelLogged = true
|
||||||
|
logResolvedModel = resp.Model
|
||||||
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
if resp != nil {
|
||||||
|
payload["text_len"] = len(resp.Text())
|
||||||
|
// Served model + why generation stopped — the two scalars that turn
|
||||||
|
// a "model misbehaved" guess into a fact. finish_reason on an
|
||||||
|
// empty-tool-call step disambiguates truncation (length) from a
|
||||||
|
// deliberate empty emission (tool_calls).
|
||||||
|
if resp.Model != "" {
|
||||||
|
payload["model"] = resp.Model
|
||||||
|
}
|
||||||
|
if resp.FinishReason != "" {
|
||||||
|
payload["finish_reason"] = string(resp.FinishReason)
|
||||||
|
}
|
||||||
|
// Per-step token breakdown (OnStep already reads these into the run
|
||||||
|
// total above; persisting the per-step slice costs nothing more).
|
||||||
|
payload["in_tokens"] = resp.Usage.InputTokens
|
||||||
|
payload["out_tokens"] = resp.Usage.OutputTokens
|
||||||
|
if resp.Usage.ReasoningTokens > 0 {
|
||||||
|
payload["thinking_tokens"] = resp.Usage.ReasoningTokens
|
||||||
|
}
|
||||||
|
if resp.Usage.CacheReadTokens > 0 {
|
||||||
|
payload["cache_read_tokens"] = resp.Usage.CacheReadTokens
|
||||||
|
}
|
||||||
|
// The model's own narration accompanying this step — the smoking gun
|
||||||
|
// for WHY a malformed tool call was emitted. Capped; suppressed when
|
||||||
|
// the step fired a secret-bearing tool (mcp_call/email_send/http_*)
|
||||||
|
// whose narration could echo the secret it's about to send.
|
||||||
|
if t := strings.TrimSpace(resp.Text()); t != "" {
|
||||||
|
if stepHasSecretTool(resp) {
|
||||||
|
payload["text_redacted"] = true
|
||||||
|
} else {
|
||||||
|
payload["text"] = truncate(t, stepTextMax)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
payload["text_len"] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
w.appendLog("step", payload)
|
||||||
|
|
||||||
|
if logResolvedModel != "" {
|
||||||
|
w.appendLog("resolved_model", map[string]any{"model": logResolvedModel})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// stepHasSecretTool reports whether a step's response fired a tool whose
|
||||||
|
// surrounding narration could leak a secret (MCP args, email body/
|
||||||
|
// recipients, raw HTTP request). Mirrors the steps.go redaction list so
|
||||||
|
// the audit trace never persists secret-adjacent assistant text.
|
||||||
|
// isSecretTool reports whether a tool's arguments/results may carry secrets
|
||||||
|
// (MCP args, email bodies/recipients, HTTP auth headers/bodies) and so must be
|
||||||
|
// redacted from the persisted audit log. Single source of truth for both the
|
||||||
|
// step-narration redaction and the OnTool arg/result redaction. NOTE: this is
|
||||||
|
// a name-prefix allowlist — a NEW secret-bearing tool must be added here or its
|
||||||
|
// args/results will be logged verbatim.
|
||||||
|
func isSecretTool(name string) bool {
|
||||||
|
switch name {
|
||||||
|
case "mcp_call", "email_send":
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return strings.HasPrefix(name, "http_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func stepHasSecretTool(resp *llm.Response) bool {
|
||||||
|
if resp == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, c := range resp.ToolCalls {
|
||||||
|
if isSecretTool(c.Name) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TokenStats returns the running totals tallied from OnStep.
|
||||||
|
// Safe to call concurrently. Returned values are a snapshot at call
|
||||||
|
// time. Used by the executors to populate RunStats before Close
|
||||||
|
// finalises the audit row.
|
||||||
|
//
|
||||||
|
// Why: the executor needs the totals AND a model name to compute cost,
|
||||||
|
// but cost calculation is a different concern from audit persistence.
|
||||||
|
// Exposing this getter lets the cost calculation live in the executor
|
||||||
|
// where the model is known.
|
||||||
|
func (w *Writer) TokenStats() (input, output, thinking int64) {
|
||||||
|
if w == nil {
|
||||||
|
return 0, 0, 0
|
||||||
|
}
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
return w.inputTokens, w.outputTokens, w.thinkingTokens
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnTool records a "tool_call" event with the tool name and a
|
||||||
|
// "tool_result" event with the result length. Tool count is incremented
|
||||||
|
// for each call. The executors call this once per executed tool call
|
||||||
|
// from their step observers (call + matching result content).
|
||||||
|
func (w *Writer) OnTool(call llm.ToolCall, result string) {
|
||||||
|
if w == nil || w.storage == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.calls.Add(1)
|
||||||
|
// Redact the args/result of secret-bearing tools — these fields actually
|
||||||
|
// CARRY the secret (MCP args, email body/recipients, HTTP auth/body), so
|
||||||
|
// logging them verbatim would defeat the OnStep narration redaction.
|
||||||
|
if isSecretTool(call.Name) {
|
||||||
|
w.appendLog("tool_call", map[string]any{
|
||||||
|
"name": call.Name,
|
||||||
|
"id": call.ID,
|
||||||
|
"args_redacted": true,
|
||||||
|
"args_len": len(call.Arguments),
|
||||||
|
})
|
||||||
|
w.appendLog("tool_result", map[string]any{
|
||||||
|
"name": call.Name,
|
||||||
|
"id": call.ID,
|
||||||
|
"result_redacted": true,
|
||||||
|
"result_len": len(result),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.appendLog("tool_call", map[string]any{
|
||||||
|
"name": call.Name,
|
||||||
|
"args": string(call.Arguments),
|
||||||
|
"id": call.ID,
|
||||||
|
})
|
||||||
|
w.appendLog("tool_result", map[string]any{
|
||||||
|
"name": call.Name,
|
||||||
|
"id": call.ID,
|
||||||
|
"result": truncate(result, 4000),
|
||||||
|
"truncated": len(result) > 4000,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogEvent records a custom event mid-run. The executor uses this for
|
||||||
|
// diagnostic events (e.g. "compaction_setup" / "compaction_fired")
|
||||||
|
// outside the canonical step / tool_call / tool_result / error set.
|
||||||
|
// Nil-safe: no-op when receiver or storage is nil.
|
||||||
|
//
|
||||||
|
// Why: skill_run_logs is the only sink Steve can read from SQL, so
|
||||||
|
// diagnostics intended for post-hoc debugging belong here. slog goes
|
||||||
|
// to mort.log which is harder to reach from outside the host.
|
||||||
|
func (w *Writer) LogEvent(eventType string, payload map[string]any) {
|
||||||
|
if w == nil || w.storage == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.appendLog(eventType, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogError records an "error" event mid-run. Distinct from the terminal
|
||||||
|
// status set by Close.
|
||||||
|
func (w *Writer) LogError(msg string) {
|
||||||
|
if w == nil || w.storage == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.appendLog("error", map[string]any{"message": msg})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close finishes the run. The caller assembles a RunStats; the writer
|
||||||
|
// fills in ToolCalls (which is bookkept on the writer itself) and
|
||||||
|
// hands the full record to FinishRun.
|
||||||
|
//
|
||||||
|
// Idempotent: subsequent calls are no-ops.
|
||||||
|
//
|
||||||
|
// Why a struct vs the old positional form: v5 adds four token + cost
|
||||||
|
// fields on top of the legacy six. The struct keeps call sites readable
|
||||||
|
// and lets future fields slot in without churning every caller.
|
||||||
|
//
|
||||||
|
// Why context.WithoutCancel: the run's terminal status MUST land in
|
||||||
|
// the audit row regardless of the run ctx's state. Pre-fix, child
|
||||||
|
// skill runs invoked via skill_invoke / skill_invoke_parallel inherited
|
||||||
|
// the parent agent's runCtx as their outer ctx; when the parent
|
||||||
|
// timed out at MaxRuntime, every in-flight child's FinishRun fired
|
||||||
|
// with that already-cancelled ctx and the row was left in
|
||||||
|
// status=running forever. Detaching here is defence in depth — the
|
||||||
|
// caller (skillexec.runInner / agentexec.runInner) ALSO detaches at
|
||||||
|
// the call site, but a cancelled ctx in the writer's hands MUST NOT
|
||||||
|
// drop the audit write. The short timeout (auditFinishTimeout) bounds
|
||||||
|
// the write so a hung DB doesn't pin the run goroutine indefinitely.
|
||||||
|
func (w *Writer) Close(ctx context.Context, stats RunStats) {
|
||||||
|
if w == nil || w.storage == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
if w.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
|
stats.ToolCalls = int(w.calls.Load())
|
||||||
|
// Detach from the caller's deadline + cancellation. Run cleanup
|
||||||
|
// must complete even when the run ctx is dead. The fresh
|
||||||
|
// auditFinishTimeout caps how long we'll wait on the storage.
|
||||||
|
finishCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), auditFinishTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := w.storage.FinishRun(finishCtx, w.runID, stats); err != nil {
|
||||||
|
slog.Warn("skillaudit: FinishRun failed", "run_id", w.runID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// auditFinishTimeout caps how long Close will wait on the storage's
|
||||||
|
// FinishRun call after detaching from the caller's ctx. 10s is generous
|
||||||
|
// for a single-row UPDATE against MySQL — anything longer suggests a
|
||||||
|
// hung connection that the run goroutine shouldn't keep waiting on.
|
||||||
|
const auditFinishTimeout = 10 * time.Second
|
||||||
|
|
||||||
|
// auditAppendTimeout bounds each per-event AppendLog on the hot path so a hung
|
||||||
|
// storage backend can't block the run goroutine.
|
||||||
|
const auditAppendTimeout = 3 * time.Second
|
||||||
|
|
||||||
|
// ToolCallsCount returns how many tool invocations OnTool has seen so
|
||||||
|
// far. Useful for budget enforcement.
|
||||||
|
func (w *Writer) ToolCallsCount() int { return int(w.calls.Load()) }
|
||||||
|
|
||||||
|
func (w *Writer) appendLog(eventType string, payload map[string]any) {
|
||||||
|
seq := int(w.sequence.Add(1))
|
||||||
|
log := SkillRunLog{
|
||||||
|
RunID: w.runID,
|
||||||
|
Sequence: seq,
|
||||||
|
EventType: eventType,
|
||||||
|
Payload: payload,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
// Bound the write: a hung storage backend must not block the run goroutine
|
||||||
|
// on the hot path (every step/tool event flows through here). Detached from
|
||||||
|
// any caller deadline — the log write is independent of the run's context.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), auditAppendTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := w.storage.AppendLog(ctx, log); err != nil {
|
||||||
|
slog.Warn("skillaudit: AppendLog failed", "run_id", w.runID, "seq", seq, "type", eventType, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncate(s string, max int) string {
|
||||||
|
if len(s) <= max {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
return s[:max] + fmt.Sprintf("…[+%d bytes]", len(s)-max)
|
||||||
|
}
|
||||||
@@ -0,0 +1,169 @@
|
|||||||
|
// Package budget gates and meters per-caller resource use over a rolling
|
||||||
|
// 7-day window (run.Ports.Budget). DBBudget is the durable tracker; NoOpBudget
|
||||||
|
// disables metering; the BudgetStorage seam backs it (Memory / contrib SQLite).
|
||||||
|
// loop (gitea.stevedudenhoeffer.com/steve/majordomo/agent).
|
||||||
|
//
|
||||||
|
// Why: a Skill is data; the executor turns data into a running agent
|
||||||
|
// (resolve model, build toolbox, start audit, run the agent loop,
|
||||||
|
// finish audit, deliver).
|
||||||
|
package budget
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BudgetTracker enforces per-user GPU budgets in v2. v1 ships
|
||||||
|
// NoOpBudget which always allows. The interface exists now so the v2
|
||||||
|
// migration is a single line in the executor.
|
||||||
|
//
|
||||||
|
// Why interface now: the executor's Check/Commit calls would need to
|
||||||
|
// be added in v2 anyway; doing it now means v2 only swaps NoOp for
|
||||||
|
// DBBudget without touching call sites.
|
||||||
|
type BudgetTracker interface {
|
||||||
|
// Check reports whether the caller has remaining budget. Returns
|
||||||
|
// nil for "yes" or an error describing the exhaustion.
|
||||||
|
Check(ctx context.Context, callerID string) error
|
||||||
|
|
||||||
|
// Commit records that the caller spent runtimeSeconds of budget on
|
||||||
|
// this run. Called after the agent completes (success or error).
|
||||||
|
Commit(ctx context.Context, callerID string, runtimeSeconds float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NoOpBudget always allows and never records. v1 default.
|
||||||
|
type NoOpBudget struct{}
|
||||||
|
|
||||||
|
// NewNoOpBudget constructs the no-op tracker.
|
||||||
|
func NewNoOpBudget() BudgetTracker { return NoOpBudget{} }
|
||||||
|
|
||||||
|
// Check always returns nil.
|
||||||
|
func (NoOpBudget) Check(_ context.Context, _ string) error { return nil }
|
||||||
|
|
||||||
|
// Commit is a no-op.
|
||||||
|
func (NoOpBudget) Commit(_ context.Context, _ string, _ float64) {}
|
||||||
|
|
||||||
|
// ErrBudgetExceeded is returned by DBBudget.Check when the caller's
|
||||||
|
// 7-day rolling window has hit the convar-configured cap.
|
||||||
|
//
|
||||||
|
// Why a sentinel: callers (executor, audit writer) need to distinguish
|
||||||
|
// budget rejection from generic errors so they can record
|
||||||
|
// status="budget_exceeded" instead of "error" and skip user-visible
|
||||||
|
// delivery side-effects.
|
||||||
|
var ErrBudgetExceeded = errors.New("weekly skill budget exceeded")
|
||||||
|
|
||||||
|
// BudgetNotifier is the optional callback DBBudget invokes when a
|
||||||
|
// Check rejects a caller. Production wires a Discord-DM hook so the
|
||||||
|
// user knows why their skill failed; tests inject a recorder.
|
||||||
|
//
|
||||||
|
// nil is allowed and is silently skipped.
|
||||||
|
type BudgetNotifier func(ctx context.Context, userID string, secondsUsed, cap float64)
|
||||||
|
|
||||||
|
// DBBudget enforces per-user weekly GPU budgets via the BudgetStorage
|
||||||
|
// interface. The "weekly" cap is a rolling 7-day window — see
|
||||||
|
// SkillBudget for the rollover semantics.
|
||||||
|
//
|
||||||
|
// Why a closure for the limit instead of an int field: the cap comes
|
||||||
|
// from a runtime convar. Reading it on every Check means a `.convar
|
||||||
|
// set skills.user_budget_seconds_per_week 7200` takes effect on the
|
||||||
|
// next call without restarting the bot or rewiring the executor.
|
||||||
|
type DBBudget struct {
|
||||||
|
storage BudgetStorage
|
||||||
|
// weeklyLimit returns the current cap in seconds. Reads convar at
|
||||||
|
// every Check so a runtime convar bump takes effect on the next
|
||||||
|
// call.
|
||||||
|
weeklyLimit func() float64
|
||||||
|
|
||||||
|
// notify is called when a Check rejects a caller. Optional —
|
||||||
|
// production wires a Discord-DM hook so the user knows why their
|
||||||
|
// skill failed. nil-safe.
|
||||||
|
notify BudgetNotifier
|
||||||
|
|
||||||
|
// now is the time source. Test injects a fake clock; production
|
||||||
|
// uses time.Now.
|
||||||
|
now func() time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDBBudget constructs a DBBudget. now may be nil — defaults to
|
||||||
|
// time.Now.
|
||||||
|
//
|
||||||
|
// Why time injection: budget rollover is time-sensitive; tests need to
|
||||||
|
// fast-forward past the 7-day boundary deterministically. now=nil
|
||||||
|
// means production callers (mort.go) don't have to think about it.
|
||||||
|
//
|
||||||
|
// Test: pass a closure that returns a fixed instant; assert rollover
|
||||||
|
// only happens when (now - WindowStart) >= 7 days.
|
||||||
|
func NewDBBudget(storage BudgetStorage, weeklyLimit func() float64, notify BudgetNotifier, now func() time.Time) *DBBudget {
|
||||||
|
if now == nil {
|
||||||
|
now = time.Now
|
||||||
|
}
|
||||||
|
return &DBBudget{
|
||||||
|
storage: storage,
|
||||||
|
weeklyLimit: weeklyLimit,
|
||||||
|
notify: notify,
|
||||||
|
now: now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check returns ErrBudgetExceeded if the caller has spent at least
|
||||||
|
// weeklyLimit seconds in the current rolling 7-day window.
|
||||||
|
//
|
||||||
|
// Why anonymous callerID="" is unbudgeted: scheduler-driven and
|
||||||
|
// system-initiated runs don't have a Discord user to bill; charging
|
||||||
|
// "system" would conflate them with a real user. The scheduler sets
|
||||||
|
// CallerID to the skill owner where applicable, so cron-loop
|
||||||
|
// abusiveness still consumes the owner's budget.
|
||||||
|
//
|
||||||
|
// Why cap<=0 means "disabled": operator wants a runtime kill-switch.
|
||||||
|
// Setting the convar to "0" turns enforcement off without restart.
|
||||||
|
//
|
||||||
|
// Test: Get returns nil → Check returns nil; Get returns row with
|
||||||
|
// SecondsUsed >= cap → Check returns ErrBudgetExceeded and notify is
|
||||||
|
// invoked; window expired (>=7d) → Check returns nil regardless of
|
||||||
|
// SecondsUsed.
|
||||||
|
func (b *DBBudget) Check(ctx context.Context, callerID string) error {
|
||||||
|
if callerID == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bud, err := b.storage.Get(ctx, callerID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("budget: %w", err)
|
||||||
|
}
|
||||||
|
if bud != nil {
|
||||||
|
if b.now().Sub(bud.WindowStart) < budgetWindow {
|
||||||
|
cap := b.weeklyLimit()
|
||||||
|
if cap > 0 && bud.SecondsUsed >= cap {
|
||||||
|
if b.notify != nil {
|
||||||
|
b.notify(ctx, callerID, bud.SecondsUsed, cap)
|
||||||
|
}
|
||||||
|
return ErrBudgetExceeded
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit records the run's runtime against the caller's budget.
|
||||||
|
// Failures are logged but never returned — budget accounting must
|
||||||
|
// not break user-visible execution.
|
||||||
|
//
|
||||||
|
// Why callerID="" is a no-op: matches Check's anonymous-caller
|
||||||
|
// shortcut; system runs don't get billed.
|
||||||
|
//
|
||||||
|
// Why runtimeSeconds<=0 is a no-op: a run that errored before
|
||||||
|
// resolving a model has wallSecs near 0 in floating-point terms but
|
||||||
|
// can also be exactly 0 (synthetic test fixtures). Skipping avoids
|
||||||
|
// spurious 0-runs rows from short-lived failures.
|
||||||
|
//
|
||||||
|
// Test: Commit(50) → Get reports SecondsUsed=50; storage failure
|
||||||
|
// surfaces only as a slog.Warn (no panic, no return).
|
||||||
|
func (b *DBBudget) Commit(ctx context.Context, callerID string, runtimeSeconds float64) {
|
||||||
|
if callerID == "" || runtimeSeconds <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := b.storage.Add(ctx, callerID, runtimeSeconds, b.now()); err != nil {
|
||||||
|
slog.Warn("skills budget: commit failed", "user", callerID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package budget
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDBBudgetRollingWindow(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mem := NewMemory()
|
||||||
|
now := time.Now()
|
||||||
|
clock := func() time.Time { return now }
|
||||||
|
b := NewDBBudget(mem, func() float64 { return 100 }, nil, clock)
|
||||||
|
|
||||||
|
// Under cap: allowed.
|
||||||
|
if err := b.Check(ctx, "u"); err != nil {
|
||||||
|
t.Fatalf("fresh caller should pass: %v", err)
|
||||||
|
}
|
||||||
|
b.Commit(ctx, "u", 60)
|
||||||
|
if err := b.Check(ctx, "u"); err != nil {
|
||||||
|
t.Fatalf("60/100 should pass: %v", err)
|
||||||
|
}
|
||||||
|
// Over cap: rejected.
|
||||||
|
b.Commit(ctx, "u", 50) // 110 total
|
||||||
|
if err := b.Check(ctx, "u"); !errors.Is(err, ErrBudgetExceeded) {
|
||||||
|
t.Fatalf("110/100 should be ErrBudgetExceeded, got %v", err)
|
||||||
|
}
|
||||||
|
// Window rolls over after 7 days: allowed again.
|
||||||
|
now = now.Add(8 * 24 * time.Hour)
|
||||||
|
b.Commit(ctx, "u", 1) // triggers rollover inside Add
|
||||||
|
if err := b.Check(ctx, "u"); err != nil {
|
||||||
|
t.Fatalf("after window rollover should pass: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoOpBudgetAlwaysAllows(t *testing.T) {
|
||||||
|
b := NewNoOpBudget()
|
||||||
|
if err := b.Check(context.Background(), "anyone"); err != nil {
|
||||||
|
t.Fatalf("NoOp must always allow: %v", err)
|
||||||
|
}
|
||||||
|
b.Commit(context.Background(), "anyone", 1e9) // no-op
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package budget
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Memory is a zero-dependency in-process BudgetStorage: per-user rolling-window
|
||||||
|
// usage held in memory (lost on restart). The default behind DBBudget for a
|
||||||
|
// light host or tests; mort uses its GORM Storage, contrib/store adds SQLite.
|
||||||
|
type Memory struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
rows map[string]*SkillBudget
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMemory returns an empty in-memory BudgetStorage.
|
||||||
|
func NewMemory() *Memory { return &Memory{rows: map[string]*SkillBudget{}} }
|
||||||
|
|
||||||
|
var _ BudgetStorage = (*Memory)(nil)
|
||||||
|
|
||||||
|
func (m *Memory) Initialize(context.Context) error { return nil }
|
||||||
|
|
||||||
|
func (m *Memory) Get(_ context.Context, userID string) (*SkillBudget, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
r, ok := m.rows[userID]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
cp := *r // copy out so callers can't mutate our row
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) Add(_ context.Context, userID string, secondsUsed float64, now time.Time) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
r, ok := m.rows[userID]
|
||||||
|
if !ok {
|
||||||
|
m.rows[userID] = &SkillBudget{
|
||||||
|
UserID: userID, WindowStart: now,
|
||||||
|
SecondsUsed: secondsUsed, RunsCount: 1, UpdatedAt: now,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Roll the window over if it's older than the window length.
|
||||||
|
if now.Sub(r.WindowStart) >= budgetWindow {
|
||||||
|
r.WindowStart = now
|
||||||
|
r.SecondsUsed = 0
|
||||||
|
r.RunsCount = 0
|
||||||
|
}
|
||||||
|
r.SecondsUsed += secondsUsed
|
||||||
|
r.RunsCount++
|
||||||
|
r.UpdatedAt = now
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package budget
|
||||||
|
|
||||||
|
import "gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
|
||||||
|
// The budget trackers plug directly into run.Ports.Budget (Check/Commit match).
|
||||||
|
var (
|
||||||
|
_ run.Budget = NoOpBudget{}
|
||||||
|
_ run.Budget = (*DBBudget)(nil)
|
||||||
|
)
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package budget
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BudgetStorage is the persistence seam behind DBBudget: one budget row per
|
||||||
|
// user, with an atomic Add that rolls the 7-day window over transparently. Mort
|
||||||
|
// backs this with GORM/MySQL (the skill_budgets table); Memory() is the
|
||||||
|
// zero-dependency default; contrib/store adds a durable SQLite one.
|
||||||
|
type BudgetStorage interface {
|
||||||
|
// Initialize runs any schema setup. Safe to call repeatedly.
|
||||||
|
Initialize(ctx context.Context) error
|
||||||
|
// Get returns the user's current budget row, or (nil, nil) if none exists.
|
||||||
|
Get(ctx context.Context, userID string) (*SkillBudget, error)
|
||||||
|
// Add increments seconds_used + runs_count atomically, rolling the window
|
||||||
|
// over when WindowStart is older than 7 days (reset to now, fresh count).
|
||||||
|
// Creates the row if absent.
|
||||||
|
Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// SkillBudget is one user's rolling-window usage row.
|
||||||
|
type SkillBudget struct {
|
||||||
|
UserID string
|
||||||
|
WindowStart time.Time
|
||||||
|
SecondsUsed float64
|
||||||
|
RunsCount int
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// budgetWindow is the rolling window length the storage rolls over at.
|
||||||
|
const budgetWindow = 7 * 24 * time.Hour
|
||||||
@@ -6,6 +6,7 @@ require (
|
|||||||
gitea.stevedudenhoeffer.com/steve/majordomo v0.0.0-20260626223738-1fd7109a42f3
|
gitea.stevedudenhoeffer.com/steve/majordomo v0.0.0-20260626223738-1fd7109a42f3
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
golang.org/x/crypto v0.53.0
|
golang.org/x/crypto v0.53.0
|
||||||
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h
|
|||||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
|||||||
@@ -0,0 +1,191 @@
|
|||||||
|
// Package agents implements the Agent noun: a persisted persona +
|
||||||
|
// execution spec + palette of skills/sub-agents/low-level tools, with
|
||||||
|
// optional trigger metadata (schedule, webhook, chatbot channel
|
||||||
|
// listener) and personalization sources.
|
||||||
|
//
|
||||||
|
// Phase 1 scope: this package introduces Agent as a persisted noun
|
||||||
|
// with CRUD only — no execution path, no palette resolution, no
|
||||||
|
// trigger handling. See /Users/steve/.claude/plans/serene-churning-micali.md
|
||||||
|
// for the staged rollout. Later phases add agentexec, agent_invoke,
|
||||||
|
// trigger dispatch (schedule/webhook/chatbot), and CommandBinding.
|
||||||
|
//
|
||||||
|
// The three-layer storage pattern from pkg/logic/storage/CLAUDE.md
|
||||||
|
// applies — when adding a field to Agent, you MUST update
|
||||||
|
// pkg/logic/storage/agents.go (gormAgent, agentFromStorage,
|
||||||
|
// toStorage) or persistence will silently break.
|
||||||
|
package persona
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Agent is the domain definition of an Agent persona + execution spec.
|
||||||
|
//
|
||||||
|
// Why: an Agent is the "configured invoker" — model tier + base
|
||||||
|
// system prompt + a palette of capabilities (skills, sub-agents,
|
||||||
|
// low-level tools) it may exercise during a run. Where a Skill is a
|
||||||
|
// reusable parameterised callable (a library function), an Agent is
|
||||||
|
// the actor with a persistent persona that calls those skills. The
|
||||||
|
// struct is flat — every field lives on its own column on the
|
||||||
|
// agents table; JSON columns are used only for variable-length
|
||||||
|
// collections (palette lists, tags, etc.).
|
||||||
|
//
|
||||||
|
// What: identity + persona + execution caps + palette + triggers +
|
||||||
|
// personalization + UX, all on one struct. Several field families
|
||||||
|
// (Palette, Triggers, Personalization) are persisted now but NOT
|
||||||
|
// exercised until later phases — they exist so the schema is stable
|
||||||
|
// and future phases can light up behaviour without DB migrations.
|
||||||
|
//
|
||||||
|
// Test: see pkg/logic/agents/storage_round_trip_test.go for
|
||||||
|
// Save/Get/GetByName/List/Delete coverage.
|
||||||
|
type Agent struct {
|
||||||
|
// Identity
|
||||||
|
ID string // UUID
|
||||||
|
Name string // unique per OwnerID; human-friendly identifier
|
||||||
|
Description string
|
||||||
|
OwnerID string // Discord member ID
|
||||||
|
AuthoredBy string // Discord member ID; usually == OwnerID
|
||||||
|
Version int // monotonic, for future versioning
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
|
||||||
|
// Extends names the parent agent this agent inherits from. Only used
|
||||||
|
// during builtin loading — the loader resolves extends references and
|
||||||
|
// merges fields before persisting. The resolved agent is a standalone
|
||||||
|
// entity; Extends is NOT persisted in the database. Only single-level
|
||||||
|
// extends is supported (no chains).
|
||||||
|
Extends string
|
||||||
|
|
||||||
|
// SystemPromptPrepend, when non-empty, is prepended to the system
|
||||||
|
// prompt (with a trailing newline separator). Used by the extends
|
||||||
|
// mechanism so a child agent can prepend persona instructions to the
|
||||||
|
// parent's full system prompt without duplicating it. Like Extends,
|
||||||
|
// this is resolved at load time and NOT persisted — the final
|
||||||
|
// SystemPrompt on the persisted Agent already has the prepend applied.
|
||||||
|
SystemPromptPrepend string
|
||||||
|
|
||||||
|
// Persona / execution spec
|
||||||
|
ModelTier string // "fast" | "standard" | "thinking" | provider/model
|
||||||
|
SystemPrompt string // base persona prompt (Phase 5 layers personalization on top)
|
||||||
|
MaxIterations int // 0 → use convar default at execution time
|
||||||
|
MaxToolCalls int // 0 → use convar default at execution time
|
||||||
|
MaxRuntime time.Duration // stored as MaxRuntimeNs int64 in GORM (avoid duration-driver flakiness)
|
||||||
|
ExecutionLane string // lane name; empty = default at execution time
|
||||||
|
EncryptionEnabled bool // Phase 1 stores the flag; envelope-encryption bridge wires in a later phase
|
||||||
|
|
||||||
|
// Run-critic (two-tier timeout). When CriticEnabled is false (the
|
||||||
|
// default) MaxRuntime is the hard kill, exactly as before. When true,
|
||||||
|
// MaxRuntime becomes a SOFT trigger: at MaxRuntime the run-critic
|
||||||
|
// activates and periodically reviews the run; the hard backstop (the
|
||||||
|
// absolute kill) is MaxRuntime × the multiplier. CriticBackstopMultiplier
|
||||||
|
// of 0 means "use the convar default" (agents.critic.backstop_multiplier_default,
|
||||||
|
// default 6×). See pkg/logic/agentcritic.
|
||||||
|
CriticEnabled bool
|
||||||
|
CriticBackstopMultiplier float64
|
||||||
|
|
||||||
|
// Palette — what this Agent may invoke (Phase 2 reads these).
|
||||||
|
// Stored as JSON arrays; not exercised by Phase 1 CRUD.
|
||||||
|
SkillPalette []string // skill IDs/names
|
||||||
|
SubAgentPalette []string // agent IDs/names
|
||||||
|
LowLevelTools []string // skilltools registry names
|
||||||
|
|
||||||
|
// Personalization (Phase 5 reads these). Each layer name maps to
|
||||||
|
// a registered PersonalizationProvider that returns text appended
|
||||||
|
// to SystemPrompt at run time. Empty list = base prompt only.
|
||||||
|
PersonalizationSources []string
|
||||||
|
|
||||||
|
// Triggers — persisted now, NOT dispatched by Phase 1.
|
||||||
|
//
|
||||||
|
// Schedule: cron expression or "daily"/"weekly" shorthand. Empty
|
||||||
|
// = on-demand only. NextRunAt + LastScheduledRunAt are scheduler
|
||||||
|
// bookkeeping for Phase 3's per-Agent scheduler.
|
||||||
|
Schedule string
|
||||||
|
NextRunAt *time.Time
|
||||||
|
LastScheduledRunAt *time.Time
|
||||||
|
|
||||||
|
// Webhook trigger metadata. WebhookSecret empty = inbound
|
||||||
|
// webhooks disabled. WebhookSignatureRequired defaults true at
|
||||||
|
// save time (see Skill's lesson: don't store a GORM default on a
|
||||||
|
// bool where false is a legitimate explicit value — application
|
||||||
|
// layer is the source of truth).
|
||||||
|
WebhookSecret string
|
||||||
|
WebhookSignatureRequired bool
|
||||||
|
WebhookIPAllowlist []string // CIDR strings; stored as JSON array
|
||||||
|
|
||||||
|
// Chatbot trigger metadata. ChatbotChannelFilter names a filter
|
||||||
|
// registered in pkg/logic/skills' ChannelFilterRegistry — when
|
||||||
|
// the migrated chatbot dispatches via this Agent, the filter
|
||||||
|
// gates which channels it listens in.
|
||||||
|
ChatbotChannelFilter string
|
||||||
|
|
||||||
|
// UX
|
||||||
|
//
|
||||||
|
// DefaultEmoji is an optional identity emoji for this agent.
|
||||||
|
// Used as the __start__ fallback and forwarded to the invoking
|
||||||
|
// Discord message when a parent calls this agent via agent_invoke.
|
||||||
|
DefaultEmoji string
|
||||||
|
|
||||||
|
// StateReactEmoji maps tool names (and reserved keys "__start__",
|
||||||
|
// "__end__", "__error__") to Discord emoji that the executor
|
||||||
|
// reacts with as the run progresses. Empty map = no reactions.
|
||||||
|
StateReactEmoji map[string]string
|
||||||
|
|
||||||
|
// Tags is a free-form set of short labels for organisation +
|
||||||
|
// discovery on the agents list page (Phase 1 admin commands +
|
||||||
|
// future web UI).
|
||||||
|
Tags []string
|
||||||
|
|
||||||
|
// Phases defines a multi-phase pipeline for this agent. When
|
||||||
|
// non-empty, the executor runs agentexec's sequential phase runner
|
||||||
|
// instead of the single agent loop. Empty = single-loop agent.
|
||||||
|
//
|
||||||
|
// Phases IS persisted (JSON struct-slice column `phases` on
|
||||||
|
// gormAgent). It used to be transient — "TOML is the only source of
|
||||||
|
// truth" — but every production dispatch path resolves the agent from
|
||||||
|
// the DB, where the dropped Phases meant research / deepresearch
|
||||||
|
// silently degraded to a single-loop run (the executor's
|
||||||
|
// `len(a.Phases) > 0` pipeline branch was dead). The builtin loader
|
||||||
|
// still seeds phases from YAML; persisting them is what makes the
|
||||||
|
// pipeline branch fire for DB-loaded agents.
|
||||||
|
Phases []AgentPhase
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentPhase describes one stage of a multi-phase pipeline in an
|
||||||
|
// agent definition. Executed directly by agentexec's phase runner
|
||||||
|
// (pipeline.go) — there is no intermediate execution-spec struct.
|
||||||
|
//
|
||||||
|
// What: name + prompt template + model/iteration overrides + tool
|
||||||
|
// list + optional/fallback flags + IsRunFunc indicator.
|
||||||
|
//
|
||||||
|
// Test: see builtin_loader_test.go for YAML round-trip coverage.
|
||||||
|
type AgentPhase struct {
|
||||||
|
// Name identifies the phase (e.g., "scout", "plan", "investigate").
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// SystemPrompt for this phase. Supports template variables:
|
||||||
|
// {{.Query}} for the original query, and {{.<PhaseName>}} for
|
||||||
|
// prior phase outputs (e.g., {{.scout}}, {{.plan}}).
|
||||||
|
SystemPrompt string
|
||||||
|
|
||||||
|
// ModelTier overrides the agent's ModelTier for this phase.
|
||||||
|
// Empty = use agent default.
|
||||||
|
ModelTier string
|
||||||
|
|
||||||
|
// MaxIter overrides the agent's MaxIterations for this phase.
|
||||||
|
// 0 = use agent default.
|
||||||
|
MaxIter int
|
||||||
|
|
||||||
|
// Tools are tool names for this phase only. These are resolved
|
||||||
|
// from the agent's low-level tools + palette at execution time.
|
||||||
|
Tools []string
|
||||||
|
|
||||||
|
// Optional means errors in this phase don't abort the pipeline.
|
||||||
|
Optional bool
|
||||||
|
|
||||||
|
// FallbackMessage is used when an optional phase fails.
|
||||||
|
// Default: "(Phase <Name> encountered an error)"
|
||||||
|
FallbackMessage string
|
||||||
|
|
||||||
|
// IsRunFunc indicates this phase is a bare LLM call (no tool
|
||||||
|
// loop). When true, the executor makes a single model.Complete
|
||||||
|
// call instead of running the full agent loop.
|
||||||
|
IsRunFunc bool
|
||||||
|
}
|
||||||
@@ -0,0 +1,599 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
// Phase 6 — Builtin Agent loader.
|
||||||
|
//
|
||||||
|
// Why: Phase 1-5 introduced the Agent noun, runtime, triggers,
|
||||||
|
// CommandBinding, and ChatBot bridge — but every Agent in production
|
||||||
|
// was either (a) a wrapper auto-migrated from a triggered Skill, or
|
||||||
|
// (b) admin-created via `.agent new`. There were no SHIPPED Agents
|
||||||
|
// authored as builtins. Phase 6 adds an idempotent boot-time loader so
|
||||||
|
// the repo can ship canonical Agent definitions (alongside the
|
||||||
|
// existing skills/<name>/skill.yml builtins) without manual admin
|
||||||
|
// creation per deploy.
|
||||||
|
//
|
||||||
|
// What: scans `<builtinsDir>/agents/*/agent.yml`, decodes each YAML
|
||||||
|
// into an Agent, and upserts via Storage.SaveAgent under the deterministic
|
||||||
|
// system owner ID "builtin". Skill-palette entries are validated AT LOAD
|
||||||
|
// TIME against the live skills storage; missing skills warn but do not
|
||||||
|
// fail the load (the skill might arrive later via a different code
|
||||||
|
// path, and runtime resolution happens at invocation time anyway).
|
||||||
|
//
|
||||||
|
// Bypass note (v3 lesson, mirrored): like skills.LoadBuiltins, this
|
||||||
|
// loader writes via Storage.SaveAgent directly. There is no agents
|
||||||
|
// equivalent of SaveUserSkill's save-time gates today (Phase 1-5 don't
|
||||||
|
// have authoring requirements on agents), but if such gates appear in
|
||||||
|
// future phases, this loader MUST keep bypassing them — builtins are
|
||||||
|
// trusted infrastructure.
|
||||||
|
//
|
||||||
|
// Test: pkg/logic/agents/builtin_loader_test.go covers happy path,
|
||||||
|
// idempotent re-load, missing-skill warn capture, and malformed YAML
|
||||||
|
// surfaced as a per-bundle warning (not a fatal error).
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"log/slog"
|
||||||
|
"net"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BuiltinAgentOwnerID is the deterministic system owner ID used for
|
||||||
|
// every Agent created by LoadBuiltinAgents. Chosen as a non-empty
|
||||||
|
// string so the (owner_id, name) unique index distinguishes builtins
|
||||||
|
// from any user-authored Agent (Discord member IDs are numeric, so
|
||||||
|
// "builtin" cannot collide). The skills builtin loader uses owner_id=""
|
||||||
|
// instead; the two systems are independent storage scopes — there's
|
||||||
|
// no need for consistency here.
|
||||||
|
const BuiltinAgentOwnerID = "builtin"
|
||||||
|
|
||||||
|
// SkillExistenceChecker is the narrow surface LoadBuiltinAgents needs
|
||||||
|
// to validate skill_palette entries at load time. Production wires
|
||||||
|
// skills.Storage which already exposes ListByName for non-owner-scoped
|
||||||
|
// lookups. nil means "skip palette validation" (tests that don't care).
|
||||||
|
//
|
||||||
|
// Why a separate narrow interface (vs importing skills.Storage):
|
||||||
|
// agents already transitively depends on skills via migrate_from_skills,
|
||||||
|
// but the loader only needs "does a skill with this name exist
|
||||||
|
// somewhere?" — a single Boolean. Keeping the interface narrow makes
|
||||||
|
// the loader testable without a full skills storage stub.
|
||||||
|
type SkillExistenceChecker interface {
|
||||||
|
// SkillExistsByName reports whether at least one skill row has the
|
||||||
|
// given name across any owner (builtins live under owner_id="";
|
||||||
|
// users own their own rows; the loader's validation just wants
|
||||||
|
// "does ANY row exist with this name?").
|
||||||
|
SkillExistsByName(ctx context.Context, name string) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadBuiltinAgents discovers and seeds builtin Agents from `builtinsDir`.
|
||||||
|
// `builtinsDir` is the root that contains an `agents/` subdirectory;
|
||||||
|
// per-agent YAML lives at `agents/<name>/agent.yml`. Returns the count
|
||||||
|
// of agents seeded or updated (skipped rows do not contribute to the
|
||||||
|
// count). Returns nil error when the agents/ directory is absent — a
|
||||||
|
// deployment without any builtin agents is valid; the loader is then a
|
||||||
|
// no-op.
|
||||||
|
//
|
||||||
|
// Idempotency contract: existing Agent rows (matched by (owner_id="builtin",
|
||||||
|
// name)) are UPDATED to the freshly-parsed YAML on each boot. ID +
|
||||||
|
// CreatedAt are preserved; UpdatedAt is refreshed. User clones of a
|
||||||
|
// builtin Agent (different owner_id, same name) are NEVER touched —
|
||||||
|
// the loader only writes to (owner_id="builtin", name) rows.
|
||||||
|
//
|
||||||
|
// `skillChecker` may be nil; when non-nil, each SkillPalette entry is
|
||||||
|
// looked up and a WARN log emitted (with the agent + missing skill
|
||||||
|
// name) for absent references. The Agent row is still seeded with the
|
||||||
|
// palette intact — runtime resolution at invocation time is the
|
||||||
|
// authoritative gate.
|
||||||
|
func LoadBuiltinAgents(ctx context.Context, store Storage, builtinsDir fs.FS, skillChecker SkillExistenceChecker) (int, error) {
|
||||||
|
if store == nil {
|
||||||
|
return 0, errors.New("agents.LoadBuiltinAgents: nil store")
|
||||||
|
}
|
||||||
|
if builtinsDir == nil {
|
||||||
|
return 0, errors.New("agents.LoadBuiltinAgents: nil builtinsDir FS")
|
||||||
|
}
|
||||||
|
entries, err := fs.ReadDir(builtinsDir, "agents")
|
||||||
|
if err != nil {
|
||||||
|
// Missing agents/ directory is benign — a deployment may not
|
||||||
|
// ship any builtins. Other errors propagate so a permission /
|
||||||
|
// IO problem surfaces loudly.
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
slog.Info("agents: no builtin agents directory", "path", "agents")
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("agents: read agents dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 1: parse all agent manifests into a map keyed by name.
|
||||||
|
// The map is needed so extends references can be resolved before
|
||||||
|
// any agent is upserted.
|
||||||
|
type parsedEntry struct {
|
||||||
|
agent *Agent
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
parsed := make(map[string]*parsedEntry)
|
||||||
|
var parseOrder []string // preserve FS iteration order for deterministic upsert
|
||||||
|
|
||||||
|
var scanned, failed int
|
||||||
|
for _, entry := range entries {
|
||||||
|
if !entry.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
manifestPath := path.Join("agents", entry.Name(), "agent.yml")
|
||||||
|
data, readErr := fs.ReadFile(builtinsDir, manifestPath)
|
||||||
|
if readErr != nil {
|
||||||
|
slog.Debug("agents: skipping (no agent.yml)", "dir", entry.Name(), "error", readErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
scanned++
|
||||||
|
ag, parseErr := decodeAgentManifest(data)
|
||||||
|
if parseErr != nil {
|
||||||
|
slog.Warn("agents: invalid agent.yml", "dir", entry.Name(), "error", parseErr)
|
||||||
|
failed++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parsed[ag.Name] = &parsedEntry{agent: ag, dir: entry.Name()}
|
||||||
|
parseOrder = append(parseOrder, ag.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2: resolve extends references. Only single-level is
|
||||||
|
// supported — chains (A extends B extends C) are rejected.
|
||||||
|
for _, name := range parseOrder {
|
||||||
|
pe := parsed[name]
|
||||||
|
ag := pe.agent
|
||||||
|
if ag.Extends == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parent, ok := parsed[ag.Extends]
|
||||||
|
if !ok {
|
||||||
|
slog.Warn("agents: extends references unknown agent",
|
||||||
|
"agent", ag.Name, "extends", ag.Extends)
|
||||||
|
failed++
|
||||||
|
delete(parsed, name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if parent.agent.Extends != "" {
|
||||||
|
slog.Warn("agents: extends chain not supported — parent also uses extends",
|
||||||
|
"agent", ag.Name, "extends", ag.Extends,
|
||||||
|
"parent_extends", parent.agent.Extends)
|
||||||
|
failed++
|
||||||
|
delete(parsed, name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ag.Extends == ag.Name {
|
||||||
|
slog.Warn("agents: agent extends itself", "agent", ag.Name)
|
||||||
|
failed++
|
||||||
|
delete(parsed, name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resolveExtends(ag, parent.agent)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 3: palette validation + upsert.
|
||||||
|
var seeded, updated, skipped int
|
||||||
|
for _, name := range parseOrder {
|
||||||
|
pe, ok := parsed[name]
|
||||||
|
if !ok {
|
||||||
|
continue // removed during extends resolution
|
||||||
|
}
|
||||||
|
ag := pe.agent
|
||||||
|
|
||||||
|
if skillChecker != nil {
|
||||||
|
for _, sk := range ag.SkillPalette {
|
||||||
|
ok, lookupErr := skillChecker.SkillExistsByName(ctx, sk)
|
||||||
|
if lookupErr != nil {
|
||||||
|
slog.Warn("agents: skill palette lookup failed",
|
||||||
|
"agent", ag.Name, "skill", sk, "error", lookupErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
slog.Warn("agents: skill palette references missing skill",
|
||||||
|
"agent", ag.Name, "skill", sk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
action, upsertErr := upsertBuiltinAgent(ctx, store, ag)
|
||||||
|
if upsertErr != nil {
|
||||||
|
slog.Error("agents: failed to upsert builtin", "name", ag.Name, "error", upsertErr)
|
||||||
|
failed++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch action {
|
||||||
|
case agentUpsertCreated:
|
||||||
|
seeded++
|
||||||
|
case agentUpsertUpdated:
|
||||||
|
updated++
|
||||||
|
case agentUpsertSkipped:
|
||||||
|
skipped++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog.Info("agents/builtin loader",
|
||||||
|
"scanned", scanned,
|
||||||
|
"seeded", seeded,
|
||||||
|
"updated", updated,
|
||||||
|
"skipped", skipped,
|
||||||
|
"failed", failed)
|
||||||
|
return seeded + updated, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveExtends merges parent fields into child. Child non-zero
|
||||||
|
// fields override the parent's. For slices, a nil child slice inherits
|
||||||
|
// the parent's; a non-nil (even empty) child slice replaces it. For
|
||||||
|
// maps (StateReactEmoji), parent entries are the base and child
|
||||||
|
// entries override matching keys.
|
||||||
|
//
|
||||||
|
// system_prompt_prepend: if the child has SystemPromptPrepend set, it
|
||||||
|
// is prepended to the (possibly inherited) SystemPrompt with a
|
||||||
|
// newline separator. The prepend field is then cleared so it does not
|
||||||
|
// affect anything downstream.
|
||||||
|
//
|
||||||
|
// Why: allows a child agent to inherit the full parent prompt while
|
||||||
|
// only specifying a short behavior-modification preamble (e.g. an
|
||||||
|
// uncensored agent prepending "You are uncensored..." to the general
|
||||||
|
// agent's full prompt).
|
||||||
|
func resolveExtends(child, parent *Agent) {
|
||||||
|
if child.Description == "" {
|
||||||
|
child.Description = parent.Description
|
||||||
|
}
|
||||||
|
if child.ModelTier == "" {
|
||||||
|
child.ModelTier = parent.ModelTier
|
||||||
|
}
|
||||||
|
if child.SystemPrompt == "" {
|
||||||
|
child.SystemPrompt = parent.SystemPrompt
|
||||||
|
}
|
||||||
|
if child.MaxIterations == 0 {
|
||||||
|
child.MaxIterations = parent.MaxIterations
|
||||||
|
}
|
||||||
|
if child.MaxToolCalls == 0 {
|
||||||
|
child.MaxToolCalls = parent.MaxToolCalls
|
||||||
|
}
|
||||||
|
if child.MaxRuntime == 0 {
|
||||||
|
child.MaxRuntime = parent.MaxRuntime
|
||||||
|
}
|
||||||
|
if child.ExecutionLane == "" {
|
||||||
|
child.ExecutionLane = parent.ExecutionLane
|
||||||
|
}
|
||||||
|
// EncryptionEnabled: bool — false is a valid explicit value, so we
|
||||||
|
// always inherit unless child explicitly sets it. Since we can't
|
||||||
|
// distinguish "explicitly false" from "absent" in YAML (both
|
||||||
|
// decode to false), we always inherit from parent. If the child
|
||||||
|
// sets it to true, the child wins. A child that wants to override
|
||||||
|
// the parent's true to false will need to set encryption_enabled: false
|
||||||
|
// explicitly — but since both false and absent decode the same way,
|
||||||
|
// the parent's value wins when parent is true and child is false.
|
||||||
|
// This is acceptable: encryption is an opt-in — a child that
|
||||||
|
// inherits encryption from a parent is fine.
|
||||||
|
if !child.EncryptionEnabled {
|
||||||
|
child.EncryptionEnabled = parent.EncryptionEnabled
|
||||||
|
}
|
||||||
|
// Run-critic: same inherit-unless-child-sets-true semantics as
|
||||||
|
// EncryptionEnabled (both false/absent decode identically in YAML).
|
||||||
|
if !child.CriticEnabled {
|
||||||
|
child.CriticEnabled = parent.CriticEnabled
|
||||||
|
}
|
||||||
|
if child.CriticBackstopMultiplier == 0 {
|
||||||
|
child.CriticBackstopMultiplier = parent.CriticBackstopMultiplier
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slices: nil = inherit; non-nil (even empty) = child overrides.
|
||||||
|
if child.SkillPalette == nil {
|
||||||
|
child.SkillPalette = parent.SkillPalette
|
||||||
|
}
|
||||||
|
if child.SubAgentPalette == nil {
|
||||||
|
child.SubAgentPalette = parent.SubAgentPalette
|
||||||
|
}
|
||||||
|
if child.LowLevelTools == nil {
|
||||||
|
child.LowLevelTools = parent.LowLevelTools
|
||||||
|
}
|
||||||
|
if child.PersonalizationSources == nil {
|
||||||
|
child.PersonalizationSources = parent.PersonalizationSources
|
||||||
|
}
|
||||||
|
if child.Tags == nil {
|
||||||
|
child.Tags = parent.Tags
|
||||||
|
}
|
||||||
|
if child.WebhookIPAllowlist == nil {
|
||||||
|
child.WebhookIPAllowlist = parent.WebhookIPAllowlist
|
||||||
|
}
|
||||||
|
if child.Phases == nil {
|
||||||
|
child.Phases = parent.Phases
|
||||||
|
}
|
||||||
|
|
||||||
|
// Triggers (Schedule, ChatbotChannelFilter, WebhookSecret, …) are
|
||||||
|
// deliberately NOT inherited. A trigger is an ACTIVATION decision —
|
||||||
|
// "this agent fires on a schedule" / "this agent is a chatbot tool in
|
||||||
|
// these channels" — and silently inheriting it from a parent persona
|
||||||
|
// is a behavioural surprise: `uncensored extends general` would inherit
|
||||||
|
// general's `chatbot_channel_filter: "none"` (match-every-channel) and
|
||||||
|
// surface the unfiltered model as a direct chatbot tool everywhere the
|
||||||
|
// instant agents.triggers.enabled flips on. A child that wants a trigger
|
||||||
|
// must declare it explicitly. (Persona, caps, palette, and tools are
|
||||||
|
// inherited above — those are capability, not activation.)
|
||||||
|
|
||||||
|
// DefaultEmoji: child wins if set; otherwise inherit.
|
||||||
|
if child.DefaultEmoji == "" {
|
||||||
|
child.DefaultEmoji = parent.DefaultEmoji
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maps: merge — parent is the base, child entries override.
|
||||||
|
if child.StateReactEmoji == nil && parent.StateReactEmoji != nil {
|
||||||
|
child.StateReactEmoji = make(map[string]string, len(parent.StateReactEmoji))
|
||||||
|
for k, v := range parent.StateReactEmoji {
|
||||||
|
child.StateReactEmoji[k] = v
|
||||||
|
}
|
||||||
|
} else if parent.StateReactEmoji != nil {
|
||||||
|
merged := make(map[string]string, len(parent.StateReactEmoji)+len(child.StateReactEmoji))
|
||||||
|
for k, v := range parent.StateReactEmoji {
|
||||||
|
merged[k] = v
|
||||||
|
}
|
||||||
|
for k, v := range child.StateReactEmoji {
|
||||||
|
merged[k] = v
|
||||||
|
}
|
||||||
|
child.StateReactEmoji = merged
|
||||||
|
}
|
||||||
|
|
||||||
|
// SystemPromptPrepend: prepend to the (now resolved) SystemPrompt.
|
||||||
|
if child.SystemPromptPrepend != "" {
|
||||||
|
child.SystemPrompt = child.SystemPromptPrepend + "\n\n" + child.SystemPrompt
|
||||||
|
child.SystemPromptPrepend = "" // consumed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear Extends — the resolution is complete, the persisted agent
|
||||||
|
// is standalone.
|
||||||
|
child.Extends = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// agentUpsertAction reports what upsertBuiltinAgent did. Exported only
|
||||||
|
// to the test in this package; the loader's public surface returns a
|
||||||
|
// count, not a per-row action.
|
||||||
|
type agentUpsertAction int
|
||||||
|
|
||||||
|
const (
|
||||||
|
agentUpsertCreated agentUpsertAction = iota
|
||||||
|
agentUpsertUpdated
|
||||||
|
agentUpsertSkipped // reserved; current loader never returns this — every parse-OK row is upserted
|
||||||
|
)
|
||||||
|
|
||||||
|
// upsertBuiltinAgent looks up an existing (BuiltinAgentOwnerID, name)
|
||||||
|
// row. If absent, inserts a new row with a freshly-minted UUID.
|
||||||
|
// Otherwise updates the existing row in place, preserving ID + CreatedAt.
|
||||||
|
//
|
||||||
|
// Why not version-skip like skills.upsertBuiltin: the Agent struct has
|
||||||
|
// a Version int field but it's a monotonic counter, not a semver
|
||||||
|
// string for change detection. Agent YAML doesn't carry a "version"
|
||||||
|
// at the wire shape; every boot writes the latest YAML content,
|
||||||
|
// trusting the YAML file in-repo IS the source of truth. The Agent's
|
||||||
|
// internal Version int auto-increments on each loader pass so admin
|
||||||
|
// inspection (`.agent show`) reveals "how many times has the loader
|
||||||
|
// touched this row".
|
||||||
|
func upsertBuiltinAgent(ctx context.Context, store Storage, fresh *Agent) (agentUpsertAction, error) {
|
||||||
|
existing, err := store.GetAgentByName(ctx, BuiltinAgentOwnerID, fresh.Name)
|
||||||
|
if err != nil && !errors.Is(err, ErrNotFound) {
|
||||||
|
return agentUpsertCreated, fmt.Errorf("lookup builtin agent %q: %w", fresh.Name, err)
|
||||||
|
}
|
||||||
|
if errors.Is(err, ErrNotFound) {
|
||||||
|
fresh.ID = uuid.New().String()
|
||||||
|
fresh.OwnerID = BuiltinAgentOwnerID
|
||||||
|
fresh.AuthoredBy = BuiltinAgentOwnerID
|
||||||
|
if fresh.Version == 0 {
|
||||||
|
fresh.Version = 1
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
fresh.CreatedAt = now
|
||||||
|
fresh.UpdatedAt = now
|
||||||
|
if saveErr := store.SaveAgent(ctx, fresh); saveErr != nil {
|
||||||
|
return agentUpsertCreated, saveErr
|
||||||
|
}
|
||||||
|
slog.Info("agents: created builtin", "name", fresh.Name, "id", fresh.ID)
|
||||||
|
return agentUpsertCreated, nil
|
||||||
|
}
|
||||||
|
// Update in place. Preserve ID, OwnerID, AuthoredBy, CreatedAt.
|
||||||
|
// Bump Version so admins can see "the loader has touched this N
|
||||||
|
// times" — useful when investigating a builtin that was
|
||||||
|
// hand-edited via the future web UI and unexpectedly reverted on
|
||||||
|
// next boot.
|
||||||
|
fresh.ID = existing.ID
|
||||||
|
fresh.OwnerID = BuiltinAgentOwnerID
|
||||||
|
fresh.AuthoredBy = BuiltinAgentOwnerID
|
||||||
|
fresh.Version = existing.Version + 1
|
||||||
|
fresh.CreatedAt = existing.CreatedAt
|
||||||
|
fresh.UpdatedAt = time.Now()
|
||||||
|
// Carry forward operator/scheduler-owned fields that the manifest
|
||||||
|
// never sets (decodeAgentManifest leaves these zero by design — a
|
||||||
|
// secret in-repo would be a credential leak). Without this, every
|
||||||
|
// boot CLOBBERS an operator-armed webhook secret + signature flag
|
||||||
|
// back to empty/false and nukes the scheduler's next-fire cursor, so
|
||||||
|
// a scheduled or webhook-armed builtin silently breaks on each deploy.
|
||||||
|
fresh.WebhookSecret = existing.WebhookSecret
|
||||||
|
fresh.WebhookSignatureRequired = existing.WebhookSignatureRequired
|
||||||
|
fresh.NextRunAt = existing.NextRunAt
|
||||||
|
fresh.LastScheduledRunAt = existing.LastScheduledRunAt
|
||||||
|
if saveErr := store.SaveAgent(ctx, fresh); saveErr != nil {
|
||||||
|
return agentUpsertUpdated, saveErr
|
||||||
|
}
|
||||||
|
slog.Info("agents: updated builtin",
|
||||||
|
"name", fresh.Name, "id", fresh.ID, "version", fresh.Version)
|
||||||
|
return agentUpsertUpdated, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// builtinAgentManifest is the YAML wire format for agents/<name>/agent.yml.
|
||||||
|
// The schema is intentionally a SUBSET of the Agent struct — future
|
||||||
|
// fields can be added without breaking existing manifests so long as
|
||||||
|
// we keep KnownFields(true) decoding (so a typo on a key surfaces as
|
||||||
|
// an error rather than silently dropping data).
|
||||||
|
//
|
||||||
|
// See pkg/logic/agents/CLAUDE.md for the schema reference.
|
||||||
|
type builtinAgentManifest struct {
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
Description string `yaml:"description"`
|
||||||
|
ModelTier string `yaml:"model_tier"`
|
||||||
|
SystemPrompt string `yaml:"system_prompt"`
|
||||||
|
SystemPromptPrepend string `yaml:"system_prompt_prepend"`
|
||||||
|
MaxIterations int `yaml:"max_iterations"`
|
||||||
|
MaxToolCalls int `yaml:"max_tool_calls"`
|
||||||
|
MaxRuntimeSeconds int `yaml:"max_runtime_seconds"`
|
||||||
|
ExecutionLane string `yaml:"execution_lane"`
|
||||||
|
EncryptionEnabled bool `yaml:"encryption_enabled"`
|
||||||
|
|
||||||
|
// Run-critic two-tier timeout. CriticEnabled flips MaxRuntime from a
|
||||||
|
// hard kill into a soft trigger; CriticBackstopMultiplier (0 => convar
|
||||||
|
// default 6×) sets the hard backstop = MaxRuntime × multiplier.
|
||||||
|
CriticEnabled bool `yaml:"critic_enabled"`
|
||||||
|
CriticBackstopMultiplier float64 `yaml:"critic_backstop_multiplier"`
|
||||||
|
|
||||||
|
// Extends names a parent agent whose fields are inherited. The
|
||||||
|
// child's non-zero fields override the parent; nil/empty slices
|
||||||
|
// inherit the parent's. Maps (state_react) are merged — child
|
||||||
|
// entries override parent entries with the same key. Only single-
|
||||||
|
// level extends is supported (no chains).
|
||||||
|
Extends string `yaml:"extends"`
|
||||||
|
|
||||||
|
SkillPalette []string `yaml:"skill_palette"`
|
||||||
|
SubAgentPalette []string `yaml:"sub_agent_palette"`
|
||||||
|
LowLevelTools []string `yaml:"low_level_tools"`
|
||||||
|
|
||||||
|
PersonalizationSources []string `yaml:"personalization_sources"`
|
||||||
|
|
||||||
|
// Triggers — builtin agents typically don't ship with triggers
|
||||||
|
// (admins flip these on per-deployment), but the keys are accepted
|
||||||
|
// so a sufficiently sophisticated builtin (e.g. a scheduled "weekly
|
||||||
|
// digest" agent) can ship triggers in-repo. Default empty.
|
||||||
|
Schedule string `yaml:"schedule"`
|
||||||
|
WebhookIPAllowlist []string `yaml:"webhook_ip_allowlist"`
|
||||||
|
ChatbotChannelFilter string `yaml:"chatbot_channel_filter"`
|
||||||
|
|
||||||
|
DefaultEmoji string `yaml:"default_emoji"`
|
||||||
|
StateReact map[string]string `yaml:"state_react"`
|
||||||
|
Tags []string `yaml:"tags"`
|
||||||
|
|
||||||
|
// Pipeline phases — when non-empty, the executor runs the
|
||||||
|
// sequential phase runner instead of the single agent loop.
|
||||||
|
Phases []builtinAgentPhaseManifest `yaml:"phases"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// builtinAgentPhaseManifest is the YAML wire format for a single
|
||||||
|
// phases list entry in agents/<name>/agent.yml. Maps 1:1 to
|
||||||
|
// AgentPhase at decode time.
|
||||||
|
type builtinAgentPhaseManifest struct {
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
SystemPrompt string `yaml:"system_prompt"`
|
||||||
|
ModelTier string `yaml:"model_tier"`
|
||||||
|
MaxIter int `yaml:"max_iter"`
|
||||||
|
Tools []string `yaml:"tools"`
|
||||||
|
Optional bool `yaml:"optional"`
|
||||||
|
FallbackMessage string `yaml:"fallback_message"`
|
||||||
|
IsRunFunc bool `yaml:"is_run_func"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeAgentManifest parses an agent.yml bundle into a domain Agent.
|
||||||
|
// Uses KnownFields(true) so a typo'd key surfaces as a parse error
|
||||||
|
// rather than silently dropping the value.
|
||||||
|
//
|
||||||
|
// What this method does NOT set:
|
||||||
|
// - ID (loader mints UUID on insert / preserves existing on update)
|
||||||
|
// - OwnerID + AuthoredBy (loader sets to BuiltinAgentOwnerID)
|
||||||
|
// - Version (loader increments on update)
|
||||||
|
// - CreatedAt + UpdatedAt (loader stamps)
|
||||||
|
// - WebhookSecret (operator generates via admin tooling at deploy
|
||||||
|
// time — shipping a secret in-repo would be a credential leak)
|
||||||
|
// - NextRunAt + LastScheduledRunAt (scheduler bookkeeping; nil at
|
||||||
|
// load time, populated on first scheduled fire)
|
||||||
|
// - WebhookSignatureRequired (application-layer default applies on
|
||||||
|
// first save; a `default:true` GORM tag would substitute on every
|
||||||
|
// write — see the v8 lesson on this exact trap)
|
||||||
|
func decodeAgentManifest(data []byte) (*Agent, error) {
|
||||||
|
var m builtinAgentManifest
|
||||||
|
dec := yaml.NewDecoder(strings.NewReader(string(data)))
|
||||||
|
dec.KnownFields(true)
|
||||||
|
if err := dec.Decode(&m); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode agent.yml: %w", err)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(m.Name) == "" {
|
||||||
|
return nil, errors.New("agent.yml: missing required field 'name'")
|
||||||
|
}
|
||||||
|
// system_prompt is required UNLESS the agent uses extends (the parent
|
||||||
|
// will supply it) or system_prompt_prepend (the prepend will be
|
||||||
|
// combined with the parent's system_prompt after extends resolution).
|
||||||
|
if strings.TrimSpace(m.SystemPrompt) == "" && strings.TrimSpace(m.Extends) == "" && strings.TrimSpace(m.SystemPromptPrepend) == "" {
|
||||||
|
return nil, errors.New("agent.yml: missing required field 'system_prompt'")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert YAML phase manifests to domain AgentPhase structs.
|
||||||
|
var phases []AgentPhase
|
||||||
|
for _, pm := range m.Phases {
|
||||||
|
if strings.TrimSpace(pm.Name) == "" {
|
||||||
|
return nil, errors.New("agent.yml: phase missing required field 'name'")
|
||||||
|
}
|
||||||
|
phases = append(phases, AgentPhase{
|
||||||
|
Name: strings.TrimSpace(pm.Name),
|
||||||
|
SystemPrompt: pm.SystemPrompt,
|
||||||
|
ModelTier: strings.TrimSpace(pm.ModelTier),
|
||||||
|
MaxIter: pm.MaxIter,
|
||||||
|
Tools: pm.Tools,
|
||||||
|
Optional: pm.Optional,
|
||||||
|
FallbackMessage: pm.FallbackMessage,
|
||||||
|
IsRunFunc: pm.IsRunFunc,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the webhook IP allow-list (CIDR or bare IP); drop + warn on
|
||||||
|
// malformed entries so a typo can't silently widen or void the allow-list.
|
||||||
|
allowlist := validateIPAllowlist(m.WebhookIPAllowlist, m.Name)
|
||||||
|
|
||||||
|
ag := &Agent{
|
||||||
|
Name: strings.TrimSpace(m.Name),
|
||||||
|
Description: m.Description,
|
||||||
|
Extends: strings.TrimSpace(m.Extends),
|
||||||
|
SystemPromptPrepend: m.SystemPromptPrepend,
|
||||||
|
ModelTier: strings.TrimSpace(m.ModelTier),
|
||||||
|
SystemPrompt: m.SystemPrompt,
|
||||||
|
MaxIterations: m.MaxIterations,
|
||||||
|
MaxToolCalls: m.MaxToolCalls,
|
||||||
|
MaxRuntime: time.Duration(m.MaxRuntimeSeconds) * time.Second,
|
||||||
|
ExecutionLane: strings.TrimSpace(m.ExecutionLane),
|
||||||
|
EncryptionEnabled: m.EncryptionEnabled,
|
||||||
|
CriticEnabled: m.CriticEnabled,
|
||||||
|
CriticBackstopMultiplier: m.CriticBackstopMultiplier,
|
||||||
|
SkillPalette: m.SkillPalette,
|
||||||
|
SubAgentPalette: m.SubAgentPalette,
|
||||||
|
LowLevelTools: m.LowLevelTools,
|
||||||
|
PersonalizationSources: m.PersonalizationSources,
|
||||||
|
Schedule: strings.TrimSpace(m.Schedule),
|
||||||
|
WebhookIPAllowlist: allowlist,
|
||||||
|
ChatbotChannelFilter: strings.TrimSpace(m.ChatbotChannelFilter),
|
||||||
|
DefaultEmoji: m.DefaultEmoji,
|
||||||
|
StateReactEmoji: m.StateReact,
|
||||||
|
Tags: m.Tags,
|
||||||
|
Phases: phases,
|
||||||
|
}
|
||||||
|
return ag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateIPAllowlist keeps only entries that parse as a CIDR block or a bare
|
||||||
|
// IP; malformed entries are dropped with a warning (a typo must not silently
|
||||||
|
// widen or void the webhook allow-list). The struct field documents "CIDR
|
||||||
|
// strings", so this enforces it at load time.
|
||||||
|
func validateIPAllowlist(entries []string, agent string) []string {
|
||||||
|
var out []string
|
||||||
|
for _, e := range entries {
|
||||||
|
e = strings.TrimSpace(e)
|
||||||
|
if e == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, _, err := net.ParseCIDR(e); err == nil {
|
||||||
|
out = append(out, e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ip := net.ParseIP(e); ip != nil {
|
||||||
|
out = append(out, e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
slog.Warn("agents: dropping malformed webhook_ip_allowlist entry (not a CIDR or IP)", "agent", agent, "entry", e)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestValidateIPAllowlist(t *testing.T) {
|
||||||
|
in := []string{"10.0.0.0/8", " 192.168.1.5 ", "not-an-ip", "", "2001:db8::/32", "garbage/99"}
|
||||||
|
got := validateIPAllowlist(in, "test")
|
||||||
|
want := map[string]bool{"10.0.0.0/8": true, "192.168.1.5": true, "2001:db8::/32": true}
|
||||||
|
if len(got) != len(want) {
|
||||||
|
t.Fatalf("got %v, want %d valid entries", got, len(want))
|
||||||
|
}
|
||||||
|
for _, e := range got {
|
||||||
|
if !want[e] {
|
||||||
|
t.Errorf("unexpected entry kept: %q", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,120 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Memory is a zero-dependency in-process Storage for agent personas — a light
|
||||||
|
// host (or tests) gets persona persistence with no DB. Mort keeps its
|
||||||
|
// GORM/MySQL Storage; contrib/store adds a durable SQLite one.
|
||||||
|
type Memory struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
agents map[string]*Agent // by ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMemory returns an empty in-memory persona Storage.
|
||||||
|
func NewMemory() *Memory { return &Memory{agents: map[string]*Agent{}} }
|
||||||
|
|
||||||
|
var _ Storage = (*Memory)(nil)
|
||||||
|
|
||||||
|
func (m *Memory) InitializeAgentStorage(context.Context) error { return nil }
|
||||||
|
|
||||||
|
func (m *Memory) SaveAgent(_ context.Context, a *Agent) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
cp := *a
|
||||||
|
m.agents[a.ID] = &cp
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) GetAgent(_ context.Context, id string) (*Agent, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
a, ok := m.agents[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
cp := *a
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) GetAgentByName(_ context.Context, ownerID, name string) (*Agent, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
for _, a := range m.agents {
|
||||||
|
if a.OwnerID == ownerID && a.Name == name {
|
||||||
|
cp := *a
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) listWhere(keep func(*Agent) bool) []*Agent {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
out := make([]*Agent, 0, len(m.agents))
|
||||||
|
for _, a := range m.agents {
|
||||||
|
if keep == nil || keep(a) {
|
||||||
|
cp := *a
|
||||||
|
out = append(out, &cp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListAgents(_ context.Context, ownerID string) ([]*Agent, error) {
|
||||||
|
return m.listWhere(func(a *Agent) bool { return a.OwnerID == ownerID }), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListAllAgents(context.Context) ([]*Agent, error) {
|
||||||
|
return m.listWhere(nil), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) DeleteAgent(_ context.Context, id string) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
delete(m.agents, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) GetAgentByWebhookSecret(_ context.Context, secret string) (*Agent, error) {
|
||||||
|
if secret == "" {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
for _, a := range m.agents {
|
||||||
|
if a.WebhookSecret == secret {
|
||||||
|
cp := *a
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListAgentsByChatbotChannelFilter(context.Context) ([]*Agent, error) {
|
||||||
|
return m.listWhere(func(a *Agent) bool { return a.ChatbotChannelFilter != "" }), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) ListScheduledAgents(_ context.Context, dueBefore time.Time) ([]*Agent, error) {
|
||||||
|
return m.listWhere(func(a *Agent) bool {
|
||||||
|
return a.Schedule != "" && a.NextRunAt != nil && !a.NextRunAt.After(dueBefore)
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Memory) MarkAgentScheduledRun(_ context.Context, agentID string, ranAt, nextAt time.Time) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
a, ok := m.agents[agentID]
|
||||||
|
if !ok {
|
||||||
|
return ErrNotFound
|
||||||
|
}
|
||||||
|
a.LastScheduledRunAt = &ranAt
|
||||||
|
a.NextRunAt = &nextAt
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,45 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestToRunnable(t *testing.T) {
|
||||||
|
a := &Agent{
|
||||||
|
ID: "id1", Name: "helper", SystemPrompt: "be nice", ModelTier: "fast",
|
||||||
|
MaxIterations: 5, MaxRuntime: 30 * time.Second,
|
||||||
|
LowLevelTools: []string{"think"}, SkillPalette: []string{"animate"},
|
||||||
|
CriticEnabled: true, CriticBackstopMultiplier: 2,
|
||||||
|
Phases: []AgentPhase{{Name: "p1", ModelTier: "thinking", MaxIter: 3, Tools: []string{"now"}, Optional: true}},
|
||||||
|
}
|
||||||
|
r := a.ToRunnable()
|
||||||
|
if r.ID != "id1" || r.ModelTier != "fast" || r.MaxIterations != 5 || !r.Critic.Enabled {
|
||||||
|
t.Fatalf("ToRunnable mapping wrong: %+v", r)
|
||||||
|
}
|
||||||
|
if len(r.Phases) != 1 || r.Phases[0].MaxIterations != 3 || !r.Phases[0].Optional {
|
||||||
|
t.Fatalf("phase mapping wrong: %+v", r.Phases)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryStoreRoundTrip(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
m := NewMemory()
|
||||||
|
a := &Agent{ID: "a1", Name: "n", OwnerID: "o1"}
|
||||||
|
if err := m.SaveAgent(ctx, a); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got, err := m.GetAgent(ctx, "a1")
|
||||||
|
if err != nil || got.Name != "n" {
|
||||||
|
t.Fatalf("GetAgent: %v %+v", err, got)
|
||||||
|
}
|
||||||
|
byName, err := m.GetAgentByName(ctx, "o1", "n")
|
||||||
|
if err != nil || byName.ID != "a1" {
|
||||||
|
t.Fatalf("GetAgentByName: %v %+v", err, byName)
|
||||||
|
}
|
||||||
|
list, _ := m.ListAgents(ctx, "o1")
|
||||||
|
if len(list) != 1 {
|
||||||
|
t.Fatalf("ListAgents = %d", len(list))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
import "gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
|
||||||
|
// ToRunnable lowers an Agent persona into the kernel's run.RunnableAgent DTO —
|
||||||
|
// the bridge that lets run.Executor run a persona WITHOUT importing this
|
||||||
|
// battery (the inversion of mort's agentexec.Run(*agents.Agent)). It maps the
|
||||||
|
// static shape only; per-run personalization, palette resolution, the critic,
|
||||||
|
// audit, etc. are supplied separately via run.Ports.
|
||||||
|
func (a *Agent) ToRunnable() run.RunnableAgent {
|
||||||
|
ra := run.RunnableAgent{
|
||||||
|
ID: a.ID,
|
||||||
|
Name: a.Name,
|
||||||
|
SystemPrompt: a.SystemPrompt,
|
||||||
|
ModelTier: a.ModelTier,
|
||||||
|
MaxIterations: a.MaxIterations,
|
||||||
|
MaxRuntime: a.MaxRuntime,
|
||||||
|
LowLevelTools: a.LowLevelTools,
|
||||||
|
SkillPalette: a.SkillPalette,
|
||||||
|
SubAgentPalette: a.SubAgentPalette,
|
||||||
|
Critic: run.CriticConfig{
|
||||||
|
Enabled: a.CriticEnabled,
|
||||||
|
BackstopMultiplier: a.CriticBackstopMultiplier,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, p := range a.Phases {
|
||||||
|
ra.Phases = append(ra.Phases, run.Phase{
|
||||||
|
Name: p.Name,
|
||||||
|
SystemPrompt: p.SystemPrompt,
|
||||||
|
ModelTier: p.ModelTier,
|
||||||
|
MaxIterations: p.MaxIter,
|
||||||
|
Tools: p.Tools,
|
||||||
|
Optional: p.Optional,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return ra
|
||||||
|
}
|
||||||
@@ -0,0 +1,115 @@
|
|||||||
|
package persona
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrNotFound is returned when an agent lookup fails. Callers compare
|
||||||
|
// with errors.Is(err, ErrNotFound).
|
||||||
|
var ErrNotFound = errors.New("agent not found")
|
||||||
|
|
||||||
|
// Storage is the persistence interface for the agents system.
|
||||||
|
//
|
||||||
|
// Why: tests substitute fake implementations; production wires
|
||||||
|
// through pkg/logic/storage's Grand Storage which embeds this
|
||||||
|
// interface. Mirrors the three-layer pattern in
|
||||||
|
// pkg/logic/storage/CLAUDE.md (domain → GORM → DB).
|
||||||
|
//
|
||||||
|
// What: Phase 1 CRUD plus Phase 3 trigger queries
|
||||||
|
// (ListDueScheduled, GetAgentByWebhookSecret,
|
||||||
|
// ListAgentsByChatbotChannelFilter, MarkScheduledRun). Trigger
|
||||||
|
// queries are read by the agentsched runner, webhook router, and
|
||||||
|
// chatbot tool provider; all are gated behind the
|
||||||
|
// agents.triggers.enabled convar so old skill-driven paths keep
|
||||||
|
// running until the convar flips.
|
||||||
|
//
|
||||||
|
// Test: see storage_round_trip_test.go for round-trip coverage.
|
||||||
|
type Storage interface {
|
||||||
|
// (Mort's Discord command-binding CRUD — the CommandBindingStorage
|
||||||
|
// embedding — stays a host concern and is NOT part of the executus
|
||||||
|
// persona Storage seam.)
|
||||||
|
|
||||||
|
// InitializeAgentStorage prepares storage (e.g. AutoMigrate)
|
||||||
|
// and is idempotent. Called from the grand storage's
|
||||||
|
// InitializeAll path.
|
||||||
|
InitializeAgentStorage(ctx context.Context) error
|
||||||
|
|
||||||
|
// SaveAgent creates or updates an Agent by ID. ID must be
|
||||||
|
// non-empty (Phase 1 admin commands mint a UUID).
|
||||||
|
SaveAgent(ctx context.Context, a *Agent) error
|
||||||
|
|
||||||
|
// GetAgent returns the agent with the given ID, or ErrNotFound.
|
||||||
|
GetAgent(ctx context.Context, id string) (*Agent, error)
|
||||||
|
|
||||||
|
// GetAgentByName resolves (owner_id, name) → agent. ownerID
|
||||||
|
// must match exactly (Phase 1 has no shared/public visibility
|
||||||
|
// yet; every agent is owned).
|
||||||
|
GetAgentByName(ctx context.Context, ownerID, name string) (*Agent, error)
|
||||||
|
|
||||||
|
// ListAgents returns every agent owned by the given member ID,
|
||||||
|
// sorted by Name ASC.
|
||||||
|
ListAgents(ctx context.Context, ownerID string) ([]*Agent, error)
|
||||||
|
|
||||||
|
// ListAllAgents returns every agent across all owners, sorted by
|
||||||
|
// (OwnerID ASC, Name ASC) so builtin rows (OwnerID="builtin")
|
||||||
|
// group together, then numeric Discord-ID owners in lexical order,
|
||||||
|
// then chatbot-shadow rows whose OwnerID is the chatbot owner's
|
||||||
|
// Discord ID but whose Name carries the "chatbot:" prefix.
|
||||||
|
//
|
||||||
|
// Why: Phase 1 admin commands ran owner-scoped (a steve-owned
|
||||||
|
// agent list shows ONLY steve's rows), which hid builtin and
|
||||||
|
// shadow Agents from the admin view. `.agent list` for admins now
|
||||||
|
// uses this method to surface every row. Non-admin invocations
|
||||||
|
// (or `.agent list --mine`) keep using ListAgents.
|
||||||
|
//
|
||||||
|
// Storage MAY back this with a single full-table scan — admin
|
||||||
|
// row counts are small (dozens to low hundreds), so no need for
|
||||||
|
// pagination at this phase.
|
||||||
|
ListAllAgents(ctx context.Context) ([]*Agent, error)
|
||||||
|
|
||||||
|
// DeleteAgent removes an agent by ID. Idempotent — deleting a
|
||||||
|
// missing row returns nil.
|
||||||
|
DeleteAgent(ctx context.Context, id string) error
|
||||||
|
|
||||||
|
// GetAgentByWebhookSecret resolves a posted /webhooks/<secret> URL
|
||||||
|
// to the matching agent. Returns ErrNotFound when no agent has
|
||||||
|
// the secret. Phase 3 webhook router consults this AFTER the
|
||||||
|
// existing Skill lookup falls through, but only when
|
||||||
|
// agents.triggers.enabled is true.
|
||||||
|
//
|
||||||
|
// Empty secret is rejected with ErrNotFound (empty WebhookSecret
|
||||||
|
// rows are NOT webhook-enabled — the application layer guards
|
||||||
|
// this, the lookup defends against accidental match).
|
||||||
|
GetAgentByWebhookSecret(ctx context.Context, secret string) (*Agent, error)
|
||||||
|
|
||||||
|
// ListAgentsByChatbotChannelFilter returns every agent with a
|
||||||
|
// non-empty ChatbotChannelFilter. Phase 3 chatbot tool provider
|
||||||
|
// uses this on every chatbot turn to assemble the per-channel
|
||||||
|
// tool list (gated by agents.triggers.enabled). The result is
|
||||||
|
// not channel-filtered here — the provider applies the channel
|
||||||
|
// filter predicate (registered in skills.ChannelFilterRegistry)
|
||||||
|
// to each row.
|
||||||
|
//
|
||||||
|
// Why no channel filter at the storage layer: the filter is a
|
||||||
|
// runtime predicate (e.g. dm_only depends on the live Discord
|
||||||
|
// channel kind cache), not a static column we can index on.
|
||||||
|
ListAgentsByChatbotChannelFilter(ctx context.Context) ([]*Agent, error)
|
||||||
|
|
||||||
|
// ListScheduledAgents returns every agent with a non-empty
|
||||||
|
// Schedule whose NextRunAt is at or before `dueBefore`. Result
|
||||||
|
// is ordered by NextRunAt ASC so the scheduler runner can drain
|
||||||
|
// in oldest-due-first order. Mirrors skills.Storage.ListDueScheduled.
|
||||||
|
//
|
||||||
|
// Phase 3 scheduler reads this on every tick when
|
||||||
|
// agents.triggers.enabled is true. The (Schedule, NextRunAt)
|
||||||
|
// composite index backs the query — see gorm tags on gormAgent.
|
||||||
|
ListScheduledAgents(ctx context.Context, dueBefore time.Time) ([]*Agent, error)
|
||||||
|
|
||||||
|
// MarkAgentScheduledRun atomically updates LastScheduledRunAt
|
||||||
|
// and NextRunAt for the given agent. Called by the agentsched
|
||||||
|
// runner after each scheduled invocation. Mirrors
|
||||||
|
// skills.Storage.MarkScheduledRun semantics.
|
||||||
|
MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user