package anthropic import ( "bufio" "encoding/json" "fmt" "io" "strings" "sync" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // wireStreamEvent is the union of all SSE data payloads the Messages API // emits. Dispatch is on Type (the data always carries one), so the SSE // "event:" line is informational only. type wireStreamEvent struct { Type string `json:"type"` Index int `json:"index"` // message_start Message *struct { Usage wireUsage `json:"usage"` } `json:"message"` // content_block_start ContentBlock *struct { Type string `json:"type"` ID string `json:"id"` Name string `json:"name"` } `json:"content_block"` // content_block_delta / message_delta Delta struct { Type string `json:"type"` Text string `json:"text"` PartialJSON string `json:"partial_json"` StopReason string `json:"stop_reason"` } `json:"delta"` // message_delta Usage *wireUsage `json:"usage"` // error Error *struct { Type string `json:"type"` Message string `json:"message"` } `json:"error"` } // stream adapts the Messages API SSE stream to llm.Stream. // // Why single-threaded pull (no reader goroutine): Next is already the // consumer's pull point, so parsing lazily inside Next keeps cancellation, // buffering, and error propagation trivial — Close just closes the body and // the next read fails. type stream struct { provider string model string full string // provider/model body io.ReadCloser scanner *bufio.Scanner // accumulated response parts []llm.Part toolCalls []llm.ToolCall usage llm.Usage finish llm.FinishReason // current content block state blockType string textBuf strings.Builder toolID string toolName string argsBuf strings.Builder done bool // final Response event emitted closeOnce sync.Once closeErr error } func newStream(m *model, body io.ReadCloser) *stream { sc := bufio.NewScanner(body) // Why a large limit: one SSE line carries one whole delta; default 64K // can be exceeded by large structured-output or tool-argument deltas. sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) return &stream{ provider: m.provider.name, model: m.id, full: m.fullName(), body: body, scanner: sc, finish: llm.FinishOther, } } // Close implements llm.Stream. Safe to call at any time and more than once. func (s *stream) Close() error { s.closeOnce.Do(func() { s.closeErr = s.body.Close() }) return s.closeErr } // Next implements llm.Stream. It emits TextDelta fragments as they arrive, // fully-assembled ToolCalls at content_block_stop, exactly one final // Response event at message_stop, then io.EOF. func (s *stream) Next() (llm.StreamEvent, error) { if s.done { return llm.StreamEvent{}, io.EOF } for { data, err := s.nextData() if err != nil { return llm.StreamEvent{}, err } var ev wireStreamEvent if err := json.Unmarshal([]byte(data), &ev); err != nil { return llm.StreamEvent{}, fmt.Errorf("%s: decode stream event: %w", s.provider, err) } switch ev.Type { case "message_start": if ev.Message != nil { s.usage = ev.Message.Usage.toUsage() } case "content_block_start": s.blockType = "" s.textBuf.Reset() s.argsBuf.Reset() if ev.ContentBlock != nil { s.blockType = ev.ContentBlock.Type if s.blockType == "tool_use" { s.toolID = ev.ContentBlock.ID s.toolName = ev.ContentBlock.Name } } case "content_block_delta": switch ev.Delta.Type { case "text_delta": s.textBuf.WriteString(ev.Delta.Text) return llm.StreamEvent{TextDelta: ev.Delta.Text}, nil case "input_json_delta": // Buffer partial JSON internally; consumers never see it. s.argsBuf.WriteString(ev.Delta.PartialJSON) default: // thinking_delta / signature_delta: tolerated, skipped. } case "content_block_stop": if event, ok := s.finishBlock(); ok { return event, nil } case "message_delta": if ev.Delta.StopReason != "" { s.finish = mapStopReason(ev.Delta.StopReason) } if ev.Usage != nil { // Output tokens arrive cumulatively in the final delta; // input tokens were reported in message_start. s.usage.OutputTokens = ev.Usage.OutputTokens } case "message_stop": s.done = true return llm.StreamEvent{Response: &llm.Response{ Parts: s.parts, ToolCalls: s.toolCalls, FinishReason: s.finish, Usage: s.usage, Model: s.full, }}, nil case "error": // Mid-stream failure after the 200 (e.g. overloaded_error). // Status stays 0: there is no HTTP status for it, and the // default Classify treats it as transient, which fits overload. apiErr := &llm.APIError{Provider: s.provider, Model: s.model} if ev.Error != nil { apiErr.Code = ev.Error.Type apiErr.Message = ev.Error.Message } return llm.StreamEvent{}, apiErr default: // ping and unknown event types: ignored. } } } // finishBlock closes out the current content block, appending its result to // the accumulated response. Tool-use blocks produce a stream event. func (s *stream) finishBlock() (llm.StreamEvent, bool) { defer func() { s.blockType = "" s.textBuf.Reset() s.argsBuf.Reset() }() switch s.blockType { case "text": if s.textBuf.Len() > 0 { s.parts = append(s.parts, llm.TextPart{Text: s.textBuf.String()}) } case "tool_use": args := s.argsBuf.String() if args == "" { // A tool called with no arguments streams zero (or empty) // input_json_delta fragments; the canonical form is "{}". args = "{}" } call := llm.ToolCall{ID: s.toolID, Name: s.toolName, Arguments: json.RawMessage(args)} s.toolCalls = append(s.toolCalls, call) return llm.StreamEvent{ToolCall: &call}, true } return llm.StreamEvent{}, false } // nextData reads SSE lines until one complete event's data is assembled // (multi-line data fields are joined with "\n" per the SSE spec). "event:" // lines and comments are ignored; dispatch keys off the JSON "type" field. func (s *stream) nextData() (string, error) { var data strings.Builder for s.scanner.Scan() { line := s.scanner.Text() if line == "" { if data.Len() > 0 { return data.String(), nil } continue } if rest, ok := strings.CutPrefix(line, "data:"); ok { if data.Len() > 0 { data.WriteByte('\n') } data.WriteString(strings.TrimPrefix(rest, " ")) } } if err := s.scanner.Err(); err != nil { return "", fmt.Errorf("%s: read stream: %w", s.provider, err) } if data.Len() > 0 { return data.String(), nil } // EOF before message_stop: the connection dropped mid-response. return "", fmt.Errorf("%s: stream ended before message_stop: %w", s.provider, io.ErrUnexpectedEOF) }