Files
steve 6fd050855a feat: add durable queue, single worker, and drain-by-model scheduling
Replace the Phase 2 in-flight chat gate (buffered channel) with a real
SQLite-backed job queue and single worker loop. Every /api/chat request
now creates a job row, blocks until the worker completes it, and returns
the result transparently.

Key changes:
- internal/store: NextJob (drain-by-model ordering), IncrementAttempt,
  ResetInterruptedJobs, DeleteTerminalJobsBefore; busy_timeout pragma
- internal/worker: single-threaded worker loop with Notifier for sync
  handler completion signaling; retry on ConnectionError, terminal fail
  on HTTPError; crash recovery resets interrupted jobs on startup
- internal/webhook: dispatcher infrastructure for async webhook delivery
- internal/server: chat handler rewritten to enqueue+wait; old chatGate
  removed; embeddings remain direct concurrent proxies (ADR-0013)
- internal/config: FOREMAN_MAX_ATTEMPTS, FOREMAN_JOB_TTL

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:29:32 -04:00

480 lines
15 KiB
Go

// Package store provides a SQLite-backed durable queue for foreman jobs and artifacts.
//
// Why: jobs must survive daemon restarts so async callers and webhooks never lose
// work (ADR-0008). SQLite in WAL mode gives durable single-writer/multi-reader
// semantics with no external dependencies.
// What: opens a SQLite database, runs migrations, and exposes CRUD for jobs and
// artifacts.
// Test: use t.TempDir() for an isolated DB per test; verify all CRUD operations
// and state transitions.
package store
import (
"database/sql"
"encoding/json"
"fmt"
"time"
_ "modernc.org/sqlite"
)
// JobState represents the lifecycle state of a job.
type JobState string
const (
JobStateQueued JobState = "queued"
JobStateLoading JobState = "loading"
JobStateWorking JobState = "working"
JobStateDone JobState = "done"
JobStateFailed JobState = "failed"
)
// Job represents a queued unit of work.
type Job struct {
ID string `json:"id"`
Model string `json:"model"`
Payload json.RawMessage `json:"payload"`
State JobState `json:"state"`
Result json.RawMessage `json:"result,omitempty"`
Error *string `json:"error,omitempty"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
StateWebhookURL *string `json:"state_webhook_url,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// Artifact represents a named, typed blob attached to a completed job.
type Artifact struct {
ID int64 `json:"id"`
JobID string `json:"job_id"`
Name string `json:"name"`
ContentType string `json:"content_type"`
Data []byte `json:"-"`
Size int64 `json:"size"`
CreatedAt time.Time `json:"created_at"`
}
// Store wraps a SQLite database with job and artifact operations.
type Store struct {
db *sql.DB
}
// migration is the DDL that creates the schema. It runs once on Open via
// IF NOT EXISTS guards.
const migration = `
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
model TEXT NOT NULL,
payload BLOB NOT NULL,
state TEXT NOT NULL DEFAULT 'queued',
result BLOB,
error TEXT,
attempt INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
state_webhook_url TEXT,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
started_at DATETIME,
completed_at DATETIME
);
CREATE INDEX IF NOT EXISTS idx_jobs_state ON jobs(state);
CREATE INDEX IF NOT EXISTS idx_jobs_model_state ON jobs(model, state);
CREATE TABLE IF NOT EXISTS artifacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL REFERENCES jobs(id),
name TEXT NOT NULL,
content_type TEXT NOT NULL,
data BLOB NOT NULL,
size INTEGER NOT NULL,
created_at DATETIME NOT NULL,
UNIQUE(job_id, name)
);
`
// Open creates or opens a SQLite database at path, enables WAL mode, and runs
// migrations.
//
// Why: single entry point ensures WAL mode and schema are always applied.
// What: opens the DB, sets pragmas, runs CREATE TABLE IF NOT EXISTS.
// Test: call Open with a temp dir path, assert no error and that tables exist.
func Open(path string) (*Store, error) {
// Append pragmas to the DSN so they apply to every connection in the pool.
dsn := path + "?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)"
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, fmt.Errorf("open sqlite %q: %w", path, err)
}
if _, err := db.Exec(migration); err != nil {
db.Close()
return nil, fmt.Errorf("run migration: %w", err)
}
return &Store{db: db}, nil
}
// Close closes the underlying database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// CreateJob inserts a new job into the queue.
//
// Why: the async /jobs endpoint and the sync passthrough both need to enqueue work.
// What: inserts a job row with state "queued" and returns the stored Job.
// Test: create a job, then GetJob by ID, assert fields match.
func (s *Store) CreateJob(job Job) (Job, error) {
now := time.Now().UTC()
job.State = JobStateQueued
job.CreatedAt = now
job.UpdatedAt = now
if job.MaxAttempts == 0 {
job.MaxAttempts = 3
}
_, err := s.db.Exec(
`INSERT INTO jobs (id, model, payload, state, attempt, max_attempts, state_webhook_url, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
job.ID, job.Model, []byte(job.Payload), string(job.State),
job.Attempt, job.MaxAttempts, job.StateWebhookURL,
job.CreatedAt, job.UpdatedAt,
)
if err != nil {
return Job{}, fmt.Errorf("insert job %s: %w", job.ID, err)
}
return job, nil
}
// GetJob retrieves a job by ID.
//
// Why: callers need to poll job status via GET /jobs/{id} and the worker needs to
// read jobs from the queue.
// What: queries the jobs table by primary key and scans into a Job struct.
// Test: create a job, GetJob, assert all fields round-trip correctly.
func (s *Store) GetJob(id string) (Job, error) {
var j Job
var payload, result []byte
err := s.db.QueryRow(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs WHERE id = ?`, id,
).Scan(
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
)
if err != nil {
return Job{}, fmt.Errorf("get job %s: %w", id, err)
}
j.Payload = json.RawMessage(payload)
if result != nil {
j.Result = json.RawMessage(result)
}
return j, nil
}
// UpdateJobState transitions a job to a new state and updates associated fields.
//
// Why: the worker loop drives jobs through their lifecycle (queued -> loading ->
// working -> done/failed), and each transition must be persisted durably.
// What: updates the state, updated_at, and optionally result/error/timestamps.
// Test: create a job, advance through states, assert each transition persists.
func (s *Store) UpdateJobState(id string, state JobState, result json.RawMessage, errMsg *string) error {
now := time.Now().UTC()
var resultBytes []byte
if result != nil {
resultBytes = []byte(result)
}
var startedAt, completedAt *time.Time
switch state {
case JobStateLoading, JobStateWorking:
startedAt = &now
case JobStateDone, JobStateFailed:
completedAt = &now
}
res, err := s.db.Exec(
`UPDATE jobs SET state = ?, result = ?, error = ?, updated_at = ?,
started_at = COALESCE(?, started_at),
completed_at = COALESCE(?, completed_at)
WHERE id = ?`,
string(state), resultBytes, errMsg, now, startedAt, completedAt, id,
)
if err != nil {
return fmt.Errorf("update job %s state to %s: %w", id, state, err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected for job %s: %w", id, err)
}
if rows == 0 {
return fmt.Errorf("job %s not found", id)
}
return nil
}
// ListJobs returns jobs, optionally filtered by state. If state is nil, all jobs
// are returned ordered by created_at descending.
//
// Why: the GET /jobs endpoint needs to list jobs with optional state filtering.
// What: queries the jobs table with an optional WHERE clause on state.
// Test: create jobs in different states, list with and without filter, assert counts.
func (s *Store) ListJobs(state *JobState) ([]Job, error) {
var rows *sql.Rows
var err error
if state != nil {
rows, err = s.db.Query(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs WHERE state = ? ORDER BY created_at DESC`, string(*state),
)
} else {
rows, err = s.db.Query(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs ORDER BY created_at DESC`,
)
}
if err != nil {
return nil, fmt.Errorf("list jobs: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var j Job
var payload, result []byte
if err := rows.Scan(
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
); err != nil {
return nil, fmt.Errorf("scan job row: %w", err)
}
j.Payload = json.RawMessage(payload)
if result != nil {
j.Result = json.RawMessage(result)
}
jobs = append(jobs, j)
}
return jobs, rows.Err()
}
// CreateArtifact attaches a named artifact to a job.
//
// Why: completed jobs produce artifacts (the completion response, structured data,
// etc.) that must be stored durably for webhook delivery and polling (ADR-0006).
// What: inserts a row into the artifacts table with the blob data.
// Test: create a job, attach an artifact, retrieve it, assert data matches.
func (s *Store) CreateArtifact(artifact Artifact) (Artifact, error) {
now := time.Now().UTC()
artifact.CreatedAt = now
artifact.Size = int64(len(artifact.Data))
res, err := s.db.Exec(
`INSERT INTO artifacts (job_id, name, content_type, data, size, created_at)
VALUES (?, ?, ?, ?, ?, ?)`,
artifact.JobID, artifact.Name, artifact.ContentType,
artifact.Data, artifact.Size, artifact.CreatedAt,
)
if err != nil {
return Artifact{}, fmt.Errorf("insert artifact %q for job %s: %w", artifact.Name, artifact.JobID, err)
}
id, err := res.LastInsertId()
if err != nil {
return Artifact{}, fmt.Errorf("get artifact id: %w", err)
}
artifact.ID = id
return artifact, nil
}
// GetArtifact retrieves a single artifact by job ID and name.
//
// Why: the GET /jobs/{id}/artifacts/{name} endpoint serves individual artifacts.
// What: queries by the (job_id, name) unique key and returns the full blob.
// Test: create an artifact, get it by job_id+name, assert data round-trips.
func (s *Store) GetArtifact(jobID, name string) (Artifact, error) {
var a Artifact
err := s.db.QueryRow(
`SELECT id, job_id, name, content_type, data, size, created_at
FROM artifacts WHERE job_id = ? AND name = ?`, jobID, name,
).Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt)
if err != nil {
return Artifact{}, fmt.Errorf("get artifact %q for job %s: %w", name, jobID, err)
}
return a, nil
}
// GetArtifactsByJob returns all artifacts for a given job.
//
// Why: the GET /jobs/{id} response includes artifact metadata for the caller to
// decide which to fetch.
// What: queries all artifacts by job_id, ordered by name.
// Test: attach multiple artifacts to a job, list them, assert all returned.
func (s *Store) GetArtifactsByJob(jobID string) ([]Artifact, error) {
rows, err := s.db.Query(
`SELECT id, job_id, name, content_type, data, size, created_at
FROM artifacts WHERE job_id = ? ORDER BY name`, jobID,
)
if err != nil {
return nil, fmt.Errorf("list artifacts for job %s: %w", jobID, err)
}
defer rows.Close()
var artifacts []Artifact
for rows.Next() {
var a Artifact
if err := rows.Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt); err != nil {
return nil, fmt.Errorf("scan artifact row: %w", err)
}
artifacts = append(artifacts, a)
}
return artifacts, rows.Err()
}
// NextJob returns the next queued job using drain-by-model ordering. Jobs for the
// currently-resident model are preferred to avoid swap costs, then ordered by
// creation time.
//
// Why: the worker loop must pick the optimal next job to minimize model swaps
// (ADR-0009 drain-by-model heuristic).
// What: queries for the first queued job, sorting by model affinity then FIFO.
// Test: enqueue jobs for two models, set currentModel to one, verify it drains
// that model first before switching.
func (s *Store) NextJob(currentModel string) (Job, error) {
var j Job
var payload, result []byte
err := s.db.QueryRow(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs
WHERE state = ?
ORDER BY (CASE WHEN model = ? THEN 0 ELSE 1 END) ASC, created_at ASC
LIMIT 1`, string(JobStateQueued), currentModel,
).Scan(
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
)
if err != nil {
return Job{}, fmt.Errorf("next job: %w", err)
}
j.Payload = json.RawMessage(payload)
if result != nil {
j.Result = json.RawMessage(result)
}
return j, nil
}
// IncrementAttempt bumps the attempt counter on a job and resets it to queued.
//
// Why: retry logic needs to record each attempt while re-queuing the job.
// What: increments attempt by 1 and sets state back to queued.
// Test: create a job, increment twice, verify attempt=2 and state=queued.
func (s *Store) IncrementAttempt(id string) error {
now := time.Now().UTC()
res, err := s.db.Exec(
`UPDATE jobs SET attempt = attempt + 1, state = ?, updated_at = ? WHERE id = ?`,
string(JobStateQueued), now, id,
)
if err != nil {
return fmt.Errorf("increment attempt for job %s: %w", id, err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected for job %s: %w", id, err)
}
if rows == 0 {
return fmt.Errorf("job %s not found", id)
}
return nil
}
// ResetInterruptedJobs moves any loading or working jobs back to queued. Called
// on startup to recover from a crash mid-execution.
//
// Why: if the daemon restarts while a job is in-flight, the job must not be stuck
// in a non-terminal, non-queued state forever.
// What: updates all loading/working jobs to queued.
// Test: create jobs in loading/working states, call Reset, verify all are queued.
func (s *Store) ResetInterruptedJobs() (int64, error) {
now := time.Now().UTC()
res, err := s.db.Exec(
`UPDATE jobs SET state = ?, updated_at = ? WHERE state IN (?, ?)`,
string(JobStateQueued), now,
string(JobStateLoading), string(JobStateWorking),
)
if err != nil {
return 0, fmt.Errorf("reset interrupted jobs: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("check rows affected: %w", err)
}
return rows, nil
}
// DeleteTerminalJobsBefore deletes terminal jobs (done or failed) and their
// artifacts older than the given cutoff time.
//
// Why: prevents unbounded storage growth by pruning old completed work (ADR-0008).
// What: deletes artifacts first (FK), then jobs with completed_at before cutoff.
// Test: create old terminal jobs, call with a recent cutoff, verify they are gone.
func (s *Store) DeleteTerminalJobsBefore(cutoff time.Time) (int64, error) {
// Delete artifacts for terminal jobs first (foreign key).
_, err := s.db.Exec(
`DELETE FROM artifacts WHERE job_id IN (
SELECT id FROM jobs WHERE state IN (?, ?) AND completed_at < ?
)`,
string(JobStateDone), string(JobStateFailed), cutoff,
)
if err != nil {
return 0, fmt.Errorf("delete old artifacts: %w", err)
}
res, err := s.db.Exec(
`DELETE FROM jobs WHERE state IN (?, ?) AND completed_at < ?`,
string(JobStateDone), string(JobStateFailed), cutoff,
)
if err != nil {
return 0, fmt.Errorf("delete old jobs: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("check rows affected: %w", err)
}
return rows, nil
}