diff --git a/.env.example b/.env.example index d81dbe2..37c7752 100644 --- a/.env.example +++ b/.env.example @@ -24,3 +24,9 @@ FOREMAN_POLL_INTERVAL=30s # Optional HMAC key for signing webhook payloads (ADR-0005) FOREMAN_WEBHOOK_SECRET= + +# Maximum retry attempts for a job before marking as failed (default: 3) +FOREMAN_MAX_ATTEMPTS=3 + +# How long to retain completed/failed jobs before pruning (default: 24h) +FOREMAN_JOB_TTL=24h diff --git a/docs/adr/0014-no-webhooks-on-sync-chat.md b/docs/adr/0014-no-webhooks-on-sync-chat.md new file mode 100644 index 0000000..8f23898 --- /dev/null +++ b/docs/adr/0014-no-webhooks-on-sync-chat.md @@ -0,0 +1,40 @@ +# ADR-0014: No webhooks on synchronous /api/chat + +**Status:** Accepted -- 2026-05-23 + +## Context + +The `state_webhook_url` field exists on the async `POST /jobs` surface to notify +callers of state transitions. When Phase 3 promoted `/api/chat` to route through +the internal job queue, the question arose: should webhook events also fire for +synchronous chat requests? + +## Decision + +**`state_webhook_url` is only honored on `POST /jobs`.** Synchronous `/api/chat` +requests do not fire webhooks, even though they internally create job rows. + +### Rationale + +- The `/api/chat` caller holds an open HTTP connection and blocks until the + response is ready. Webhooks would be redundant: the caller already gets the + result directly. +- Adding webhook delivery on the sync path would double the webhook volume with + no consumer benefit. +- The sync path is the go-llm target; webhook handling would add latency and + complexity to the critical hot path. +- Callers who want webhooks should use `POST /jobs` explicitly. + +## Consequences + +- `POST /jobs` is the only entry point that supports `state_webhook_url`. +- `/api/chat` job rows are created without a webhook URL and produce no webhook + traffic. +- This keeps the webhook dispatcher's load proportional to async job volume only. + +## Alternatives considered + +- **Fire webhooks on both paths.** Adds webhook traffic for every go-llm request + with no consumer; rejected. +- **Optional opt-in header on /api/chat.** Over-engineered for a passthrough + endpoint; rejected. diff --git a/docs/adr/README.md b/docs/adr/README.md index 7154f2c..628a2cc 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -25,6 +25,8 @@ worker, one queue. No distributed dispatch, no leases, no fair queueing. | 0010 | Authentication and security boundary | Accepted | | 0011 | Go client library and go-llm integration | Accepted | | 0012 | Streaming support | Accepted | +| 0013 | Two-slot residency and embedding bypass | Accepted | +| 0014 | No webhooks on synchronous /api/chat | Accepted | ADR-0003 was resolved in favor of **native Ollama** as the v1 surface: foreman is, on the wire, a private authenticated Ollama deployment, so `go-llm` integrates via diff --git a/internal/server/jobs.go b/internal/server/jobs.go new file mode 100644 index 0000000..c2d1155 --- /dev/null +++ b/internal/server/jobs.go @@ -0,0 +1,241 @@ +package server + +import ( + "crypto/rand" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/oklog/ulid/v2" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" +) + +// registerJobRoutes adds the async /jobs routes to the mux. +// +// Why: separating route registration allows clean Phase 3/Phase 4 commit separation. +// What: registers POST /jobs, GET /jobs/{id}, and GET /jobs/{id}/artifacts/{name}. +// Test: exercise all /jobs routes via the server handler in jobs_test.go. +func (s *Server) registerJobRoutes() { + s.mux.HandleFunc("POST /jobs", s.handleCreateJob) + s.mux.HandleFunc("GET /jobs/{id}", s.handleGetJob) + s.mux.HandleFunc("GET /jobs/{id}/artifacts/{name}", s.handleGetArtifact) +} + +// jobSubmitRequest is the body shape for POST /jobs. It extends the native chat +// payload with optional foreman-specific fields. +type jobSubmitRequest struct { + Model string `json:"model"` + StateWebhookURL string `json:"state_webhook_url,omitempty"` + Messages json.RawMessage `json:"messages"` +} + +// jobSubmitResponse is the response from POST /jobs. +type jobSubmitResponse struct { + JobID string `json:"job_id"` +} + +// jobStatusResponse is the response from GET /jobs/{id}. +type jobStatusResponse struct { + JobID string `json:"job_id"` + State string `json:"state"` + Model string `json:"model"` + CreatedAt time.Time `json:"created_at"` + StartedAt *time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at"` + Attempt int `json:"attempt"` + Result json.RawMessage `json:"result"` + Error *string `json:"error"` + Artifacts []artifactReference `json:"artifacts"` +} + +// artifactReference is artifact metadata returned in job status and webhook events. +type artifactReference struct { + Name string `json:"name"` + ContentType string `json:"content_type"` + Size int64 `json:"size"` + Data string `json:"data,omitempty"` + URL string `json:"url,omitempty"` +} + +// handleCreateJob handles POST /jobs -- the async job submission endpoint. +// +// Why: orchestration callers need fire-and-forget job submission (ADR-0004). +// What: validates model, enqueues the job, returns 202 with job_id immediately. +// Test: submit a job, verify 202 and ULID in response, verify job exists in store. +func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, `{"error":"failed to read request body"}`, http.StatusBadRequest) + return + } + + // Parse the request to get model and webhook URL. + var req jobSubmitRequest + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest) + return + } + + if req.Model == "" { + http.Error(w, `{"error":"model is required"}`, http.StatusBadRequest) + return + } + + // Validate model exists. One re-poll on miss. + if !s.inventory.HasModel(req.Model) { + if err := s.inventory.Refresh(r.Context()); err != nil { + s.logger.Warn("model re-poll failed", "error", err) + } + if !s.inventory.HasModel(req.Model) { + http.Error(w, `{"error":"model not found"}`, http.StatusNotFound) + return + } + } + + jobID := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() + + maxAttempts := s.cfg.MaxAttempts + if maxAttempts == 0 { + maxAttempts = 3 + } + + var webhookURL *string + if req.StateWebhookURL != "" { + webhookURL = &req.StateWebhookURL + } + + job := store.Job{ + ID: jobID, + Model: req.Model, + Payload: json.RawMessage(body), + MaxAttempts: maxAttempts, + StateWebhookURL: webhookURL, + } + + if _, err := s.store.CreateJob(job); err != nil { + s.logger.Error("failed to create job", "error", err) + http.Error(w, `{"error":"failed to create job"}`, http.StatusInternalServerError) + return + } + + // Fire initial "queued" webhook if configured. + if webhookURL != nil && s.dispatcher != nil { + s.dispatcher.Fire(*webhookURL, webhook.Event{ + JobID: jobID, + State: string(store.JobStateQueued), + PreviousState: "", + Timestamp: time.Now().UTC(), + Model: req.Model, + Attempt: 0, + }) + } + + // Wake the worker. + if s.workerRef != nil { + s.workerRef.Wake() + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(jobSubmitResponse{JobID: jobID}) +} + +// handleGetJob handles GET /jobs/{id} -- returns current job state and result. +// +// Why: callers need to poll job status for recovery after missed webhooks (ADR-0004). +// What: looks up the job by ID, includes artifact metadata, returns JSON. +// Test: create and complete a job, GET /jobs/{id}, verify all fields. +func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + http.Error(w, `{"error":"job id is required"}`, http.StatusBadRequest) + return + } + + job, err := s.store.GetJob(id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.Error(w, `{"error":"job not found"}`, http.StatusNotFound) + return + } + s.logger.Error("failed to get job", "error", err, "job_id", id) + http.Error(w, `{"error":"internal error"}`, http.StatusInternalServerError) + return + } + + // Build artifact references. + var artRefs []artifactReference + artifacts, err := s.store.GetArtifactsByJob(id) + if err != nil { + s.logger.Error("failed to get artifacts", "error", err, "job_id", id) + } else { + for _, a := range artifacts { + ref := artifactReference{ + Name: a.Name, + ContentType: a.ContentType, + Size: a.Size, + } + if a.Size <= 256*1024 { + ref.Data = string(a.Data) + } else { + ref.URL = fmt.Sprintf("/jobs/%s/artifacts/%s", id, a.Name) + } + artRefs = append(artRefs, ref) + } + } + if artRefs == nil { + artRefs = []artifactReference{} + } + + resp := jobStatusResponse{ + JobID: job.ID, + State: string(job.State), + Model: job.Model, + CreatedAt: job.CreatedAt, + StartedAt: job.StartedAt, + CompletedAt: job.CompletedAt, + Attempt: job.Attempt, + Result: job.Result, + Error: job.Error, + Artifacts: artRefs, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// handleGetArtifact handles GET /jobs/{id}/artifacts/{name} -- serves raw artifact data. +// +// Why: large artifacts are not inlined in webhooks; callers fetch them here (ADR-0006). +// What: looks up the artifact by job_id and name, serves the raw data with its content type. +// Test: store an artifact, GET it, verify content type and data match. +func (s *Server) handleGetArtifact(w http.ResponseWriter, r *http.Request) { + jobID := r.PathValue("id") + name := r.PathValue("name") + + if jobID == "" || name == "" { + http.Error(w, `{"error":"job id and artifact name are required"}`, http.StatusBadRequest) + return + } + + artifact, err := s.store.GetArtifact(jobID, name) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.Error(w, `{"error":"artifact not found"}`, http.StatusNotFound) + return + } + s.logger.Error("failed to get artifact", "error", err, "job_id", jobID, "name", name) + http.Error(w, `{"error":"internal error"}`, http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", artifact.ContentType) + w.WriteHeader(http.StatusOK) + w.Write(artifact.Data) +} diff --git a/internal/server/jobs_test.go b/internal/server/jobs_test.go new file mode 100644 index 0000000..90e3133 --- /dev/null +++ b/internal/server/jobs_test.go @@ -0,0 +1,506 @@ +package server + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/config" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/ollama" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/worker" +) + +// newJobTestServer creates a fully wired server + worker for job tests. It returns +// the server, store, and a cancel function. The worker is already running. +func newJobTestServer(t *testing.T, client ollama.Client, webhookSecret string) (*Server, *store.Store) { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + st, err := store.Open(dbPath) + if err != nil { + t.Fatalf("store.Open: %v", err) + } + t.Cleanup(func() { st.Close() }) + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + inv := ollama.NewModelInventory(client, logger) + if err := inv.Refresh(context.Background()); err != nil { + t.Fatalf("inv.Refresh: %v", err) + } + + notifier := worker.NewNotifier() + dispatcher := webhook.NewDispatcher(webhookSecret, logger) + w := worker.New(st, client, inv, notifier, dispatcher, logger) + + cfg := config.Config{ + OllamaURL: "http://localhost:11434", + MaxAttempts: 3, + JobTTL: 24 * time.Hour, + WebhookSecret: webhookSecret, + } + + srv := New(cfg, st, client, inv, notifier, w, dispatcher, logger) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go w.Run(ctx) + + return srv, st +} + +func TestCreateJob_Returns202(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, "") + + body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}` + req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, want %d; body: %s", rec.Code, http.StatusAccepted, rec.Body.String()) + } + + var resp jobSubmitResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.JobID == "" { + t.Error("job_id should not be empty") + } + // ULID should be 26 characters. + if len(resp.JobID) != 26 { + t.Errorf("job_id length = %d, want 26 (ULID)", len(resp.JobID)) + } +} + +func TestCreateJob_UnknownModel404(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + } + srv, _ := newJobTestServer(t, client, "") + + body := `{"model":"nonexistent","messages":[{"role":"user","content":"hi"}]}` + req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound) + } +} + +func TestCreateJob_MissingModel400(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{}, + ps: &ollama.PsResponse{}, + } + srv, _ := newJobTestServer(t, client, "") + + body := `{"messages":[{"role":"user","content":"hi"}]}` + req := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestGetJob_Returns404ForUnknown(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{}, + ps: &ollama.PsResponse{}, + } + srv, _ := newJobTestServer(t, client, "") + + req := httptest.NewRequest(http.MethodGet, "/jobs/01NONEXISTENT0000000000000", nil) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound) + } +} + +func TestGetJob_ReturnsJobState(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "hello"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, "") + + // Submit a job. + body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}` + submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + submitRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(submitRec, submitReq) + + var submitResp jobSubmitResponse + json.NewDecoder(submitRec.Body).Decode(&submitResp) + + // Wait for the job to complete. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil) + getRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(getRec, getReq) + + var status jobStatusResponse + json.NewDecoder(getRec.Body).Decode(&status) + if status.State == "done" { + // Verify all fields. + if status.JobID != submitResp.JobID { + t.Errorf("job_id = %q, want %q", status.JobID, submitResp.JobID) + } + if status.Model != "qwen3:30b" { + t.Errorf("model = %q, want %q", status.Model, "qwen3:30b") + } + if status.Result == nil { + t.Error("result should not be nil on done") + } + if len(status.Artifacts) == 0 { + t.Error("artifacts should include the completion") + } + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatal("job did not reach done state in time") +} + +func TestGetArtifact_Returns404ForUnknown(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{}, + ps: &ollama.PsResponse{}, + } + srv, _ := newJobTestServer(t, client, "") + + req := httptest.NewRequest(http.MethodGet, "/jobs/01NOEXIST0000000000000000/artifacts/completion", nil) + rec := httptest.NewRecorder() + srv.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusNotFound) + } +} + +func TestGetArtifact_ReturnsData(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "hello"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, "") + + // Submit and wait. + body := `{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}` + submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + submitRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(submitRec, submitReq) + + var submitResp jobSubmitResponse + json.NewDecoder(submitRec.Body).Decode(&submitResp) + + // Wait for completion. + deadline := time.Now().Add(5 * time.Second) + var done bool + for time.Now().Before(deadline) { + getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil) + getRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(getRec, getReq) + var status jobStatusResponse + json.NewDecoder(getRec.Body).Decode(&status) + if status.State == "done" { + done = true + break + } + time.Sleep(50 * time.Millisecond) + } + if !done { + t.Fatal("job did not complete in time") + } + + // Get the artifact. + artReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID+"/artifacts/completion", nil) + artRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(artRec, artReq) + + if artRec.Code != http.StatusOK { + t.Fatalf("artifact status = %d, want %d", artRec.Code, http.StatusOK) + } + if artRec.Header().Get("Content-Type") != "application/json" { + t.Errorf("Content-Type = %q, want %q", artRec.Header().Get("Content-Type"), "application/json") + } + + // Verify the artifact is a valid chat response. + var chatResp ollama.ChatResponse + if err := json.NewDecoder(artRec.Body).Decode(&chatResp); err != nil { + t.Fatalf("decode artifact: %v", err) + } + if chatResp.Message == nil || chatResp.Message.Content != "hello" { + t.Errorf("artifact content = %v, want message with 'hello'", chatResp.Message) + } +} + +func TestWebhook_LifecycleEvents(t *testing.T) { + var mu sync.Mutex + var events []webhookEvent + + whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var e webhookEvent + json.Unmarshal(body, &e) + mu.Lock() + events = append(events, e) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer whSrv.Close() + + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + time.Sleep(20 * time.Millisecond) // Brief work. + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, "") + + // Submit a job with webhook. + body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL) + submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + submitRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(submitRec, submitReq) + + if submitRec.Code != http.StatusAccepted { + t.Fatalf("submit status = %d, want %d", submitRec.Code, http.StatusAccepted) + } + + var submitResp jobSubmitResponse + json.NewDecoder(submitRec.Body).Decode(&submitResp) + + // Wait for webhooks to arrive. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + mu.Lock() + n := len(events) + mu.Unlock() + // We expect at least: queued, working (or loading), done. + if n >= 3 { + break + } + time.Sleep(50 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + + if len(events) < 3 { + t.Fatalf("received %d webhook events, want >= 3", len(events)) + } + + // Verify all events have the correct job_id. + for i, e := range events { + if e.JobID != submitResp.JobID { + t.Errorf("event[%d].job_id = %q, want %q", i, e.JobID, submitResp.JobID) + } + if e.Model != "qwen3:30b" { + t.Errorf("event[%d].model = %q, want %q", i, e.Model, "qwen3:30b") + } + } + + // First event should be "queued". + if events[0].State != "queued" { + t.Errorf("first event state = %q, want %q", events[0].State, "queued") + } + + // Last event should be "done". + lastEvent := events[len(events)-1] + if lastEvent.State != "done" { + t.Errorf("last event state = %q, want %q", lastEvent.State, "done") + } +} + +// webhookEvent mirrors the webhook Event structure for test deserialization. +type webhookEvent struct { + JobID string `json:"job_id"` + State string `json:"state"` + PreviousState string `json:"previous_state"` + Timestamp time.Time `json:"timestamp"` + Model string `json:"model"` + Attempt int `json:"attempt"` + Result json.RawMessage `json:"result"` + Artifacts json.RawMessage `json:"artifacts"` + Error *string `json:"error"` +} + +func TestWebhook_500DoesNotAffectJobState(t *testing.T) { + var webhookCalls atomic.Int32 + + whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + webhookCalls.Add(1) + w.WriteHeader(http.StatusInternalServerError) + })) + defer whSrv.Close() + + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, "") + + body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL) + submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + submitRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(submitRec, submitReq) + + var submitResp jobSubmitResponse + json.NewDecoder(submitRec.Body).Decode(&submitResp) + + // Wait for the job to complete (regardless of webhook failures). + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + getReq := httptest.NewRequest(http.MethodGet, "/jobs/"+submitResp.JobID, nil) + getRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(getRec, getReq) + var status jobStatusResponse + json.NewDecoder(getRec.Body).Decode(&status) + if status.State == "done" { + return // Job completed despite webhook failures. + } + time.Sleep(50 * time.Millisecond) + } + t.Fatal("job should complete even when webhook receiver returns 500") +} + +func TestWebhook_HMACSignature(t *testing.T) { + secret := "test-webhook-secret" + + type capture struct { + signature string + body []byte + } + ch := make(chan capture, 10) + + whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + ch <- capture{signature: r.Header.Get("X-Foreman-Signature"), body: body} + w.WriteHeader(http.StatusOK) + })) + defer whSrv.Close() + + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil + }, + } + srv, _ := newJobTestServer(t, client, secret) + + body := fmt.Sprintf(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}],"state_webhook_url":"%s"}`, whSrv.URL) + submitReq := httptest.NewRequest(http.MethodPost, "/jobs", strings.NewReader(body)) + submitRec := httptest.NewRecorder() + srv.Handler().ServeHTTP(submitRec, submitReq) + + // Wait for at least one webhook delivery. + var got capture + select { + case got = <-ch: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for webhook delivery") + } + + if got.signature == "" { + t.Fatal("X-Foreman-Signature header should be set when secret is configured") + } + + // Verify the HMAC. + if len(got.signature) < 8 || got.signature[:7] != "sha256=" { + t.Fatalf("signature format wrong: %q", got.signature) + } + + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(got.body) + expected := "sha256=" + hex.EncodeToString(mac.Sum(nil)) + if got.signature != expected { + t.Errorf("HMAC mismatch: got %q, want %q", got.signature, expected) + } +} + +func TestTTLPruner(t *testing.T) { + client := &stubClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil + }, + } + _, st := newJobTestServer(t, client, "") + + // Create a terminal job. + job := store.Job{ + ID: "01PRUNE001", + Model: "qwen3:30b", + Payload: json.RawMessage(`{}`), + } + st.CreateJob(job) + st.UpdateJobState("01PRUNE001", store.JobStateDone, nil, nil) + + // Prune with a future cutoff. + cutoff := time.Now().UTC().Add(1 * time.Minute) + n, err := st.DeleteTerminalJobsBefore(cutoff) + if err != nil { + t.Fatalf("DeleteTerminalJobsBefore: %v", err) + } + if n != 1 { + t.Errorf("deleted = %d, want 1", n) + } +} diff --git a/internal/webhook/dispatcher_test.go b/internal/webhook/dispatcher_test.go new file mode 100644 index 0000000..85a92a9 --- /dev/null +++ b/internal/webhook/dispatcher_test.go @@ -0,0 +1,286 @@ +package webhook + +import ( + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +func TestDispatcher_Fire_Delivers(t *testing.T) { + ch := make(chan []byte, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + ch <- body + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + d := NewDispatcher("", logger) + + event := Event{ + JobID: "01TEST001", + State: "done", + PreviousState: "working", + Timestamp: time.Now().UTC(), + Model: "qwen3:30b", + Attempt: 1, + } + + d.Fire(srv.URL, event) + + var receivedBody []byte + select { + case receivedBody = <-ch: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for webhook delivery") + } + + // Verify the body is valid JSON with the right fields. + var got Event + if err := json.Unmarshal(receivedBody, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.JobID != "01TEST001" { + t.Errorf("job_id = %q, want %q", got.JobID, "01TEST001") + } + if got.State != "done" { + t.Errorf("state = %q, want %q", got.State, "done") + } + if got.PreviousState != "working" { + t.Errorf("previous_state = %q, want %q", got.PreviousState, "working") + } + if got.Model != "qwen3:30b" { + t.Errorf("model = %q, want %q", got.Model, "qwen3:30b") + } +} + +func TestDispatcher_Fire_RetriesOn500(t *testing.T) { + var attempts atomic.Int32 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := attempts.Add(1) + if n <= 2 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + d := NewDispatcher("", logger) + d.baseDelay = 10 * time.Millisecond // Fast retries for testing. + + event := Event{ + JobID: "01RETRY001", + State: "done", + Model: "qwen3:30b", + } + + d.Fire(srv.URL, event) + + // Wait for retries to complete. + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if attempts.Load() >= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if got := attempts.Load(); got < 3 { + t.Errorf("attempts = %d, want >= 3 (2 failures + 1 success)", got) + } +} + +func TestDispatcher_Fire_DoesNotBlockCaller(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + d := NewDispatcher("", logger) + + start := time.Now() + d.Fire(srv.URL, Event{JobID: "test", State: "done"}) + elapsed := time.Since(start) + + // Fire should return immediately (< 10ms), not wait for the HTTP call. + if elapsed > 50*time.Millisecond { + t.Errorf("Fire blocked for %v, should return immediately", elapsed) + } +} + +func TestDispatcher_HMAC_Signing(t *testing.T) { + type capture struct { + signature string + body []byte + } + ch := make(chan capture, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + ch <- capture{signature: r.Header.Get("X-Foreman-Signature"), body: body} + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + secret := "test-secret-key" + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + d := NewDispatcher(secret, logger) + + event := Event{ + JobID: "01HMAC001", + State: "done", + Model: "qwen3:30b", + } + + d.Fire(srv.URL, event) + + var got capture + select { + case got = <-ch: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for webhook delivery") + } + + if got.signature == "" { + t.Fatal("X-Foreman-Signature header should be set when secret is configured") + } + + if len(got.signature) < 8 || got.signature[:7] != "sha256=" { + t.Fatalf("signature format wrong: %q", got.signature) + } + + // Verify the signature against the received body. + if !VerifySignature(got.body, got.signature, secret) { + t.Error("HMAC verification failed with correct secret") + } + + // Verify wrong secret fails. + if VerifySignature(got.body, got.signature, "wrong-secret") { + t.Error("HMAC verification should fail with wrong secret") + } +} + +func TestDispatcher_NoHMAC_WhenNoSecret(t *testing.T) { + ch := make(chan string, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ch <- r.Header.Get("X-Foreman-Signature") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + d := NewDispatcher("", logger) // No secret. + + d.Fire(srv.URL, Event{JobID: "test", State: "done"}) + + var gotSignature string + select { + case gotSignature = <-ch: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for webhook delivery") + } + + if gotSignature != "" { + t.Errorf("X-Foreman-Signature should not be set when no secret is configured, got %q", gotSignature) + } +} + +func TestVerifySignature_InvalidFormat(t *testing.T) { + if VerifySignature([]byte("test"), "invalid", "secret") { + t.Error("should reject signatures without sha256= prefix") + } + if VerifySignature([]byte("test"), "sha256", "secret") { + t.Error("should reject too-short signatures") + } + if VerifySignature([]byte("test"), "", "secret") { + t.Error("should reject empty signature") + } +} + +func TestFormatArtifacts_SmallInline(t *testing.T) { + metas := []ArtifactMeta{ + { + Name: "completion", + ContentType: "application/json", + Size: 100, + Data: []byte(`{"done":true}`), + }, + } + + result := FormatArtifacts("01TEST", metas) + if result == nil { + t.Fatal("result should not be nil") + } + + var parsed []struct { + Name string `json:"name"` + Data string `json:"data"` + URL string `json:"url"` + } + if err := json.Unmarshal(result, &parsed); err != nil { + t.Fatalf("unmarshal: %v", err) + } + + if len(parsed) != 1 { + t.Fatalf("len = %d, want 1", len(parsed)) + } + if parsed[0].Data == "" { + t.Error("small artifact should be inlined") + } + if parsed[0].URL != "" { + t.Error("small artifact should not have a URL") + } +} + +func TestFormatArtifacts_LargeByURL(t *testing.T) { + largeData := make([]byte, 300*1024) // 300KB > 256KB threshold. + metas := []ArtifactMeta{ + { + Name: "completion", + ContentType: "application/json", + Size: int64(len(largeData)), + Data: largeData, + }, + } + + result := FormatArtifacts("01LARGE", metas) + + var parsed []struct { + Name string `json:"name"` + Data string `json:"data"` + URL string `json:"url"` + } + if err := json.Unmarshal(result, &parsed); err != nil { + t.Fatalf("unmarshal: %v", err) + } + + if parsed[0].Data != "" { + t.Error("large artifact should not be inlined") + } + if parsed[0].URL == "" { + t.Error("large artifact should have a URL") + } + if parsed[0].URL != "/jobs/01LARGE/artifacts/completion" { + t.Errorf("URL = %q, want %q", parsed[0].URL, "/jobs/01LARGE/artifacts/completion") + } +} + +func TestFormatArtifacts_Empty(t *testing.T) { + result := FormatArtifacts("01EMPTY", nil) + if result != nil { + t.Errorf("empty artifacts should return nil, got %s", result) + } +} diff --git a/progress.md b/progress.md index 2dd594b..f4c1ecb 100644 --- a/progress.md +++ b/progress.md @@ -117,3 +117,51 @@ with the real SQLite-backed job queue and single worker loop. DeleteTerminalJobsBefore. - Server: chat model validation (404), non-streaming chat through queue, serialization (max 1 concurrent), context cancellation, embed bypass unchanged. + +## Phase 4: Async /jobs surface, webhooks, artifacts — 2026-05-23 + +**M1 core complete** (minus CLI and go-llm constructor, which are separate work). + +- `internal/webhook/` — webhook dispatcher: + - `Dispatcher.Fire(url, event)`: non-blocking goroutine delivery with + exponential backoff retry (1s, 2s, 4s, 8s, 16s — max 5 attempts). + - Optional HMAC-SHA256 signing via `FOREMAN_WEBHOOK_SECRET` — sets + `X-Foreman-Signature: sha256=` header. + - `VerifySignature()`: exported for webhook receivers. + - `FormatArtifacts()`: inline (data field) for artifacts <= 256KB, URL reference + for larger ones. + - Webhook failures are logged and dropped — never block or fail the job + (ADR-0005). + +- `internal/server/` — new routes: + - `POST /jobs`: validates model, creates job row with optional + `state_webhook_url`, returns `202 Accepted` with `{"job_id":""}`. + Fires initial "queued" webhook. Wakes worker. + - `GET /jobs/{id}`: returns full job state, result, error, and artifact + metadata. 404 for unknown IDs. Artifacts under 256KB are inlined; larger + ones get a URL reference. + - `GET /jobs/{id}/artifacts/{name}`: serves raw artifact data with stored + content type. 404 for unknown job/artifact. + +- `docs/adr/0014-no-webhooks-on-sync-chat.md`: + - `state_webhook_url` is only honored on `POST /jobs`. Sync `/api/chat` does + not fire webhooks (ADR-0014). Rationale: the caller already holds a blocking + HTTP connection. + +- `cmd/foreman/main.go` — full serve wiring: + - Creates webhook dispatcher, notifier, worker. + - Starts worker loop goroutine and TTL pruner goroutine. + - TTL pruner runs every `jobTTL/4` (min 1 minute), deletes terminal jobs + older than `FOREMAN_JOB_TTL` (default 24h). + - Server constructor now receives notifier, worker, and dispatcher. + +- Tests (all passing with `-race`): + - Jobs API: 202 on submit, ULID format, 404 for unknown model, 400 for + missing model, 404 for unknown job, job state after completion, artifact + retrieval, artifact 404. + - Webhooks: full lifecycle events (queued->working->done), 500-returning + receiver does not affect job state, HMAC signature verification. + - Webhook dispatcher: delivery, retry on 500, non-blocking Fire, HMAC signing, + no HMAC when no secret, signature format validation. + - Artifacts: small inline, large by URL, empty returns nil. + - TTL pruner: deletes old terminal jobs.