package ollama import ( "bufio" "context" "encoding/json" "fmt" "io" "strconv" "sync" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // Stream implements llm.Model over Ollama's NDJSON streaming. It also // transparently handles foreman's non-streaming degradation (a single // buffered JSON object): one JSON line parses as the final chunk. func (m *model) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) { req = req.Apply(opts...) if err := m.enforceCapabilities(req); err != nil { return nil, err } wireReq, err := m.buildRequest(req, true) if err != nil { return nil, err } resp, err := m.do(ctx, wireReq) if err != nil { return nil, err } sc := bufio.NewScanner(resp.Body) // Single NDJSON lines can far exceed the 64KB default (thinking dumps, // tool payloads, foreman's whole-response-as-one-line degradation). sc.Buffer(make([]byte, 64<<10), 16<<20) return &stream{model: m, body: resp.Body, scanner: sc}, nil } type stream struct { model *model body io.Closer scanner *bufio.Scanner mu sync.Mutex closed bool finished bool toolCalls []llm.ToolCall text []byte pending []llm.StreamEvent usage llm.Usage doneReason string } func (s *stream) Next() (llm.StreamEvent, error) { s.mu.Lock() defer s.mu.Unlock() for { if len(s.pending) > 0 { ev := s.pending[0] s.pending = s.pending[1:] return ev, nil } if s.finished { return llm.StreamEvent{}, io.EOF } if !s.scanner.Scan() { if err := s.scanner.Err(); err != nil { return llm.StreamEvent{}, fmt.Errorf("ollama %s: read stream: %w", s.model.qualified(), err) } // EOF without a done chunk: synthesize the final response from // what we accumulated rather than losing it. s.queueFinal() continue } line := s.scanner.Bytes() if len(line) == 0 { continue } var chunk chatResponse if err := json.Unmarshal(line, &chunk); err != nil { return llm.StreamEvent{}, fmt.Errorf("ollama %s: decode stream chunk: %w", s.model.qualified(), err) } if chunk.Message.Content != "" { s.text = append(s.text, chunk.Message.Content...) s.pending = append(s.pending, llm.StreamEvent{TextDelta: chunk.Message.Content}) } // Tool calls arrive complete per chunk (no partial-argument deltas // in the native protocol). base := len(s.toolCalls) for i, tc := range chunk.Message.ToolCalls { id := tc.ID if id == "" { id = "call_" + strconv.Itoa(base+i) } args := tc.Function.Arguments if len(args) == 0 { args = json.RawMessage("{}") } call := llm.ToolCall{ID: id, Name: tc.Function.Name, Arguments: args} s.toolCalls = append(s.toolCalls, call) s.pending = append(s.pending, llm.StreamEvent{ToolCall: &s.toolCalls[len(s.toolCalls)-1]}) } if chunk.Done { s.usage = llm.Usage{InputTokens: chunk.PromptEvalCount, OutputTokens: chunk.EvalCount} s.doneReason = chunk.DoneReason s.queueFinal() } } } // queueFinal appends the final Response event and marks the stream done. func (s *stream) queueFinal() { resp := &llm.Response{ Model: s.model.qualified(), Usage: s.usage, FinishReason: finishReason(s.doneReason, len(s.toolCalls) > 0), } if len(s.text) > 0 { resp.Parts = append(resp.Parts, llm.Text(string(s.text))) } if len(s.toolCalls) > 0 { resp.ToolCalls = append([]llm.ToolCall(nil), s.toolCalls...) } s.pending = append(s.pending, llm.StreamEvent{Response: resp}) s.finished = true } func (s *stream) Close() error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return nil } s.closed = true return s.body.Close() }