6fd050855a
Replace the Phase 2 in-flight chat gate (buffered channel) with a real SQLite-backed job queue and single worker loop. Every /api/chat request now creates a job row, blocks until the worker completes it, and returns the result transparently. Key changes: - internal/store: NextJob (drain-by-model ordering), IncrementAttempt, ResetInterruptedJobs, DeleteTerminalJobsBefore; busy_timeout pragma - internal/worker: single-threaded worker loop with Notifier for sync handler completion signaling; retry on ConnectionError, terminal fail on HTTPError; crash recovery resets interrupted jobs on startup - internal/webhook: dispatcher infrastructure for async webhook delivery - internal/server: chat handler rewritten to enqueue+wait; old chatGate removed; embeddings remain direct concurrent proxies (ADR-0013) - internal/config: FOREMAN_MAX_ATTEMPTS, FOREMAN_JOB_TTL Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
480 lines
15 KiB
Go
480 lines
15 KiB
Go
// Package store provides a SQLite-backed durable queue for foreman jobs and artifacts.
|
|
//
|
|
// Why: jobs must survive daemon restarts so async callers and webhooks never lose
|
|
// work (ADR-0008). SQLite in WAL mode gives durable single-writer/multi-reader
|
|
// semantics with no external dependencies.
|
|
// What: opens a SQLite database, runs migrations, and exposes CRUD for jobs and
|
|
// artifacts.
|
|
// Test: use t.TempDir() for an isolated DB per test; verify all CRUD operations
|
|
// and state transitions.
|
|
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
// JobState represents the lifecycle state of a job.
|
|
type JobState string
|
|
|
|
const (
|
|
JobStateQueued JobState = "queued"
|
|
JobStateLoading JobState = "loading"
|
|
JobStateWorking JobState = "working"
|
|
JobStateDone JobState = "done"
|
|
JobStateFailed JobState = "failed"
|
|
)
|
|
|
|
// Job represents a queued unit of work.
|
|
type Job struct {
|
|
ID string `json:"id"`
|
|
Model string `json:"model"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
State JobState `json:"state"`
|
|
Result json.RawMessage `json:"result,omitempty"`
|
|
Error *string `json:"error,omitempty"`
|
|
Attempt int `json:"attempt"`
|
|
MaxAttempts int `json:"max_attempts"`
|
|
StateWebhookURL *string `json:"state_webhook_url,omitempty"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
|
}
|
|
|
|
// Artifact represents a named, typed blob attached to a completed job.
|
|
type Artifact struct {
|
|
ID int64 `json:"id"`
|
|
JobID string `json:"job_id"`
|
|
Name string `json:"name"`
|
|
ContentType string `json:"content_type"`
|
|
Data []byte `json:"-"`
|
|
Size int64 `json:"size"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// Store wraps a SQLite database with job and artifact operations.
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// migration is the DDL that creates the schema. It runs once on Open via
|
|
// IF NOT EXISTS guards.
|
|
const migration = `
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id TEXT PRIMARY KEY,
|
|
model TEXT NOT NULL,
|
|
payload BLOB NOT NULL,
|
|
state TEXT NOT NULL DEFAULT 'queued',
|
|
result BLOB,
|
|
error TEXT,
|
|
attempt INTEGER NOT NULL DEFAULT 0,
|
|
max_attempts INTEGER NOT NULL DEFAULT 3,
|
|
state_webhook_url TEXT,
|
|
created_at DATETIME NOT NULL,
|
|
updated_at DATETIME NOT NULL,
|
|
started_at DATETIME,
|
|
completed_at DATETIME
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_state ON jobs(state);
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_model_state ON jobs(model, state);
|
|
|
|
CREATE TABLE IF NOT EXISTS artifacts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL REFERENCES jobs(id),
|
|
name TEXT NOT NULL,
|
|
content_type TEXT NOT NULL,
|
|
data BLOB NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
created_at DATETIME NOT NULL,
|
|
UNIQUE(job_id, name)
|
|
);
|
|
`
|
|
|
|
// Open creates or opens a SQLite database at path, enables WAL mode, and runs
|
|
// migrations.
|
|
//
|
|
// Why: single entry point ensures WAL mode and schema are always applied.
|
|
// What: opens the DB, sets pragmas, runs CREATE TABLE IF NOT EXISTS.
|
|
// Test: call Open with a temp dir path, assert no error and that tables exist.
|
|
func Open(path string) (*Store, error) {
|
|
// Append pragmas to the DSN so they apply to every connection in the pool.
|
|
dsn := path + "?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)"
|
|
db, err := sql.Open("sqlite", dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open sqlite %q: %w", path, err)
|
|
}
|
|
|
|
if _, err := db.Exec(migration); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("run migration: %w", err)
|
|
}
|
|
|
|
return &Store{db: db}, nil
|
|
}
|
|
|
|
// Close closes the underlying database connection.
|
|
func (s *Store) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
// CreateJob inserts a new job into the queue.
|
|
//
|
|
// Why: the async /jobs endpoint and the sync passthrough both need to enqueue work.
|
|
// What: inserts a job row with state "queued" and returns the stored Job.
|
|
// Test: create a job, then GetJob by ID, assert fields match.
|
|
func (s *Store) CreateJob(job Job) (Job, error) {
|
|
now := time.Now().UTC()
|
|
job.State = JobStateQueued
|
|
job.CreatedAt = now
|
|
job.UpdatedAt = now
|
|
|
|
if job.MaxAttempts == 0 {
|
|
job.MaxAttempts = 3
|
|
}
|
|
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO jobs (id, model, payload, state, attempt, max_attempts, state_webhook_url, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
job.ID, job.Model, []byte(job.Payload), string(job.State),
|
|
job.Attempt, job.MaxAttempts, job.StateWebhookURL,
|
|
job.CreatedAt, job.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return Job{}, fmt.Errorf("insert job %s: %w", job.ID, err)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// GetJob retrieves a job by ID.
|
|
//
|
|
// Why: callers need to poll job status via GET /jobs/{id} and the worker needs to
|
|
// read jobs from the queue.
|
|
// What: queries the jobs table by primary key and scans into a Job struct.
|
|
// Test: create a job, GetJob, assert all fields round-trip correctly.
|
|
func (s *Store) GetJob(id string) (Job, error) {
|
|
var j Job
|
|
var payload, result []byte
|
|
|
|
err := s.db.QueryRow(
|
|
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
|
|
state_webhook_url, created_at, updated_at, started_at, completed_at
|
|
FROM jobs WHERE id = ?`, id,
|
|
).Scan(
|
|
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
|
|
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
|
|
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
|
|
)
|
|
if err != nil {
|
|
return Job{}, fmt.Errorf("get job %s: %w", id, err)
|
|
}
|
|
|
|
j.Payload = json.RawMessage(payload)
|
|
if result != nil {
|
|
j.Result = json.RawMessage(result)
|
|
}
|
|
|
|
return j, nil
|
|
}
|
|
|
|
// UpdateJobState transitions a job to a new state and updates associated fields.
|
|
//
|
|
// Why: the worker loop drives jobs through their lifecycle (queued -> loading ->
|
|
// working -> done/failed), and each transition must be persisted durably.
|
|
// What: updates the state, updated_at, and optionally result/error/timestamps.
|
|
// Test: create a job, advance through states, assert each transition persists.
|
|
func (s *Store) UpdateJobState(id string, state JobState, result json.RawMessage, errMsg *string) error {
|
|
now := time.Now().UTC()
|
|
|
|
var resultBytes []byte
|
|
if result != nil {
|
|
resultBytes = []byte(result)
|
|
}
|
|
|
|
var startedAt, completedAt *time.Time
|
|
switch state {
|
|
case JobStateLoading, JobStateWorking:
|
|
startedAt = &now
|
|
case JobStateDone, JobStateFailed:
|
|
completedAt = &now
|
|
}
|
|
|
|
res, err := s.db.Exec(
|
|
`UPDATE jobs SET state = ?, result = ?, error = ?, updated_at = ?,
|
|
started_at = COALESCE(?, started_at),
|
|
completed_at = COALESCE(?, completed_at)
|
|
WHERE id = ?`,
|
|
string(state), resultBytes, errMsg, now, startedAt, completedAt, id,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update job %s state to %s: %w", id, state, err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("check rows affected for job %s: %w", id, err)
|
|
}
|
|
if rows == 0 {
|
|
return fmt.Errorf("job %s not found", id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListJobs returns jobs, optionally filtered by state. If state is nil, all jobs
|
|
// are returned ordered by created_at descending.
|
|
//
|
|
// Why: the GET /jobs endpoint needs to list jobs with optional state filtering.
|
|
// What: queries the jobs table with an optional WHERE clause on state.
|
|
// Test: create jobs in different states, list with and without filter, assert counts.
|
|
func (s *Store) ListJobs(state *JobState) ([]Job, error) {
|
|
var rows *sql.Rows
|
|
var err error
|
|
|
|
if state != nil {
|
|
rows, err = s.db.Query(
|
|
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
|
|
state_webhook_url, created_at, updated_at, started_at, completed_at
|
|
FROM jobs WHERE state = ? ORDER BY created_at DESC`, string(*state),
|
|
)
|
|
} else {
|
|
rows, err = s.db.Query(
|
|
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
|
|
state_webhook_url, created_at, updated_at, started_at, completed_at
|
|
FROM jobs ORDER BY created_at DESC`,
|
|
)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []Job
|
|
for rows.Next() {
|
|
var j Job
|
|
var payload, result []byte
|
|
|
|
if err := rows.Scan(
|
|
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
|
|
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
|
|
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan job row: %w", err)
|
|
}
|
|
|
|
j.Payload = json.RawMessage(payload)
|
|
if result != nil {
|
|
j.Result = json.RawMessage(result)
|
|
}
|
|
jobs = append(jobs, j)
|
|
}
|
|
|
|
return jobs, rows.Err()
|
|
}
|
|
|
|
// CreateArtifact attaches a named artifact to a job.
|
|
//
|
|
// Why: completed jobs produce artifacts (the completion response, structured data,
|
|
// etc.) that must be stored durably for webhook delivery and polling (ADR-0006).
|
|
// What: inserts a row into the artifacts table with the blob data.
|
|
// Test: create a job, attach an artifact, retrieve it, assert data matches.
|
|
func (s *Store) CreateArtifact(artifact Artifact) (Artifact, error) {
|
|
now := time.Now().UTC()
|
|
artifact.CreatedAt = now
|
|
artifact.Size = int64(len(artifact.Data))
|
|
|
|
res, err := s.db.Exec(
|
|
`INSERT INTO artifacts (job_id, name, content_type, data, size, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)`,
|
|
artifact.JobID, artifact.Name, artifact.ContentType,
|
|
artifact.Data, artifact.Size, artifact.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return Artifact{}, fmt.Errorf("insert artifact %q for job %s: %w", artifact.Name, artifact.JobID, err)
|
|
}
|
|
|
|
id, err := res.LastInsertId()
|
|
if err != nil {
|
|
return Artifact{}, fmt.Errorf("get artifact id: %w", err)
|
|
}
|
|
artifact.ID = id
|
|
|
|
return artifact, nil
|
|
}
|
|
|
|
// GetArtifact retrieves a single artifact by job ID and name.
|
|
//
|
|
// Why: the GET /jobs/{id}/artifacts/{name} endpoint serves individual artifacts.
|
|
// What: queries by the (job_id, name) unique key and returns the full blob.
|
|
// Test: create an artifact, get it by job_id+name, assert data round-trips.
|
|
func (s *Store) GetArtifact(jobID, name string) (Artifact, error) {
|
|
var a Artifact
|
|
|
|
err := s.db.QueryRow(
|
|
`SELECT id, job_id, name, content_type, data, size, created_at
|
|
FROM artifacts WHERE job_id = ? AND name = ?`, jobID, name,
|
|
).Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt)
|
|
if err != nil {
|
|
return Artifact{}, fmt.Errorf("get artifact %q for job %s: %w", name, jobID, err)
|
|
}
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// GetArtifactsByJob returns all artifacts for a given job.
|
|
//
|
|
// Why: the GET /jobs/{id} response includes artifact metadata for the caller to
|
|
// decide which to fetch.
|
|
// What: queries all artifacts by job_id, ordered by name.
|
|
// Test: attach multiple artifacts to a job, list them, assert all returned.
|
|
func (s *Store) GetArtifactsByJob(jobID string) ([]Artifact, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, job_id, name, content_type, data, size, created_at
|
|
FROM artifacts WHERE job_id = ? ORDER BY name`, jobID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list artifacts for job %s: %w", jobID, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var artifacts []Artifact
|
|
for rows.Next() {
|
|
var a Artifact
|
|
if err := rows.Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt); err != nil {
|
|
return nil, fmt.Errorf("scan artifact row: %w", err)
|
|
}
|
|
artifacts = append(artifacts, a)
|
|
}
|
|
|
|
return artifacts, rows.Err()
|
|
}
|
|
|
|
// NextJob returns the next queued job using drain-by-model ordering. Jobs for the
|
|
// currently-resident model are preferred to avoid swap costs, then ordered by
|
|
// creation time.
|
|
//
|
|
// Why: the worker loop must pick the optimal next job to minimize model swaps
|
|
// (ADR-0009 drain-by-model heuristic).
|
|
// What: queries for the first queued job, sorting by model affinity then FIFO.
|
|
// Test: enqueue jobs for two models, set currentModel to one, verify it drains
|
|
// that model first before switching.
|
|
func (s *Store) NextJob(currentModel string) (Job, error) {
|
|
var j Job
|
|
var payload, result []byte
|
|
|
|
err := s.db.QueryRow(
|
|
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
|
|
state_webhook_url, created_at, updated_at, started_at, completed_at
|
|
FROM jobs
|
|
WHERE state = ?
|
|
ORDER BY (CASE WHEN model = ? THEN 0 ELSE 1 END) ASC, created_at ASC
|
|
LIMIT 1`, string(JobStateQueued), currentModel,
|
|
).Scan(
|
|
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
|
|
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
|
|
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
|
|
)
|
|
if err != nil {
|
|
return Job{}, fmt.Errorf("next job: %w", err)
|
|
}
|
|
|
|
j.Payload = json.RawMessage(payload)
|
|
if result != nil {
|
|
j.Result = json.RawMessage(result)
|
|
}
|
|
|
|
return j, nil
|
|
}
|
|
|
|
// IncrementAttempt bumps the attempt counter on a job and resets it to queued.
|
|
//
|
|
// Why: retry logic needs to record each attempt while re-queuing the job.
|
|
// What: increments attempt by 1 and sets state back to queued.
|
|
// Test: create a job, increment twice, verify attempt=2 and state=queued.
|
|
func (s *Store) IncrementAttempt(id string) error {
|
|
now := time.Now().UTC()
|
|
res, err := s.db.Exec(
|
|
`UPDATE jobs SET attempt = attempt + 1, state = ?, updated_at = ? WHERE id = ?`,
|
|
string(JobStateQueued), now, id,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("increment attempt for job %s: %w", id, err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("check rows affected for job %s: %w", id, err)
|
|
}
|
|
if rows == 0 {
|
|
return fmt.Errorf("job %s not found", id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ResetInterruptedJobs moves any loading or working jobs back to queued. Called
|
|
// on startup to recover from a crash mid-execution.
|
|
//
|
|
// Why: if the daemon restarts while a job is in-flight, the job must not be stuck
|
|
// in a non-terminal, non-queued state forever.
|
|
// What: updates all loading/working jobs to queued.
|
|
// Test: create jobs in loading/working states, call Reset, verify all are queued.
|
|
func (s *Store) ResetInterruptedJobs() (int64, error) {
|
|
now := time.Now().UTC()
|
|
res, err := s.db.Exec(
|
|
`UPDATE jobs SET state = ?, updated_at = ? WHERE state IN (?, ?)`,
|
|
string(JobStateQueued), now,
|
|
string(JobStateLoading), string(JobStateWorking),
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("reset interrupted jobs: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
// DeleteTerminalJobsBefore deletes terminal jobs (done or failed) and their
|
|
// artifacts older than the given cutoff time.
|
|
//
|
|
// Why: prevents unbounded storage growth by pruning old completed work (ADR-0008).
|
|
// What: deletes artifacts first (FK), then jobs with completed_at before cutoff.
|
|
// Test: create old terminal jobs, call with a recent cutoff, verify they are gone.
|
|
func (s *Store) DeleteTerminalJobsBefore(cutoff time.Time) (int64, error) {
|
|
// Delete artifacts for terminal jobs first (foreign key).
|
|
_, err := s.db.Exec(
|
|
`DELETE FROM artifacts WHERE job_id IN (
|
|
SELECT id FROM jobs WHERE state IN (?, ?) AND completed_at < ?
|
|
)`,
|
|
string(JobStateDone), string(JobStateFailed), cutoff,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("delete old artifacts: %w", err)
|
|
}
|
|
|
|
res, err := s.db.Exec(
|
|
`DELETE FROM jobs WHERE state IN (?, ?) AND completed_at < ?`,
|
|
string(JobStateDone), string(JobStateFailed), cutoff,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("delete old jobs: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
|
|
return rows, nil
|
|
}
|