Files
steve 27f196d333 feat: add Ollama target client, model poller, and native passthrough
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy.

- internal/ollama: Client interface and HTTP implementation for chat
  (streaming + non-streaming), embed, tags, ps with auth forwarding,
  NDJSON streaming via bufio.Scanner, and connection vs HTTP error
  classification via custom error types.
- internal/ollama: ModelInventory with background poller for /api/tags
  and /api/ps, degraded mode on target unreachable with model retention,
  automatic recovery on reconnect.
- internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps,
  /api/embed, /api/embeddings) with model validation, chat serialization
  gate (capacity-1 channel), concurrent embedding bypass (ADR-0013),
  NDJSON streaming with per-chunk flush, and degraded health reporting.
- cmd/foreman: Full serve wiring with Ollama client, poller goroutine,
  embedder warmup (keep_alive:-1), and signal-based shutdown.

The Mac is now usable as a go-llm target through foreman.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:07:33 -04:00

202 lines
4.8 KiB
Go

package ollama
import (
"context"
"fmt"
"log/slog"
"net/http"
"testing"
"time"
)
// mockClient implements Client for inventory testing.
type mockClient struct {
tagsFn func(ctx context.Context) (*TagsResponse, error)
psFn func(ctx context.Context) (*PsResponse, error)
}
func (m *mockClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) {
return nil, nil, fmt.Errorf("not implemented")
}
func (m *mockClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (m *mockClient) Tags(ctx context.Context) (*TagsResponse, error) {
return m.tagsFn(ctx)
}
func (m *mockClient) Ps(ctx context.Context) (*PsResponse, error) {
return m.psFn(ctx)
}
func (m *mockClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) {
return nil, fmt.Errorf("not implemented")
}
func (m *mockClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) {
return nil, fmt.Errorf("not implemented")
}
func TestInventory_RefreshPopulatesModels(t *testing.T) {
client := &mockClient{
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
return &TagsResponse{
Models: []ModelInfo{
{Name: "qwen3:30b"},
{Name: "nomic-embed-text"},
},
}, nil
},
psFn: func(ctx context.Context) (*PsResponse, error) {
return &PsResponse{
Models: []RunningModel{
{Name: "nomic-embed-text"},
},
}, nil
},
}
inv := NewModelInventory(client, slog.Default())
if err := inv.Refresh(context.Background()); err != nil {
t.Fatalf("Refresh: %v", err)
}
models := inv.Models()
if len(models) != 2 {
t.Fatalf("got %d models, want 2", len(models))
}
if !inv.HasModel("qwen3:30b") {
t.Error("HasModel(qwen3:30b) = false, want true")
}
if inv.HasModel("nonexistent") {
t.Error("HasModel(nonexistent) = true, want false")
}
resident := inv.ResidentModels()
if len(resident) != 1 {
t.Fatalf("got %d resident models, want 1", len(resident))
}
if inv.Degraded() {
t.Error("degraded should be false after successful refresh")
}
if inv.LastPoll().IsZero() {
t.Error("lastPoll should be non-zero after refresh")
}
}
func TestInventory_DegradedOnFailure(t *testing.T) {
callCount := 0
client := &mockClient{
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
callCount++
if callCount == 1 {
return &TagsResponse{
Models: []ModelInfo{{Name: "qwen3:30b"}},
}, nil
}
return nil, fmt.Errorf("connection refused")
},
psFn: func(ctx context.Context) (*PsResponse, error) {
return &PsResponse{}, nil
},
}
inv := NewModelInventory(client, slog.Default())
// First refresh succeeds.
if err := inv.Refresh(context.Background()); err != nil {
t.Fatalf("first Refresh: %v", err)
}
if inv.Degraded() {
t.Error("should not be degraded after first successful poll")
}
// Second refresh fails — should retain models but mark degraded.
if err := inv.Refresh(context.Background()); err == nil {
t.Fatal("expected error on second refresh")
}
if !inv.Degraded() {
t.Error("should be degraded after failed poll")
}
// Models should be retained.
if !inv.HasModel("qwen3:30b") {
t.Error("should retain models after failed poll")
}
}
func TestInventory_RecoveryFromDegraded(t *testing.T) {
failing := true
client := &mockClient{
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
if failing {
return nil, fmt.Errorf("connection refused")
}
return &TagsResponse{
Models: []ModelInfo{{Name: "qwen3:30b"}},
}, nil
},
psFn: func(ctx context.Context) (*PsResponse, error) {
return &PsResponse{}, nil
},
}
inv := NewModelInventory(client, slog.Default())
// First refresh fails.
inv.Refresh(context.Background())
if !inv.Degraded() {
t.Error("should be degraded after failed poll")
}
// Target recovers.
failing = false
if err := inv.Refresh(context.Background()); err != nil {
t.Fatalf("recovery Refresh: %v", err)
}
if inv.Degraded() {
t.Error("should not be degraded after successful poll")
}
}
func TestInventory_StartAndCancel(t *testing.T) {
pollCount := 0
client := &mockClient{
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
pollCount++
return &TagsResponse{}, nil
},
psFn: func(ctx context.Context) (*PsResponse, error) {
return &PsResponse{}, nil
},
}
inv := NewModelInventory(client, slog.Default())
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
inv.Start(ctx, 10*time.Millisecond)
close(done)
}()
// Let it poll a few times.
time.Sleep(50 * time.Millisecond)
cancel()
select {
case <-done:
// Clean exit.
case <-time.After(2 * time.Second):
t.Fatal("Start did not exit after context cancellation")
}
if pollCount < 2 {
t.Errorf("poll count = %d, want >= 2 (initial + at least one tick)", pollCount)
}
}