package google import ( "context" "encoding/json" "io" "iter" "strconv" "sync" "google.golang.org/genai" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // Stream implements llm.Model over the SDK's range-over-func stream // (iter.Seq2), adapted to majordomo's pull-based Stream via iter.Pull2. func (m *model) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) { req = req.Apply(opts...) if err := m.enforceCapabilities(req); err != nil { return nil, err } client, err := m.provider.genaiClient(ctx) if err != nil { return nil, err } system, contents, err := m.buildContents(req) if err != nil { return nil, err } cfg, err := m.buildConfig(req, system) if err != nil { return nil, err } seq := client.Models.GenerateContentStream(ctx, m.id, contents, cfg) next, stop := iter.Pull2(iter.Seq2[*genai.GenerateContentResponse, error](seq)) return &stream{model: m, next: next, stop: stop}, nil } type stream struct { model *model next func() (*genai.GenerateContentResponse, error, bool) stop func() mu sync.Mutex closeOnce sync.Once finished bool pending []llm.StreamEvent text []byte toolCalls []llm.ToolCall usage llm.Usage finish genai.FinishReason } func (s *stream) Next() (llm.StreamEvent, error) { s.mu.Lock() defer s.mu.Unlock() for { if len(s.pending) > 0 { ev := s.pending[0] s.pending = s.pending[1:] return ev, nil } if s.finished { return llm.StreamEvent{}, io.EOF } chunk, err, ok := s.next() if !ok { s.queueFinal() continue } if err != nil { return llm.StreamEvent{}, s.model.mapError(err) } if chunk.UsageMetadata != nil { s.usage = llm.Usage{ InputTokens: int(chunk.UsageMetadata.PromptTokenCount), OutputTokens: int(chunk.UsageMetadata.CandidatesTokenCount + chunk.UsageMetadata.ThoughtsTokenCount), } } if len(chunk.Candidates) == 0 { continue } cand := chunk.Candidates[0] if cand.FinishReason != "" { s.finish = cand.FinishReason } if cand.Content == nil { continue } for _, part := range cand.Content.Parts { if part == nil { continue } if part.Text != "" && !part.Thought { s.text = append(s.text, part.Text...) s.pending = append(s.pending, llm.StreamEvent{TextDelta: part.Text}) } // Function calls arrive whole per chunk in the Gemini stream. if fc := part.FunctionCall; fc != nil { id := fc.ID if id == "" { id = "call_" + strconv.Itoa(len(s.toolCalls)) } args, err := json.Marshal(fc.Args) if err != nil || len(fc.Args) == 0 { args = json.RawMessage("{}") } call := llm.ToolCall{ID: id, Name: fc.Name, Arguments: args} s.toolCalls = append(s.toolCalls, call) s.pending = append(s.pending, llm.StreamEvent{ToolCall: &call}) } } } } func (s *stream) queueFinal() { resp := &llm.Response{ Model: s.model.qualified(), Usage: s.usage, FinishReason: mapFinish(s.finish, len(s.toolCalls) > 0), } if len(s.text) > 0 { resp.Parts = append(resp.Parts, llm.Text(string(s.text))) } if len(s.toolCalls) > 0 { resp.ToolCalls = append([]llm.ToolCall(nil), s.toolCalls...) } s.pending = append(s.pending, llm.StreamEvent{Response: resp}) s.finished = true } func (s *stream) Close() error { s.closeOnce.Do(s.stop) return nil }