package openai import ( "bufio" "encoding/json" "fmt" "io" "strings" "sync" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // stream consumes the data-only SSE stream of chat.completion.chunk events. // // Delivery contract: TextDelta events as content fragments arrive; ToolCall // events only once fully assembled (fragments are buffered internally and // flushed at stream end — simplest correct handling of interleaved parallel // calls); exactly one final Response event; then io.EOF. type stream struct { m *model body io.ReadCloser sc *bufio.Scanner closeOnce sync.Once closeErr error queue []llm.StreamEvent done bool // finalize ran; drain queue then io.EOF text strings.Builder calls []*toolCallAcc // first-appearance order byIndex map[int]*toolCallAcc finish string usage llm.Usage } // toolCallAcc accumulates one tool call's fragments. The id and name arrive // on the first fragment for an index; arguments arrive as string pieces to // concatenate. type toolCallAcc struct { id string name string args strings.Builder } // Next implements llm.Stream. func (s *stream) Next() (llm.StreamEvent, error) { for { if len(s.queue) > 0 { ev := s.queue[0] s.queue = s.queue[1:] return ev, nil } if s.done { return llm.StreamEvent{}, io.EOF } if !s.sc.Scan() { if err := s.sc.Err(); err != nil { return llm.StreamEvent{}, fmt.Errorf("openai: read stream: %w", err) } // Why: some compat servers close the body without a [DONE] // sentinel; a clean EOF still finalizes with what arrived. s.finalize() continue } line := strings.TrimSpace(s.sc.Text()) if !strings.HasPrefix(line, "data:") { continue // SSE comments, event:/id: fields, blank separators } payload := strings.TrimSpace(strings.TrimPrefix(line, "data:")) if payload == "" { continue } if payload == "[DONE]" { s.finalize() continue } if err := s.handleChunk([]byte(payload)); err != nil { return llm.StreamEvent{}, err } } } // handleChunk folds one chat.completion.chunk into the stream state, // queueing any events it produces. func (s *stream) handleChunk(data []byte) error { var chunk streamChunk if err := json.Unmarshal(data, &chunk); err != nil { return fmt.Errorf("openai: decode stream chunk: %w", err) } if chunk.Error != nil { // Mid-stream error event on an otherwise-200 stream. Status stays 0: // there is no failing HTTP status to report. apiErr := &llm.APIError{ Provider: s.m.p.name, Model: s.m.id, Code: chunk.Error.Code, Message: chunk.Error.Message, } if apiErr.Code == "" { apiErr.Code = chunk.Error.Type } return apiErr } if chunk.Usage != nil { s.usage = chunk.Usage.toUsage() } // Why the guard: the include_usage chunk arrives with an EMPTY choices // array; indexing choices[0] unconditionally would panic on it. if len(chunk.Choices) == 0 { return nil } choice := chunk.Choices[0] if choice.FinishReason != "" { s.finish = choice.FinishReason } if choice.Delta.Content != "" { s.text.WriteString(choice.Delta.Content) s.queue = append(s.queue, llm.StreamEvent{TextDelta: choice.Delta.Content}) } for _, tc := range choice.Delta.ToolCalls { acc := s.byIndex[tc.Index] if acc == nil { if s.byIndex == nil { s.byIndex = make(map[int]*toolCallAcc) } acc = &toolCallAcc{} s.byIndex[tc.Index] = acc s.calls = append(s.calls, acc) } if tc.ID != "" { acc.id = tc.ID } if tc.Function.Name != "" { acc.name = tc.Function.Name } acc.args.WriteString(tc.Function.Arguments) } return nil } // finalize assembles the buffered tool calls and the final Response, queues // them (ToolCall events first, Response last), and marks the stream done. func (s *stream) finalize() { if s.done { return } s.done = true resp := &llm.Response{Model: s.m.p.name + "/" + s.m.id, Usage: s.usage} if s.text.Len() > 0 { resp.Parts = []llm.Part{llm.TextPart{Text: s.text.String()}} } for i, acc := range s.calls { id := acc.id if id == "" { // Why: ToolResult.ID must echo ToolCall.ID; synthesize for // compat servers that stream calls without ids. id = fmt.Sprintf("call_%d", i) } resp.ToolCalls = append(resp.ToolCalls, llm.ToolCall{ ID: id, Name: acc.name, Arguments: json.RawMessage(acc.args.String()), }) } resp.FinishReason = mapFinish(s.finish, len(resp.ToolCalls) > 0) for i := range resp.ToolCalls { tc := resp.ToolCalls[i] // copy so the event doesn't alias the slice s.queue = append(s.queue, llm.StreamEvent{ToolCall: &tc}) } s.queue = append(s.queue, llm.StreamEvent{Response: resp}) } // Close implements llm.Stream. Closing the body unblocks any in-flight read // and aborts the HTTP stream; safe to call at any time, including twice. func (s *stream) Close() error { s.closeOnce.Do(func() { s.closeErr = s.body.Close() }) return s.closeErr }