feat: scaffold project with config, store, health endpoint, CI, and Dockerfile

Phase 1 of foreman: initialize the Go module, project layout, and core
infrastructure. Includes env-based configuration (FOREMAN_* namespace),
SQLite-backed durable job queue with WAL mode via modernc.org/sqlite,
stdlib HTTP server with /healthz and optional bearer-token auth middleware,
subcommand dispatch (serve + stubs), Gitea CI workflow, multi-stage
distroless Dockerfile, and comprehensive tests for all packages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-23 17:58:36 -04:00
parent d5702f7a75
commit 9cdf4b2472
15 changed files with 1474 additions and 0 deletions
+366
View File
@@ -0,0 +1,366 @@
// Package store provides a SQLite-backed durable queue for foreman jobs and artifacts.
//
// Why: jobs must survive daemon restarts so async callers and webhooks never lose
// work (ADR-0008). SQLite in WAL mode gives durable single-writer/multi-reader
// semantics with no external dependencies.
// What: opens a SQLite database, runs migrations, and exposes CRUD for jobs and
// artifacts.
// Test: use t.TempDir() for an isolated DB per test; verify all CRUD operations
// and state transitions.
package store
import (
"database/sql"
"encoding/json"
"fmt"
"time"
_ "modernc.org/sqlite"
)
// JobState represents the lifecycle state of a job.
type JobState string
const (
JobStateQueued JobState = "queued"
JobStateLoading JobState = "loading"
JobStateWorking JobState = "working"
JobStateDone JobState = "done"
JobStateFailed JobState = "failed"
)
// Job represents a queued unit of work.
type Job struct {
ID string `json:"id"`
Model string `json:"model"`
Payload json.RawMessage `json:"payload"`
State JobState `json:"state"`
Result json.RawMessage `json:"result,omitempty"`
Error *string `json:"error,omitempty"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
StateWebhookURL *string `json:"state_webhook_url,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// Artifact represents a named, typed blob attached to a completed job.
type Artifact struct {
ID int64 `json:"id"`
JobID string `json:"job_id"`
Name string `json:"name"`
ContentType string `json:"content_type"`
Data []byte `json:"-"`
Size int64 `json:"size"`
CreatedAt time.Time `json:"created_at"`
}
// Store wraps a SQLite database with job and artifact operations.
type Store struct {
db *sql.DB
}
// migration is the DDL that creates the schema. It runs once on Open via
// IF NOT EXISTS guards.
const migration = `
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
model TEXT NOT NULL,
payload BLOB NOT NULL,
state TEXT NOT NULL DEFAULT 'queued',
result BLOB,
error TEXT,
attempt INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
state_webhook_url TEXT,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
started_at DATETIME,
completed_at DATETIME
);
CREATE INDEX IF NOT EXISTS idx_jobs_state ON jobs(state);
CREATE INDEX IF NOT EXISTS idx_jobs_model_state ON jobs(model, state);
CREATE TABLE IF NOT EXISTS artifacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL REFERENCES jobs(id),
name TEXT NOT NULL,
content_type TEXT NOT NULL,
data BLOB NOT NULL,
size INTEGER NOT NULL,
created_at DATETIME NOT NULL,
UNIQUE(job_id, name)
);
`
// Open creates or opens a SQLite database at path, enables WAL mode, and runs
// migrations.
//
// Why: single entry point ensures WAL mode and schema are always applied.
// What: opens the DB, sets pragmas, runs CREATE TABLE IF NOT EXISTS.
// Test: call Open with a temp dir path, assert no error and that tables exist.
func Open(path string) (*Store, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("open sqlite %q: %w", path, err)
}
// Enable WAL mode for concurrent readers.
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
db.Close()
return nil, fmt.Errorf("enable WAL mode: %w", err)
}
// Enable foreign keys.
if _, err := db.Exec("PRAGMA foreign_keys=ON"); err != nil {
db.Close()
return nil, fmt.Errorf("enable foreign keys: %w", err)
}
if _, err := db.Exec(migration); err != nil {
db.Close()
return nil, fmt.Errorf("run migration: %w", err)
}
return &Store{db: db}, nil
}
// Close closes the underlying database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// CreateJob inserts a new job into the queue.
//
// Why: the async /jobs endpoint and the sync passthrough both need to enqueue work.
// What: inserts a job row with state "queued" and returns the stored Job.
// Test: create a job, then GetJob by ID, assert fields match.
func (s *Store) CreateJob(job Job) (Job, error) {
now := time.Now().UTC()
job.State = JobStateQueued
job.CreatedAt = now
job.UpdatedAt = now
if job.MaxAttempts == 0 {
job.MaxAttempts = 3
}
_, err := s.db.Exec(
`INSERT INTO jobs (id, model, payload, state, attempt, max_attempts, state_webhook_url, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
job.ID, job.Model, []byte(job.Payload), string(job.State),
job.Attempt, job.MaxAttempts, job.StateWebhookURL,
job.CreatedAt, job.UpdatedAt,
)
if err != nil {
return Job{}, fmt.Errorf("insert job %s: %w", job.ID, err)
}
return job, nil
}
// GetJob retrieves a job by ID.
//
// Why: callers need to poll job status via GET /jobs/{id} and the worker needs to
// read jobs from the queue.
// What: queries the jobs table by primary key and scans into a Job struct.
// Test: create a job, GetJob, assert all fields round-trip correctly.
func (s *Store) GetJob(id string) (Job, error) {
var j Job
var payload, result []byte
err := s.db.QueryRow(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs WHERE id = ?`, id,
).Scan(
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
)
if err != nil {
return Job{}, fmt.Errorf("get job %s: %w", id, err)
}
j.Payload = json.RawMessage(payload)
if result != nil {
j.Result = json.RawMessage(result)
}
return j, nil
}
// UpdateJobState transitions a job to a new state and updates associated fields.
//
// Why: the worker loop drives jobs through their lifecycle (queued -> loading ->
// working -> done/failed), and each transition must be persisted durably.
// What: updates the state, updated_at, and optionally result/error/timestamps.
// Test: create a job, advance through states, assert each transition persists.
func (s *Store) UpdateJobState(id string, state JobState, result json.RawMessage, errMsg *string) error {
now := time.Now().UTC()
var resultBytes []byte
if result != nil {
resultBytes = []byte(result)
}
var startedAt, completedAt *time.Time
switch state {
case JobStateLoading, JobStateWorking:
startedAt = &now
case JobStateDone, JobStateFailed:
completedAt = &now
}
res, err := s.db.Exec(
`UPDATE jobs SET state = ?, result = ?, error = ?, updated_at = ?,
started_at = COALESCE(?, started_at),
completed_at = COALESCE(?, completed_at)
WHERE id = ?`,
string(state), resultBytes, errMsg, now, startedAt, completedAt, id,
)
if err != nil {
return fmt.Errorf("update job %s state to %s: %w", id, state, err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected for job %s: %w", id, err)
}
if rows == 0 {
return fmt.Errorf("job %s not found", id)
}
return nil
}
// ListJobs returns jobs, optionally filtered by state. If state is nil, all jobs
// are returned ordered by created_at descending.
//
// Why: the GET /jobs endpoint needs to list jobs with optional state filtering.
// What: queries the jobs table with an optional WHERE clause on state.
// Test: create jobs in different states, list with and without filter, assert counts.
func (s *Store) ListJobs(state *JobState) ([]Job, error) {
var rows *sql.Rows
var err error
if state != nil {
rows, err = s.db.Query(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs WHERE state = ? ORDER BY created_at DESC`, string(*state),
)
} else {
rows, err = s.db.Query(
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
state_webhook_url, created_at, updated_at, started_at, completed_at
FROM jobs ORDER BY created_at DESC`,
)
}
if err != nil {
return nil, fmt.Errorf("list jobs: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var j Job
var payload, result []byte
if err := rows.Scan(
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
); err != nil {
return nil, fmt.Errorf("scan job row: %w", err)
}
j.Payload = json.RawMessage(payload)
if result != nil {
j.Result = json.RawMessage(result)
}
jobs = append(jobs, j)
}
return jobs, rows.Err()
}
// CreateArtifact attaches a named artifact to a job.
//
// Why: completed jobs produce artifacts (the completion response, structured data,
// etc.) that must be stored durably for webhook delivery and polling (ADR-0006).
// What: inserts a row into the artifacts table with the blob data.
// Test: create a job, attach an artifact, retrieve it, assert data matches.
func (s *Store) CreateArtifact(artifact Artifact) (Artifact, error) {
now := time.Now().UTC()
artifact.CreatedAt = now
artifact.Size = int64(len(artifact.Data))
res, err := s.db.Exec(
`INSERT INTO artifacts (job_id, name, content_type, data, size, created_at)
VALUES (?, ?, ?, ?, ?, ?)`,
artifact.JobID, artifact.Name, artifact.ContentType,
artifact.Data, artifact.Size, artifact.CreatedAt,
)
if err != nil {
return Artifact{}, fmt.Errorf("insert artifact %q for job %s: %w", artifact.Name, artifact.JobID, err)
}
id, err := res.LastInsertId()
if err != nil {
return Artifact{}, fmt.Errorf("get artifact id: %w", err)
}
artifact.ID = id
return artifact, nil
}
// GetArtifact retrieves a single artifact by job ID and name.
//
// Why: the GET /jobs/{id}/artifacts/{name} endpoint serves individual artifacts.
// What: queries by the (job_id, name) unique key and returns the full blob.
// Test: create an artifact, get it by job_id+name, assert data round-trips.
func (s *Store) GetArtifact(jobID, name string) (Artifact, error) {
var a Artifact
err := s.db.QueryRow(
`SELECT id, job_id, name, content_type, data, size, created_at
FROM artifacts WHERE job_id = ? AND name = ?`, jobID, name,
).Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt)
if err != nil {
return Artifact{}, fmt.Errorf("get artifact %q for job %s: %w", name, jobID, err)
}
return a, nil
}
// GetArtifactsByJob returns all artifacts for a given job.
//
// Why: the GET /jobs/{id} response includes artifact metadata for the caller to
// decide which to fetch.
// What: queries all artifacts by job_id, ordered by name.
// Test: attach multiple artifacts to a job, list them, assert all returned.
func (s *Store) GetArtifactsByJob(jobID string) ([]Artifact, error) {
rows, err := s.db.Query(
`SELECT id, job_id, name, content_type, data, size, created_at
FROM artifacts WHERE job_id = ? ORDER BY name`, jobID,
)
if err != nil {
return nil, fmt.Errorf("list artifacts for job %s: %w", jobID, err)
}
defer rows.Close()
var artifacts []Artifact
for rows.Next() {
var a Artifact
if err := rows.Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt); err != nil {
return nil, fmt.Errorf("scan artifact row: %w", err)
}
artifacts = append(artifacts, a)
}
return artifacts, rows.Err()
}
+376
View File
@@ -0,0 +1,376 @@
package store
import (
"database/sql"
"encoding/json"
"errors"
"path/filepath"
"testing"
)
// openTestDB creates a fresh SQLite store in a temp directory for test isolation.
func openTestDB(t *testing.T) *Store {
t.Helper()
path := filepath.Join(t.TempDir(), "test.db")
s, err := Open(path)
if err != nil {
t.Fatalf("Open(%q): %v", path, err)
}
t.Cleanup(func() { s.Close() })
return s
}
func TestOpen_CreatesTablesAndWAL(t *testing.T) {
s := openTestDB(t)
// Verify WAL mode is active.
var mode string
if err := s.db.QueryRow("PRAGMA journal_mode").Scan(&mode); err != nil {
t.Fatalf("query journal_mode: %v", err)
}
if mode != "wal" {
t.Errorf("journal_mode = %q, want %q", mode, "wal")
}
// Verify tables exist by querying their metadata.
for _, table := range []string{"jobs", "artifacts"} {
var name string
err := s.db.QueryRow(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", table,
).Scan(&name)
if err != nil {
t.Errorf("table %q not found: %v", table, err)
}
}
}
func TestCreateJob_And_GetJob(t *testing.T) {
s := openTestDB(t)
webhook := "http://example.com/webhook"
job := Job{
ID: "01ARZ3NDEKTSV4RRFFQ69G5FAV",
Model: "qwen3:30b",
Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`),
MaxAttempts: 5,
StateWebhookURL: &webhook,
}
created, err := s.CreateJob(job)
if err != nil {
t.Fatalf("CreateJob: %v", err)
}
if created.State != JobStateQueued {
t.Errorf("State = %q, want %q", created.State, JobStateQueued)
}
if created.CreatedAt.IsZero() {
t.Error("CreatedAt should be set")
}
if created.MaxAttempts != 5 {
t.Errorf("MaxAttempts = %d, want 5", created.MaxAttempts)
}
got, err := s.GetJob(created.ID)
if err != nil {
t.Fatalf("GetJob: %v", err)
}
if got.ID != created.ID {
t.Errorf("ID = %q, want %q", got.ID, created.ID)
}
if got.Model != "qwen3:30b" {
t.Errorf("Model = %q, want %q", got.Model, "qwen3:30b")
}
if got.State != JobStateQueued {
t.Errorf("State = %q, want %q", got.State, JobStateQueued)
}
if got.StateWebhookURL == nil || *got.StateWebhookURL != webhook {
t.Errorf("StateWebhookURL = %v, want %q", got.StateWebhookURL, webhook)
}
}
func TestCreateJob_DefaultMaxAttempts(t *testing.T) {
s := openTestDB(t)
job := Job{
ID: "01ARZ3NDEKTSV4RRFFQ69G5FA2",
Model: "qwen3:14b",
Payload: json.RawMessage(`{}`),
}
created, err := s.CreateJob(job)
if err != nil {
t.Fatalf("CreateJob: %v", err)
}
if created.MaxAttempts != 3 {
t.Errorf("MaxAttempts = %d, want 3 (default)", created.MaxAttempts)
}
}
func TestGetJob_NotFound(t *testing.T) {
s := openTestDB(t)
_, err := s.GetJob("nonexistent")
if !errors.Is(err, sql.ErrNoRows) {
t.Errorf("GetJob(nonexistent) error = %v, want sql.ErrNoRows wrapped", err)
}
}
func TestUpdateJobState(t *testing.T) {
s := openTestDB(t)
job := Job{
ID: "01ARZ3NDEKTSV4RRFFQ69G5FA3",
Model: "qwen3:30b",
Payload: json.RawMessage(`{}`),
}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
// Transition: queued -> loading
if err := s.UpdateJobState(job.ID, JobStateLoading, nil, nil); err != nil {
t.Fatalf("UpdateJobState to loading: %v", err)
}
got, _ := s.GetJob(job.ID)
if got.State != JobStateLoading {
t.Errorf("State = %q, want %q", got.State, JobStateLoading)
}
if got.StartedAt == nil {
t.Error("StartedAt should be set after loading")
}
// Transition: loading -> working
if err := s.UpdateJobState(job.ID, JobStateWorking, nil, nil); err != nil {
t.Fatalf("UpdateJobState to working: %v", err)
}
// Transition: working -> done with result
result := json.RawMessage(`{"response":"hello"}`)
if err := s.UpdateJobState(job.ID, JobStateDone, result, nil); err != nil {
t.Fatalf("UpdateJobState to done: %v", err)
}
got, _ = s.GetJob(job.ID)
if got.State != JobStateDone {
t.Errorf("State = %q, want %q", got.State, JobStateDone)
}
if got.CompletedAt == nil {
t.Error("CompletedAt should be set after done")
}
if string(got.Result) != `{"response":"hello"}` {
t.Errorf("Result = %s, want %s", got.Result, `{"response":"hello"}`)
}
}
func TestUpdateJobState_Failed(t *testing.T) {
s := openTestDB(t)
job := Job{
ID: "01ARZ3NDEKTSV4RRFFQ69G5FA4",
Model: "qwen3:30b",
Payload: json.RawMessage(`{}`),
}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
errMsg := "target unreachable after 3 attempts"
if err := s.UpdateJobState(job.ID, JobStateFailed, nil, &errMsg); err != nil {
t.Fatalf("UpdateJobState to failed: %v", err)
}
got, _ := s.GetJob(job.ID)
if got.State != JobStateFailed {
t.Errorf("State = %q, want %q", got.State, JobStateFailed)
}
if got.Error == nil || *got.Error != errMsg {
t.Errorf("Error = %v, want %q", got.Error, errMsg)
}
}
func TestUpdateJobState_NotFound(t *testing.T) {
s := openTestDB(t)
err := s.UpdateJobState("nonexistent", JobStateDone, nil, nil)
if err == nil {
t.Error("UpdateJobState for nonexistent job should fail")
}
}
func TestListJobs_All(t *testing.T) {
s := openTestDB(t)
for i, id := range []string{"01A", "01B", "01C"} {
state := JobStateQueued
job := Job{ID: id, Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob[%d]: %v", i, err)
}
if i == 1 {
_ = s.UpdateJobState(id, JobStateDone, nil, nil)
_ = state // suppress unused warning
}
}
all, err := s.ListJobs(nil)
if err != nil {
t.Fatalf("ListJobs(nil): %v", err)
}
if len(all) != 3 {
t.Errorf("len = %d, want 3", len(all))
}
}
func TestListJobs_FilterByState(t *testing.T) {
s := openTestDB(t)
for _, id := range []string{"01D", "01E", "01F"} {
job := Job{ID: id, Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
}
// Move one to done.
_ = s.UpdateJobState("01E", JobStateDone, nil, nil)
queued := JobStateQueued
jobs, err := s.ListJobs(&queued)
if err != nil {
t.Fatalf("ListJobs(queued): %v", err)
}
if len(jobs) != 2 {
t.Errorf("len = %d, want 2", len(jobs))
}
for _, j := range jobs {
if j.State != JobStateQueued {
t.Errorf("unexpected state %q in filtered results", j.State)
}
}
}
func TestCreateArtifact_And_GetArtifact(t *testing.T) {
s := openTestDB(t)
// Need a job first (foreign key).
job := Job{ID: "01ART", Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
artifact := Artifact{
JobID: "01ART",
Name: "completion",
ContentType: "application/json",
Data: []byte(`{"response":"hello world"}`),
}
created, err := s.CreateArtifact(artifact)
if err != nil {
t.Fatalf("CreateArtifact: %v", err)
}
if created.ID == 0 {
t.Error("ID should be set after insert")
}
if created.Size != int64(len(artifact.Data)) {
t.Errorf("Size = %d, want %d", created.Size, len(artifact.Data))
}
got, err := s.GetArtifact("01ART", "completion")
if err != nil {
t.Fatalf("GetArtifact: %v", err)
}
if string(got.Data) != `{"response":"hello world"}` {
t.Errorf("Data = %q, want %q", got.Data, `{"response":"hello world"}`)
}
if got.ContentType != "application/json" {
t.Errorf("ContentType = %q", got.ContentType)
}
}
func TestGetArtifact_NotFound(t *testing.T) {
s := openTestDB(t)
_, err := s.GetArtifact("noexist", "noname")
if err == nil {
t.Error("GetArtifact should fail for nonexistent artifact")
}
}
func TestGetArtifactsByJob(t *testing.T) {
s := openTestDB(t)
job := Job{ID: "01ARTS", Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
for _, name := range []string{"completion", "metadata"} {
_, err := s.CreateArtifact(Artifact{
JobID: "01ARTS",
Name: name,
ContentType: "application/json",
Data: []byte(`{}`),
})
if err != nil {
t.Fatalf("CreateArtifact(%q): %v", name, err)
}
}
artifacts, err := s.GetArtifactsByJob("01ARTS")
if err != nil {
t.Fatalf("GetArtifactsByJob: %v", err)
}
if len(artifacts) != 2 {
t.Errorf("len = %d, want 2", len(artifacts))
}
// Should be ordered by name.
if artifacts[0].Name != "completion" {
t.Errorf("artifacts[0].Name = %q, want %q", artifacts[0].Name, "completion")
}
if artifacts[1].Name != "metadata" {
t.Errorf("artifacts[1].Name = %q, want %q", artifacts[1].Name, "metadata")
}
}
func TestCreateArtifact_DuplicateNameFails(t *testing.T) {
s := openTestDB(t)
job := Job{ID: "01DUP", Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
a := Artifact{
JobID: "01DUP",
Name: "completion",
ContentType: "application/json",
Data: []byte(`{}`),
}
if _, err := s.CreateArtifact(a); err != nil {
t.Fatalf("first CreateArtifact: %v", err)
}
_, err := s.CreateArtifact(a)
if err == nil {
t.Error("duplicate artifact (job_id, name) should fail")
}
}
func TestGetArtifactsByJob_Empty(t *testing.T) {
s := openTestDB(t)
job := Job{ID: "01EMPTY", Model: "m", Payload: json.RawMessage(`{}`)}
if _, err := s.CreateJob(job); err != nil {
t.Fatalf("CreateJob: %v", err)
}
artifacts, err := s.GetArtifactsByJob("01EMPTY")
if err != nil {
t.Fatalf("GetArtifactsByJob: %v", err)
}
if len(artifacts) != 0 {
t.Errorf("len = %d, want 0", len(artifacts))
}
}