043249e0e1
Phase 3: - provider/openai: Chat Completions for OpenAI + compat endpoints (SSE streaming with by-index tool-call assembly, response_format json_schema, legacy max_tokens option, reasoning_effort) - provider/anthropic: Messages API (tool_use/tool_result, GA structured output via output_config.format, full SSE event parser, 529 transient) - provider/ollama: one native /api/chat client behind the ollama, ollama-cloud, and foreman built-ins (presets; NDJSON streaming tolerant of foreman's buffered single-object responses; object tool arguments; format-schema structured output; think mapping) - media/: capability normalization (sniff, downscale, transcode, byte ladder, ErrUnsupported), wired into the chain executor per target with penalty-free advance past incapable elements - registry: real provider + scheme wiring, WithHTTPClient option, required env-foreman TLS chat round-trip test - ADR-0009 multimodal strategy, ADR-0010 tools/structured mapping; README matrix + CLAUDE.md synced Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
141 lines
3.6 KiB
Go
141 lines
3.6 KiB
Go
package ollama
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
)
|
|
|
|
// Stream implements llm.Model over Ollama's NDJSON streaming. It also
|
|
// transparently handles foreman's non-streaming degradation (a single
|
|
// buffered JSON object): one JSON line parses as the final chunk.
|
|
func (m *model) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
|
req = req.Apply(opts...)
|
|
if err := m.enforceCapabilities(req); err != nil {
|
|
return nil, err
|
|
}
|
|
wireReq, err := m.buildRequest(req, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp, err := m.do(ctx, wireReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sc := bufio.NewScanner(resp.Body)
|
|
// Single NDJSON lines can far exceed the 64KB default (thinking dumps,
|
|
// tool payloads, foreman's whole-response-as-one-line degradation).
|
|
sc.Buffer(make([]byte, 64<<10), 16<<20)
|
|
|
|
return &stream{model: m, body: resp.Body, scanner: sc}, nil
|
|
}
|
|
|
|
type stream struct {
|
|
model *model
|
|
body io.Closer
|
|
scanner *bufio.Scanner
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
finished bool
|
|
toolCalls []llm.ToolCall
|
|
text []byte
|
|
pending []llm.StreamEvent
|
|
usage llm.Usage
|
|
doneReason string
|
|
}
|
|
|
|
func (s *stream) Next() (llm.StreamEvent, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for {
|
|
if len(s.pending) > 0 {
|
|
ev := s.pending[0]
|
|
s.pending = s.pending[1:]
|
|
return ev, nil
|
|
}
|
|
if s.finished {
|
|
return llm.StreamEvent{}, io.EOF
|
|
}
|
|
if !s.scanner.Scan() {
|
|
if err := s.scanner.Err(); err != nil {
|
|
return llm.StreamEvent{}, fmt.Errorf("ollama %s: read stream: %w", s.model.qualified(), err)
|
|
}
|
|
// EOF without a done chunk: synthesize the final response from
|
|
// what we accumulated rather than losing it.
|
|
s.queueFinal()
|
|
continue
|
|
}
|
|
line := s.scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
var chunk chatResponse
|
|
if err := json.Unmarshal(line, &chunk); err != nil {
|
|
return llm.StreamEvent{}, fmt.Errorf("ollama %s: decode stream chunk: %w", s.model.qualified(), err)
|
|
}
|
|
|
|
if chunk.Message.Content != "" {
|
|
s.text = append(s.text, chunk.Message.Content...)
|
|
s.pending = append(s.pending, llm.StreamEvent{TextDelta: chunk.Message.Content})
|
|
}
|
|
// Tool calls arrive complete per chunk (no partial-argument deltas
|
|
// in the native protocol).
|
|
base := len(s.toolCalls)
|
|
for i, tc := range chunk.Message.ToolCalls {
|
|
id := tc.ID
|
|
if id == "" {
|
|
id = "call_" + strconv.Itoa(base+i)
|
|
}
|
|
args := tc.Function.Arguments
|
|
if len(args) == 0 {
|
|
args = json.RawMessage("{}")
|
|
}
|
|
call := llm.ToolCall{ID: id, Name: tc.Function.Name, Arguments: args}
|
|
s.toolCalls = append(s.toolCalls, call)
|
|
s.pending = append(s.pending, llm.StreamEvent{ToolCall: &s.toolCalls[len(s.toolCalls)-1]})
|
|
}
|
|
if chunk.Done {
|
|
s.usage = llm.Usage{InputTokens: chunk.PromptEvalCount, OutputTokens: chunk.EvalCount}
|
|
s.doneReason = chunk.DoneReason
|
|
s.queueFinal()
|
|
}
|
|
}
|
|
}
|
|
|
|
// queueFinal appends the final Response event and marks the stream done.
|
|
func (s *stream) queueFinal() {
|
|
resp := &llm.Response{
|
|
Model: s.model.qualified(),
|
|
Usage: s.usage,
|
|
FinishReason: finishReason(s.doneReason, len(s.toolCalls) > 0),
|
|
}
|
|
if len(s.text) > 0 {
|
|
resp.Parts = append(resp.Parts, llm.Text(string(s.text)))
|
|
}
|
|
if len(s.toolCalls) > 0 {
|
|
resp.ToolCalls = append([]llm.ToolCall(nil), s.toolCalls...)
|
|
}
|
|
s.pending = append(s.pending, llm.StreamEvent{Response: resp})
|
|
s.finished = true
|
|
}
|
|
|
|
func (s *stream) Close() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return nil
|
|
}
|
|
s.closed = true
|
|
return s.body.Close()
|
|
}
|