Files
go-llm/v2/stream.go
T
steve cbaf41f50c
CI / Root Module (push) Failing after 1m30s
CI / Lint (push) Failing after 1m1s
CI / V2 Module (push) Successful in 3m41s
feat(v2): add ReasoningLevel option; thinking/reasoning across providers
Introduces an opt-in level-based reasoning toggle (low/medium/high) that
each provider translates to its native parameter:

- Anthropic: thinking.budget_tokens (1024/8000/24000), with temperature
  forced to default and MaxTokens auto-grown above the budget.
- OpenAI/xAI/Groq via openaicompat: reasoning_effort string, gated by a
  new Rules.SupportsReasoning predicate so non-reasoning models don't
  receive the parameter. xAI uses Rules.MapReasoningEffort to remap
  "medium" to "high" since its API only accepts low|high.
- Google: thinking_config.thinking_budget + include_thoughts:true.
- DeepSeek: SupportsReasoning=false (reasoner is always-on; the
  reasoning_content trace was already extracted via openaicompat).

Reasoning content is surfaced as Response.Thinking on Complete and as
StreamEventThinking deltas during streaming. Provider-side: extracted
from Anthropic thinking content blocks, Google's part.Thought=true
parts, and the non-standard reasoning_content field that DeepSeek and
Groq emit (parsed out of raw JSON since openai-go doesn't type it).

Public API:
  - llm.ReasoningLevel + ReasoningLow/Medium/High constants
  - llm.WithReasoning(level) request option
  - Model.WithReasoning(level) for baked-in defaults
  - provider.Request.Reasoning, provider.Response.Thinking
  - provider.StreamEventThinking

Tests cover Rules-based gating, MapReasoningEffort, reasoning_content
extraction (Complete + Stream), Anthropic budget mapping, and
temperature suppression when thinking is enabled. Existing behavior is
unchanged when Reasoning is the empty string.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 03:58:42 +00:00

165 lines
3.7 KiB
Go

package llm
import (
"context"
"fmt"
"io"
"gitea.stevedudenhoeffer.com/steve/go-llm/v2/provider"
)
// StreamEventType identifies the kind of stream event.
type StreamEventType = provider.StreamEventType
const (
StreamEventText = provider.StreamEventText
StreamEventToolStart = provider.StreamEventToolStart
StreamEventToolDelta = provider.StreamEventToolDelta
StreamEventToolEnd = provider.StreamEventToolEnd
StreamEventDone = provider.StreamEventDone
StreamEventError = provider.StreamEventError
StreamEventThinking = provider.StreamEventThinking
)
// StreamEvent represents a single event in a streaming response.
type StreamEvent struct {
Type StreamEventType
// Text is set for StreamEventText — the text delta.
Text string
// ToolCall is set for StreamEventToolStart/ToolDelta/ToolEnd.
ToolCall *ToolCall
// ToolIndex identifies which tool call is being updated.
ToolIndex int
// Error is set for StreamEventError.
Error error
// Response is set for StreamEventDone — the complete, aggregated response.
Response *Response
}
// StreamReader reads streaming events from an LLM response.
// Must be closed when done.
type StreamReader struct {
events <-chan StreamEvent
cancel context.CancelFunc
done bool
}
func newStreamReader(ctx context.Context, p provider.Provider, req provider.Request) (*StreamReader, error) {
ctx, cancel := context.WithCancel(ctx)
providerEvents := make(chan provider.StreamEvent, 32)
publicEvents := make(chan StreamEvent, 32)
go func() {
defer close(publicEvents)
for pev := range providerEvents {
ev := convertStreamEvent(pev)
select {
case publicEvents <- ev:
case <-ctx.Done():
return
}
}
}()
go func() {
defer close(providerEvents)
if err := p.Stream(ctx, req, providerEvents); err != nil {
select {
case providerEvents <- provider.StreamEvent{Type: provider.StreamEventError, Error: err}:
default:
}
}
}()
return &StreamReader{
events: publicEvents,
cancel: cancel,
}, nil
}
func convertStreamEvent(pev provider.StreamEvent) StreamEvent {
ev := StreamEvent{
Type: pev.Type,
Text: pev.Text,
ToolIndex: pev.ToolIndex,
}
if pev.Error != nil {
ev.Error = pev.Error
}
if pev.ToolCall != nil {
tc := ToolCall{
ID: pev.ToolCall.ID,
Name: pev.ToolCall.Name,
Arguments: pev.ToolCall.Arguments,
}
ev.ToolCall = &tc
}
if pev.Response != nil {
resp := convertProviderResponse(*pev.Response)
ev.Response = &resp
}
return ev
}
// Next returns the next event from the stream.
// Returns io.EOF when the stream is complete.
func (sr *StreamReader) Next() (StreamEvent, error) {
if sr.done {
return StreamEvent{}, io.EOF
}
ev, ok := <-sr.events
if !ok {
sr.done = true
return StreamEvent{}, io.EOF
}
if ev.Type == StreamEventError {
return ev, ev.Error
}
if ev.Type == StreamEventDone {
sr.done = true
}
return ev, nil
}
// Close closes the stream reader and releases resources.
func (sr *StreamReader) Close() error {
sr.cancel()
return nil
}
// Collect reads all events and returns the final aggregated Response.
func (sr *StreamReader) Collect() (Response, error) {
var lastResp *Response
for {
ev, err := sr.Next()
if err == io.EOF {
break
}
if err != nil {
return Response{}, err
}
if ev.Type == StreamEventDone && ev.Response != nil {
lastResp = ev.Response
}
}
if lastResp == nil {
return Response{}, fmt.Errorf("stream completed without final response")
}
return *lastResp, nil
}
// Text is a convenience that collects the stream and returns just the text.
func (sr *StreamReader) Text() (string, error) {
resp, err := sr.Collect()
if err != nil {
return "", err
}
return resp.Text, nil
}