Files
steve 0147a79d18
CI / Tidy (push) Successful in 9m31s
CI / Build & Test (push) Successful in 10m13s
feat: conversion-driven extensions — resolvers, DefineTool, hooks, ops controls
Phase 9a (ADR-0014): Registry.RegisterResolver for dynamic tiers;
DefineTool[Args] typed tools; Usage cache/reasoning detail fields wired
through anthropic/openai/google; WithPromptCaching (Anthropic
cache_control); agent supervision hooks (WithMaxStepsFunc, WithSteer,
WithCompactor, WithToolErrorLimits + ErrToolLoop); health
Bench/Unbench/Snapshot; ChainConfig.Observer failover events.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:30:06 +02:00

143 lines
3.4 KiB
Go

package google
import (
"context"
"encoding/json"
"io"
"iter"
"strconv"
"sync"
"google.golang.org/genai"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// Stream implements llm.Model over the SDK's range-over-func stream
// (iter.Seq2), adapted to majordomo's pull-based Stream via iter.Pull2.
func (m *model) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
req = req.Apply(opts...)
if err := m.enforceCapabilities(req); err != nil {
return nil, err
}
client, err := m.provider.genaiClient(ctx)
if err != nil {
return nil, err
}
system, contents, err := m.buildContents(req)
if err != nil {
return nil, err
}
cfg, err := m.buildConfig(req, system)
if err != nil {
return nil, err
}
seq := client.Models.GenerateContentStream(ctx, m.id, contents, cfg)
next, stop := iter.Pull2(iter.Seq2[*genai.GenerateContentResponse, error](seq))
return &stream{model: m, next: next, stop: stop}, nil
}
type stream struct {
model *model
next func() (*genai.GenerateContentResponse, error, bool)
stop func()
mu sync.Mutex
closeOnce sync.Once
finished bool
pending []llm.StreamEvent
text []byte
toolCalls []llm.ToolCall
usage llm.Usage
finish genai.FinishReason
}
func (s *stream) Next() (llm.StreamEvent, error) {
s.mu.Lock()
defer s.mu.Unlock()
for {
if len(s.pending) > 0 {
ev := s.pending[0]
s.pending = s.pending[1:]
return ev, nil
}
if s.finished {
return llm.StreamEvent{}, io.EOF
}
chunk, err, ok := s.next()
if !ok {
s.queueFinal()
continue
}
if err != nil {
return llm.StreamEvent{}, s.model.mapError(err)
}
if chunk.UsageMetadata != nil {
s.usage = llm.Usage{
InputTokens: int(chunk.UsageMetadata.PromptTokenCount),
OutputTokens: int(chunk.UsageMetadata.CandidatesTokenCount + chunk.UsageMetadata.ThoughtsTokenCount),
CacheReadTokens: int(chunk.UsageMetadata.CachedContentTokenCount),
ReasoningTokens: int(chunk.UsageMetadata.ThoughtsTokenCount),
}
}
if len(chunk.Candidates) == 0 {
continue
}
cand := chunk.Candidates[0]
if cand.FinishReason != "" {
s.finish = cand.FinishReason
}
if cand.Content == nil {
continue
}
for _, part := range cand.Content.Parts {
if part == nil {
continue
}
if part.Text != "" && !part.Thought {
s.text = append(s.text, part.Text...)
s.pending = append(s.pending, llm.StreamEvent{TextDelta: part.Text})
}
// Function calls arrive whole per chunk in the Gemini stream.
if fc := part.FunctionCall; fc != nil {
id := fc.ID
if id == "" {
id = "call_" + strconv.Itoa(len(s.toolCalls))
}
args, err := json.Marshal(fc.Args)
if err != nil || len(fc.Args) == 0 {
args = json.RawMessage("{}")
}
call := llm.ToolCall{ID: id, Name: fc.Name, Arguments: args}
s.toolCalls = append(s.toolCalls, call)
s.pending = append(s.pending, llm.StreamEvent{ToolCall: &call})
}
}
}
}
func (s *stream) queueFinal() {
resp := &llm.Response{
Model: s.model.qualified(),
Usage: s.usage,
FinishReason: mapFinish(s.finish, len(s.toolCalls) > 0),
}
if len(s.text) > 0 {
resp.Parts = append(resp.Parts, llm.Text(string(s.text)))
}
if len(s.toolCalls) > 0 {
resp.ToolCalls = append([]llm.ToolCall(nil), s.toolCalls...)
}
s.pending = append(s.pending, llm.StreamEvent{Response: resp})
s.finished = true
}
func (s *stream) Close() error {
s.closeOnce.Do(s.stop)
return nil
}