4759a06d1b
Adds client/ -- a public Go package providing a synchronous facade over
foreman's async POST /jobs API (Level 1 integration per ADR-0011).
Two delivery modes:
- Webhook receiver (preferred): ephemeral HTTP server on random port,
pushes results immediately, verifies HMAC when configured
- Polling fallback: polls GET /jobs/{id} at configurable interval
Also includes Tags() and Embed() helpers, bearer auth support, and
comprehensive integration tests against the real foreman HTTP handlers.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
235 lines
6.0 KiB
Go
235 lines
6.0 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook"
|
|
)
|
|
|
|
// webhookEvent mirrors the webhook.Event structure for deserialization of
|
|
// inbound webhook payloads from the foreman dispatcher.
|
|
type webhookEvent struct {
|
|
JobID string `json:"job_id"`
|
|
State string `json:"state"`
|
|
PreviousState string `json:"previous_state"`
|
|
Model string `json:"model"`
|
|
Attempt int `json:"attempt"`
|
|
Result json.RawMessage `json:"result"`
|
|
Artifacts json.RawMessage `json:"artifacts"`
|
|
Error *string `json:"error"`
|
|
}
|
|
|
|
// webhookReceiver is an ephemeral HTTP server that receives webhook events
|
|
// from foreman for a single job submission.
|
|
type webhookReceiver struct {
|
|
listener net.Listener
|
|
server *http.Server
|
|
secret string
|
|
|
|
mu sync.Mutex
|
|
result chan webhookEvent
|
|
jobID string
|
|
}
|
|
|
|
// newWebhookReceiver creates and starts an ephemeral HTTP server on a random
|
|
// port to receive foreman webhook events.
|
|
//
|
|
// Why: the preferred delivery mode is push-based via webhooks, which avoids
|
|
// polling overhead and delivers results faster.
|
|
// What: binds a random port, starts an HTTP server handling POST requests,
|
|
// and returns the receiver for the caller to wait on.
|
|
// Test: create a receiver, POST a done event to its URL, verify it arrives on
|
|
// the result channel.
|
|
func newWebhookReceiver(secret string) (*webhookReceiver, error) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
return nil, &bindError{err: err}
|
|
}
|
|
|
|
recv := &webhookReceiver{
|
|
listener: listener,
|
|
secret: secret,
|
|
result: make(chan webhookEvent, 1),
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/", recv.handleWebhook)
|
|
|
|
recv.server = &http.Server{Handler: mux}
|
|
|
|
go recv.server.Serve(listener)
|
|
|
|
return recv, nil
|
|
}
|
|
|
|
// addr returns the listener address for constructing the webhook URL.
|
|
func (r *webhookReceiver) addr() string {
|
|
return r.listener.Addr().String()
|
|
}
|
|
|
|
// webhookURL returns the full URL that foreman should POST events to.
|
|
func (r *webhookReceiver) webhookURL(jobID string) string {
|
|
r.mu.Lock()
|
|
r.jobID = jobID
|
|
r.mu.Unlock()
|
|
return fmt.Sprintf("http://%s/webhook/%s", r.addr(), jobID)
|
|
}
|
|
|
|
// shutdown gracefully shuts down the receiver.
|
|
func (r *webhookReceiver) shutdown() {
|
|
r.server.Shutdown(context.Background())
|
|
}
|
|
|
|
// handleWebhook processes inbound webhook POST requests from foreman.
|
|
func (r *webhookReceiver) handleWebhook(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
body, err := io.ReadAll(req.Body)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Verify HMAC signature if a secret is configured.
|
|
if r.secret != "" {
|
|
sig := req.Header.Get("X-Foreman-Signature")
|
|
if !webhook.VerifySignature(body, sig, r.secret) {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
}
|
|
|
|
var event webhookEvent
|
|
if err := json.Unmarshal(body, &event); err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Only accept events for our job ID.
|
|
r.mu.Lock()
|
|
expectedJobID := r.jobID
|
|
r.mu.Unlock()
|
|
|
|
if expectedJobID != "" && event.JobID != expectedJobID {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// Forward terminal events to the result channel.
|
|
if event.State == "done" || event.State == "failed" {
|
|
select {
|
|
case r.result <- event:
|
|
default:
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
// submitWithWebhook submits a job with a local webhook receiver and waits for
|
|
// the terminal event.
|
|
func (c *Client) submitWithWebhook(ctx context.Context, req SubmitRequest) (*Result, error) {
|
|
recv, err := newWebhookReceiver(c.webhookSecret)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer recv.shutdown()
|
|
|
|
// Build the wire request with a placeholder webhook URL. We need the job ID
|
|
// first to build the correct path, but we need to set the URL before submitting.
|
|
// Use a path-less URL initially — foreman sends to whatever URL we provide.
|
|
webhookURL := fmt.Sprintf("http://%s/webhook/pending", recv.addr())
|
|
|
|
wireReq := jobSubmitRequest{
|
|
Model: req.Model,
|
|
Messages: req.Messages,
|
|
Stream: req.Stream,
|
|
Tools: req.Tools,
|
|
Options: req.Options,
|
|
Think: req.Think,
|
|
StateWebhookURL: webhookURL,
|
|
}
|
|
|
|
jobID, err := c.postJob(ctx, wireReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("submit job: %w", err)
|
|
}
|
|
|
|
// Now that we have the job ID, tell the receiver to filter for it.
|
|
recv.mu.Lock()
|
|
recv.jobID = jobID
|
|
recv.mu.Unlock()
|
|
|
|
// Wait for the terminal webhook event or context cancellation.
|
|
select {
|
|
case event := <-recv.result:
|
|
return webhookEventToResult(event), nil
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("context cancelled while waiting for job %s: %w", jobID, ctx.Err())
|
|
}
|
|
}
|
|
|
|
// webhookEventToResult converts a webhook event to a Result.
|
|
func webhookEventToResult(e webhookEvent) *Result {
|
|
r := &Result{
|
|
JobID: e.JobID,
|
|
State: e.State,
|
|
Model: e.Model,
|
|
Attempt: e.Attempt,
|
|
Result: e.Result,
|
|
}
|
|
if e.Error != nil {
|
|
r.Error = *e.Error
|
|
}
|
|
|
|
// Parse artifacts from the webhook event if present.
|
|
if len(e.Artifacts) > 0 {
|
|
var artifacts []Artifact
|
|
if err := json.Unmarshal(e.Artifacts, &artifacts); err == nil {
|
|
r.Artifacts = artifacts
|
|
}
|
|
}
|
|
if r.Artifacts == nil {
|
|
r.Artifacts = []Artifact{}
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
// bindError is returned when the webhook receiver cannot bind a local port.
|
|
type bindError struct {
|
|
err error
|
|
}
|
|
|
|
func (e *bindError) Error() string {
|
|
return fmt.Sprintf("webhook receiver bind failed: %v", e.err)
|
|
}
|
|
|
|
func (e *bindError) Unwrap() error {
|
|
return e.err
|
|
}
|
|
|
|
// isBindError checks if an error is a webhook receiver bind failure, indicating
|
|
// the client should fall back to polling.
|
|
func isBindError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
_, ok := err.(*bindError)
|
|
if ok {
|
|
return true
|
|
}
|
|
// Also check the error message for wrapped bind errors.
|
|
return strings.Contains(err.Error(), "webhook receiver bind failed")
|
|
}
|