Files
steve 4759a06d1b feat: add Go client package with sync facade over async /jobs
Adds client/ -- a public Go package providing a synchronous facade over
foreman's async POST /jobs API (Level 1 integration per ADR-0011).

Two delivery modes:
- Webhook receiver (preferred): ephemeral HTTP server on random port,
  pushes results immediately, verifies HMAC when configured
- Polling fallback: polls GET /jobs/{id} at configurable interval

Also includes Tags() and Embed() helpers, bearer auth support, and
comprehensive integration tests against the real foreman HTTP handlers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:38:16 -04:00

235 lines
6.0 KiB
Go

package client
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook"
)
// webhookEvent mirrors the webhook.Event structure for deserialization of
// inbound webhook payloads from the foreman dispatcher.
type webhookEvent struct {
JobID string `json:"job_id"`
State string `json:"state"`
PreviousState string `json:"previous_state"`
Model string `json:"model"`
Attempt int `json:"attempt"`
Result json.RawMessage `json:"result"`
Artifacts json.RawMessage `json:"artifacts"`
Error *string `json:"error"`
}
// webhookReceiver is an ephemeral HTTP server that receives webhook events
// from foreman for a single job submission.
type webhookReceiver struct {
listener net.Listener
server *http.Server
secret string
mu sync.Mutex
result chan webhookEvent
jobID string
}
// newWebhookReceiver creates and starts an ephemeral HTTP server on a random
// port to receive foreman webhook events.
//
// Why: the preferred delivery mode is push-based via webhooks, which avoids
// polling overhead and delivers results faster.
// What: binds a random port, starts an HTTP server handling POST requests,
// and returns the receiver for the caller to wait on.
// Test: create a receiver, POST a done event to its URL, verify it arrives on
// the result channel.
func newWebhookReceiver(secret string) (*webhookReceiver, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, &bindError{err: err}
}
recv := &webhookReceiver{
listener: listener,
secret: secret,
result: make(chan webhookEvent, 1),
}
mux := http.NewServeMux()
mux.HandleFunc("/", recv.handleWebhook)
recv.server = &http.Server{Handler: mux}
go recv.server.Serve(listener)
return recv, nil
}
// addr returns the listener address for constructing the webhook URL.
func (r *webhookReceiver) addr() string {
return r.listener.Addr().String()
}
// webhookURL returns the full URL that foreman should POST events to.
func (r *webhookReceiver) webhookURL(jobID string) string {
r.mu.Lock()
r.jobID = jobID
r.mu.Unlock()
return fmt.Sprintf("http://%s/webhook/%s", r.addr(), jobID)
}
// shutdown gracefully shuts down the receiver.
func (r *webhookReceiver) shutdown() {
r.server.Shutdown(context.Background())
}
// handleWebhook processes inbound webhook POST requests from foreman.
func (r *webhookReceiver) handleWebhook(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
body, err := io.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
// Verify HMAC signature if a secret is configured.
if r.secret != "" {
sig := req.Header.Get("X-Foreman-Signature")
if !webhook.VerifySignature(body, sig, r.secret) {
w.WriteHeader(http.StatusUnauthorized)
return
}
}
var event webhookEvent
if err := json.Unmarshal(body, &event); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
// Only accept events for our job ID.
r.mu.Lock()
expectedJobID := r.jobID
r.mu.Unlock()
if expectedJobID != "" && event.JobID != expectedJobID {
w.WriteHeader(http.StatusOK)
return
}
// Forward terminal events to the result channel.
if event.State == "done" || event.State == "failed" {
select {
case r.result <- event:
default:
}
}
w.WriteHeader(http.StatusOK)
}
// submitWithWebhook submits a job with a local webhook receiver and waits for
// the terminal event.
func (c *Client) submitWithWebhook(ctx context.Context, req SubmitRequest) (*Result, error) {
recv, err := newWebhookReceiver(c.webhookSecret)
if err != nil {
return nil, err
}
defer recv.shutdown()
// Build the wire request with a placeholder webhook URL. We need the job ID
// first to build the correct path, but we need to set the URL before submitting.
// Use a path-less URL initially — foreman sends to whatever URL we provide.
webhookURL := fmt.Sprintf("http://%s/webhook/pending", recv.addr())
wireReq := jobSubmitRequest{
Model: req.Model,
Messages: req.Messages,
Stream: req.Stream,
Tools: req.Tools,
Options: req.Options,
Think: req.Think,
StateWebhookURL: webhookURL,
}
jobID, err := c.postJob(ctx, wireReq)
if err != nil {
return nil, fmt.Errorf("submit job: %w", err)
}
// Now that we have the job ID, tell the receiver to filter for it.
recv.mu.Lock()
recv.jobID = jobID
recv.mu.Unlock()
// Wait for the terminal webhook event or context cancellation.
select {
case event := <-recv.result:
return webhookEventToResult(event), nil
case <-ctx.Done():
return nil, fmt.Errorf("context cancelled while waiting for job %s: %w", jobID, ctx.Err())
}
}
// webhookEventToResult converts a webhook event to a Result.
func webhookEventToResult(e webhookEvent) *Result {
r := &Result{
JobID: e.JobID,
State: e.State,
Model: e.Model,
Attempt: e.Attempt,
Result: e.Result,
}
if e.Error != nil {
r.Error = *e.Error
}
// Parse artifacts from the webhook event if present.
if len(e.Artifacts) > 0 {
var artifacts []Artifact
if err := json.Unmarshal(e.Artifacts, &artifacts); err == nil {
r.Artifacts = artifacts
}
}
if r.Artifacts == nil {
r.Artifacts = []Artifact{}
}
return r
}
// bindError is returned when the webhook receiver cannot bind a local port.
type bindError struct {
err error
}
func (e *bindError) Error() string {
return fmt.Sprintf("webhook receiver bind failed: %v", e.err)
}
func (e *bindError) Unwrap() error {
return e.err
}
// isBindError checks if an error is a webhook receiver bind failure, indicating
// the client should fall back to polling.
func isBindError(err error) bool {
if err == nil {
return false
}
_, ok := err.(*bindError)
if ok {
return true
}
// Also check the error message for wrapped bind errors.
return strings.Contains(err.Error(), "webhook receiver bind failed")
}