Files
steve daf07fd759 feat: add async /jobs surface, state webhooks, and artifact handling
Add the async job submission API, webhook state notifications, and
artifact serving endpoints on top of the Phase 3 queue infrastructure.

Key changes:
- POST /jobs: async job submission with 202 + job_id ULID; optional
  state_webhook_url for push notifications on state transitions
- GET /jobs/{id}: job status polling with result, error, and artifact
  metadata; artifacts <= 256KB inlined, larger ones by URL reference
- GET /jobs/{id}/artifacts/{name}: raw artifact data serving
- Webhook dispatcher: at-least-once delivery with exponential backoff
  (5 retries); optional HMAC-SHA256 signing (X-Foreman-Signature)
- ADR-0014: state_webhook_url only honored on POST /jobs, not sync
  /api/chat (caller already blocks for result)
- Comprehensive tests for /jobs lifecycle, webhook delivery, HMAC
  verification, artifact inline/URL threshold, and TTL pruning

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:30:18 -04:00

242 lines
7.3 KiB
Go

package server
import (
"crypto/rand"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/oklog/ulid/v2"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/store"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook"
)
// registerJobRoutes adds the async /jobs routes to the mux.
//
// Why: separating route registration allows clean Phase 3/Phase 4 commit separation.
// What: registers POST /jobs, GET /jobs/{id}, and GET /jobs/{id}/artifacts/{name}.
// Test: exercise all /jobs routes via the server handler in jobs_test.go.
func (s *Server) registerJobRoutes() {
s.mux.HandleFunc("POST /jobs", s.handleCreateJob)
s.mux.HandleFunc("GET /jobs/{id}", s.handleGetJob)
s.mux.HandleFunc("GET /jobs/{id}/artifacts/{name}", s.handleGetArtifact)
}
// jobSubmitRequest is the body shape for POST /jobs. It extends the native chat
// payload with optional foreman-specific fields.
type jobSubmitRequest struct {
Model string `json:"model"`
StateWebhookURL string `json:"state_webhook_url,omitempty"`
Messages json.RawMessage `json:"messages"`
}
// jobSubmitResponse is the response from POST /jobs.
type jobSubmitResponse struct {
JobID string `json:"job_id"`
}
// jobStatusResponse is the response from GET /jobs/{id}.
type jobStatusResponse struct {
JobID string `json:"job_id"`
State string `json:"state"`
Model string `json:"model"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at"`
Attempt int `json:"attempt"`
Result json.RawMessage `json:"result"`
Error *string `json:"error"`
Artifacts []artifactReference `json:"artifacts"`
}
// artifactReference is artifact metadata returned in job status and webhook events.
type artifactReference struct {
Name string `json:"name"`
ContentType string `json:"content_type"`
Size int64 `json:"size"`
Data string `json:"data,omitempty"`
URL string `json:"url,omitempty"`
}
// handleCreateJob handles POST /jobs -- the async job submission endpoint.
//
// Why: orchestration callers need fire-and-forget job submission (ADR-0004).
// What: validates model, enqueues the job, returns 202 with job_id immediately.
// Test: submit a job, verify 202 and ULID in response, verify job exists in store.
func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, `{"error":"failed to read request body"}`, http.StatusBadRequest)
return
}
// Parse the request to get model and webhook URL.
var req jobSubmitRequest
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, `{"error":"invalid JSON body"}`, http.StatusBadRequest)
return
}
if req.Model == "" {
http.Error(w, `{"error":"model is required"}`, http.StatusBadRequest)
return
}
// Validate model exists. One re-poll on miss.
if !s.inventory.HasModel(req.Model) {
if err := s.inventory.Refresh(r.Context()); err != nil {
s.logger.Warn("model re-poll failed", "error", err)
}
if !s.inventory.HasModel(req.Model) {
http.Error(w, `{"error":"model not found"}`, http.StatusNotFound)
return
}
}
jobID := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
maxAttempts := s.cfg.MaxAttempts
if maxAttempts == 0 {
maxAttempts = 3
}
var webhookURL *string
if req.StateWebhookURL != "" {
webhookURL = &req.StateWebhookURL
}
job := store.Job{
ID: jobID,
Model: req.Model,
Payload: json.RawMessage(body),
MaxAttempts: maxAttempts,
StateWebhookURL: webhookURL,
}
if _, err := s.store.CreateJob(job); err != nil {
s.logger.Error("failed to create job", "error", err)
http.Error(w, `{"error":"failed to create job"}`, http.StatusInternalServerError)
return
}
// Fire initial "queued" webhook if configured.
if webhookURL != nil && s.dispatcher != nil {
s.dispatcher.Fire(*webhookURL, webhook.Event{
JobID: jobID,
State: string(store.JobStateQueued),
PreviousState: "",
Timestamp: time.Now().UTC(),
Model: req.Model,
Attempt: 0,
})
}
// Wake the worker.
if s.workerRef != nil {
s.workerRef.Wake()
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(jobSubmitResponse{JobID: jobID})
}
// handleGetJob handles GET /jobs/{id} -- returns current job state and result.
//
// Why: callers need to poll job status for recovery after missed webhooks (ADR-0004).
// What: looks up the job by ID, includes artifact metadata, returns JSON.
// Test: create and complete a job, GET /jobs/{id}, verify all fields.
func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
http.Error(w, `{"error":"job id is required"}`, http.StatusBadRequest)
return
}
job, err := s.store.GetJob(id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
http.Error(w, `{"error":"job not found"}`, http.StatusNotFound)
return
}
s.logger.Error("failed to get job", "error", err, "job_id", id)
http.Error(w, `{"error":"internal error"}`, http.StatusInternalServerError)
return
}
// Build artifact references.
var artRefs []artifactReference
artifacts, err := s.store.GetArtifactsByJob(id)
if err != nil {
s.logger.Error("failed to get artifacts", "error", err, "job_id", id)
} else {
for _, a := range artifacts {
ref := artifactReference{
Name: a.Name,
ContentType: a.ContentType,
Size: a.Size,
}
if a.Size <= 256*1024 {
ref.Data = string(a.Data)
} else {
ref.URL = fmt.Sprintf("/jobs/%s/artifacts/%s", id, a.Name)
}
artRefs = append(artRefs, ref)
}
}
if artRefs == nil {
artRefs = []artifactReference{}
}
resp := jobStatusResponse{
JobID: job.ID,
State: string(job.State),
Model: job.Model,
CreatedAt: job.CreatedAt,
StartedAt: job.StartedAt,
CompletedAt: job.CompletedAt,
Attempt: job.Attempt,
Result: job.Result,
Error: job.Error,
Artifacts: artRefs,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// handleGetArtifact handles GET /jobs/{id}/artifacts/{name} -- serves raw artifact data.
//
// Why: large artifacts are not inlined in webhooks; callers fetch them here (ADR-0006).
// What: looks up the artifact by job_id and name, serves the raw data with its content type.
// Test: store an artifact, GET it, verify content type and data match.
func (s *Server) handleGetArtifact(w http.ResponseWriter, r *http.Request) {
jobID := r.PathValue("id")
name := r.PathValue("name")
if jobID == "" || name == "" {
http.Error(w, `{"error":"job id and artifact name are required"}`, http.StatusBadRequest)
return
}
artifact, err := s.store.GetArtifact(jobID, name)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
http.Error(w, `{"error":"artifact not found"}`, http.StatusNotFound)
return
}
s.logger.Error("failed to get artifact", "error", err, "job_id", jobID, "name", name)
http.Error(w, `{"error":"internal error"}`, http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", artifact.ContentType)
w.WriteHeader(http.StatusOK)
w.Write(artifact.Data)
}