diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..d81dbe2 --- /dev/null +++ b/.env.example @@ -0,0 +1,26 @@ +# foreman configuration — all env vars are FOREMAN_* namespaced. +# Copy to .env and fill in values for local development. + +# Listen address for the HTTP server (default: :8080) +FOREMAN_ADDR=:8080 + +# Base URL of the Ollama target (required) +FOREMAN_OLLAMA_URL=http://mac.tail:11434 + +# Optional bearer token foreman sends to the Ollama target +FOREMAN_OLLAMA_TOKEN= + +# Optional bearer token callers must present to foreman +FOREMAN_TOKEN= + +# Always-resident embedder model (e.g. nomic-embed-text, qwen3-embedding:0.6b) +FOREMAN_EMBED_MODEL=nomic-embed-text + +# Path to the SQLite database file (default: foreman.db) +FOREMAN_DB_PATH=foreman.db + +# How often to poll the target's /api/tags (default: 30s) +FOREMAN_POLL_INTERVAL=30s + +# Optional HMAC key for signing webhook payloads (ADR-0005) +FOREMAN_WEBHOOK_SECRET= diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml new file mode 100644 index 0000000..2eed90a --- /dev/null +++ b/.gitea/workflows/ci.yaml @@ -0,0 +1,26 @@ +name: CI +on: + push: { branches: ["*"] } + pull_request: { branches: ["*"] } +jobs: + build: + name: Build & Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: { go-version-file: "go.mod" } + - run: go mod download + - run: go build ./... + - run: go vet ./... + - run: go test -race -count=1 ./... + tidy: + name: Tidy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: { go-version-file: "go.mod" } + - run: | + go mod tidy + git diff --exit-code go.mod go.sum diff --git a/.gitignore b/.gitignore index 5d2f263..0ab90d0 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,13 @@ coverage.* .env.local *.local +# Vendor +vendor/ + +# Go workspace +go.work +go.work.sum + # Editor / OS cruft .DS_Store .idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0835faa --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.26-bookworm AS build +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -o /out/foreman ./cmd/foreman + +FROM gcr.io/distroless/static-debian12 +COPY --from=build /out/foreman /foreman +EXPOSE 8080 +ENTRYPOINT ["/foreman", "serve"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..59a496e --- /dev/null +++ b/README.md @@ -0,0 +1,63 @@ +# foreman + +A small, always-on Go daemon that fronts **one** Ollama target. It turns a +single Ollama instance into a queued, observable job endpoint: it polls the +target's installed models, serializes work through the target (managing model +swaps), assigns every job an ID, and reports progress via webhooks. + +On the wire it speaks **native Ollama**, so it doubles as a drop-in `go-llm` +target. + +## Quickstart + +```bash +# Set the required Ollama target URL +export FOREMAN_OLLAMA_URL=http://mac.tail:11434 + +# Run directly +go run ./cmd/foreman serve + +# Or build and run +go build -o foreman ./cmd/foreman +./foreman serve +``` + +## Docker + +```bash +docker build -t foreman . +docker run -e FOREMAN_OLLAMA_URL=http://mac.tail:11434 -p 8080:8080 foreman +``` + +## Configuration + +All configuration is via environment variables, namespaced under `FOREMAN_*`. +See [`.env.example`](.env.example) for the full list. + +| Variable | Default | Description | +|---|---|---| +| `FOREMAN_ADDR` | `:8080` | Listen address | +| `FOREMAN_OLLAMA_URL` | *(required)* | Ollama target base URL | +| `FOREMAN_OLLAMA_TOKEN` | *(empty)* | Bearer token sent to the target | +| `FOREMAN_TOKEN` | *(empty)* | Bearer token callers must present | +| `FOREMAN_EMBED_MODEL` | *(empty)* | Always-resident embedder model | +| `FOREMAN_DB_PATH` | `foreman.db` | SQLite database path | +| `FOREMAN_POLL_INTERVAL` | `30s` | Target model poll interval | +| `FOREMAN_WEBHOOK_SECRET` | *(empty)* | HMAC key for webhook signing | + +## Health check + +```bash +curl http://localhost:8080/healthz +# {"status":"ok","degraded":false} +``` + +## Architecture + +See [`docs/adr/`](docs/adr/) for design decisions. Key points: + +- One daemon per Ollama target (ADR-0001) +- SQLite-backed durable job queue in WAL mode (ADR-0008) +- Single worker loop with drain-by-model scheduling (ADR-0009) +- Native Ollama passthrough + async `/jobs` surface (ADR-0003, ADR-0004) +- Embeddings bypass the queue entirely (ADR-0013) diff --git a/cmd/foreman/main.go b/cmd/foreman/main.go new file mode 100644 index 0000000..9af03f6 --- /dev/null +++ b/cmd/foreman/main.go @@ -0,0 +1,78 @@ +// Package main is the entry point for the foreman daemon. +// +// Why: foreman is a single binary with subcommands; the main package handles +// argument dispatch and wiring. +// What: parses subcommands (serve, submit, jobs, ps) and runs the selected one. +// Test: build and run with --help; test individual packages separately. +package main + +import ( + "fmt" + "log/slog" + "os" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/config" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/server" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + cmd := "serve" + if len(os.Args) > 1 { + cmd = os.Args[1] + } + + switch cmd { + case "serve": + if err := runServe(logger); err != nil { + logger.Error("serve failed", "error", err) + os.Exit(1) + } + case "submit": + fmt.Fprintln(os.Stderr, "submit: not yet implemented") + os.Exit(1) + case "jobs": + fmt.Fprintln(os.Stderr, "jobs: not yet implemented") + os.Exit(1) + case "ps": + fmt.Fprintln(os.Stderr, "ps: not yet implemented") + os.Exit(1) + default: + fmt.Fprintf(os.Stderr, "unknown command: %s\nusage: foreman [serve|submit|jobs|ps]\n", cmd) + os.Exit(1) + } +} + +// runServe loads configuration, opens the store, and starts the HTTP server. +// +// Why: the serve subcommand is the daemon's primary mode of operation. +// What: wires config -> store -> server and blocks on ListenAndServe. +// Test: tested indirectly via integration tests; each component is unit tested. +func runServe(logger *slog.Logger) error { + cfg, err := config.Load() + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + logger.Info("configuration loaded", + "addr", cfg.Addr, + "ollama_url", cfg.OllamaURL, + "db_path", cfg.DBPath, + "poll_interval", cfg.PollInterval, + "embed_model", cfg.EmbedModel, + "auth_enabled", cfg.Token != "", + ) + + st, err := store.Open(cfg.DBPath) + if err != nil { + return fmt.Errorf("open store: %w", err) + } + defer st.Close() + + srv := server.New(cfg, st, logger) + return srv.ListenAndServe() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cae6b43 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module gitea.stevedudenhoeffer.com/steve/foreman + +go 1.26.2 + +require modernc.org/sqlite v1.50.1 + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/sys v0.42.0 // indirect + modernc.org/libc v1.72.3 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..38131a8 --- /dev/null +++ b/go.sum @@ -0,0 +1,51 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= +modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= +modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= +modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU= +modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg= +modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.50.1 h1:l+cQvn0sd0zJJtfygGHuQJ5AjlrwXmWPw4KP3ZMwr9w= +modernc.org/sqlite v1.50.1/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..a90f219 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,81 @@ +// Package config loads foreman's runtime configuration from environment variables. +// +// Why: centralizes all env-based configuration into a single, validated struct so +// callers never read raw env vars directly. +// What: reads FOREMAN_* environment variables, applies defaults, and validates +// required values. +// Test: set required env vars, call Load(), assert fields match; omit required vars, +// assert Load() returns an error. +package config + +import ( + "fmt" + "os" + "time" +) + +// Config holds all runtime configuration for the foreman daemon. +type Config struct { + // Addr is the listen address for the HTTP server (default ":8080"). + Addr string + + // OllamaURL is the base URL of the Ollama target (required). + OllamaURL string + + // OllamaToken is an optional bearer token sent to the Ollama target. + OllamaToken string + + // Token is an optional bearer token that callers must present to foreman. + Token string + + // EmbedModel is the always-resident embedder model name (e.g. "nomic-embed-text"). + EmbedModel string + + // DBPath is the path to the SQLite database file (default "foreman.db"). + DBPath string + + // PollInterval controls how often the model poller hits the target (default 30s). + PollInterval time.Duration + + // WebhookSecret is an optional HMAC key for signing webhook payloads. + WebhookSecret string +} + +// Load reads configuration from environment variables and returns a validated Config. +// +// Why: provides a single entry point for configuration with sensible defaults. +// What: reads FOREMAN_* env vars, applies defaults, validates required fields. +// Test: call with FOREMAN_OLLAMA_URL set, assert success; call without it, assert error. +func Load() (Config, error) { + cfg := Config{ + Addr: envOr("FOREMAN_ADDR", ":8080"), + OllamaURL: os.Getenv("FOREMAN_OLLAMA_URL"), + OllamaToken: os.Getenv("FOREMAN_OLLAMA_TOKEN"), + Token: os.Getenv("FOREMAN_TOKEN"), + EmbedModel: os.Getenv("FOREMAN_EMBED_MODEL"), + DBPath: envOr("FOREMAN_DB_PATH", "foreman.db"), + WebhookSecret: os.Getenv("FOREMAN_WEBHOOK_SECRET"), + } + + pollStr := envOr("FOREMAN_POLL_INTERVAL", "30s") + dur, err := time.ParseDuration(pollStr) + if err != nil { + return Config{}, fmt.Errorf("invalid FOREMAN_POLL_INTERVAL %q: %w", pollStr, err) + } + cfg.PollInterval = dur + + if cfg.OllamaURL == "" { + return Config{}, fmt.Errorf("FOREMAN_OLLAMA_URL is required") + } + + return cfg, nil +} + +// envOr returns the value of the environment variable named by key, or fallback +// if the variable is empty or unset. +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..694ee32 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,96 @@ +package config + +import ( + "os" + "testing" + "time" +) + +func TestLoad_Defaults(t *testing.T) { + // Set required env var, leave optional ones at defaults. + t.Setenv("FOREMAN_OLLAMA_URL", "http://localhost:11434") + // Clear any other vars that might be set. + t.Setenv("FOREMAN_ADDR", "") + t.Setenv("FOREMAN_DB_PATH", "") + t.Setenv("FOREMAN_POLL_INTERVAL", "") + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if cfg.Addr != ":8080" { + t.Errorf("Addr = %q, want %q", cfg.Addr, ":8080") + } + if cfg.OllamaURL != "http://localhost:11434" { + t.Errorf("OllamaURL = %q, want %q", cfg.OllamaURL, "http://localhost:11434") + } + if cfg.DBPath != "foreman.db" { + t.Errorf("DBPath = %q, want %q", cfg.DBPath, "foreman.db") + } + if cfg.PollInterval != 30*time.Second { + t.Errorf("PollInterval = %v, want %v", cfg.PollInterval, 30*time.Second) + } +} + +func TestLoad_AllEnvVars(t *testing.T) { + t.Setenv("FOREMAN_ADDR", ":9090") + t.Setenv("FOREMAN_OLLAMA_URL", "http://mac.tail:11434") + t.Setenv("FOREMAN_OLLAMA_TOKEN", "ollama-secret") + t.Setenv("FOREMAN_TOKEN", "my-token") + t.Setenv("FOREMAN_EMBED_MODEL", "nomic-embed-text") + t.Setenv("FOREMAN_DB_PATH", "/data/foreman.db") + t.Setenv("FOREMAN_POLL_INTERVAL", "1m") + t.Setenv("FOREMAN_WEBHOOK_SECRET", "hmac-key") + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if cfg.Addr != ":9090" { + t.Errorf("Addr = %q, want %q", cfg.Addr, ":9090") + } + if cfg.OllamaURL != "http://mac.tail:11434" { + t.Errorf("OllamaURL = %q", cfg.OllamaURL) + } + if cfg.OllamaToken != "ollama-secret" { + t.Errorf("OllamaToken = %q", cfg.OllamaToken) + } + if cfg.Token != "my-token" { + t.Errorf("Token = %q", cfg.Token) + } + if cfg.EmbedModel != "nomic-embed-text" { + t.Errorf("EmbedModel = %q", cfg.EmbedModel) + } + if cfg.DBPath != "/data/foreman.db" { + t.Errorf("DBPath = %q", cfg.DBPath) + } + if cfg.PollInterval != time.Minute { + t.Errorf("PollInterval = %v, want %v", cfg.PollInterval, time.Minute) + } + if cfg.WebhookSecret != "hmac-key" { + t.Errorf("WebhookSecret = %q", cfg.WebhookSecret) + } +} + +func TestLoad_MissingOllamaURL(t *testing.T) { + // Ensure FOREMAN_OLLAMA_URL is unset. + os.Unsetenv("FOREMAN_OLLAMA_URL") + t.Setenv("FOREMAN_OLLAMA_URL", "") + + _, err := Load() + if err == nil { + t.Fatal("Load() should fail when FOREMAN_OLLAMA_URL is empty") + } +} + +func TestLoad_InvalidPollInterval(t *testing.T) { + t.Setenv("FOREMAN_OLLAMA_URL", "http://localhost:11434") + t.Setenv("FOREMAN_POLL_INTERVAL", "not-a-duration") + + _, err := Load() + if err == nil { + t.Fatal("Load() should fail with invalid FOREMAN_POLL_INTERVAL") + } +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..bfbc68d --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,117 @@ +// Package server provides the HTTP API for the foreman daemon. +// +// Why: foreman exposes a native Ollama-compatible API plus async job endpoints; +// centralizing routing and middleware here keeps cmd/foreman thin. +// What: creates a stdlib net/http server with health checks, optional bearer-token +// auth, and an extensible mux for later phases. +// Test: start the server with httptest, hit /healthz, verify 200; set a token, +// verify 401 without it. +package server + +import ( + "encoding/json" + "log/slog" + "net/http" + "strings" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/config" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" +) + +// Server holds the HTTP server and its dependencies. +type Server struct { + cfg config.Config + store *store.Store + mux *http.ServeMux + logger *slog.Logger +} + +// New creates a new Server with the given configuration and store. The mux is +// populated with initial routes; callers can add more before calling ListenAndServe. +// +// Why: dependency injection makes the server testable and extensible. +// What: wires config, store, and logger into the server, registers routes. +// Test: create with New, use httptest to exercise routes. +func New(cfg config.Config, st *store.Store, logger *slog.Logger) *Server { + s := &Server{ + cfg: cfg, + store: st, + mux: http.NewServeMux(), + logger: logger, + } + s.routes() + return s +} + +// Handler returns the server's http.Handler, with auth middleware applied. +// +// Why: allows httptest usage in tests without starting a real listener. +// What: wraps the mux with optional bearer-token middleware. +// Test: call Handler(), use httptest.NewServer, exercise endpoints. +func (s *Server) Handler() http.Handler { + var h http.Handler = s.mux + if s.cfg.Token != "" { + h = s.authMiddleware(h) + } + return h +} + +// ListenAndServe starts the HTTP server on the configured address. +func (s *Server) ListenAndServe() error { + s.logger.Info("starting server", "addr", s.cfg.Addr) + return http.ListenAndServe(s.cfg.Addr, s.Handler()) +} + +// routes registers all HTTP routes on the mux. +func (s *Server) routes() { + s.mux.HandleFunc("GET /healthz", s.handleHealthz) +} + +// healthResponse is the JSON shape returned by /healthz. +type healthResponse struct { + Status string `json:"status"` + Degraded bool `json:"degraded"` +} + +// handleHealthz returns the daemon's health status. The degraded flag is a +// placeholder for the model poller's connectivity state (Phase 2). +func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(healthResponse{ + Status: "ok", + Degraded: false, + }) +} + +// authMiddleware validates the Authorization: Bearer header on all +// requests except /healthz. Returns 401 if the token is missing or wrong. +func (s *Server) authMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // /healthz is always public so load balancers and probes work without auth. + if r.URL.Path == "/healthz" { + next.ServeHTTP(w, r) + return + } + + auth := r.Header.Get("Authorization") + if auth == "" { + http.Error(w, `{"error":"missing authorization header"}`, http.StatusUnauthorized) + return + } + + const prefix = "Bearer " + if !strings.HasPrefix(auth, prefix) { + http.Error(w, `{"error":"invalid authorization header"}`, http.StatusUnauthorized) + return + } + + token := strings.TrimPrefix(auth, prefix) + if token != s.cfg.Token { + http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 0000000..16775a6 --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,140 @@ +package server + +import ( + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/config" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" +) + +// newTestServer creates a Server backed by a temp-dir SQLite store. +func newTestServer(t *testing.T, cfg config.Config) *Server { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + st, err := store.Open(dbPath) + if err != nil { + t.Fatalf("store.Open: %v", err) + } + t.Cleanup(func() { st.Close() }) + + logger := slog.Default() + return New(cfg, st, logger) +} + +func TestHealthz_OK(t *testing.T) { + srv := newTestServer(t, config.Config{ + OllamaURL: "http://localhost:11434", + }) + + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var resp healthResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode response: %v", err) + } + if resp.Status != "ok" { + t.Errorf("status = %q, want %q", resp.Status, "ok") + } + if resp.Degraded { + t.Error("degraded should be false") + } +} + +func TestHealthz_NoAuthRequired(t *testing.T) { + srv := newTestServer(t, config.Config{ + OllamaURL: "http://localhost:11434", + Token: "secret-token", + }) + + // /healthz should work without any auth header even when token is configured. + req := httptest.NewRequest(http.MethodGet, "/healthz", nil) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d, want %d (healthz should bypass auth)", rec.Code, http.StatusOK) + } +} + +func TestAuth_RequiredWhenTokenSet(t *testing.T) { + srv := newTestServer(t, config.Config{ + OllamaURL: "http://localhost:11434", + Token: "secret-token", + }) + + tests := []struct { + name string + path string + auth string + want int + }{ + { + name: "no auth header", + path: "/some-route", + auth: "", + want: http.StatusUnauthorized, + }, + { + name: "wrong token", + path: "/some-route", + auth: "Bearer wrong-token", + want: http.StatusUnauthorized, + }, + { + name: "correct token", + path: "/some-route", + auth: "Bearer secret-token", + // Route doesn't exist so we get 404, but auth passed. + want: http.StatusNotFound, + }, + { + name: "invalid scheme", + path: "/some-route", + auth: "Basic dXNlcjpwYXNz", + want: http.StatusUnauthorized, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, tt.path, nil) + if tt.auth != "" { + req.Header.Set("Authorization", tt.auth) + } + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != tt.want { + t.Errorf("status = %d, want %d", rec.Code, tt.want) + } + }) + } +} + +func TestAuth_NotRequiredWhenNoToken(t *testing.T) { + srv := newTestServer(t, config.Config{ + OllamaURL: "http://localhost:11434", + // Token intentionally empty. + }) + + // Without a configured token, any request should pass auth (even to a + // nonexistent route, which returns 404 rather than 401). + req := httptest.NewRequest(http.MethodGet, "/some-route", nil) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code == http.StatusUnauthorized { + t.Error("should not require auth when no token is configured") + } +} diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..f82dbd7 --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,366 @@ +// Package store provides a SQLite-backed durable queue for foreman jobs and artifacts. +// +// Why: jobs must survive daemon restarts so async callers and webhooks never lose +// work (ADR-0008). SQLite in WAL mode gives durable single-writer/multi-reader +// semantics with no external dependencies. +// What: opens a SQLite database, runs migrations, and exposes CRUD for jobs and +// artifacts. +// Test: use t.TempDir() for an isolated DB per test; verify all CRUD operations +// and state transitions. +package store + +import ( + "database/sql" + "encoding/json" + "fmt" + "time" + + _ "modernc.org/sqlite" +) + +// JobState represents the lifecycle state of a job. +type JobState string + +const ( + JobStateQueued JobState = "queued" + JobStateLoading JobState = "loading" + JobStateWorking JobState = "working" + JobStateDone JobState = "done" + JobStateFailed JobState = "failed" +) + +// Job represents a queued unit of work. +type Job struct { + ID string `json:"id"` + Model string `json:"model"` + Payload json.RawMessage `json:"payload"` + State JobState `json:"state"` + Result json.RawMessage `json:"result,omitempty"` + Error *string `json:"error,omitempty"` + Attempt int `json:"attempt"` + MaxAttempts int `json:"max_attempts"` + StateWebhookURL *string `json:"state_webhook_url,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + StartedAt *time.Time `json:"started_at,omitempty"` + CompletedAt *time.Time `json:"completed_at,omitempty"` +} + +// Artifact represents a named, typed blob attached to a completed job. +type Artifact struct { + ID int64 `json:"id"` + JobID string `json:"job_id"` + Name string `json:"name"` + ContentType string `json:"content_type"` + Data []byte `json:"-"` + Size int64 `json:"size"` + CreatedAt time.Time `json:"created_at"` +} + +// Store wraps a SQLite database with job and artifact operations. +type Store struct { + db *sql.DB +} + +// migration is the DDL that creates the schema. It runs once on Open via +// IF NOT EXISTS guards. +const migration = ` +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + model TEXT NOT NULL, + payload BLOB NOT NULL, + state TEXT NOT NULL DEFAULT 'queued', + result BLOB, + error TEXT, + attempt INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + state_webhook_url TEXT, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL, + started_at DATETIME, + completed_at DATETIME +); + +CREATE INDEX IF NOT EXISTS idx_jobs_state ON jobs(state); +CREATE INDEX IF NOT EXISTS idx_jobs_model_state ON jobs(model, state); + +CREATE TABLE IF NOT EXISTS artifacts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL REFERENCES jobs(id), + name TEXT NOT NULL, + content_type TEXT NOT NULL, + data BLOB NOT NULL, + size INTEGER NOT NULL, + created_at DATETIME NOT NULL, + UNIQUE(job_id, name) +); +` + +// Open creates or opens a SQLite database at path, enables WAL mode, and runs +// migrations. +// +// Why: single entry point ensures WAL mode and schema are always applied. +// What: opens the DB, sets pragmas, runs CREATE TABLE IF NOT EXISTS. +// Test: call Open with a temp dir path, assert no error and that tables exist. +func Open(path string) (*Store, error) { + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, fmt.Errorf("open sqlite %q: %w", path, err) + } + + // Enable WAL mode for concurrent readers. + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + db.Close() + return nil, fmt.Errorf("enable WAL mode: %w", err) + } + + // Enable foreign keys. + if _, err := db.Exec("PRAGMA foreign_keys=ON"); err != nil { + db.Close() + return nil, fmt.Errorf("enable foreign keys: %w", err) + } + + if _, err := db.Exec(migration); err != nil { + db.Close() + return nil, fmt.Errorf("run migration: %w", err) + } + + return &Store{db: db}, nil +} + +// Close closes the underlying database connection. +func (s *Store) Close() error { + return s.db.Close() +} + +// CreateJob inserts a new job into the queue. +// +// Why: the async /jobs endpoint and the sync passthrough both need to enqueue work. +// What: inserts a job row with state "queued" and returns the stored Job. +// Test: create a job, then GetJob by ID, assert fields match. +func (s *Store) CreateJob(job Job) (Job, error) { + now := time.Now().UTC() + job.State = JobStateQueued + job.CreatedAt = now + job.UpdatedAt = now + + if job.MaxAttempts == 0 { + job.MaxAttempts = 3 + } + + _, err := s.db.Exec( + `INSERT INTO jobs (id, model, payload, state, attempt, max_attempts, state_webhook_url, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + job.ID, job.Model, []byte(job.Payload), string(job.State), + job.Attempt, job.MaxAttempts, job.StateWebhookURL, + job.CreatedAt, job.UpdatedAt, + ) + if err != nil { + return Job{}, fmt.Errorf("insert job %s: %w", job.ID, err) + } + + return job, nil +} + +// GetJob retrieves a job by ID. +// +// Why: callers need to poll job status via GET /jobs/{id} and the worker needs to +// read jobs from the queue. +// What: queries the jobs table by primary key and scans into a Job struct. +// Test: create a job, GetJob, assert all fields round-trip correctly. +func (s *Store) GetJob(id string) (Job, error) { + var j Job + var payload, result []byte + + err := s.db.QueryRow( + `SELECT id, model, payload, state, result, error, attempt, max_attempts, + state_webhook_url, created_at, updated_at, started_at, completed_at + FROM jobs WHERE id = ?`, id, + ).Scan( + &j.ID, &j.Model, &payload, &j.State, &result, &j.Error, + &j.Attempt, &j.MaxAttempts, &j.StateWebhookURL, + &j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt, + ) + if err != nil { + return Job{}, fmt.Errorf("get job %s: %w", id, err) + } + + j.Payload = json.RawMessage(payload) + if result != nil { + j.Result = json.RawMessage(result) + } + + return j, nil +} + +// UpdateJobState transitions a job to a new state and updates associated fields. +// +// Why: the worker loop drives jobs through their lifecycle (queued -> loading -> +// working -> done/failed), and each transition must be persisted durably. +// What: updates the state, updated_at, and optionally result/error/timestamps. +// Test: create a job, advance through states, assert each transition persists. +func (s *Store) UpdateJobState(id string, state JobState, result json.RawMessage, errMsg *string) error { + now := time.Now().UTC() + + var resultBytes []byte + if result != nil { + resultBytes = []byte(result) + } + + var startedAt, completedAt *time.Time + switch state { + case JobStateLoading, JobStateWorking: + startedAt = &now + case JobStateDone, JobStateFailed: + completedAt = &now + } + + res, err := s.db.Exec( + `UPDATE jobs SET state = ?, result = ?, error = ?, updated_at = ?, + started_at = COALESCE(?, started_at), + completed_at = COALESCE(?, completed_at) + WHERE id = ?`, + string(state), resultBytes, errMsg, now, startedAt, completedAt, id, + ) + if err != nil { + return fmt.Errorf("update job %s state to %s: %w", id, state, err) + } + + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("check rows affected for job %s: %w", id, err) + } + if rows == 0 { + return fmt.Errorf("job %s not found", id) + } + + return nil +} + +// ListJobs returns jobs, optionally filtered by state. If state is nil, all jobs +// are returned ordered by created_at descending. +// +// Why: the GET /jobs endpoint needs to list jobs with optional state filtering. +// What: queries the jobs table with an optional WHERE clause on state. +// Test: create jobs in different states, list with and without filter, assert counts. +func (s *Store) ListJobs(state *JobState) ([]Job, error) { + var rows *sql.Rows + var err error + + if state != nil { + rows, err = s.db.Query( + `SELECT id, model, payload, state, result, error, attempt, max_attempts, + state_webhook_url, created_at, updated_at, started_at, completed_at + FROM jobs WHERE state = ? ORDER BY created_at DESC`, string(*state), + ) + } else { + rows, err = s.db.Query( + `SELECT id, model, payload, state, result, error, attempt, max_attempts, + state_webhook_url, created_at, updated_at, started_at, completed_at + FROM jobs ORDER BY created_at DESC`, + ) + } + if err != nil { + return nil, fmt.Errorf("list jobs: %w", err) + } + defer rows.Close() + + var jobs []Job + for rows.Next() { + var j Job + var payload, result []byte + + if err := rows.Scan( + &j.ID, &j.Model, &payload, &j.State, &result, &j.Error, + &j.Attempt, &j.MaxAttempts, &j.StateWebhookURL, + &j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt, + ); err != nil { + return nil, fmt.Errorf("scan job row: %w", err) + } + + j.Payload = json.RawMessage(payload) + if result != nil { + j.Result = json.RawMessage(result) + } + jobs = append(jobs, j) + } + + return jobs, rows.Err() +} + +// CreateArtifact attaches a named artifact to a job. +// +// Why: completed jobs produce artifacts (the completion response, structured data, +// etc.) that must be stored durably for webhook delivery and polling (ADR-0006). +// What: inserts a row into the artifacts table with the blob data. +// Test: create a job, attach an artifact, retrieve it, assert data matches. +func (s *Store) CreateArtifact(artifact Artifact) (Artifact, error) { + now := time.Now().UTC() + artifact.CreatedAt = now + artifact.Size = int64(len(artifact.Data)) + + res, err := s.db.Exec( + `INSERT INTO artifacts (job_id, name, content_type, data, size, created_at) + VALUES (?, ?, ?, ?, ?, ?)`, + artifact.JobID, artifact.Name, artifact.ContentType, + artifact.Data, artifact.Size, artifact.CreatedAt, + ) + if err != nil { + return Artifact{}, fmt.Errorf("insert artifact %q for job %s: %w", artifact.Name, artifact.JobID, err) + } + + id, err := res.LastInsertId() + if err != nil { + return Artifact{}, fmt.Errorf("get artifact id: %w", err) + } + artifact.ID = id + + return artifact, nil +} + +// GetArtifact retrieves a single artifact by job ID and name. +// +// Why: the GET /jobs/{id}/artifacts/{name} endpoint serves individual artifacts. +// What: queries by the (job_id, name) unique key and returns the full blob. +// Test: create an artifact, get it by job_id+name, assert data round-trips. +func (s *Store) GetArtifact(jobID, name string) (Artifact, error) { + var a Artifact + + err := s.db.QueryRow( + `SELECT id, job_id, name, content_type, data, size, created_at + FROM artifacts WHERE job_id = ? AND name = ?`, jobID, name, + ).Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt) + if err != nil { + return Artifact{}, fmt.Errorf("get artifact %q for job %s: %w", name, jobID, err) + } + + return a, nil +} + +// GetArtifactsByJob returns all artifacts for a given job. +// +// Why: the GET /jobs/{id} response includes artifact metadata for the caller to +// decide which to fetch. +// What: queries all artifacts by job_id, ordered by name. +// Test: attach multiple artifacts to a job, list them, assert all returned. +func (s *Store) GetArtifactsByJob(jobID string) ([]Artifact, error) { + rows, err := s.db.Query( + `SELECT id, job_id, name, content_type, data, size, created_at + FROM artifacts WHERE job_id = ? ORDER BY name`, jobID, + ) + if err != nil { + return nil, fmt.Errorf("list artifacts for job %s: %w", jobID, err) + } + defer rows.Close() + + var artifacts []Artifact + for rows.Next() { + var a Artifact + if err := rows.Scan(&a.ID, &a.JobID, &a.Name, &a.ContentType, &a.Data, &a.Size, &a.CreatedAt); err != nil { + return nil, fmt.Errorf("scan artifact row: %w", err) + } + artifacts = append(artifacts, a) + } + + return artifacts, rows.Err() +} diff --git a/internal/store/store_test.go b/internal/store/store_test.go new file mode 100644 index 0000000..cdf228e --- /dev/null +++ b/internal/store/store_test.go @@ -0,0 +1,376 @@ +package store + +import ( + "database/sql" + "encoding/json" + "errors" + "path/filepath" + "testing" +) + +// openTestDB creates a fresh SQLite store in a temp directory for test isolation. +func openTestDB(t *testing.T) *Store { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + s, err := Open(path) + if err != nil { + t.Fatalf("Open(%q): %v", path, err) + } + t.Cleanup(func() { s.Close() }) + return s +} + +func TestOpen_CreatesTablesAndWAL(t *testing.T) { + s := openTestDB(t) + + // Verify WAL mode is active. + var mode string + if err := s.db.QueryRow("PRAGMA journal_mode").Scan(&mode); err != nil { + t.Fatalf("query journal_mode: %v", err) + } + if mode != "wal" { + t.Errorf("journal_mode = %q, want %q", mode, "wal") + } + + // Verify tables exist by querying their metadata. + for _, table := range []string{"jobs", "artifacts"} { + var name string + err := s.db.QueryRow( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", table, + ).Scan(&name) + if err != nil { + t.Errorf("table %q not found: %v", table, err) + } + } +} + +func TestCreateJob_And_GetJob(t *testing.T) { + s := openTestDB(t) + + webhook := "http://example.com/webhook" + job := Job{ + ID: "01ARZ3NDEKTSV4RRFFQ69G5FAV", + Model: "qwen3:30b", + Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), + MaxAttempts: 5, + StateWebhookURL: &webhook, + } + + created, err := s.CreateJob(job) + if err != nil { + t.Fatalf("CreateJob: %v", err) + } + + if created.State != JobStateQueued { + t.Errorf("State = %q, want %q", created.State, JobStateQueued) + } + if created.CreatedAt.IsZero() { + t.Error("CreatedAt should be set") + } + if created.MaxAttempts != 5 { + t.Errorf("MaxAttempts = %d, want 5", created.MaxAttempts) + } + + got, err := s.GetJob(created.ID) + if err != nil { + t.Fatalf("GetJob: %v", err) + } + + if got.ID != created.ID { + t.Errorf("ID = %q, want %q", got.ID, created.ID) + } + if got.Model != "qwen3:30b" { + t.Errorf("Model = %q, want %q", got.Model, "qwen3:30b") + } + if got.State != JobStateQueued { + t.Errorf("State = %q, want %q", got.State, JobStateQueued) + } + if got.StateWebhookURL == nil || *got.StateWebhookURL != webhook { + t.Errorf("StateWebhookURL = %v, want %q", got.StateWebhookURL, webhook) + } +} + +func TestCreateJob_DefaultMaxAttempts(t *testing.T) { + s := openTestDB(t) + + job := Job{ + ID: "01ARZ3NDEKTSV4RRFFQ69G5FA2", + Model: "qwen3:14b", + Payload: json.RawMessage(`{}`), + } + + created, err := s.CreateJob(job) + if err != nil { + t.Fatalf("CreateJob: %v", err) + } + + if created.MaxAttempts != 3 { + t.Errorf("MaxAttempts = %d, want 3 (default)", created.MaxAttempts) + } +} + +func TestGetJob_NotFound(t *testing.T) { + s := openTestDB(t) + + _, err := s.GetJob("nonexistent") + if !errors.Is(err, sql.ErrNoRows) { + t.Errorf("GetJob(nonexistent) error = %v, want sql.ErrNoRows wrapped", err) + } +} + +func TestUpdateJobState(t *testing.T) { + s := openTestDB(t) + + job := Job{ + ID: "01ARZ3NDEKTSV4RRFFQ69G5FA3", + Model: "qwen3:30b", + Payload: json.RawMessage(`{}`), + } + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + // Transition: queued -> loading + if err := s.UpdateJobState(job.ID, JobStateLoading, nil, nil); err != nil { + t.Fatalf("UpdateJobState to loading: %v", err) + } + got, _ := s.GetJob(job.ID) + if got.State != JobStateLoading { + t.Errorf("State = %q, want %q", got.State, JobStateLoading) + } + if got.StartedAt == nil { + t.Error("StartedAt should be set after loading") + } + + // Transition: loading -> working + if err := s.UpdateJobState(job.ID, JobStateWorking, nil, nil); err != nil { + t.Fatalf("UpdateJobState to working: %v", err) + } + + // Transition: working -> done with result + result := json.RawMessage(`{"response":"hello"}`) + if err := s.UpdateJobState(job.ID, JobStateDone, result, nil); err != nil { + t.Fatalf("UpdateJobState to done: %v", err) + } + got, _ = s.GetJob(job.ID) + if got.State != JobStateDone { + t.Errorf("State = %q, want %q", got.State, JobStateDone) + } + if got.CompletedAt == nil { + t.Error("CompletedAt should be set after done") + } + if string(got.Result) != `{"response":"hello"}` { + t.Errorf("Result = %s, want %s", got.Result, `{"response":"hello"}`) + } +} + +func TestUpdateJobState_Failed(t *testing.T) { + s := openTestDB(t) + + job := Job{ + ID: "01ARZ3NDEKTSV4RRFFQ69G5FA4", + Model: "qwen3:30b", + Payload: json.RawMessage(`{}`), + } + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + errMsg := "target unreachable after 3 attempts" + if err := s.UpdateJobState(job.ID, JobStateFailed, nil, &errMsg); err != nil { + t.Fatalf("UpdateJobState to failed: %v", err) + } + + got, _ := s.GetJob(job.ID) + if got.State != JobStateFailed { + t.Errorf("State = %q, want %q", got.State, JobStateFailed) + } + if got.Error == nil || *got.Error != errMsg { + t.Errorf("Error = %v, want %q", got.Error, errMsg) + } +} + +func TestUpdateJobState_NotFound(t *testing.T) { + s := openTestDB(t) + + err := s.UpdateJobState("nonexistent", JobStateDone, nil, nil) + if err == nil { + t.Error("UpdateJobState for nonexistent job should fail") + } +} + +func TestListJobs_All(t *testing.T) { + s := openTestDB(t) + + for i, id := range []string{"01A", "01B", "01C"} { + state := JobStateQueued + job := Job{ID: id, Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob[%d]: %v", i, err) + } + if i == 1 { + _ = s.UpdateJobState(id, JobStateDone, nil, nil) + _ = state // suppress unused warning + } + } + + all, err := s.ListJobs(nil) + if err != nil { + t.Fatalf("ListJobs(nil): %v", err) + } + if len(all) != 3 { + t.Errorf("len = %d, want 3", len(all)) + } +} + +func TestListJobs_FilterByState(t *testing.T) { + s := openTestDB(t) + + for _, id := range []string{"01D", "01E", "01F"} { + job := Job{ID: id, Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + } + // Move one to done. + _ = s.UpdateJobState("01E", JobStateDone, nil, nil) + + queued := JobStateQueued + jobs, err := s.ListJobs(&queued) + if err != nil { + t.Fatalf("ListJobs(queued): %v", err) + } + if len(jobs) != 2 { + t.Errorf("len = %d, want 2", len(jobs)) + } + for _, j := range jobs { + if j.State != JobStateQueued { + t.Errorf("unexpected state %q in filtered results", j.State) + } + } +} + +func TestCreateArtifact_And_GetArtifact(t *testing.T) { + s := openTestDB(t) + + // Need a job first (foreign key). + job := Job{ID: "01ART", Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + artifact := Artifact{ + JobID: "01ART", + Name: "completion", + ContentType: "application/json", + Data: []byte(`{"response":"hello world"}`), + } + + created, err := s.CreateArtifact(artifact) + if err != nil { + t.Fatalf("CreateArtifact: %v", err) + } + if created.ID == 0 { + t.Error("ID should be set after insert") + } + if created.Size != int64(len(artifact.Data)) { + t.Errorf("Size = %d, want %d", created.Size, len(artifact.Data)) + } + + got, err := s.GetArtifact("01ART", "completion") + if err != nil { + t.Fatalf("GetArtifact: %v", err) + } + if string(got.Data) != `{"response":"hello world"}` { + t.Errorf("Data = %q, want %q", got.Data, `{"response":"hello world"}`) + } + if got.ContentType != "application/json" { + t.Errorf("ContentType = %q", got.ContentType) + } +} + +func TestGetArtifact_NotFound(t *testing.T) { + s := openTestDB(t) + + _, err := s.GetArtifact("noexist", "noname") + if err == nil { + t.Error("GetArtifact should fail for nonexistent artifact") + } +} + +func TestGetArtifactsByJob(t *testing.T) { + s := openTestDB(t) + + job := Job{ID: "01ARTS", Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + for _, name := range []string{"completion", "metadata"} { + _, err := s.CreateArtifact(Artifact{ + JobID: "01ARTS", + Name: name, + ContentType: "application/json", + Data: []byte(`{}`), + }) + if err != nil { + t.Fatalf("CreateArtifact(%q): %v", name, err) + } + } + + artifacts, err := s.GetArtifactsByJob("01ARTS") + if err != nil { + t.Fatalf("GetArtifactsByJob: %v", err) + } + if len(artifacts) != 2 { + t.Errorf("len = %d, want 2", len(artifacts)) + } + // Should be ordered by name. + if artifacts[0].Name != "completion" { + t.Errorf("artifacts[0].Name = %q, want %q", artifacts[0].Name, "completion") + } + if artifacts[1].Name != "metadata" { + t.Errorf("artifacts[1].Name = %q, want %q", artifacts[1].Name, "metadata") + } +} + +func TestCreateArtifact_DuplicateNameFails(t *testing.T) { + s := openTestDB(t) + + job := Job{ID: "01DUP", Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + a := Artifact{ + JobID: "01DUP", + Name: "completion", + ContentType: "application/json", + Data: []byte(`{}`), + } + if _, err := s.CreateArtifact(a); err != nil { + t.Fatalf("first CreateArtifact: %v", err) + } + + _, err := s.CreateArtifact(a) + if err == nil { + t.Error("duplicate artifact (job_id, name) should fail") + } +} + +func TestGetArtifactsByJob_Empty(t *testing.T) { + s := openTestDB(t) + + job := Job{ID: "01EMPTY", Model: "m", Payload: json.RawMessage(`{}`)} + if _, err := s.CreateJob(job); err != nil { + t.Fatalf("CreateJob: %v", err) + } + + artifacts, err := s.GetArtifactsByJob("01EMPTY") + if err != nil { + t.Fatalf("GetArtifactsByJob: %v", err) + } + if len(artifacts) != 0 { + t.Errorf("len = %d, want 0", len(artifacts)) + } +} diff --git a/progress.md b/progress.md new file mode 100644 index 0000000..f1081bb --- /dev/null +++ b/progress.md @@ -0,0 +1,19 @@ +# foreman — progress + +## Phase 1: Scaffold — 2026-05-23 + +- Go module initialized (`gitea.stevedudenhoeffer.com/steve/foreman`) +- Project layout: `cmd/foreman/`, `internal/config/`, `internal/store/`, `internal/server/` +- `internal/config`: loads all `FOREMAN_*` env vars with defaults and validation +- `internal/store`: SQLite-backed durable queue (WAL mode, `modernc.org/sqlite`) + - `jobs` table: ULID PK, model, payload, state machine, retry tracking, timestamps + - `artifacts` table: named typed blobs per job, unique on (job_id, name) + - Full CRUD: CreateJob, GetJob, UpdateJobState, ListJobs, CreateArtifact, GetArtifact, GetArtifactsByJob +- `internal/server`: stdlib `net/http` server + - `GET /healthz` returning `{"status":"ok","degraded":false}` + - Optional bearer-token auth middleware (skips /healthz) +- `cmd/foreman/main.go`: subcommand dispatch (serve + stubs for submit, jobs, ps) +- CI: `.gitea/workflows/ci.yaml` (build, vet, test -race, tidy check) +- Dockerfile: multi-stage distroless build +- Config files: `.env.example`, `.gitignore` +- Tests: config validation, store CRUD + edge cases, server health + auth middleware