package llm import ( "context" "fmt" "io" "gitea.stevedudenhoeffer.com/steve/go-llm/v2/provider" ) // StreamEventType identifies the kind of stream event. type StreamEventType = provider.StreamEventType const ( StreamEventText = provider.StreamEventText StreamEventToolStart = provider.StreamEventToolStart StreamEventToolDelta = provider.StreamEventToolDelta StreamEventToolEnd = provider.StreamEventToolEnd StreamEventDone = provider.StreamEventDone StreamEventError = provider.StreamEventError ) // StreamEvent represents a single event in a streaming response. type StreamEvent struct { Type StreamEventType // Text is set for StreamEventText — the text delta. Text string // ToolCall is set for StreamEventToolStart/ToolDelta/ToolEnd. ToolCall *ToolCall // ToolIndex identifies which tool call is being updated. ToolIndex int // Error is set for StreamEventError. Error error // Response is set for StreamEventDone — the complete, aggregated response. Response *Response } // StreamReader reads streaming events from an LLM response. // Must be closed when done. type StreamReader struct { events <-chan StreamEvent cancel context.CancelFunc done bool } func newStreamReader(ctx context.Context, p provider.Provider, req provider.Request) (*StreamReader, error) { ctx, cancel := context.WithCancel(ctx) providerEvents := make(chan provider.StreamEvent, 32) publicEvents := make(chan StreamEvent, 32) go func() { defer close(publicEvents) for pev := range providerEvents { ev := convertStreamEvent(pev) select { case publicEvents <- ev: case <-ctx.Done(): return } } }() go func() { defer close(providerEvents) if err := p.Stream(ctx, req, providerEvents); err != nil { select { case providerEvents <- provider.StreamEvent{Type: provider.StreamEventError, Error: err}: default: } } }() return &StreamReader{ events: publicEvents, cancel: cancel, }, nil } func convertStreamEvent(pev provider.StreamEvent) StreamEvent { ev := StreamEvent{ Type: pev.Type, Text: pev.Text, ToolIndex: pev.ToolIndex, } if pev.Error != nil { ev.Error = pev.Error } if pev.ToolCall != nil { tc := ToolCall{ ID: pev.ToolCall.ID, Name: pev.ToolCall.Name, Arguments: pev.ToolCall.Arguments, } ev.ToolCall = &tc } if pev.Response != nil { resp := convertProviderResponse(*pev.Response) ev.Response = &resp } return ev } // Next returns the next event from the stream. // Returns io.EOF when the stream is complete. func (sr *StreamReader) Next() (StreamEvent, error) { if sr.done { return StreamEvent{}, io.EOF } ev, ok := <-sr.events if !ok { sr.done = true return StreamEvent{}, io.EOF } if ev.Type == StreamEventError { return ev, ev.Error } if ev.Type == StreamEventDone { sr.done = true } return ev, nil } // Close closes the stream reader and releases resources. func (sr *StreamReader) Close() error { sr.cancel() return nil } // Collect reads all events and returns the final aggregated Response. func (sr *StreamReader) Collect() (Response, error) { var lastResp *Response for { ev, err := sr.Next() if err == io.EOF { break } if err != nil { return Response{}, err } if ev.Type == StreamEventDone && ev.Response != nil { lastResp = ev.Response } } if lastResp == nil { return Response{}, fmt.Errorf("stream completed without final response") } return *lastResp, nil } // Text is a convenience that collects the stream and returns just the text. func (sr *StreamReader) Text() (string, error) { resp, err := sr.Collect() if err != nil { return "", err } return resp.Text, nil }