27f196d333
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy. - internal/ollama: Client interface and HTTP implementation for chat (streaming + non-streaming), embed, tags, ps with auth forwarding, NDJSON streaming via bufio.Scanner, and connection vs HTTP error classification via custom error types. - internal/ollama: ModelInventory with background poller for /api/tags and /api/ps, degraded mode on target unreachable with model retention, automatic recovery on reconnect. - internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps, /api/embed, /api/embeddings) with model validation, chat serialization gate (capacity-1 channel), concurrent embedding bypass (ADR-0013), NDJSON streaming with per-chunk flush, and degraded health reporting. - cmd/foreman: Full serve wiring with Ollama client, poller goroutine, embedder warmup (keep_alive:-1), and signal-based shutdown. The Mac is now usable as a go-llm target through foreman. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
327 lines
11 KiB
Go
327 lines
11 KiB
Go
package ollama
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
)
|
|
|
|
// scannerBufSize is the buffer size for the NDJSON scanner (4 MB).
|
|
// Large enough to handle big tool-call payloads in a single line.
|
|
const scannerBufSize = 4 * 1024 * 1024
|
|
|
|
// Client defines the interface for communicating with an Ollama target.
|
|
//
|
|
// Why: an interface allows the worker loop, passthrough handlers, and tests to
|
|
// share a single contract and swap in stubs.
|
|
// What: covers the four Ollama endpoints foreman uses: chat, embed, tags, and ps.
|
|
// Test: implement with a stub HTTP server; verify round-trip for each method.
|
|
type Client interface {
|
|
// Chat sends a chat request. When stream is false, returns (*ChatResponse, nil, nil).
|
|
// When stream is true, returns (nil, <-chan ChatResponse, nil) with chunks delivered
|
|
// on the channel. The channel is closed when the stream ends.
|
|
Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error)
|
|
|
|
// Embed sends an embedding request to /api/embed.
|
|
Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error)
|
|
|
|
// Tags returns the list of installed models from /api/tags.
|
|
Tags(ctx context.Context) (*TagsResponse, error)
|
|
|
|
// Ps returns the list of currently-loaded models from /api/ps.
|
|
Ps(ctx context.Context) (*PsResponse, error)
|
|
|
|
// RawChat performs a raw proxied chat request, returning the http.Response for
|
|
// the caller to stream directly to a downstream client. The caller is responsible
|
|
// for closing the response body.
|
|
RawChat(ctx context.Context, body []byte) (*http.Response, error)
|
|
|
|
// RawEmbed performs a raw proxied embed request, returning the http.Response.
|
|
// The caller is responsible for closing the response body.
|
|
RawEmbed(ctx context.Context, body []byte) (*http.Response, error)
|
|
}
|
|
|
|
// httpClient is the concrete implementation of Client backed by net/http.
|
|
type httpClient struct {
|
|
baseURL string
|
|
token string
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// NewClient creates a new Ollama HTTP client.
|
|
//
|
|
// Why: centralizes base URL, auth token, and HTTP client configuration.
|
|
// What: returns a Client that makes HTTP requests to the given Ollama base URL.
|
|
// Test: create with a httptest.Server URL, call Tags, verify correct request path.
|
|
func NewClient(baseURL, token string) Client {
|
|
// Trim trailing slash for consistent URL construction.
|
|
baseURL = strings.TrimRight(baseURL, "/")
|
|
return &httpClient{
|
|
baseURL: baseURL,
|
|
token: token,
|
|
httpClient: &http.Client{},
|
|
}
|
|
}
|
|
|
|
// Chat sends a POST /api/chat to the Ollama target.
|
|
//
|
|
// Why: the worker loop and sync passthrough both need structured chat access.
|
|
// What: POSTs the chat request, returns either a single response or a channel of
|
|
// streamed chunks depending on the stream parameter.
|
|
// Test: stub a /api/chat endpoint returning NDJSON or a single JSON object; verify
|
|
// both streaming and non-streaming paths.
|
|
func (c *httpClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) {
|
|
streamVal := stream
|
|
req.Stream = &streamVal
|
|
|
|
body, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("marshal chat request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("create chat request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, nil, c.wrapConnErr(err)
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
defer resp.Body.Close()
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
if !stream {
|
|
defer resp.Body.Close()
|
|
var chatResp ChatResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil {
|
|
return nil, nil, fmt.Errorf("decode chat response: %w", err)
|
|
}
|
|
return &chatResp, nil, nil
|
|
}
|
|
|
|
// Streaming: read NDJSON lines and send on channel.
|
|
ch := make(chan ChatResponse, 64)
|
|
go func() {
|
|
defer close(ch)
|
|
defer resp.Body.Close()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
scanner.Buffer(make([]byte, 0, scannerBufSize), scannerBufSize)
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var chunk ChatResponse
|
|
if err := json.Unmarshal(line, &chunk); err != nil {
|
|
continue
|
|
}
|
|
ch <- chunk
|
|
}
|
|
}()
|
|
|
|
return nil, ch, nil
|
|
}
|
|
|
|
// Embed sends a POST /api/embed to the Ollama target.
|
|
//
|
|
// Why: embedding requests bypass the queue and go directly to the target (ADR-0013).
|
|
// What: POSTs the embed request and returns the parsed response.
|
|
// Test: stub /api/embed, send a request, verify embeddings in the response.
|
|
func (c *httpClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) {
|
|
body, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal embed request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create embed request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, c.wrapConnErr(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
var embedResp EmbedResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&embedResp); err != nil {
|
|
return nil, fmt.Errorf("decode embed response: %w", err)
|
|
}
|
|
return &embedResp, nil
|
|
}
|
|
|
|
// Tags fetches GET /api/tags from the Ollama target.
|
|
//
|
|
// Why: the model poller needs the installed model list for inventory and validation.
|
|
// What: GETs /api/tags and returns the parsed response.
|
|
// Test: stub /api/tags with a model list, verify Tags() returns it.
|
|
func (c *httpClient) Tags(ctx context.Context) (*TagsResponse, error) {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create tags request: %w", err)
|
|
}
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, c.wrapConnErr(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
var tagsResp TagsResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&tagsResp); err != nil {
|
|
return nil, fmt.Errorf("decode tags response: %w", err)
|
|
}
|
|
return &tagsResp, nil
|
|
}
|
|
|
|
// Ps fetches GET /api/ps from the Ollama target.
|
|
//
|
|
// Why: the poller and scheduler need to know which models are currently loaded.
|
|
// What: GETs /api/ps and returns the parsed response.
|
|
// Test: stub /api/ps with running models, verify Ps() returns them.
|
|
func (c *httpClient) Ps(ctx context.Context) (*PsResponse, error) {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/ps", nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create ps request: %w", err)
|
|
}
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, c.wrapConnErr(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
var psResp PsResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&psResp); err != nil {
|
|
return nil, fmt.Errorf("decode ps response: %w", err)
|
|
}
|
|
return &psResp, nil
|
|
}
|
|
|
|
// RawChat performs a raw proxied POST /api/chat, returning the http.Response for
|
|
// direct streaming to a downstream client.
|
|
//
|
|
// Why: the passthrough handler needs raw access to the response body for NDJSON
|
|
// streaming without double-parsing.
|
|
// What: POSTs the raw body to /api/chat and returns the raw HTTP response.
|
|
// Test: stub /api/chat, call RawChat, verify response status and body forwarding.
|
|
func (c *httpClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create raw chat request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, c.wrapConnErr(err)
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
defer resp.Body.Close()
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// RawEmbed performs a raw proxied POST /api/embed, returning the http.Response.
|
|
//
|
|
// Why: the embed passthrough handler proxies the raw body/response without parsing.
|
|
// What: POSTs the raw body to /api/embed and returns the raw HTTP response.
|
|
// Test: stub /api/embed, call RawEmbed, verify response forwarding.
|
|
func (c *httpClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) {
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create raw embed request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
c.setAuth(httpReq)
|
|
|
|
resp, err := c.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, c.wrapConnErr(err)
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
defer resp.Body.Close()
|
|
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// setAuth adds the bearer token to the request if configured.
|
|
func (c *httpClient) setAuth(req *http.Request) {
|
|
if c.token != "" {
|
|
req.Header.Set("Authorization", "Bearer "+c.token)
|
|
}
|
|
}
|
|
|
|
// wrapConnErr checks if the error is a network-level failure and wraps it in a
|
|
// ConnectionError. Non-network errors are returned as-is.
|
|
func (c *httpClient) wrapConnErr(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// Check for common network error types.
|
|
if _, ok := err.(*net.OpError); ok {
|
|
return &ConnectionError{URL: c.baseURL, Err: err}
|
|
}
|
|
if _, ok := err.(net.Error); ok {
|
|
return &ConnectionError{URL: c.baseURL, Err: err}
|
|
}
|
|
// Also catch connection refused, DNS errors, etc. that might be wrapped.
|
|
if isConnectionError(err) {
|
|
return &ConnectionError{URL: c.baseURL, Err: err}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// isConnectionError checks for common connection-level error patterns in wrapped errors.
|
|
func isConnectionError(err error) bool {
|
|
msg := err.Error()
|
|
return strings.Contains(msg, "connection refused") ||
|
|
strings.Contains(msg, "no such host") ||
|
|
strings.Contains(msg, "network is unreachable") ||
|
|
strings.Contains(msg, "i/o timeout") ||
|
|
strings.Contains(msg, "dial tcp")
|
|
}
|