Files
steve 27f196d333 feat: add Ollama target client, model poller, and native passthrough
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy.

- internal/ollama: Client interface and HTTP implementation for chat
  (streaming + non-streaming), embed, tags, ps with auth forwarding,
  NDJSON streaming via bufio.Scanner, and connection vs HTTP error
  classification via custom error types.
- internal/ollama: ModelInventory with background poller for /api/tags
  and /api/ps, degraded mode on target unreachable with model retention,
  automatic recovery on reconnect.
- internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps,
  /api/embed, /api/embeddings) with model validation, chat serialization
  gate (capacity-1 channel), concurrent embedding bypass (ADR-0013),
  NDJSON streaming with per-chunk flush, and degraded health reporting.
- cmd/foreman: Full serve wiring with Ollama client, poller goroutine,
  embedder warmup (keep_alive:-1), and signal-based shutdown.

The Mac is now usable as a go-llm target through foreman.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:07:33 -04:00

327 lines
11 KiB
Go

package ollama
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
)
// scannerBufSize is the buffer size for the NDJSON scanner (4 MB).
// Large enough to handle big tool-call payloads in a single line.
const scannerBufSize = 4 * 1024 * 1024
// Client defines the interface for communicating with an Ollama target.
//
// Why: an interface allows the worker loop, passthrough handlers, and tests to
// share a single contract and swap in stubs.
// What: covers the four Ollama endpoints foreman uses: chat, embed, tags, and ps.
// Test: implement with a stub HTTP server; verify round-trip for each method.
type Client interface {
// Chat sends a chat request. When stream is false, returns (*ChatResponse, nil, nil).
// When stream is true, returns (nil, <-chan ChatResponse, nil) with chunks delivered
// on the channel. The channel is closed when the stream ends.
Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error)
// Embed sends an embedding request to /api/embed.
Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error)
// Tags returns the list of installed models from /api/tags.
Tags(ctx context.Context) (*TagsResponse, error)
// Ps returns the list of currently-loaded models from /api/ps.
Ps(ctx context.Context) (*PsResponse, error)
// RawChat performs a raw proxied chat request, returning the http.Response for
// the caller to stream directly to a downstream client. The caller is responsible
// for closing the response body.
RawChat(ctx context.Context, body []byte) (*http.Response, error)
// RawEmbed performs a raw proxied embed request, returning the http.Response.
// The caller is responsible for closing the response body.
RawEmbed(ctx context.Context, body []byte) (*http.Response, error)
}
// httpClient is the concrete implementation of Client backed by net/http.
type httpClient struct {
baseURL string
token string
httpClient *http.Client
}
// NewClient creates a new Ollama HTTP client.
//
// Why: centralizes base URL, auth token, and HTTP client configuration.
// What: returns a Client that makes HTTP requests to the given Ollama base URL.
// Test: create with a httptest.Server URL, call Tags, verify correct request path.
func NewClient(baseURL, token string) Client {
// Trim trailing slash for consistent URL construction.
baseURL = strings.TrimRight(baseURL, "/")
return &httpClient{
baseURL: baseURL,
token: token,
httpClient: &http.Client{},
}
}
// Chat sends a POST /api/chat to the Ollama target.
//
// Why: the worker loop and sync passthrough both need structured chat access.
// What: POSTs the chat request, returns either a single response or a channel of
// streamed chunks depending on the stream parameter.
// Test: stub a /api/chat endpoint returning NDJSON or a single JSON object; verify
// both streaming and non-streaming paths.
func (c *httpClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) {
streamVal := stream
req.Stream = &streamVal
body, err := json.Marshal(req)
if err != nil {
return nil, nil, fmt.Errorf("marshal chat request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
if err != nil {
return nil, nil, fmt.Errorf("create chat request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, nil, c.wrapConnErr(err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer resp.Body.Close()
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
if !stream {
defer resp.Body.Close()
var chatResp ChatResponse
if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil {
return nil, nil, fmt.Errorf("decode chat response: %w", err)
}
return &chatResp, nil, nil
}
// Streaming: read NDJSON lines and send on channel.
ch := make(chan ChatResponse, 64)
go func() {
defer close(ch)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, scannerBufSize), scannerBufSize)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var chunk ChatResponse
if err := json.Unmarshal(line, &chunk); err != nil {
continue
}
ch <- chunk
}
}()
return nil, ch, nil
}
// Embed sends a POST /api/embed to the Ollama target.
//
// Why: embedding requests bypass the queue and go directly to the target (ADR-0013).
// What: POSTs the embed request and returns the parsed response.
// Test: stub /api/embed, send a request, verify embeddings in the response.
func (c *httpClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal embed request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create embed request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, c.wrapConnErr(err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
var embedResp EmbedResponse
if err := json.NewDecoder(resp.Body).Decode(&embedResp); err != nil {
return nil, fmt.Errorf("decode embed response: %w", err)
}
return &embedResp, nil
}
// Tags fetches GET /api/tags from the Ollama target.
//
// Why: the model poller needs the installed model list for inventory and validation.
// What: GETs /api/tags and returns the parsed response.
// Test: stub /api/tags with a model list, verify Tags() returns it.
func (c *httpClient) Tags(ctx context.Context) (*TagsResponse, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil)
if err != nil {
return nil, fmt.Errorf("create tags request: %w", err)
}
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, c.wrapConnErr(err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
var tagsResp TagsResponse
if err := json.NewDecoder(resp.Body).Decode(&tagsResp); err != nil {
return nil, fmt.Errorf("decode tags response: %w", err)
}
return &tagsResp, nil
}
// Ps fetches GET /api/ps from the Ollama target.
//
// Why: the poller and scheduler need to know which models are currently loaded.
// What: GETs /api/ps and returns the parsed response.
// Test: stub /api/ps with running models, verify Ps() returns them.
func (c *httpClient) Ps(ctx context.Context) (*PsResponse, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/ps", nil)
if err != nil {
return nil, fmt.Errorf("create ps request: %w", err)
}
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, c.wrapConnErr(err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
var psResp PsResponse
if err := json.NewDecoder(resp.Body).Decode(&psResp); err != nil {
return nil, fmt.Errorf("decode ps response: %w", err)
}
return &psResp, nil
}
// RawChat performs a raw proxied POST /api/chat, returning the http.Response for
// direct streaming to a downstream client.
//
// Why: the passthrough handler needs raw access to the response body for NDJSON
// streaming without double-parsing.
// What: POSTs the raw body to /api/chat and returns the raw HTTP response.
// Test: stub /api/chat, call RawChat, verify response status and body forwarding.
func (c *httpClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create raw chat request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, c.wrapConnErr(err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer resp.Body.Close()
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
return resp, nil
}
// RawEmbed performs a raw proxied POST /api/embed, returning the http.Response.
//
// Why: the embed passthrough handler proxies the raw body/response without parsing.
// What: POSTs the raw body to /api/embed and returns the raw HTTP response.
// Test: stub /api/embed, call RawEmbed, verify response forwarding.
func (c *httpClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create raw embed request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
c.setAuth(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, c.wrapConnErr(err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer resp.Body.Close()
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
}
return resp, nil
}
// setAuth adds the bearer token to the request if configured.
func (c *httpClient) setAuth(req *http.Request) {
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
}
// wrapConnErr checks if the error is a network-level failure and wraps it in a
// ConnectionError. Non-network errors are returned as-is.
func (c *httpClient) wrapConnErr(err error) error {
if err == nil {
return nil
}
// Check for common network error types.
if _, ok := err.(*net.OpError); ok {
return &ConnectionError{URL: c.baseURL, Err: err}
}
if _, ok := err.(net.Error); ok {
return &ConnectionError{URL: c.baseURL, Err: err}
}
// Also catch connection refused, DNS errors, etc. that might be wrapped.
if isConnectionError(err) {
return &ConnectionError{URL: c.baseURL, Err: err}
}
return err
}
// isConnectionError checks for common connection-level error patterns in wrapped errors.
func isConnectionError(err error) bool {
msg := err.Error()
return strings.Contains(msg, "connection refused") ||
strings.Contains(msg, "no such host") ||
strings.Contains(msg, "network is unreachable") ||
strings.Contains(msg, "i/o timeout") ||
strings.Contains(msg, "dial tcp")
}