Files
steve 7cd7eaff8b
CI / Tidy (push) Successful in 9m42s
CI / Build & Test (push) Successful in 10m28s
CI / Publish Docker Image (push) Successful in 21s
feat: add FOREMAN_KEEP_ALIVE config for worker model residency
Allow configuring how long the worker model stays resident on the Ollama
target after a request via FOREMAN_KEEP_ALIVE env var. Accepts Ollama
duration strings ("-1" forever, "0" unload, "15m", "1h", etc). Defaults
to "-1" (pin forever). The embedder warm-up is unaffected and always
uses keep_alive=-1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 21:29:37 -04:00

520 lines
17 KiB
Go

package server
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/config"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/ollama"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/store"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/worker"
)
// newJobTestServer creates a fully wired server + worker for job tests. It returns
// the server, store, and a cancel function. The worker is already running.
func newJobTestServer(t *testing.T, client ollama.Client, webhookSecret string) (*Server, *store.Store) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "test.db")
st, err := store.Open(dbPath)
if err != nil {
t.Fatalf("store.Open: %v", err)
}
t.Cleanup(func() { st.Close() })
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
inv := ollama.NewModelInventory(client, logger)
if err := inv.Refresh(context.Background()); err != nil {
t.Fatalf("inv.Refresh: %v", err)
}
notifier := worker.NewNotifier()
dispatcher := webhook.NewDispatcher(webhookSecret, logger)
w := worker.New(st, client, inv, notifier, dispatcher, logger, "-1")
cfg := config.Config{
OllamaURL: "http://localhost:11434",
MaxAttempts: 3,
JobTTL: 24 * time.Hour,
WebhookSecret: webhookSecret,
}
srv := New(cfg, st, client, inv, notifier, w, dispatcher, logger)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go w.Run(ctx)
return srv, st
}
func TestCreateJob_Returns202(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, "")
body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`
req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code != http.StatusAccepted {
t.Fatalf("status = %d, want %d; body: %s", rec.Code, http.StatusAccepted, rec.Body.String())
}
var resp jobSubmitResponse
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.JobID == "" {
t.Error("job_id should not be empty")
}
// ULID should be 26 characters.
if len(resp.JobID) != 26 {
t.Errorf("job_id length = %d, want 26 (ULID)", len(resp.JobID))
}
}
func TestCreateJob_UnknownModel404(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
}
srv, _ := newJobTestServer(t, client, "")
body := `{"model":"nonexistent","messages":[{"role":"user","content":"hi"}]}`
req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound)
}
}
func TestCreateJob_MissingModel400(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{},
ps: &ollama.PsResponse{},
}
srv, _ := newJobTestServer(t, client, "")
body := `{"messages":[{"role":"user","content":"hi"}]}`
req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want %d", rec.Code, http.StatusBadRequest)
}
}
func TestGetJob_Returns404ForUnknown(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{},
ps: &ollama.PsResponse{},
}
srv, _ := newJobTestServer(t, client, "")
req := httptest.NewRequest(http.MethodGet, "/jobs/01NONEXISTENT0000000000000", nil)
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound)
}
}
func TestGetJob_ReturnsJobState(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "hello"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, "")
// Submit a job.
body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`
submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
submitRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(submitRec, submitReq)
var submitResp jobSubmitResponse
json.NewDecoder(submitRec.Body).Decode(&submitResp)
// Wait for the job to complete.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil)
getRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(getRec, getReq)
var status jobStatusResponse
json.NewDecoder(getRec.Body).Decode(&status)
if status.State == "done" {
// Verify all fields.
if status.JobID != submitResp.JobID {
t.Errorf("job_id = %q, want %q", status.JobID, submitResp.JobID)
}
if status.Model != "qwen3:30b" {
t.Errorf("model = %q, want %q", status.Model, "qwen3:30b")
}
if status.Result == nil {
t.Error("result should not be nil on done")
}
if len(status.Artifacts) == 0 {
t.Error("artifacts should include the completion")
}
return
}
time.Sleep(50 * time.Millisecond)
}
t.Fatal("job did not reach done state in time")
}
func TestGetArtifact_Returns404ForUnknown(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{},
ps: &ollama.PsResponse{},
}
srv, _ := newJobTestServer(t, client, "")
req := httptest.NewRequest(http.MethodGet, "/jobs/01NOEXIST0000000000000000/artifacts/completion", nil)
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound)
}
}
func TestGetArtifact_ReturnsData(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "hello"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, "")
// Submit and wait.
body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`
submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
submitRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(submitRec, submitReq)
var submitResp jobSubmitResponse
json.NewDecoder(submitRec.Body).Decode(&submitResp)
// Wait for completion.
deadline := time.Now().Add(5 * time.Second)
var done bool
for time.Now().Before(deadline) {
getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil)
getRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(getRec, getReq)
var status jobStatusResponse
json.NewDecoder(getRec.Body).Decode(&status)
if status.State == "done" {
done = true
break
}
time.Sleep(50 * time.Millisecond)
}
if !done {
t.Fatal("job did not complete in time")
}
// Get the artifact.
artReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID+"/artifacts/completion", nil)
artRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(artRec, artReq)
if artRec.Code != http.StatusOK {
t.Fatalf("artifact status = %d, want %d", artRec.Code, http.StatusOK)
}
if artRec.Header().Get("Content-Type") != "application/json" {
t.Errorf("Content-Type = %q, want %q", artRec.Header().Get("Content-Type"), "application/json")
}
// Verify the artifact is a valid chat response.
var chatResp ollama.ChatResponse
if err := json.NewDecoder(artRec.Body).Decode(&chatResp); err != nil {
t.Fatalf("decode artifact: %v", err)
}
if chatResp.Message == nil || chatResp.Message.Content != "hello" {
t.Errorf("artifact content = %v, want message with 'hello'", chatResp.Message)
}
}
func TestWebhook_LifecycleEvents(t *testing.T) {
var mu sync.Mutex
var events []webhookEvent
whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
var e webhookEvent
json.Unmarshal(body, &e)
mu.Lock()
events = append(events, e)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer whSrv.Close()
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
time.Sleep(20 * time.Millisecond) // Brief work.
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, "")
// Submit a job with webhook.
body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL)
submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
submitRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(submitRec, submitReq)
if submitRec.Code != http.StatusAccepted {
t.Fatalf("submit status = %d, want %d", submitRec.Code, http.StatusAccepted)
}
var submitResp jobSubmitResponse
json.NewDecoder(submitRec.Body).Decode(&submitResp)
// Wait until we see a "done" event. Since all webhooks are delivered in
// background goroutines there is no guaranteed wall-clock ordering between
// "queued", "loading"/"working", and "done". Waiting for "done" to appear
// is the only reliable signal that all prior events have been dispatched
// (the worker fires them in order before completing).
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
mu.Lock()
found := false
for _, e := range events {
if e.State == "done" {
found = true
break
}
}
mu.Unlock()
if found {
break
}
time.Sleep(50 * time.Millisecond)
}
mu.Lock()
defer mu.Unlock()
// Verify we received at least: queued, working (or loading), done.
if len(events) < 3 {
t.Fatalf("received %d webhook events, want >= 3", len(events))
}
// Verify all events have the correct job_id and model.
for i, e := range events {
if e.JobID != submitResp.JobID {
t.Errorf("event[%d].job_id = %q, want %q", i, e.JobID, submitResp.JobID)
}
if e.Model != "qwen3:30b" {
t.Errorf("event[%d].model = %q, want %q", i, e.Model, "qwen3:30b")
}
}
// Verify that "queued" and "done" each appear exactly once across all events.
// We do not assert wall-clock arrival order because all deliveries are async
// goroutines that may be scheduled in any order by the OS.
stateCount := make(map[string]int)
for _, e := range events {
stateCount[e.State]++
}
if stateCount["queued"] != 1 {
t.Errorf("expected exactly 1 'queued' event, got %d", stateCount["queued"])
}
if stateCount["done"] != 1 {
t.Errorf("expected exactly 1 'done' event, got %d", stateCount["done"])
}
}
// webhookEvent mirrors the webhook Event structure for test deserialization.
type webhookEvent struct {
JobID string `json:"job_id"`
State string `json:"state"`
PreviousState string `json:"previous_state"`
Timestamp time.Time `json:"timestamp"`
Model string `json:"model"`
Attempt int `json:"attempt"`
Result json.RawMessage `json:"result"`
Artifacts json.RawMessage `json:"artifacts"`
Error *string `json:"error"`
}
func TestWebhook_500DoesNotAffectJobState(t *testing.T) {
var webhookCalls atomic.Int32
whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
webhookCalls.Add(1)
w.WriteHeader(http.StatusInternalServerError)
}))
defer whSrv.Close()
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, "")
body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL)
submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
submitRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(submitRec, submitReq)
var submitResp jobSubmitResponse
json.NewDecoder(submitRec.Body).Decode(&submitResp)
// Wait for the job to complete (regardless of webhook failures).
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil)
getRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(getRec, getReq)
var status jobStatusResponse
json.NewDecoder(getRec.Body).Decode(&status)
if status.State == "done" {
return // Job completed despite webhook failures.
}
time.Sleep(50 * time.Millisecond)
}
t.Fatal("job should complete even when webhook receiver returns 500")
}
func TestWebhook_HMACSignature(t *testing.T) {
secret := "test-webhook-secret"
type capture struct {
signature string
body []byte
}
ch := make(chan capture, 10)
whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
ch <- capture{signature: r.Header.Get("X-Foreman-Signature"), body: body}
w.WriteHeader(http.StatusOK)
}))
defer whSrv.Close()
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil
},
}
srv, _ := newJobTestServer(t, client, secret)
body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL)
submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body))
submitRec := httptest.NewRecorder()
srv.Handler().ServeHTTP(submitRec, submitReq)
// Wait for at least one webhook delivery.
var got capture
select {
case got = <-ch:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for webhook delivery")
}
if got.signature == "" {
t.Fatal("X-Foreman-Signature header should be set when secret is configured")
}
// Verify the HMAC.
if len(got.signature) < 8 || got.signature[:7] != "sha256=" {
t.Fatalf("signature format wrong: %q", got.signature)
}
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(got.body)
expected := "sha256=" + hex.EncodeToString(mac.Sum(nil))
if got.signature != expected {
t.Errorf("HMAC mismatch: got %q, want %q", got.signature, expected)
}
}
func TestTTLPruner(t *testing.T) {
client := &stubClient{
tags: &ollama.TagsResponse{
Models: []ollama.ModelInfo{{Name: "qwen3:30b"}},
},
ps: &ollama.PsResponse{},
chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) {
return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil
},
}
_, st := newJobTestServer(t, client, "")
// Create a terminal job.
job := store.Job{
ID: "01PRUNE001",
Model: "qwen3:30b",
Payload: json.RawMessage(`{}`),
}
st.CreateJob(job)
st.UpdateJobState("01PRUNE001", store.JobStateDone, nil, nil)
// Prune with a future cutoff.
cutoff := time.Now().UTC().Add(1 * time.Minute)
n, err := st.DeleteTerminalJobsBefore(cutoff)
if err != nil {
t.Fatalf("DeleteTerminalJobsBefore: %v", err)
}
if n != 1 {
t.Errorf("deleted = %d, want 1", n)
}
}