feat: add async /jobs surface, state webhooks, and artifact handling
Add the async job submission API, webhook state notifications, and
artifact serving endpoints on top of the Phase 3 queue infrastructure.
Key changes:
- POST /jobs: async job submission with 202 + job_id ULID; optional
state_webhook_url for push notifications on state transitions
- GET /jobs/{id}: job status polling with result, error, and artifact
metadata; artifacts <= 256KB inlined, larger ones by URL reference
- GET /jobs/{id}/artifacts/{name}: raw artifact data serving
- Webhook dispatcher: at-least-once delivery with exponential backoff
(5 retries); optional HMAC-SHA256 signing (X-Foreman-Signature)
- ADR-0014: state_webhook_url only honored on POST /jobs, not sync
/api/chat (caller already blocks for result)
- Comprehensive tests for /jobs lifecycle, webhook delivery, HMAC
verification, artifact inline/URL threshold, and TTL pruning
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,286 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDispatcher_Fire_Delivers(t *testing.T) {
|
||||
ch := make(chan []byte, 1)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
ch <- body
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
d := NewDispatcher("", logger)
|
||||
|
||||
event := Event{
|
||||
JobID: "01TEST001",
|
||||
State: "done",
|
||||
PreviousState: "working",
|
||||
Timestamp: time.Now().UTC(),
|
||||
Model: "qwen3:30b",
|
||||
Attempt: 1,
|
||||
}
|
||||
|
||||
d.Fire(srv.URL, event)
|
||||
|
||||
var receivedBody []byte
|
||||
select {
|
||||
case receivedBody = <-ch:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timed out waiting for webhook delivery")
|
||||
}
|
||||
|
||||
// Verify the body is valid JSON with the right fields.
|
||||
var got Event
|
||||
if err := json.Unmarshal(receivedBody, &got); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if got.JobID != "01TEST001" {
|
||||
t.Errorf("job_id = %q, want %q", got.JobID, "01TEST001")
|
||||
}
|
||||
if got.State != "done" {
|
||||
t.Errorf("state = %q, want %q", got.State, "done")
|
||||
}
|
||||
if got.PreviousState != "working" {
|
||||
t.Errorf("previous_state = %q, want %q", got.PreviousState, "working")
|
||||
}
|
||||
if got.Model != "qwen3:30b" {
|
||||
t.Errorf("model = %q, want %q", got.Model, "qwen3:30b")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Fire_RetriesOn500(t *testing.T) {
|
||||
var attempts atomic.Int32
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
n := attempts.Add(1)
|
||||
if n <= 2 {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
d := NewDispatcher("", logger)
|
||||
d.baseDelay = 10 * time.Millisecond // Fast retries for testing.
|
||||
|
||||
event := Event{
|
||||
JobID: "01RETRY001",
|
||||
State: "done",
|
||||
Model: "qwen3:30b",
|
||||
}
|
||||
|
||||
d.Fire(srv.URL, event)
|
||||
|
||||
// Wait for retries to complete.
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if attempts.Load() >= 3 {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if got := attempts.Load(); got < 3 {
|
||||
t.Errorf("attempts = %d, want >= 3 (2 failures + 1 success)", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Fire_DoesNotBlockCaller(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
d := NewDispatcher("", logger)
|
||||
|
||||
start := time.Now()
|
||||
d.Fire(srv.URL, Event{JobID: "test", State: "done"})
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Fire should return immediately (< 10ms), not wait for the HTTP call.
|
||||
if elapsed > 50*time.Millisecond {
|
||||
t.Errorf("Fire blocked for %v, should return immediately", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_HMAC_Signing(t *testing.T) {
|
||||
type capture struct {
|
||||
signature string
|
||||
body []byte
|
||||
}
|
||||
ch := make(chan capture, 1)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
ch <- capture{signature: r.Header.Get("X-Foreman-Signature"), body: body}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
secret := "test-secret-key"
|
||||
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
d := NewDispatcher(secret, logger)
|
||||
|
||||
event := Event{
|
||||
JobID: "01HMAC001",
|
||||
State: "done",
|
||||
Model: "qwen3:30b",
|
||||
}
|
||||
|
||||
d.Fire(srv.URL, event)
|
||||
|
||||
var got capture
|
||||
select {
|
||||
case got = <-ch:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timed out waiting for webhook delivery")
|
||||
}
|
||||
|
||||
if got.signature == "" {
|
||||
t.Fatal("X-Foreman-Signature header should be set when secret is configured")
|
||||
}
|
||||
|
||||
if len(got.signature) < 8 || got.signature[:7] != "sha256=" {
|
||||
t.Fatalf("signature format wrong: %q", got.signature)
|
||||
}
|
||||
|
||||
// Verify the signature against the received body.
|
||||
if !VerifySignature(got.body, got.signature, secret) {
|
||||
t.Error("HMAC verification failed with correct secret")
|
||||
}
|
||||
|
||||
// Verify wrong secret fails.
|
||||
if VerifySignature(got.body, got.signature, "wrong-secret") {
|
||||
t.Error("HMAC verification should fail with wrong secret")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_NoHMAC_WhenNoSecret(t *testing.T) {
|
||||
ch := make(chan string, 1)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ch <- r.Header.Get("X-Foreman-Signature")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
d := NewDispatcher("", logger) // No secret.
|
||||
|
||||
d.Fire(srv.URL, Event{JobID: "test", State: "done"})
|
||||
|
||||
var gotSignature string
|
||||
select {
|
||||
case gotSignature = <-ch:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timed out waiting for webhook delivery")
|
||||
}
|
||||
|
||||
if gotSignature != "" {
|
||||
t.Errorf("X-Foreman-Signature should not be set when no secret is configured, got %q", gotSignature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVerifySignature_InvalidFormat(t *testing.T) {
|
||||
if VerifySignature([]byte("test"), "invalid", "secret") {
|
||||
t.Error("should reject signatures without sha256= prefix")
|
||||
}
|
||||
if VerifySignature([]byte("test"), "sha256", "secret") {
|
||||
t.Error("should reject too-short signatures")
|
||||
}
|
||||
if VerifySignature([]byte("test"), "", "secret") {
|
||||
t.Error("should reject empty signature")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatArtifacts_SmallInline(t *testing.T) {
|
||||
metas := []ArtifactMeta{
|
||||
{
|
||||
Name: "completion",
|
||||
ContentType: "application/json",
|
||||
Size: 100,
|
||||
Data: []byte(`{"done":true}`),
|
||||
},
|
||||
}
|
||||
|
||||
result := FormatArtifacts("01TEST", metas)
|
||||
if result == nil {
|
||||
t.Fatal("result should not be nil")
|
||||
}
|
||||
|
||||
var parsed []struct {
|
||||
Name string `json:"name"`
|
||||
Data string `json:"data"`
|
||||
URL string `json:"url"`
|
||||
}
|
||||
if err := json.Unmarshal(result, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
|
||||
if len(parsed) != 1 {
|
||||
t.Fatalf("len = %d, want 1", len(parsed))
|
||||
}
|
||||
if parsed[0].Data == "" {
|
||||
t.Error("small artifact should be inlined")
|
||||
}
|
||||
if parsed[0].URL != "" {
|
||||
t.Error("small artifact should not have a URL")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatArtifacts_LargeByURL(t *testing.T) {
|
||||
largeData := make([]byte, 300*1024) // 300KB > 256KB threshold.
|
||||
metas := []ArtifactMeta{
|
||||
{
|
||||
Name: "completion",
|
||||
ContentType: "application/json",
|
||||
Size: int64(len(largeData)),
|
||||
Data: largeData,
|
||||
},
|
||||
}
|
||||
|
||||
result := FormatArtifacts("01LARGE", metas)
|
||||
|
||||
var parsed []struct {
|
||||
Name string `json:"name"`
|
||||
Data string `json:"data"`
|
||||
URL string `json:"url"`
|
||||
}
|
||||
if err := json.Unmarshal(result, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
|
||||
if parsed[0].Data != "" {
|
||||
t.Error("large artifact should not be inlined")
|
||||
}
|
||||
if parsed[0].URL == "" {
|
||||
t.Error("large artifact should have a URL")
|
||||
}
|
||||
if parsed[0].URL != "/jobs/01LARGE/artifacts/completion" {
|
||||
t.Errorf("URL = %q, want %q", parsed[0].URL, "/jobs/01LARGE/artifacts/completion")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatArtifacts_Empty(t *testing.T) {
|
||||
result := FormatArtifacts("01EMPTY", nil)
|
||||
if result != nil {
|
||||
t.Errorf("empty artifacts should return nil, got %s", result)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user