package ollama import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net" "net/http" "strings" ) // scannerBufSize is the buffer size for the NDJSON scanner (4 MB). // Large enough to handle big tool-call payloads in a single line. const scannerBufSize = 4 * 1024 * 1024 // Client defines the interface for communicating with an Ollama target. // // Why: an interface allows the worker loop, passthrough handlers, and tests to // share a single contract and swap in stubs. // What: covers the four Ollama endpoints foreman uses: chat, embed, tags, and ps. // Test: implement with a stub HTTP server; verify round-trip for each method. type Client interface { // Chat sends a chat request. When stream is false, returns (*ChatResponse, nil, nil). // When stream is true, returns (nil, <-chan ChatResponse, nil) with chunks delivered // on the channel. The channel is closed when the stream ends. Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) // Embed sends an embedding request to /api/embed. Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) // Tags returns the list of installed models from /api/tags. Tags(ctx context.Context) (*TagsResponse, error) // Ps returns the list of currently-loaded models from /api/ps. Ps(ctx context.Context) (*PsResponse, error) // RawChat performs a raw proxied chat request, returning the http.Response for // the caller to stream directly to a downstream client. The caller is responsible // for closing the response body. RawChat(ctx context.Context, body []byte) (*http.Response, error) // RawEmbed performs a raw proxied embed request, returning the http.Response. // The caller is responsible for closing the response body. RawEmbed(ctx context.Context, body []byte) (*http.Response, error) } // httpClient is the concrete implementation of Client backed by net/http. type httpClient struct { baseURL string token string httpClient *http.Client } // NewClient creates a new Ollama HTTP client. // // Why: centralizes base URL, auth token, and HTTP client configuration. // What: returns a Client that makes HTTP requests to the given Ollama base URL. // Test: create with a httptest.Server URL, call Tags, verify correct request path. func NewClient(baseURL, token string) Client { // Trim trailing slash for consistent URL construction. baseURL = strings.TrimRight(baseURL, "/") return &httpClient{ baseURL: baseURL, token: token, httpClient: &http.Client{}, } } // Chat sends a POST /api/chat to the Ollama target. // // Why: the worker loop and sync passthrough both need structured chat access. // What: POSTs the chat request, returns either a single response or a channel of // streamed chunks depending on the stream parameter. // Test: stub a /api/chat endpoint returning NDJSON or a single JSON object; verify // both streaming and non-streaming paths. func (c *httpClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) { streamVal := stream req.Stream = &streamVal body, err := json.Marshal(req) if err != nil { return nil, nil, fmt.Errorf("marshal chat request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body)) if err != nil { return nil, nil, fmt.Errorf("create chat request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, nil, c.wrapConnErr(err) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { defer resp.Body.Close() errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } if !stream { defer resp.Body.Close() var chatResp ChatResponse if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil { return nil, nil, fmt.Errorf("decode chat response: %w", err) } return &chatResp, nil, nil } // Streaming: read NDJSON lines and send on channel. ch := make(chan ChatResponse, 64) go func() { defer close(ch) defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, scannerBufSize), scannerBufSize) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var chunk ChatResponse if err := json.Unmarshal(line, &chunk); err != nil { continue } ch <- chunk } }() return nil, ch, nil } // Embed sends a POST /api/embed to the Ollama target. // // Why: embedding requests bypass the queue and go directly to the target (ADR-0013). // What: POSTs the embed request and returns the parsed response. // Test: stub /api/embed, send a request, verify embeddings in the response. func (c *httpClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) { body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("marshal embed request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("create embed request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, c.wrapConnErr(err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } var embedResp EmbedResponse if err := json.NewDecoder(resp.Body).Decode(&embedResp); err != nil { return nil, fmt.Errorf("decode embed response: %w", err) } return &embedResp, nil } // Tags fetches GET /api/tags from the Ollama target. // // Why: the model poller needs the installed model list for inventory and validation. // What: GETs /api/tags and returns the parsed response. // Test: stub /api/tags with a model list, verify Tags() returns it. func (c *httpClient) Tags(ctx context.Context) (*TagsResponse, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil) if err != nil { return nil, fmt.Errorf("create tags request: %w", err) } c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, c.wrapConnErr(err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } var tagsResp TagsResponse if err := json.NewDecoder(resp.Body).Decode(&tagsResp); err != nil { return nil, fmt.Errorf("decode tags response: %w", err) } return &tagsResp, nil } // Ps fetches GET /api/ps from the Ollama target. // // Why: the poller and scheduler need to know which models are currently loaded. // What: GETs /api/ps and returns the parsed response. // Test: stub /api/ps with running models, verify Ps() returns them. func (c *httpClient) Ps(ctx context.Context) (*PsResponse, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/ps", nil) if err != nil { return nil, fmt.Errorf("create ps request: %w", err) } c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, c.wrapConnErr(err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } var psResp PsResponse if err := json.NewDecoder(resp.Body).Decode(&psResp); err != nil { return nil, fmt.Errorf("decode ps response: %w", err) } return &psResp, nil } // RawChat performs a raw proxied POST /api/chat, returning the http.Response for // direct streaming to a downstream client. // // Why: the passthrough handler needs raw access to the response body for NDJSON // streaming without double-parsing. // What: POSTs the raw body to /api/chat and returns the raw HTTP response. // Test: stub /api/chat, call RawChat, verify response status and body forwarding. func (c *httpClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("create raw chat request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, c.wrapConnErr(err) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { defer resp.Body.Close() errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } return resp, nil } // RawEmbed performs a raw proxied POST /api/embed, returning the http.Response. // // Why: the embed passthrough handler proxies the raw body/response without parsing. // What: POSTs the raw body to /api/embed and returns the raw HTTP response. // Test: stub /api/embed, call RawEmbed, verify response forwarding. func (c *httpClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/embed", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("create raw embed request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") c.setAuth(httpReq) resp, err := c.httpClient.Do(httpReq) if err != nil { return nil, c.wrapConnErr(err) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { defer resp.Body.Close() errBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) return nil, &HTTPError{StatusCode: resp.StatusCode, Body: string(errBody)} } return resp, nil } // setAuth adds the bearer token to the request if configured. func (c *httpClient) setAuth(req *http.Request) { if c.token != "" { req.Header.Set("Authorization", "Bearer "+c.token) } } // wrapConnErr checks if the error is a network-level failure and wraps it in a // ConnectionError. Non-network errors are returned as-is. func (c *httpClient) wrapConnErr(err error) error { if err == nil { return nil } // Check for common network error types. if _, ok := err.(*net.OpError); ok { return &ConnectionError{URL: c.baseURL, Err: err} } if _, ok := err.(net.Error); ok { return &ConnectionError{URL: c.baseURL, Err: err} } // Also catch connection refused, DNS errors, etc. that might be wrapped. if isConnectionError(err) { return &ConnectionError{URL: c.baseURL, Err: err} } return err } // isConnectionError checks for common connection-level error patterns in wrapped errors. func isConnectionError(err error) bool { msg := err.Error() return strings.Contains(msg, "connection refused") || strings.Contains(msg, "no such host") || strings.Contains(msg, "network is unreachable") || strings.Contains(msg, "i/o timeout") || strings.Contains(msg, "dial tcp") }