P4: Tier-2 batteries — audit + budget + persona noun #4

Closed
steve wants to merge 4 commits from phase-4-batteries into main
6 changed files with 311 additions and 1 deletions
Showing only changes of commit 3f14aae032 - Show all commits
+2 -1
View File
@@ -68,7 +68,8 @@ BATTERIES (opt-in siblings, each nil-safe + a default):
critic/ two-tier timeout state machine + Escalator [P4]
schedule/ cron runner cores [P4]
checkpoint/ durable resume seam [P4]
budget/ rolling-window tracker (+ NoOp) [P4]
budget/ DBBudget rolling-7d + NoOp (run.Budget); [P4]
BudgetStorage iface + Memory default
contrib/store/ SECOND module (+ modernc.org/sqlite): [P4]
in-memory + pure-Go SQLite impls of every *Store seam
+167
View File
@@ -0,0 +1,167 @@
// Package skillexec runs saved Skill definitions via majordomo's agent
// loop (gitea.stevedudenhoeffer.com/steve/majordomo/agent).
//
// Why: a Skill is data; the executor turns data into a running agent
// (resolve model, build toolbox, start audit, run the agent loop,
// finish audit, deliver).
package budget
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
)
// BudgetTracker enforces per-user GPU budgets in v2. v1 ships
// NoOpBudget which always allows. The interface exists now so the v2
// migration is a single line in the executor.
//
// Why interface now: the executor's Check/Commit calls would need to
// be added in v2 anyway; doing it now means v2 only swaps NoOp for
// DBBudget without touching call sites.
type BudgetTracker interface {
// Check reports whether the caller has remaining budget. Returns
// nil for "yes" or an error describing the exhaustion.
Check(ctx context.Context, callerID string) error
// Commit records that the caller spent runtimeSeconds of budget on
// this run. Called after the agent completes (success or error).
Commit(ctx context.Context, callerID string, runtimeSeconds float64)
}
// NoOpBudget always allows and never records. v1 default.
type NoOpBudget struct{}
// NewNoOpBudget constructs the no-op tracker.
func NewNoOpBudget() BudgetTracker { return NoOpBudget{} }
// Check always returns nil.
func (NoOpBudget) Check(_ context.Context, _ string) error { return nil }
// Commit is a no-op.
func (NoOpBudget) Commit(_ context.Context, _ string, _ float64) {}
// ErrBudgetExceeded is returned by DBBudget.Check when the caller's
// 7-day rolling window has hit the convar-configured cap.
//
// Why a sentinel: callers (executor, audit writer) need to distinguish
// budget rejection from generic errors so they can record
// status="budget_exceeded" instead of "error" and skip user-visible
// delivery side-effects.
var ErrBudgetExceeded = errors.New("weekly skill budget exceeded")
// BudgetNotifier is the optional callback DBBudget invokes when a
// Check rejects a caller. Production wires a Discord-DM hook so the
// user knows why their skill failed; tests inject a recorder.
//
// nil is allowed and is silently skipped.
type BudgetNotifier func(ctx context.Context, userID string, secondsUsed, cap float64)
// DBBudget enforces per-user weekly GPU budgets via the BudgetStorage
// interface. The "weekly" cap is a rolling 7-day window — see
// SkillBudget for the rollover semantics.
//
// Why a closure for the limit instead of an int field: the cap comes
// from a runtime convar. Reading it on every Check means a `.convar
// set skills.user_budget_seconds_per_week 7200` takes effect on the
// next call without restarting the bot or rewiring the executor.
type DBBudget struct {
storage BudgetStorage
// weeklyLimit returns the current cap in seconds. Reads convar at
// every Check so a runtime convar bump takes effect on the next
// call.
weeklyLimit func() float64
// notify is called when a Check rejects a caller. Optional —
// production wires a Discord-DM hook so the user knows why their
// skill failed. nil-safe.
notify BudgetNotifier
// now is the time source. Test injects a fake clock; production
// uses time.Now.
now func() time.Time
}
// NewDBBudget constructs a DBBudget. now may be nil — defaults to
// time.Now.
//
// Why time injection: budget rollover is time-sensitive; tests need to
// fast-forward past the 7-day boundary deterministically. now=nil
// means production callers (mort.go) don't have to think about it.
//
// Test: pass a closure that returns a fixed instant; assert rollover
// only happens when (now - WindowStart) >= 7 days.
func NewDBBudget(storage BudgetStorage, weeklyLimit func() float64, notify BudgetNotifier, now func() time.Time) *DBBudget {
if now == nil {
now = time.Now
}
return &DBBudget{
storage: storage,
weeklyLimit: weeklyLimit,
notify: notify,
now: now,
}
}
// Check returns ErrBudgetExceeded if the caller has spent at least
// weeklyLimit seconds in the current rolling 7-day window.
//
// Why anonymous callerID="" is unbudgeted: scheduler-driven and
// system-initiated runs don't have a Discord user to bill; charging
// "system" would conflate them with a real user. The scheduler sets
// CallerID to the skill owner where applicable, so cron-loop
// abusiveness still consumes the owner's budget.
//
// Why cap<=0 means "disabled": operator wants a runtime kill-switch.
// Setting the convar to "0" turns enforcement off without restart.
//
// Test: Get returns nil → Check returns nil; Get returns row with
// SecondsUsed >= cap → Check returns ErrBudgetExceeded and notify is
// invoked; window expired (>=7d) → Check returns nil regardless of
// SecondsUsed.
func (b *DBBudget) Check(ctx context.Context, callerID string) error {
if callerID == "" {
return nil
}
bud, err := b.storage.Get(ctx, callerID)
if err != nil {
return fmt.Errorf("budget: %w", err)
}
if bud != nil {
if b.now().Sub(bud.WindowStart) < 7*24*time.Hour {
cap := b.weeklyLimit()
if cap > 0 && bud.SecondsUsed >= cap {
if b.notify != nil {
b.notify(ctx, callerID, bud.SecondsUsed, cap)
}
return ErrBudgetExceeded
}
}
}
return nil
}
// Commit records the run's runtime against the caller's budget.
// Failures are logged but never returned — budget accounting must
// not break user-visible execution.
//
// Why callerID="" is a no-op: matches Check's anonymous-caller
// shortcut; system runs don't get billed.
//
// Why runtimeSeconds<=0 is a no-op: a run that errored before
// resolving a model has wallSecs near 0 in floating-point terms but
// can also be exactly 0 (synthetic test fixtures). Skipping avoids
// spurious 0-runs rows from short-lived failures.
//
// Test: Commit(50) → Get reports SecondsUsed=50; storage failure
// surfaces only as a slog.Warn (no panic, no return).
func (b *DBBudget) Commit(ctx context.Context, callerID string, runtimeSeconds float64) {
if callerID == "" || runtimeSeconds <= 0 {
return
}
if err := b.storage.Add(ctx, callerID, runtimeSeconds, b.now()); err != nil {
slog.Warn("skills budget: commit failed", "user", callerID, "error", err)
}
}
+44
View File
@@ -0,0 +1,44 @@
package budget
import (
"context"
"errors"
"testing"
"time"
)
func TestDBBudgetRollingWindow(t *testing.T) {
ctx := context.Background()
mem := NewMemory()
now := time.Now()
clock := func() time.Time { return now }
b := NewDBBudget(mem, func() float64 { return 100 }, nil, clock)
// Under cap: allowed.
if err := b.Check(ctx, "u"); err != nil {
t.Fatalf("fresh caller should pass: %v", err)
}
b.Commit(ctx, "u", 60)
if err := b.Check(ctx, "u"); err != nil {
t.Fatalf("60/100 should pass: %v", err)
}
// Over cap: rejected.
b.Commit(ctx, "u", 50) // 110 total
if err := b.Check(ctx, "u"); !errors.Is(err, ErrBudgetExceeded) {
t.Fatalf("110/100 should be ErrBudgetExceeded, got %v", err)
}
// Window rolls over after 7 days: allowed again.
now = now.Add(8 * 24 * time.Hour)
b.Commit(ctx, "u", 1) // triggers rollover inside Add
if err := b.Check(ctx, "u"); err != nil {
t.Fatalf("after window rollover should pass: %v", err)
}
}
func TestNoOpBudgetAlwaysAllows(t *testing.T) {
b := NewNoOpBudget()
if err := b.Check(context.Background(), "anyone"); err != nil {
t.Fatalf("NoOp must always allow: %v", err)
}
b.Commit(context.Background(), "anyone", 1e9) // no-op
}
+56
View File
@@ -0,0 +1,56 @@
package budget
import (
"context"
"sync"
"time"
)
// Memory is a zero-dependency in-process BudgetStorage: per-user rolling-window
// usage held in memory (lost on restart). The default behind DBBudget for a
// light host or tests; mort uses its GORM Storage, contrib/store adds SQLite.
type Memory struct {
mu sync.Mutex
rows map[string]*SkillBudget
}
// NewMemory returns an empty in-memory BudgetStorage.
func NewMemory() *Memory { return &Memory{rows: map[string]*SkillBudget{}} }
var _ BudgetStorage = (*Memory)(nil)
func (m *Memory) Initialize(context.Context) error { return nil }
func (m *Memory) Get(_ context.Context, userID string) (*SkillBudget, error) {
m.mu.Lock()
defer m.mu.Unlock()
r, ok := m.rows[userID]
if !ok {
return nil, nil
}
cp := *r // copy out so callers can't mutate our row
return &cp, nil
}
func (m *Memory) Add(_ context.Context, userID string, secondsUsed float64, now time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
r, ok := m.rows[userID]
if !ok {
m.rows[userID] = &SkillBudget{
UserID: userID, WindowStart: now,
SecondsUsed: secondsUsed, RunsCount: 1, UpdatedAt: now,
}
return nil
}
// Roll the window over if it's older than the window length.
if now.Sub(r.WindowStart) >= budgetWindow {
r.WindowStart = now
r.SecondsUsed = 0
r.RunsCount = 0
}
r.SecondsUsed += secondsUsed
r.RunsCount++
r.UpdatedAt = now
return nil
}
+9
View File
@@ -0,0 +1,9 @@
package budget
import "gitea.stevedudenhoeffer.com/steve/executus/run"
// The budget trackers plug directly into run.Ports.Budget (Check/Commit match).
var (
_ run.Budget = NoOpBudget{}
_ run.Budget = (*DBBudget)(nil)
)
+33
View File
@@ -0,0 +1,33 @@
package budget
import (
"context"
"time"
)
// BudgetStorage is the persistence seam behind DBBudget: one budget row per
// user, with an atomic Add that rolls the 7-day window over transparently. Mort
// backs this with GORM/MySQL (the skill_budgets table); Memory() is the
// zero-dependency default; contrib/store adds a durable SQLite one.
type BudgetStorage interface {
// Initialize runs any schema setup. Safe to call repeatedly.
Initialize(ctx context.Context) error
// Get returns the user's current budget row, or (nil, nil) if none exists.
Get(ctx context.Context, userID string) (*SkillBudget, error)
// Add increments seconds_used + runs_count atomically, rolling the window
// over when WindowStart is older than 7 days (reset to now, fresh count).
// Creates the row if absent.
Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error
}
// SkillBudget is one user's rolling-window usage row.
type SkillBudget struct {
UserID string
WindowStart time.Time
SecondsUsed float64
RunsCount int
UpdatedAt time.Time
}
// budgetWindow is the rolling window length the storage rolls over at.
const budgetWindow = 7 * 24 * time.Hour