Files
steve c8559676ed
executus CI / test (push) Has been cancelled
P4b: skill noun + contrib/store (SQLite for budget/persona/skill/audit)
Merges the skill half of the persona/skill pair plus the second nested module.
(Squashed onto main from phase-4b-skill; the audit/budget/persona batteries it
was stacked on already landed via the P4 merge.)

- skill/: clean-redesign Skill noun + LEAN SkillStore (lifecycle/versions/
  schedule only) + ToRunnable + Memory default.
- contrib/store/: separate go.mod carrying modernc.org/sqlite, so the driver
  never enters the core go.sum. db.Budget()/Personas()/Skills()/Audit() back
  all four store seams (JSON-blob + indexed columns; round-trip tested).
  Includes the verified gadfly #5 fixes (AppendVersion tx+UNIQUE+error,
  Mark*ScheduledRun atomic json_set, busy_timeout, NaN guard).
- CI: builds + tests the nested module and asserts it owns the sqlite driver.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 00:15:00 -04:00

175 lines
6.2 KiB
Go

package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/persona"
)
// personaStore is the SQLite-backed persona.Storage. It stores each Agent as a
// JSON blob in `data` with a handful of extracted, indexed columns for the
// query methods — so the FULL struct round-trips (no domain↔GORM↔DB field-loss
// footgun) while owner/name/webhook/schedule lookups stay indexable.
type personaStore struct{ db *sql.DB }
// Personas returns a durable persona.Storage backed by this database.
func (d *DB) Personas() persona.Storage { return &personaStore{db: d.sql} }
var _ persona.Storage = (*personaStore)(nil)
func (s *personaStore) InitializeAgentStorage(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
webhook_secret TEXT NOT NULL DEFAULT '',
chatbot_channel_filter TEXT NOT NULL DEFAULT '',
schedule TEXT NOT NULL DEFAULT '',
next_run_at INTEGER NOT NULL DEFAULT 0, -- unix seconds; 0 = unset
data TEXT NOT NULL -- full Agent as JSON
);
CREATE INDEX IF NOT EXISTS idx_agents_owner ON agents(owner_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_owner_name ON agents(owner_id, name);
CREATE INDEX IF NOT EXISTS idx_agents_sched ON agents(schedule, next_run_at);`)
if err != nil {
return fmt.Errorf("personaStore.Initialize: %w", err)
}
return nil
}
func (s *personaStore) SaveAgent(ctx context.Context, a *persona.Agent) error {
blob, err := json.Marshal(a)
if err != nil {
return fmt.Errorf("personaStore.SaveAgent: marshal: %w", err)
}
var next int64
if a.NextRunAt != nil && !a.NextRunAt.IsZero() {
next = a.NextRunAt.Unix()
}
_, err = s.db.ExecContext(ctx, `
INSERT INTO agents (id, owner_id, name, webhook_secret, chatbot_channel_filter, schedule, next_run_at, data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
owner_id=excluded.owner_id, name=excluded.name, webhook_secret=excluded.webhook_secret,
chatbot_channel_filter=excluded.chatbot_channel_filter, schedule=excluded.schedule,
next_run_at=excluded.next_run_at, data=excluded.data`,
a.ID, a.OwnerID, a.Name, a.WebhookSecret, a.ChatbotChannelFilter, a.Schedule, next, string(blob))
if err != nil {
return fmt.Errorf("personaStore.SaveAgent: %w", err)
}
return nil
}
// scanAgents unmarshals the `data` column of every row in rows.
func scanAgents(rows *sql.Rows) ([]*persona.Agent, error) {
defer rows.Close()
var out []*persona.Agent
for rows.Next() {
var blob string
if err := rows.Scan(&blob); err != nil {
return nil, err
}
var a persona.Agent
if err := json.Unmarshal([]byte(blob), &a); err != nil {
return nil, err
}
out = append(out, &a)
}
return out, rows.Err()
}
func (s *personaStore) getOne(ctx context.Context, where string, arg ...any) (*persona.Agent, error) {
var blob string
err := s.db.QueryRowContext(ctx, `SELECT data FROM agents WHERE `+where, arg...).Scan(&blob)
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, persona.ErrNotFound
case err != nil:
return nil, err
}
var a persona.Agent
if err := json.Unmarshal([]byte(blob), &a); err != nil {
return nil, err
}
return &a, nil
}
func (s *personaStore) GetAgent(ctx context.Context, id string) (*persona.Agent, error) {
return s.getOne(ctx, "id = ?", id)
}
func (s *personaStore) GetAgentByName(ctx context.Context, ownerID, name string) (*persona.Agent, error) {
return s.getOne(ctx, "owner_id = ? AND name = ?", ownerID, name)
}
func (s *personaStore) GetAgentByWebhookSecret(ctx context.Context, secret string) (*persona.Agent, error) {
if secret == "" {
return nil, persona.ErrNotFound
}
return s.getOne(ctx, "webhook_secret = ?", secret)
}
func (s *personaStore) ListAgents(ctx context.Context, ownerID string) ([]*persona.Agent, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents WHERE owner_id = ? ORDER BY name`, ownerID)
if err != nil {
return nil, fmt.Errorf("personaStore.ListAgents: %w", err)
}
return scanAgents(rows)
}
func (s *personaStore) ListAllAgents(ctx context.Context) ([]*persona.Agent, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents ORDER BY name`)
if err != nil {
return nil, fmt.Errorf("personaStore.ListAllAgents: %w", err)
}
return scanAgents(rows)
}
func (s *personaStore) DeleteAgent(ctx context.Context, id string) error {
if _, err := s.db.ExecContext(ctx, `DELETE FROM agents WHERE id = ?`, id); err != nil {
return fmt.Errorf("personaStore.DeleteAgent: %w", err)
}
return nil
}
func (s *personaStore) ListAgentsByChatbotChannelFilter(ctx context.Context) ([]*persona.Agent, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents WHERE chatbot_channel_filter != '' ORDER BY name`)
if err != nil {
return nil, fmt.Errorf("personaStore.ListAgentsByChatbotChannelFilter: %w", err)
}
return scanAgents(rows)
}
func (s *personaStore) ListScheduledAgents(ctx context.Context, dueBefore time.Time) ([]*persona.Agent, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT data FROM agents WHERE schedule != '' AND next_run_at > 0 AND next_run_at <= ? ORDER BY next_run_at`,
dueBefore.Unix())
if err != nil {
return nil, fmt.Errorf("personaStore.ListScheduledAgents: %w", err)
}
return scanAgents(rows)
}
func (s *personaStore) MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error {
// Single atomic statement, not Get→mutate→Save: closes the lost-update
// window a concurrent Mark/edit would otherwise open. json_set keeps the
// blob's *time.Time fields consistent with the next_run_at column (Go
// encodes time.Time as RFC3339Nano, so it round-trips through GetAgent).
res, err := s.db.ExecContext(ctx,
`UPDATE agents SET next_run_at=?, data=json_set(data,'$.NextRunAt',?,'$.LastScheduledRunAt',?) WHERE id=?`,
nextAt.Unix(), nextAt.Format(time.RFC3339Nano), ranAt.Format(time.RFC3339Nano), agentID)
if err != nil {
return fmt.Errorf("personaStore.MarkAgentScheduledRun: %w", err)
}
if n, _ := res.RowsAffected(); n == 0 {
return persona.ErrNotFound
}
return nil
}