0147a79d18
Phase 9a (ADR-0014): Registry.RegisterResolver for dynamic tiers; DefineTool[Args] typed tools; Usage cache/reasoning detail fields wired through anthropic/openai/google; WithPromptCaching (Anthropic cache_control); agent supervision hooks (WithMaxStepsFunc, WithSteer, WithCompactor, WithToolErrorLimits + ErrToolLoop); health Bench/Unbench/Snapshot; ChainConfig.Observer failover events. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
143 lines
3.4 KiB
Go
143 lines
3.4 KiB
Go
package google
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"iter"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"google.golang.org/genai"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
)
|
|
|
|
// Stream implements llm.Model over the SDK's range-over-func stream
|
|
// (iter.Seq2), adapted to majordomo's pull-based Stream via iter.Pull2.
|
|
func (m *model) Stream(ctx context.Context, req llm.Request, opts ...llm.Option) (llm.Stream, error) {
|
|
req = req.Apply(opts...)
|
|
if err := m.enforceCapabilities(req); err != nil {
|
|
return nil, err
|
|
}
|
|
client, err := m.provider.genaiClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
system, contents, err := m.buildContents(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cfg, err := m.buildConfig(req, system)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
seq := client.Models.GenerateContentStream(ctx, m.id, contents, cfg)
|
|
next, stop := iter.Pull2(iter.Seq2[*genai.GenerateContentResponse, error](seq))
|
|
return &stream{model: m, next: next, stop: stop}, nil
|
|
}
|
|
|
|
type stream struct {
|
|
model *model
|
|
next func() (*genai.GenerateContentResponse, error, bool)
|
|
stop func()
|
|
|
|
mu sync.Mutex
|
|
closeOnce sync.Once
|
|
finished bool
|
|
pending []llm.StreamEvent
|
|
text []byte
|
|
toolCalls []llm.ToolCall
|
|
usage llm.Usage
|
|
finish genai.FinishReason
|
|
}
|
|
|
|
func (s *stream) Next() (llm.StreamEvent, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for {
|
|
if len(s.pending) > 0 {
|
|
ev := s.pending[0]
|
|
s.pending = s.pending[1:]
|
|
return ev, nil
|
|
}
|
|
if s.finished {
|
|
return llm.StreamEvent{}, io.EOF
|
|
}
|
|
|
|
chunk, err, ok := s.next()
|
|
if !ok {
|
|
s.queueFinal()
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return llm.StreamEvent{}, s.model.mapError(err)
|
|
}
|
|
|
|
if chunk.UsageMetadata != nil {
|
|
s.usage = llm.Usage{
|
|
InputTokens: int(chunk.UsageMetadata.PromptTokenCount),
|
|
OutputTokens: int(chunk.UsageMetadata.CandidatesTokenCount + chunk.UsageMetadata.ThoughtsTokenCount),
|
|
CacheReadTokens: int(chunk.UsageMetadata.CachedContentTokenCount),
|
|
ReasoningTokens: int(chunk.UsageMetadata.ThoughtsTokenCount),
|
|
}
|
|
}
|
|
if len(chunk.Candidates) == 0 {
|
|
continue
|
|
}
|
|
cand := chunk.Candidates[0]
|
|
if cand.FinishReason != "" {
|
|
s.finish = cand.FinishReason
|
|
}
|
|
if cand.Content == nil {
|
|
continue
|
|
}
|
|
for _, part := range cand.Content.Parts {
|
|
if part == nil {
|
|
continue
|
|
}
|
|
if part.Text != "" && !part.Thought {
|
|
s.text = append(s.text, part.Text...)
|
|
s.pending = append(s.pending, llm.StreamEvent{TextDelta: part.Text})
|
|
}
|
|
// Function calls arrive whole per chunk in the Gemini stream.
|
|
if fc := part.FunctionCall; fc != nil {
|
|
id := fc.ID
|
|
if id == "" {
|
|
id = "call_" + strconv.Itoa(len(s.toolCalls))
|
|
}
|
|
args, err := json.Marshal(fc.Args)
|
|
if err != nil || len(fc.Args) == 0 {
|
|
args = json.RawMessage("{}")
|
|
}
|
|
call := llm.ToolCall{ID: id, Name: fc.Name, Arguments: args}
|
|
s.toolCalls = append(s.toolCalls, call)
|
|
s.pending = append(s.pending, llm.StreamEvent{ToolCall: &call})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *stream) queueFinal() {
|
|
resp := &llm.Response{
|
|
Model: s.model.qualified(),
|
|
Usage: s.usage,
|
|
FinishReason: mapFinish(s.finish, len(s.toolCalls) > 0),
|
|
}
|
|
if len(s.text) > 0 {
|
|
resp.Parts = append(resp.Parts, llm.Text(string(s.text)))
|
|
}
|
|
if len(s.toolCalls) > 0 {
|
|
resp.ToolCalls = append([]llm.ToolCall(nil), s.toolCalls...)
|
|
}
|
|
s.pending = append(s.pending, llm.StreamEvent{Response: resp})
|
|
s.finished = true
|
|
}
|
|
|
|
func (s *stream) Close() error {
|
|
s.closeOnce.Do(s.stop)
|
|
return nil
|
|
}
|