cb16008b14
db.Personas() satisfies persona.Storage over SQLite. Each Agent is stored as a JSON blob with extracted indexed columns (owner_id, name, webhook_secret, chatbot_channel_filter, schedule, next_run_at) — so the WHOLE struct round-trips (no domain<->GORM<->DB field-loss footgun) while the lookups stay indexable. Test proves the round-trip preserves nested + map fields (SkillPalette, StateReactEmoji), the owner/name + webhook + chatbot-filter queries, the scheduled-due query, and MarkAgentScheduledRun clearing the due window. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
168 lines
5.6 KiB
Go
168 lines
5.6 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/executus/persona"
|
|
)
|
|
|
|
// personaStore is the SQLite-backed persona.Storage. It stores each Agent as a
|
|
// JSON blob in `data` with a handful of extracted, indexed columns for the
|
|
// query methods — so the FULL struct round-trips (no domain↔GORM↔DB field-loss
|
|
// footgun) while owner/name/webhook/schedule lookups stay indexable.
|
|
type personaStore struct{ db *sql.DB }
|
|
|
|
// Personas returns a durable persona.Storage backed by this database.
|
|
func (d *DB) Personas() persona.Storage { return &personaStore{db: d.sql} }
|
|
|
|
var _ persona.Storage = (*personaStore)(nil)
|
|
|
|
func (s *personaStore) InitializeAgentStorage(ctx context.Context) error {
|
|
_, err := s.db.ExecContext(ctx, `
|
|
CREATE TABLE IF NOT EXISTS agents (
|
|
id TEXT PRIMARY KEY,
|
|
owner_id TEXT NOT NULL DEFAULT '',
|
|
name TEXT NOT NULL DEFAULT '',
|
|
webhook_secret TEXT NOT NULL DEFAULT '',
|
|
chatbot_channel_filter TEXT NOT NULL DEFAULT '',
|
|
schedule TEXT NOT NULL DEFAULT '',
|
|
next_run_at INTEGER NOT NULL DEFAULT 0, -- unix seconds; 0 = unset
|
|
data TEXT NOT NULL -- full Agent as JSON
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_agents_owner ON agents(owner_id);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_owner_name ON agents(owner_id, name);
|
|
CREATE INDEX IF NOT EXISTS idx_agents_sched ON agents(schedule, next_run_at);`)
|
|
if err != nil {
|
|
return fmt.Errorf("personaStore.Initialize: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *personaStore) SaveAgent(ctx context.Context, a *persona.Agent) error {
|
|
blob, err := json.Marshal(a)
|
|
if err != nil {
|
|
return fmt.Errorf("personaStore.SaveAgent: marshal: %w", err)
|
|
}
|
|
var next int64
|
|
if a.NextRunAt != nil && !a.NextRunAt.IsZero() {
|
|
next = a.NextRunAt.Unix()
|
|
}
|
|
_, err = s.db.ExecContext(ctx, `
|
|
INSERT INTO agents (id, owner_id, name, webhook_secret, chatbot_channel_filter, schedule, next_run_at, data)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
owner_id=excluded.owner_id, name=excluded.name, webhook_secret=excluded.webhook_secret,
|
|
chatbot_channel_filter=excluded.chatbot_channel_filter, schedule=excluded.schedule,
|
|
next_run_at=excluded.next_run_at, data=excluded.data`,
|
|
a.ID, a.OwnerID, a.Name, a.WebhookSecret, a.ChatbotChannelFilter, a.Schedule, next, string(blob))
|
|
if err != nil {
|
|
return fmt.Errorf("personaStore.SaveAgent: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// scanAgents unmarshals the `data` column of every row in rows.
|
|
func scanAgents(rows *sql.Rows) ([]*persona.Agent, error) {
|
|
defer rows.Close()
|
|
var out []*persona.Agent
|
|
for rows.Next() {
|
|
var blob string
|
|
if err := rows.Scan(&blob); err != nil {
|
|
return nil, err
|
|
}
|
|
var a persona.Agent
|
|
if err := json.Unmarshal([]byte(blob), &a); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, &a)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *personaStore) getOne(ctx context.Context, where string, arg ...any) (*persona.Agent, error) {
|
|
var blob string
|
|
err := s.db.QueryRowContext(ctx, `SELECT data FROM agents WHERE `+where, arg...).Scan(&blob)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return nil, persona.ErrNotFound
|
|
case err != nil:
|
|
return nil, err
|
|
}
|
|
var a persona.Agent
|
|
if err := json.Unmarshal([]byte(blob), &a); err != nil {
|
|
return nil, err
|
|
}
|
|
return &a, nil
|
|
}
|
|
|
|
func (s *personaStore) GetAgent(ctx context.Context, id string) (*persona.Agent, error) {
|
|
return s.getOne(ctx, "id = ?", id)
|
|
}
|
|
|
|
func (s *personaStore) GetAgentByName(ctx context.Context, ownerID, name string) (*persona.Agent, error) {
|
|
return s.getOne(ctx, "owner_id = ? AND name = ?", ownerID, name)
|
|
}
|
|
|
|
func (s *personaStore) GetAgentByWebhookSecret(ctx context.Context, secret string) (*persona.Agent, error) {
|
|
if secret == "" {
|
|
return nil, persona.ErrNotFound
|
|
}
|
|
return s.getOne(ctx, "webhook_secret = ?", secret)
|
|
}
|
|
|
|
func (s *personaStore) ListAgents(ctx context.Context, ownerID string) ([]*persona.Agent, error) {
|
|
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents WHERE owner_id = ? ORDER BY name`, ownerID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("personaStore.ListAgents: %w", err)
|
|
}
|
|
return scanAgents(rows)
|
|
}
|
|
|
|
func (s *personaStore) ListAllAgents(ctx context.Context) ([]*persona.Agent, error) {
|
|
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents ORDER BY name`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("personaStore.ListAllAgents: %w", err)
|
|
}
|
|
return scanAgents(rows)
|
|
}
|
|
|
|
func (s *personaStore) DeleteAgent(ctx context.Context, id string) error {
|
|
if _, err := s.db.ExecContext(ctx, `DELETE FROM agents WHERE id = ?`, id); err != nil {
|
|
return fmt.Errorf("personaStore.DeleteAgent: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *personaStore) ListAgentsByChatbotChannelFilter(ctx context.Context) ([]*persona.Agent, error) {
|
|
rows, err := s.db.QueryContext(ctx, `SELECT data FROM agents WHERE chatbot_channel_filter != '' ORDER BY name`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("personaStore.ListAgentsByChatbotChannelFilter: %w", err)
|
|
}
|
|
return scanAgents(rows)
|
|
}
|
|
|
|
func (s *personaStore) ListScheduledAgents(ctx context.Context, dueBefore time.Time) ([]*persona.Agent, error) {
|
|
rows, err := s.db.QueryContext(ctx,
|
|
`SELECT data FROM agents WHERE schedule != '' AND next_run_at > 0 AND next_run_at <= ? ORDER BY next_run_at`,
|
|
dueBefore.Unix())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("personaStore.ListScheduledAgents: %w", err)
|
|
}
|
|
return scanAgents(rows)
|
|
}
|
|
|
|
func (s *personaStore) MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error {
|
|
a, err := s.GetAgent(ctx, agentID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.LastScheduledRunAt = &ranAt
|
|
a.NextRunAt = &nextAt
|
|
return s.SaveAgent(ctx, a)
|
|
}
|