package client import ( "context" "encoding/json" "fmt" "io" "net" "net/http" "strings" "sync" "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" ) // webhookEvent mirrors the webhook.Event structure for deserialization of // inbound webhook payloads from the foreman dispatcher. type webhookEvent struct { JobID string `json:"job_id"` State string `json:"state"` PreviousState string `json:"previous_state"` Model string `json:"model"` Attempt int `json:"attempt"` Result json.RawMessage `json:"result"` Artifacts json.RawMessage `json:"artifacts"` Error *string `json:"error"` } // webhookReceiver is an ephemeral HTTP server that receives webhook events // from foreman for a single job submission. type webhookReceiver struct { listener net.Listener server *http.Server secret string mu sync.Mutex result chan webhookEvent jobID string } // newWebhookReceiver creates and starts an ephemeral HTTP server on a random // port to receive foreman webhook events. // // Why: the preferred delivery mode is push-based via webhooks, which avoids // polling overhead and delivers results faster. // What: binds a random port, starts an HTTP server handling POST requests, // and returns the receiver for the caller to wait on. // Test: create a receiver, POST a done event to its URL, verify it arrives on // the result channel. func newWebhookReceiver(secret string) (*webhookReceiver, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, &bindError{err: err} } recv := &webhookReceiver{ listener: listener, secret: secret, result: make(chan webhookEvent, 1), } mux := http.NewServeMux() mux.HandleFunc("/", recv.handleWebhook) recv.server = &http.Server{Handler: mux} go recv.server.Serve(listener) return recv, nil } // addr returns the listener address for constructing the webhook URL. func (r *webhookReceiver) addr() string { return r.listener.Addr().String() } // webhookURL returns the full URL that foreman should POST events to. func (r *webhookReceiver) webhookURL(jobID string) string { r.mu.Lock() r.jobID = jobID r.mu.Unlock() return fmt.Sprintf("http://%s/webhook/%s", r.addr(), jobID) } // shutdown gracefully shuts down the receiver. func (r *webhookReceiver) shutdown() { r.server.Shutdown(context.Background()) } // handleWebhook processes inbound webhook POST requests from foreman. func (r *webhookReceiver) handleWebhook(w http.ResponseWriter, req *http.Request) { if req.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } body, err := io.ReadAll(req.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } // Verify HMAC signature if a secret is configured. if r.secret != "" { sig := req.Header.Get("X-Foreman-Signature") if !webhook.VerifySignature(body, sig, r.secret) { w.WriteHeader(http.StatusUnauthorized) return } } var event webhookEvent if err := json.Unmarshal(body, &event); err != nil { w.WriteHeader(http.StatusBadRequest) return } // Only accept events for our job ID. r.mu.Lock() expectedJobID := r.jobID r.mu.Unlock() if expectedJobID != "" && event.JobID != expectedJobID { w.WriteHeader(http.StatusOK) return } // Forward terminal events to the result channel. if event.State == "done" || event.State == "failed" { select { case r.result <- event: default: } } w.WriteHeader(http.StatusOK) } // submitWithWebhook submits a job with a local webhook receiver and waits for // the terminal event. func (c *Client) submitWithWebhook(ctx context.Context, req SubmitRequest) (*Result, error) { recv, err := newWebhookReceiver(c.webhookSecret) if err != nil { return nil, err } defer recv.shutdown() // Build the wire request with a placeholder webhook URL. We need the job ID // first to build the correct path, but we need to set the URL before submitting. // Use a path-less URL initially — foreman sends to whatever URL we provide. webhookURL := fmt.Sprintf("http://%s/webhook/pending", recv.addr()) wireReq := jobSubmitRequest{ Model: req.Model, Messages: req.Messages, Stream: req.Stream, Tools: req.Tools, Options: req.Options, Think: req.Think, StateWebhookURL: webhookURL, } jobID, err := c.postJob(ctx, wireReq) if err != nil { return nil, fmt.Errorf("submit job: %w", err) } // Now that we have the job ID, tell the receiver to filter for it. recv.mu.Lock() recv.jobID = jobID recv.mu.Unlock() // Wait for the terminal webhook event or context cancellation. select { case event := <-recv.result: return webhookEventToResult(event), nil case <-ctx.Done(): return nil, fmt.Errorf("context cancelled while waiting for job %s: %w", jobID, ctx.Err()) } } // webhookEventToResult converts a webhook event to a Result. func webhookEventToResult(e webhookEvent) *Result { r := &Result{ JobID: e.JobID, State: e.State, Model: e.Model, Attempt: e.Attempt, Result: e.Result, } if e.Error != nil { r.Error = *e.Error } // Parse artifacts from the webhook event if present. if len(e.Artifacts) > 0 { var artifacts []Artifact if err := json.Unmarshal(e.Artifacts, &artifacts); err == nil { r.Artifacts = artifacts } } if r.Artifacts == nil { r.Artifacts = []Artifact{} } return r } // bindError is returned when the webhook receiver cannot bind a local port. type bindError struct { err error } func (e *bindError) Error() string { return fmt.Sprintf("webhook receiver bind failed: %v", e.err) } func (e *bindError) Unwrap() error { return e.err } // isBindError checks if an error is a webhook receiver bind failure, indicating // the client should fall back to polling. func isBindError(err error) bool { if err == nil { return false } _, ok := err.(*bindError) if ok { return true } // Also check the error message for wrapped bind errors. return strings.Contains(err.Error(), "webhook receiver bind failed") }