feat: add Ollama target client, model poller, and native passthrough
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy. - internal/ollama: Client interface and HTTP implementation for chat (streaming + non-streaming), embed, tags, ps with auth forwarding, NDJSON streaming via bufio.Scanner, and connection vs HTTP error classification via custom error types. - internal/ollama: ModelInventory with background poller for /api/tags and /api/ps, degraded mode on target unreachable with model retention, automatic recovery on reconnect. - internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps, /api/embed, /api/embeddings) with model validation, chat serialization gate (capacity-1 channel), concurrent embedding bypass (ADR-0013), NDJSON streaming with per-chunk flush, and degraded health reporting. - cmd/foreman: Full serve wiring with Ollama client, poller goroutine, embedder warmup (keep_alive:-1), and signal-based shutdown. The Mac is now usable as a go-llm target through foreman. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,326 @@
|
||||
package ollama
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// scannerBufSize is the buffer size for the NDJSON scanner (4 MB).
|
||||
// Large enough to handle big tool-call payloads in a single line.
|
||||
const scannerBufSize = 4 * 1024 * 1024
|
||||
|
||||
// Client defines the interface for communicating with an Ollama target.
|
||||
//
|
||||
// Why: an interface allows the worker loop, passthrough handlers, and tests to
|
||||
// share a single contract and swap in stubs.
|
||||
// What: covers the four Ollama endpoints foreman uses: chat, embed, tags, and ps.
|
||||
// Test: implement with a stub HTTP server; verify round-trip for each method.
|
||||
type Client interface {
|
||||
// Chat sends a chat request. When stream is false, returns (*ChatResponse, nil, nil).
|
||||
// When stream is true, returns (nil, <-chan ChatResponse, nil) with chunks delivered
|
||||
// on the channel. The channel is closed when the stream ends.
|
||||
Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error)
|
||||
|
||||
// Embed sends an embedding request to /api/embed.
|
||||
Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error)
|
||||
|
||||
// Tags returns the list of installed models from /api/tags.
|
||||
Tags(ctx context.Context) (*TagsResponse, error)
|
||||
|
||||
// Ps returns the list of currently-loaded models from /api/ps.
|
||||
Ps(ctx context.Context) (*PsResponse, error)
|
||||
|
||||
// RawChat performs a raw proxied chat request, returning the http.Response for
|
||||
// the caller to stream directly to a downstream client. The caller is responsible
|
||||
// for closing the response body.
|
||||
RawChat(ctx context.Context, body []byte) (*http.Response, error)
|
||||
|
||||
// RawEmbed performs a raw proxied embed request, returning the http.Response.
|
||||
// The caller is responsible for closing the response body.
|
||||
RawEmbed(ctx context.Context, body []byte) (*http.Response, error)
|
||||
}
|
||||
|
||||
// httpClient is the concrete implementation of Client backed by net/http.
|
||||
type httpClient struct {
|
||||
baseURL string
|
||||
token string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new Ollama HTTP client.
|
||||
//
|
||||
// Why: centralizes base URL, auth token, and HTTP client configuration.
|
||||
// What: returns a Client that makes HTTP requests to the given Ollama base URL.
|
||||
// Test: create with a httptest.Server URL, call Tags, verify correct request path.
|
||||
func NewClient(baseURL, token string) Client {
|
||||
// Trim trailing slash for consistent URL construction.
|
||||
baseURL = strings.TrimRight(baseURL, "/")
|
||||
return &httpClient{
|
||||
baseURL: baseURL,
|
||||
token: token,
|
||||
httpClient: &http.Client{},
|
||||
}
|
||||
}
|
||||
|
||||
// Chat sends a POST /api/chat to the Ollama target.
|
||||
//
|
||||
// Why: the worker loop and sync passthrough both need structured chat access.
|
||||
// What: POSTs the chat request, returns either a single response or a channel of
|
||||
// streamed chunks depending on the stream parameter.
|
||||
// Test: stub a /api/chat endpoint returning NDJSON or a single JSON object; verify
|
||||
// both streaming and non-streaming paths.
|
||||
func (c *httpClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) {
|
||||
streamVal := stream
|
||||
req.Stream = &streamVal
|
||||
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("marshal chat request: %w", err)
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("create chat request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, nil, c.wrapConnErr(err)
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
defer resp.Body.Close()
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
if !stream {
|
||||
defer resp.Body.Close()
|
||||
var chatResp ChatResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil {
|
||||
return nil, nil, fmt.Errorf("decode chat response: %w", err)
|
||||
}
|
||||
return &chatResp, nil, nil
|
||||
}
|
||||
|
||||
// Streaming: read NDJSON lines and send on channel.
|
||||
ch := make(chan ChatResponse, 64)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer resp.Body.Close()
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(make([]byte, 0, scannerBufSize), scannerBufSize)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
var chunk ChatResponse
|
||||
if err := json.Unmarshal(line, &chunk); err != nil {
|
||||
continue
|
||||
}
|
||||
ch <- chunk
|
||||
}
|
||||
}()
|
||||
|
||||
return nil, ch, nil
|
||||
}
|
||||
|
||||
// Embed sends a POST /api/embed to the Ollama target.
|
||||
//
|
||||
// Why: embedding requests bypass the queue and go directly to the target (ADR-0013).
|
||||
// What: POSTs the embed request and returns the parsed response.
|
||||
// Test: stub /api/embed, send a request, verify embeddings in the response.
|
||||
func (c *httpClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) {
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal embed request: %w", err)
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create embed request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, c.wrapConnErr(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
var embedResp EmbedResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&embedResp); err != nil {
|
||||
return nil, fmt.Errorf("decode embed response: %w", err)
|
||||
}
|
||||
return &embedResp, nil
|
||||
}
|
||||
|
||||
// Tags fetches GET /api/tags from the Ollama target.
|
||||
//
|
||||
// Why: the model poller needs the installed model list for inventory and validation.
|
||||
// What: GETs /api/tags and returns the parsed response.
|
||||
// Test: stub /api/tags with a model list, verify Tags() returns it.
|
||||
func (c *httpClient) Tags(ctx context.Context) (*TagsResponse, error) {
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create tags request: %w", err)
|
||||
}
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, c.wrapConnErr(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
var tagsResp TagsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&tagsResp); err != nil {
|
||||
return nil, fmt.Errorf("decode tags response: %w", err)
|
||||
}
|
||||
return &tagsResp, nil
|
||||
}
|
||||
|
||||
// Ps fetches GET /api/ps from the Ollama target.
|
||||
//
|
||||
// Why: the poller and scheduler need to know which models are currently loaded.
|
||||
// What: GETs /api/ps and returns the parsed response.
|
||||
// Test: stub /api/ps with running models, verify Ps() returns them.
|
||||
func (c *httpClient) Ps(ctx context.Context) (*PsResponse, error) {
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/ps", nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create ps request: %w", err)
|
||||
}
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, c.wrapConnErr(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
var psResp PsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&psResp); err != nil {
|
||||
return nil, fmt.Errorf("decode ps response: %w", err)
|
||||
}
|
||||
return &psResp, nil
|
||||
}
|
||||
|
||||
// RawChat performs a raw proxied POST /api/chat, returning the http.Response for
|
||||
// direct streaming to a downstream client.
|
||||
//
|
||||
// Why: the passthrough handler needs raw access to the response body for NDJSON
|
||||
// streaming without double-parsing.
|
||||
// What: POSTs the raw body to /api/chat and returns the raw HTTP response.
|
||||
// Test: stub /api/chat, call RawChat, verify response status and body forwarding.
|
||||
func (c *httpClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) {
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create raw chat request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, c.wrapConnErr(err)
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
defer resp.Body.Close()
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// RawEmbed performs a raw proxied POST /api/embed, returning the http.Response.
|
||||
//
|
||||
// Why: the embed passthrough handler proxies the raw body/response without parsing.
|
||||
// What: POSTs the raw body to /api/embed and returns the raw HTTP response.
|
||||
// Test: stub /api/embed, call RawEmbed, verify response forwarding.
|
||||
func (c *httpClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) {
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create raw embed request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
c.setAuth(httpReq)
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, c.wrapConnErr(err)
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
defer resp.Body.Close()
|
||||
errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// setAuth adds the bearer token to the request if configured.
|
||||
func (c *httpClient) setAuth(req *http.Request) {
|
||||
if c.token != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||||
}
|
||||
}
|
||||
|
||||
// wrapConnErr checks if the error is a network-level failure and wraps it in a
|
||||
// ConnectionError. Non-network errors are returned as-is.
|
||||
func (c *httpClient) wrapConnErr(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
// Check for common network error types.
|
||||
if _, ok := err.(*net.OpError); ok {
|
||||
return &ConnectionError{URL: c.baseURL, Err: err}
|
||||
}
|
||||
if _, ok := err.(net.Error); ok {
|
||||
return &ConnectionError{URL: c.baseURL, Err: err}
|
||||
}
|
||||
// Also catch connection refused, DNS errors, etc. that might be wrapped.
|
||||
if isConnectionError(err) {
|
||||
return &ConnectionError{URL: c.baseURL, Err: err}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// isConnectionError checks for common connection-level error patterns in wrapped errors.
|
||||
func isConnectionError(err error) bool {
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "connection refused") ||
|
||||
strings.Contains(msg, "no such host") ||
|
||||
strings.Contains(msg, "network is unreachable") ||
|
||||
strings.Contains(msg, "i/o timeout") ||
|
||||
strings.Contains(msg, "dial tcp")
|
||||
}
|
||||
Reference in New Issue
Block a user