Files
go-llm/v2/stream.go
Steve Dudenhoeffer a4cb4baab5 Add go-llm v2: redesigned API for simpler LLM abstraction
v2 is a new Go module (v2/) with a dramatically simpler API:
- Unified Message type (no more Input marker interface)
- Define[T] for ergonomic tool creation with standard context.Context
- Chat session with automatic tool-call loop (agent loop)
- Streaming via pull-based StreamReader
- MCP one-call connect (MCPStdioServer, MCPHTTPServer, MCPSSEServer)
- Middleware support (logging, retry, timeout, usage tracking)
- Decoupled JSON Schema (map[string]any, no provider coupling)
- Sample tools: WebSearch, Browser, Exec, ReadFile, WriteFile, HTTP
- Providers: OpenAI, Anthropic, Google (all with streaming)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 20:00:08 -05:00

164 lines
3.6 KiB
Go

package llm
import (
"context"
"fmt"
"io"
"gitea.stevedudenhoeffer.com/steve/go-llm/v2/provider"
)
// StreamEventType identifies the kind of stream event.
type StreamEventType = provider.StreamEventType
const (
StreamEventText = provider.StreamEventText
StreamEventToolStart = provider.StreamEventToolStart
StreamEventToolDelta = provider.StreamEventToolDelta
StreamEventToolEnd = provider.StreamEventToolEnd
StreamEventDone = provider.StreamEventDone
StreamEventError = provider.StreamEventError
)
// StreamEvent represents a single event in a streaming response.
type StreamEvent struct {
Type StreamEventType
// Text is set for StreamEventText — the text delta.
Text string
// ToolCall is set for StreamEventToolStart/ToolDelta/ToolEnd.
ToolCall *ToolCall
// ToolIndex identifies which tool call is being updated.
ToolIndex int
// Error is set for StreamEventError.
Error error
// Response is set for StreamEventDone — the complete, aggregated response.
Response *Response
}
// StreamReader reads streaming events from an LLM response.
// Must be closed when done.
type StreamReader struct {
events <-chan StreamEvent
cancel context.CancelFunc
done bool
}
func newStreamReader(ctx context.Context, p provider.Provider, req provider.Request) (*StreamReader, error) {
ctx, cancel := context.WithCancel(ctx)
providerEvents := make(chan provider.StreamEvent, 32)
publicEvents := make(chan StreamEvent, 32)
go func() {
defer close(publicEvents)
for pev := range providerEvents {
ev := convertStreamEvent(pev)
select {
case publicEvents <- ev:
case <-ctx.Done():
return
}
}
}()
go func() {
defer close(providerEvents)
if err := p.Stream(ctx, req, providerEvents); err != nil {
select {
case providerEvents <- provider.StreamEvent{Type: provider.StreamEventError, Error: err}:
default:
}
}
}()
return &StreamReader{
events: publicEvents,
cancel: cancel,
}, nil
}
func convertStreamEvent(pev provider.StreamEvent) StreamEvent {
ev := StreamEvent{
Type: pev.Type,
Text: pev.Text,
ToolIndex: pev.ToolIndex,
}
if pev.Error != nil {
ev.Error = pev.Error
}
if pev.ToolCall != nil {
tc := ToolCall{
ID: pev.ToolCall.ID,
Name: pev.ToolCall.Name,
Arguments: pev.ToolCall.Arguments,
}
ev.ToolCall = &tc
}
if pev.Response != nil {
resp := convertProviderResponse(*pev.Response)
ev.Response = &resp
}
return ev
}
// Next returns the next event from the stream.
// Returns io.EOF when the stream is complete.
func (sr *StreamReader) Next() (StreamEvent, error) {
if sr.done {
return StreamEvent{}, io.EOF
}
ev, ok := <-sr.events
if !ok {
sr.done = true
return StreamEvent{}, io.EOF
}
if ev.Type == StreamEventError {
return ev, ev.Error
}
if ev.Type == StreamEventDone {
sr.done = true
}
return ev, nil
}
// Close closes the stream reader and releases resources.
func (sr *StreamReader) Close() error {
sr.cancel()
return nil
}
// Collect reads all events and returns the final aggregated Response.
func (sr *StreamReader) Collect() (Response, error) {
var lastResp *Response
for {
ev, err := sr.Next()
if err == io.EOF {
break
}
if err != nil {
return Response{}, err
}
if ev.Type == StreamEventDone && ev.Response != nil {
lastResp = ev.Response
}
}
if lastResp == nil {
return Response{}, fmt.Errorf("stream completed without final response")
}
return *lastResp, nil
}
// Text is a convenience that collects the stream and returns just the text.
func (sr *StreamReader) Text() (string, error) {
resp, err := sr.Collect()
if err != nil {
return "", err
}
return resp.Text, nil
}