From 4759a06d1bfc64c7d0bbee5bb23a9af1fe16b776 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Sat, 23 May 2026 18:38:16 -0400 Subject: [PATCH] feat: add Go client package with sync facade over async /jobs Adds client/ -- a public Go package providing a synchronous facade over foreman's async POST /jobs API (Level 1 integration per ADR-0011). Two delivery modes: - Webhook receiver (preferred): ephemeral HTTP server on random port, pushes results immediately, verifies HMAC when configured - Polling fallback: polls GET /jobs/{id} at configurable interval Also includes Tags() and Embed() helpers, bearer auth support, and comprehensive integration tests against the real foreman HTTP handlers. Co-Authored-By: Claude Opus 4.7 (1M context) --- client/client.go | 432 ++++++++++++++++++++++++++++++++++++++ client/client_test.go | 469 ++++++++++++++++++++++++++++++++++++++++++ client/webhook.go | 234 +++++++++++++++++++++ progress.md | 38 ++++ 4 files changed, 1173 insertions(+) create mode 100644 client/client.go create mode 100644 client/client_test.go create mode 100644 client/webhook.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..15ca698 --- /dev/null +++ b/client/client.go @@ -0,0 +1,432 @@ +// Package client provides a synchronous facade over foreman's async /jobs API. +// +// Why: orchestration callers need a simple blocking call to submit chat work +// to a foreman daemon without managing webhooks or polling themselves. This is +// the Level 1 integration described in ADR-0011. +// What: submits a job via POST /jobs, waits for completion using either a local +// webhook receiver (preferred) or polling fallback, and returns the result. +// Test: spin up a real foreman server via internal packages, submit jobs through +// the client, verify results for happy path, failure, timeout, and auth. +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +// Client communicates with a foreman daemon's async /jobs API. +type Client struct { + baseURL string + token string + webhookSecret string + httpClient *http.Client + pollInterval time.Duration + forcePolling bool +} + +// Option configures a Client. +type Option func(*Client) + +// New creates a new foreman client targeting the given base URL. +// +// Why: callers need a single entry point with sensible defaults and optional overrides. +// What: returns a Client configured with the base URL and any provided options. +// Test: create with New, verify baseURL is trimmed and defaults are applied. +func New(baseURL string, opts ...Option) *Client { + c := &Client{ + baseURL: baseURL, + httpClient: &http.Client{Timeout: 0}, // no global timeout; per-request via context + pollInterval: 2 * time.Second, + } + for _, opt := range opts { + opt(c) + } + return c +} + +// WithToken sets the bearer token for authenticating with foreman. +// +// Why: foreman supports optional static bearer auth (ADR-0010). +// What: configures the Authorization header on all outbound requests. +// Test: create with WithToken, submit a job, verify the header is present. +func WithToken(token string) Option { + return func(c *Client) { + c.token = token + } +} + +// WithWebhookSecret sets the HMAC secret for verifying inbound webhook payloads. +// +// Why: callers receiving webhooks need to verify authenticity (ADR-0005). +// What: stores the secret for HMAC-SHA256 verification on received events. +// Test: create with a secret, receive a webhook, verify signature check passes. +func WithWebhookSecret(secret string) Option { + return func(c *Client) { + c.webhookSecret = secret + } +} + +// WithHTTPClient sets a custom HTTP client for outbound requests. +// +// Why: callers may need custom TLS, timeouts, or transport settings. +// What: replaces the default http.Client. +// Test: create with a custom client, verify it is used for requests. +func WithHTTPClient(hc *http.Client) Option { + return func(c *Client) { + c.httpClient = hc + } +} + +// WithPollInterval sets the interval for polling GET /jobs/{id} in fallback mode. +// +// Why: the default 2s may be too frequent or too slow for some callers. +// What: overrides the default poll interval. +// Test: create with a custom interval, verify polling uses it. +func WithPollInterval(d time.Duration) Option { + return func(c *Client) { + c.pollInterval = d + } +} + +// WithPollingMode forces the client to use polling instead of attempting a +// webhook receiver. Useful when the caller knows it is behind NAT or a firewall. +// +// Why: webhook receivers require the foreman daemon to reach back to the caller, +// which is not always possible (NAT, firewalls, containers). +// What: disables the webhook receiver attempt and uses polling exclusively. +// Test: create with WithPollingMode, verify no listener is started. +func WithPollingMode() Option { + return func(c *Client) { + c.forcePolling = true + } +} + +// SubmitRequest is the payload for submitting a job to foreman. +type SubmitRequest struct { + Model string `json:"model"` + Messages []json.RawMessage `json:"messages"` + Stream *bool `json:"stream,omitempty"` + Tools json.RawMessage `json:"tools,omitempty"` + Options json.RawMessage `json:"options,omitempty"` + Think json.RawMessage `json:"think,omitempty"` +} + +// Result is the final outcome of a submitted job. +type Result struct { + JobID string `json:"job_id"` + State string `json:"state"` + Model string `json:"model"` + Attempt int `json:"attempt"` + Result json.RawMessage `json:"result,omitempty"` + Error string `json:"error,omitempty"` + Artifacts []Artifact `json:"artifacts,omitempty"` + CreatedAt time.Time `json:"created_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` +} + +// Artifact is metadata and optional inline data for a job artifact. +type Artifact struct { + Name string `json:"name"` + ContentType string `json:"content_type"` + Size int64 `json:"size"` + Data string `json:"data,omitempty"` + URL string `json:"url,omitempty"` +} + +// ModelInfo describes an installed model, as returned by GET /api/tags. +type ModelInfo struct { + Name string `json:"name"` + Model string `json:"model"` + ModifiedAt time.Time `json:"modified_at"` + Size int64 `json:"size"` + Digest string `json:"digest"` + Details json.RawMessage `json:"details,omitempty"` +} + +// EmbedRequest is the payload for the embedding endpoint. +type EmbedRequest struct { + Model string `json:"model"` + Input json.RawMessage `json:"input"` + KeepAlive json.RawMessage `json:"keep_alive,omitempty"` + Options json.RawMessage `json:"options,omitempty"` +} + +// EmbedResponse is the response from the embedding endpoint. +type EmbedResponse struct { + Model string `json:"model,omitempty"` + Embeddings [][]float64 `json:"embeddings,omitempty"` +} + +// jobSubmitRequest is the wire format for POST /jobs on the foreman server. +type jobSubmitRequest struct { + Model string `json:"model"` + Messages []json.RawMessage `json:"messages"` + Stream *bool `json:"stream,omitempty"` + Tools json.RawMessage `json:"tools,omitempty"` + Options json.RawMessage `json:"options,omitempty"` + Think json.RawMessage `json:"think,omitempty"` + StateWebhookURL string `json:"state_webhook_url,omitempty"` +} + +// jobSubmitResponse is the wire format for the POST /jobs response. +type jobSubmitResponse struct { + JobID string `json:"job_id"` +} + +// jobStatusResponse mirrors the GET /jobs/{id} response from the foreman server. +type jobStatusResponse struct { + JobID string `json:"job_id"` + State string `json:"state"` + Model string `json:"model"` + CreatedAt time.Time `json:"created_at"` + StartedAt *time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at"` + Attempt int `json:"attempt"` + Result json.RawMessage `json:"result"` + Error *string `json:"error"` + Artifacts []Artifact `json:"artifacts"` +} + +// tagsResponse mirrors the GET /api/tags response. +type tagsResponse struct { + Models []ModelInfo `json:"models"` +} + +// Submit sends a chat job to foreman and blocks until it completes or the context +// is cancelled. It attempts to use a local webhook receiver for push notification; +// if that fails (or forcePolling is set), it falls back to polling GET /jobs/{id}. +// +// Why: orchestration callers want a synchronous call signature over the async API +// without managing their own webhook infrastructure. +// What: POSTs to /jobs, waits for terminal state via webhook or polling, returns +// the final result or error. +// Test: submit a job to a test server, verify the returned Result contains the +// completion, artifacts, and correct state. +func (c *Client) Submit(ctx context.Context, req SubmitRequest) (*Result, error) { + if req.Model == "" { + return nil, fmt.Errorf("model is required") + } + + // Attempt webhook receiver mode first, fall back to polling. + if !c.forcePolling { + result, err := c.submitWithWebhook(ctx, req) + if err == nil { + return result, nil + } + // If the webhook listener failed to bind, fall back to polling. + // Any other error (context cancelled, server error) is returned directly. + if !isBindError(err) { + return nil, err + } + } + + return c.submitWithPolling(ctx, req) +} + +// submitWithPolling submits a job and polls GET /jobs/{id} until a terminal state. +func (c *Client) submitWithPolling(ctx context.Context, req SubmitRequest) (*Result, error) { + wireReq := jobSubmitRequest{ + Model: req.Model, + Messages: req.Messages, + Stream: req.Stream, + Tools: req.Tools, + Options: req.Options, + Think: req.Think, + } + + jobID, err := c.postJob(ctx, wireReq) + if err != nil { + return nil, fmt.Errorf("submit job: %w", err) + } + + return c.pollUntilDone(ctx, jobID) +} + +// postJob POSTs a job to the foreman server and returns the job ID. +func (c *Client) postJob(ctx context.Context, req jobSubmitRequest) (string, error) { + body, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("marshal job request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/jobs", bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("create POST /jobs request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + c.setAuth(httpReq) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return "", fmt.Errorf("POST /jobs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + var errResp struct { + Error string `json:"error"` + } + json.NewDecoder(resp.Body).Decode(&errResp) + return "", fmt.Errorf("POST /jobs returned %d: %s", resp.StatusCode, errResp.Error) + } + + var submitResp jobSubmitResponse + if err := json.NewDecoder(resp.Body).Decode(&submitResp); err != nil { + return "", fmt.Errorf("decode POST /jobs response: %w", err) + } + + return submitResp.JobID, nil +} + +// pollUntilDone polls GET /jobs/{id} at pollInterval until the job reaches a +// terminal state (done or failed) or the context is cancelled. +func (c *Client) pollUntilDone(ctx context.Context, jobID string) (*Result, error) { + ticker := time.NewTicker(c.pollInterval) + defer ticker.Stop() + + for { + status, err := c.getJobStatus(ctx, jobID) + if err != nil { + return nil, fmt.Errorf("poll job %s: %w", jobID, err) + } + + if status.State == "done" || status.State == "failed" { + return statusToResult(status), nil + } + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while polling job %s: %w", jobID, ctx.Err()) + case <-ticker.C: + } + } +} + +// getJobStatus fetches the current state of a job via GET /jobs/{id}. +func (c *Client) getJobStatus(ctx context.Context, jobID string) (*jobStatusResponse, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/jobs/"+jobID, nil) + if err != nil { + return nil, fmt.Errorf("create GET /jobs/%s request: %w", jobID, err) + } + c.setAuth(httpReq) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("GET /jobs/%s: %w", jobID, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + var errResp struct { + Error string `json:"error"` + } + json.NewDecoder(resp.Body).Decode(&errResp) + return nil, fmt.Errorf("GET /jobs/%s returned %d: %s", jobID, resp.StatusCode, errResp.Error) + } + + var status jobStatusResponse + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, fmt.Errorf("decode GET /jobs/%s response: %w", jobID, err) + } + + return &status, nil +} + +// Tags returns the list of models installed on the foreman target. +// +// Why: callers need to discover available models before submitting work. +// What: GETs /api/tags and returns parsed model info. +// Test: call Tags on a test server with known models, verify the list matches. +func (c *Client) Tags(ctx context.Context) ([]ModelInfo, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil) + if err != nil { + return nil, fmt.Errorf("create GET /api/tags request: %w", err) + } + c.setAuth(httpReq) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("GET /api/tags: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET /api/tags returned %d", resp.StatusCode) + } + + var tags tagsResponse + if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil { + return nil, fmt.Errorf("decode /api/tags response: %w", err) + } + + return tags.Models, nil +} + +// Embed sends an embedding request to the foreman target. Embeddings bypass the +// queue and are proxied directly to the always-resident embedder (ADR-0013). +// +// Why: callers need embeddings without waiting behind chat jobs in the queue. +// What: POSTs to /api/embed and returns the parsed response. +// Test: call Embed on a test server with a stub embedder, verify embeddings returned. +func (c *Client) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal embed request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("create POST /api/embed request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + c.setAuth(httpReq) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("POST /api/embed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("POST /api/embed returned %d", resp.StatusCode) + } + + var embedResp EmbedResponse + if err := json.NewDecoder(resp.Body).Decode(&embedResp); err != nil { + return nil, fmt.Errorf("decode /api/embed response: %w", err) + } + + return &embedResp, nil +} + +// setAuth adds the bearer token to the request if configured. +func (c *Client) setAuth(req *http.Request) { + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } +} + +// statusToResult converts a jobStatusResponse to a Result. +func statusToResult(s *jobStatusResponse) *Result { + r := &Result{ + JobID: s.JobID, + State: s.State, + Model: s.Model, + Attempt: s.Attempt, + Result: s.Result, + Artifacts: s.Artifacts, + CreatedAt: s.CreatedAt, + CompletedAt: s.CompletedAt, + } + if s.Error != nil { + r.Error = *s.Error + } + if r.Artifacts == nil { + r.Artifacts = []Artifact{} + } + return r +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..e2db6f2 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,469 @@ +package client_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/foreman/client" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/config" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/ollama" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/server" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" + "gitea.stevedudenhoeffer.com/steve/foreman/internal/worker" +) + +// newTestForeman creates a fully wired foreman server (store, worker, dispatcher) +// backed by a temp SQLite database and stub Ollama client. Returns the httptest +// server URL and a cleanup function. +// +// Why: the client tests exercise the real HTTP API to catch integration bugs. +// What: wires store, inventory, notifier, dispatcher, worker, and server into an +// httptest.Server. +// Test: used as a helper in all client_test.go test functions. +func newTestForeman(t *testing.T, ollamaClient ollama.Client, webhookSecret string) string { + t.Helper() + + dbPath := filepath.Join(t.TempDir(), "test.db") + st, err := store.Open(dbPath) + if err != nil { + t.Fatalf("store.Open: %v", err) + } + t.Cleanup(func() { st.Close() }) + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + inv := ollama.NewModelInventory(ollamaClient, logger) + if err := inv.Refresh(context.Background()); err != nil { + t.Fatalf("inv.Refresh: %v", err) + } + + notifier := worker.NewNotifier() + dispatcher := webhook.NewDispatcher(webhookSecret, logger) + w := worker.New(st, ollamaClient, inv, notifier, dispatcher, logger) + + cfg := config.Config{ + OllamaURL: "http://localhost:11434", + MaxAttempts: 3, + JobTTL: 24 * time.Hour, + WebhookSecret: webhookSecret, + } + + srv := server.New(cfg, st, ollamaClient, inv, notifier, w, dispatcher, logger) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go w.Run(ctx) + + ts := httptest.NewServer(srv.Handler()) + t.Cleanup(ts.Close) + + return ts.URL +} + +func TestSubmit_HappyPath_Polling(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{ + Model: req.Model, + Done: true, + Message: &ollama.Message{Role: "assistant", Content: "Hello from foreman!"}, + }, nil, nil + }, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL, client.WithPollingMode(), client.WithPollInterval(100*time.Millisecond)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + result, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err != nil { + t.Fatalf("Submit: %v", err) + } + + if result.State != "done" { + t.Errorf("state = %q, want %q", result.State, "done") + } + if result.JobID == "" { + t.Error("job_id should not be empty") + } + if result.Model != "qwen3:30b" { + t.Errorf("model = %q, want %q", result.Model, "qwen3:30b") + } + if len(result.Result) == 0 { + t.Error("result should not be empty on done") + } + if len(result.Artifacts) == 0 { + t.Error("artifacts should include the completion") + } + + // Verify the result contains the expected chat response. + var chatResp ollama.ChatResponse + if err := json.Unmarshal(result.Result, &chatResp); err != nil { + t.Fatalf("unmarshal result: %v", err) + } + if chatResp.Message == nil || chatResp.Message.Content != "Hello from foreman!" { + t.Errorf("chat content = %v, want 'Hello from foreman!'", chatResp.Message) + } +} + +func TestSubmit_HappyPath_Webhook(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{ + Model: req.Model, + Done: true, + Message: &ollama.Message{Role: "assistant", Content: "webhook response"}, + }, nil, nil + }, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL) // Webhook mode (default) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + result, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err != nil { + t.Fatalf("Submit: %v", err) + } + + if result.State != "done" { + t.Errorf("state = %q, want %q", result.State, "done") + } + if result.JobID == "" { + t.Error("job_id should not be empty") + } + + // Verify the result. + var chatResp ollama.ChatResponse + if err := json.Unmarshal(result.Result, &chatResp); err != nil { + t.Fatalf("unmarshal result: %v", err) + } + if chatResp.Message == nil || chatResp.Message.Content != "webhook response" { + t.Errorf("chat content = %v, want 'webhook response'", chatResp.Message) + } +} + +func TestSubmit_FailedJob(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return nil, nil, &ollama.HTTPError{StatusCode: 500, Body: "internal error"} + }, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL, client.WithPollingMode(), client.WithPollInterval(100*time.Millisecond)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + result, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err != nil { + t.Fatalf("Submit: %v", err) + } + + if result.State != "failed" { + t.Errorf("state = %q, want %q", result.State, "failed") + } + if result.Error == "" { + t.Error("error should not be empty on failed job") + } +} + +func TestSubmit_ContextTimeout(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + // Block until context cancelled. + <-ctx.Done() + return nil, nil, ctx.Err() + }, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL, client.WithPollingMode(), client.WithPollInterval(50*time.Millisecond)) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err == nil { + t.Fatal("expected error on context timeout") + } +} + +func TestSubmit_AuthToken(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{ + Model: req.Model, + Done: true, + Message: &ollama.Message{Role: "assistant", Content: "ok"}, + }, nil, nil + }, + } + + // Create a foreman with auth required. + dbPath := filepath.Join(t.TempDir(), "test.db") + st, err := store.Open(dbPath) + if err != nil { + t.Fatalf("store.Open: %v", err) + } + t.Cleanup(func() { st.Close() }) + + logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) + inv := ollama.NewModelInventory(ollamaStub, logger) + inv.Refresh(context.Background()) + + notifier := worker.NewNotifier() + dispatcher := webhook.NewDispatcher("", logger) + w := worker.New(st, ollamaStub, inv, notifier, dispatcher, logger) + + cfg := config.Config{ + OllamaURL: "http://localhost:11434", + Token: "my-secret-token", + MaxAttempts: 3, + JobTTL: 24 * time.Hour, + } + + srv := server.New(cfg, st, ollamaStub, inv, notifier, w, dispatcher, logger) + + wCtx, wCancel := context.WithCancel(context.Background()) + t.Cleanup(wCancel) + go w.Run(wCtx) + + ts := httptest.NewServer(srv.Handler()) + t.Cleanup(ts.Close) + + // Without token: should fail. + c := client.New(ts.URL, client.WithPollingMode(), client.WithPollInterval(100*time.Millisecond)) + ctx := context.Background() + _, err = c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err == nil { + t.Fatal("expected error when no token provided") + } + + // With correct token: should succeed. + c = client.New(ts.URL, client.WithToken("my-secret-token"), client.WithPollingMode(), client.WithPollInterval(100*time.Millisecond)) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + result, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err != nil { + t.Fatalf("Submit with correct token: %v", err) + } + if result.State != "done" { + t.Errorf("state = %q, want %q", result.State, "done") + } +} + +func TestTags(t *testing.T) { + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{ + {Name: "qwen3:30b", Size: 19000000000}, + {Name: "nomic-embed-text", Size: 300000000}, + }, + }, + ps: &ollama.PsResponse{}, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + models, err := c.Tags(ctx) + if err != nil { + t.Fatalf("Tags: %v", err) + } + + if len(models) != 2 { + t.Fatalf("got %d models, want 2", len(models)) + } + if models[0].Name != "qwen3:30b" { + t.Errorf("first model = %q, want %q", models[0].Name, "qwen3:30b") + } +} + +func TestEmbed(t *testing.T) { + embedResp := ollama.EmbedResponse{ + Model: "nomic-embed-text", + Embeddings: [][]float64{{0.1, 0.2, 0.3}}, + } + respBytes, _ := json.Marshal(embedResp) + + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{}, + ps: &ollama.PsResponse{}, + rawEmbedFunc: func(ctx context.Context, body []byte) (*http.Response, error) { + return &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": {"application/json"}}, + Body: io.NopCloser(bytes.NewReader(respBytes)), + }, nil + }, + } + + baseURL := newTestForeman(t, ollamaStub, "") + c := client.New(baseURL) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := c.Embed(ctx, client.EmbedRequest{ + Model: "nomic-embed-text", + Input: json.RawMessage(`"test input"`), + }) + if err != nil { + t.Fatalf("Embed: %v", err) + } + + if resp.Model != "nomic-embed-text" { + t.Errorf("model = %q, want %q", resp.Model, "nomic-embed-text") + } + if len(resp.Embeddings) != 1 || len(resp.Embeddings[0]) != 3 { + t.Errorf("embeddings shape wrong: %v", resp.Embeddings) + } +} + +func TestSubmit_MissingModel(t *testing.T) { + c := client.New("http://localhost:9999") + _, err := c.Submit(context.Background(), client.SubmitRequest{}) + if err == nil { + t.Fatal("expected error for missing model") + } +} + +func TestSubmit_WebhookWithHMAC(t *testing.T) { + secret := "test-hmac-secret" + + ollamaStub := &stubOllamaClient{ + tags: &ollama.TagsResponse{ + Models: []ollama.ModelInfo{{Name: "qwen3:30b"}}, + }, + ps: &ollama.PsResponse{}, + chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + return &ollama.ChatResponse{ + Model: req.Model, + Done: true, + Message: &ollama.Message{Role: "assistant", Content: "hmac verified"}, + }, nil, nil + }, + } + + baseURL := newTestForeman(t, ollamaStub, secret) + c := client.New(baseURL, client.WithWebhookSecret(secret)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + result, err := c.Submit(ctx, client.SubmitRequest{ + Model: "qwen3:30b", + Messages: []json.RawMessage{json.RawMessage(`{"role":"user","content":"hi"}`)}, + }) + if err != nil { + t.Fatalf("Submit with HMAC: %v", err) + } + + if result.State != "done" { + t.Errorf("state = %q, want %q", result.State, "done") + } +} + +// --- Stub Ollama client for integration tests --- + +type stubOllamaClient struct { + tags *ollama.TagsResponse + tagsErr error + ps *ollama.PsResponse + psErr error + + chatFunc func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) + rawEmbedFunc func(ctx context.Context, body []byte) (*http.Response, error) +} + +func (s *stubOllamaClient) Chat(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { + if s.chatFunc != nil { + return s.chatFunc(ctx, req, stream) + } + return nil, nil, fmt.Errorf("stubOllamaClient.Chat not implemented") +} + +func (s *stubOllamaClient) Embed(ctx context.Context, req ollama.EmbedRequest) (*ollama.EmbedResponse, error) { + return nil, fmt.Errorf("stubOllamaClient.Embed not implemented") +} + +func (s *stubOllamaClient) Tags(ctx context.Context) (*ollama.TagsResponse, error) { + if s.tagsErr != nil { + return nil, s.tagsErr + } + return s.tags, nil +} + +func (s *stubOllamaClient) Ps(ctx context.Context) (*ollama.PsResponse, error) { + if s.psErr != nil { + return nil, s.psErr + } + return s.ps, nil +} + +func (s *stubOllamaClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) { + return nil, fmt.Errorf("stubOllamaClient.RawChat not implemented") +} + +func (s *stubOllamaClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) { + if s.rawEmbedFunc != nil { + return s.rawEmbedFunc(ctx, body) + } + return nil, fmt.Errorf("stubOllamaClient.RawEmbed not implemented") +} diff --git a/client/webhook.go b/client/webhook.go new file mode 100644 index 0000000..cdc9e2b --- /dev/null +++ b/client/webhook.go @@ -0,0 +1,234 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync" + + "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" +) + +// webhookEvent mirrors the webhook.Event structure for deserialization of +// inbound webhook payloads from the foreman dispatcher. +type webhookEvent struct { + JobID string `json:"job_id"` + State string `json:"state"` + PreviousState string `json:"previous_state"` + Model string `json:"model"` + Attempt int `json:"attempt"` + Result json.RawMessage `json:"result"` + Artifacts json.RawMessage `json:"artifacts"` + Error *string `json:"error"` +} + +// webhookReceiver is an ephemeral HTTP server that receives webhook events +// from foreman for a single job submission. +type webhookReceiver struct { + listener net.Listener + server *http.Server + secret string + + mu sync.Mutex + result chan webhookEvent + jobID string +} + +// newWebhookReceiver creates and starts an ephemeral HTTP server on a random +// port to receive foreman webhook events. +// +// Why: the preferred delivery mode is push-based via webhooks, which avoids +// polling overhead and delivers results faster. +// What: binds a random port, starts an HTTP server handling POST requests, +// and returns the receiver for the caller to wait on. +// Test: create a receiver, POST a done event to its URL, verify it arrives on +// the result channel. +func newWebhookReceiver(secret string) (*webhookReceiver, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, &bindError{err: err} + } + + recv := &webhookReceiver{ + listener: listener, + secret: secret, + result: make(chan webhookEvent, 1), + } + + mux := http.NewServeMux() + mux.HandleFunc("/", recv.handleWebhook) + + recv.server = &http.Server{Handler: mux} + + go recv.server.Serve(listener) + + return recv, nil +} + +// addr returns the listener address for constructing the webhook URL. +func (r *webhookReceiver) addr() string { + return r.listener.Addr().String() +} + +// webhookURL returns the full URL that foreman should POST events to. +func (r *webhookReceiver) webhookURL(jobID string) string { + r.mu.Lock() + r.jobID = jobID + r.mu.Unlock() + return fmt.Sprintf("http://%s/webhook/%s", r.addr(), jobID) +} + +// shutdown gracefully shuts down the receiver. +func (r *webhookReceiver) shutdown() { + r.server.Shutdown(context.Background()) +} + +// handleWebhook processes inbound webhook POST requests from foreman. +func (r *webhookReceiver) handleWebhook(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Verify HMAC signature if a secret is configured. + if r.secret != "" { + sig := req.Header.Get("X-Foreman-Signature") + if !webhook.VerifySignature(body, sig, r.secret) { + w.WriteHeader(http.StatusUnauthorized) + return + } + } + + var event webhookEvent + if err := json.Unmarshal(body, &event); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Only accept events for our job ID. + r.mu.Lock() + expectedJobID := r.jobID + r.mu.Unlock() + + if expectedJobID != "" && event.JobID != expectedJobID { + w.WriteHeader(http.StatusOK) + return + } + + // Forward terminal events to the result channel. + if event.State == "done" || event.State == "failed" { + select { + case r.result <- event: + default: + } + } + + w.WriteHeader(http.StatusOK) +} + +// submitWithWebhook submits a job with a local webhook receiver and waits for +// the terminal event. +func (c *Client) submitWithWebhook(ctx context.Context, req SubmitRequest) (*Result, error) { + recv, err := newWebhookReceiver(c.webhookSecret) + if err != nil { + return nil, err + } + defer recv.shutdown() + + // Build the wire request with a placeholder webhook URL. We need the job ID + // first to build the correct path, but we need to set the URL before submitting. + // Use a path-less URL initially — foreman sends to whatever URL we provide. + webhookURL := fmt.Sprintf("http://%s/webhook/pending", recv.addr()) + + wireReq := jobSubmitRequest{ + Model: req.Model, + Messages: req.Messages, + Stream: req.Stream, + Tools: req.Tools, + Options: req.Options, + Think: req.Think, + StateWebhookURL: webhookURL, + } + + jobID, err := c.postJob(ctx, wireReq) + if err != nil { + return nil, fmt.Errorf("submit job: %w", err) + } + + // Now that we have the job ID, tell the receiver to filter for it. + recv.mu.Lock() + recv.jobID = jobID + recv.mu.Unlock() + + // Wait for the terminal webhook event or context cancellation. + select { + case event := <-recv.result: + return webhookEventToResult(event), nil + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for job %s: %w", jobID, ctx.Err()) + } +} + +// webhookEventToResult converts a webhook event to a Result. +func webhookEventToResult(e webhookEvent) *Result { + r := &Result{ + JobID: e.JobID, + State: e.State, + Model: e.Model, + Attempt: e.Attempt, + Result: e.Result, + } + if e.Error != nil { + r.Error = *e.Error + } + + // Parse artifacts from the webhook event if present. + if len(e.Artifacts) > 0 { + var artifacts []Artifact + if err := json.Unmarshal(e.Artifacts, &artifacts); err == nil { + r.Artifacts = artifacts + } + } + if r.Artifacts == nil { + r.Artifacts = []Artifact{} + } + + return r +} + +// bindError is returned when the webhook receiver cannot bind a local port. +type bindError struct { + err error +} + +func (e *bindError) Error() string { + return fmt.Sprintf("webhook receiver bind failed: %v", e.err) +} + +func (e *bindError) Unwrap() error { + return e.err +} + +// isBindError checks if an error is a webhook receiver bind failure, indicating +// the client should fall back to polling. +func isBindError(err error) bool { + if err == nil { + return false + } + _, ok := err.(*bindError) + if ok { + return true + } + // Also check the error message for wrapped bind errors. + return strings.Contains(err.Error(), "webhook receiver bind failed") +} diff --git a/progress.md b/progress.md index f4c1ecb..16f4686 100644 --- a/progress.md +++ b/progress.md @@ -165,3 +165,41 @@ with the real SQLite-backed job queue and single worker loop. no HMAC when no secret, signature format validation. - Artifacts: small inline, large by URL, empty returns nil. - TTL pruner: deletes old terminal jobs. + +## Phase 5: Go client package + go-llm Foreman() constructor — 2026-05-23 + +**Level 0 + Level 1 integration complete** (ADR-0011). + +- `client/` — public Go client package (sync facade over async `/jobs` API): + - `client.New(baseURL, opts...)`: configurable client with bearer auth, + webhook secret, custom HTTP client, poll interval. + - `client.Submit(ctx, SubmitRequest) (*Result, error)`: synchronous + submission — blocks until the job reaches a terminal state (`done`/`failed`). + - **Two delivery modes:** + - **Webhook receiver (preferred):** starts an ephemeral HTTP server on a + random port, sets `state_webhook_url`, waits for the `done`/`failed` + webhook event. Verifies HMAC signature when `WithWebhookSecret` is set. + Falls back to polling automatically if the listener fails to bind. + - **Polling fallback:** polls `GET /jobs/{id}` at `pollInterval` (default + 2s) until terminal state. Forced via `WithPollingMode()`. + - `client.Tags(ctx)`: fetches installed models via `GET /api/tags`. + - `client.Embed(ctx, EmbedRequest)`: sends embedding requests via + `POST /api/embed` (bypasses queue, ADR-0013). + - Both modes respect context cancellation/deadline and clean up resources. + +- Tests (all passing with `-race`): + - Happy path (polling): submit, poll, verify completed result + artifacts. + - Happy path (webhook): submit with webhook receiver, verify push delivery. + - Failed job: returns Result with state=failed and error message. + - Context timeout: returns error on deadline exceeded. + - Auth: bearer token sent when configured; 401 without it. + - HMAC webhook verification: signed webhooks verified correctly. + - Tags and Embed endpoints: round-trip through the client. + - Missing model validation: returns error before network call. + +- go-llm integration (Level 0): + - `llm.Foreman(baseURL, apiKey, opts...)` constructor added to + `v2/constructors.go` on branch `feat/foreman-constructor`. + - Delegates to existing `ollamaProvider.New()` — zero new code paths. + - DD#9 added to `v2/CLAUDE.md`. + - PR: https://gitea.stevedudenhoeffer.com/steve/go-llm/pulls/4