27f196d333
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy. - internal/ollama: Client interface and HTTP implementation for chat (streaming + non-streaming), embed, tags, ps with auth forwarding, NDJSON streaming via bufio.Scanner, and connection vs HTTP error classification via custom error types. - internal/ollama: ModelInventory with background poller for /api/tags and /api/ps, degraded mode on target unreachable with model retention, automatic recovery on reconnect. - internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps, /api/embed, /api/embeddings) with model validation, chat serialization gate (capacity-1 channel), concurrent embedding bypass (ADR-0013), NDJSON streaming with per-chunk flush, and degraded health reporting. - cmd/foreman: Full serve wiring with Ollama client, poller goroutine, embedder warmup (keep_alive:-1), and signal-based shutdown. The Mac is now usable as a go-llm target through foreman. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
202 lines
4.8 KiB
Go
202 lines
4.8 KiB
Go
package ollama
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// mockClient implements Client for inventory testing.
|
|
type mockClient struct {
|
|
tagsFn func(ctx context.Context) (*TagsResponse, error)
|
|
psFn func(ctx context.Context) (*PsResponse, error)
|
|
}
|
|
|
|
func (m *mockClient) Chat(ctx context.Context, req ChatRequest, stream bool) (*ChatResponse, <-chan ChatResponse, error) {
|
|
return nil, nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func (m *mockClient) Embed(ctx context.Context, req EmbedRequest) (*EmbedResponse, error) {
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func (m *mockClient) Tags(ctx context.Context) (*TagsResponse, error) {
|
|
return m.tagsFn(ctx)
|
|
}
|
|
|
|
func (m *mockClient) Ps(ctx context.Context) (*PsResponse, error) {
|
|
return m.psFn(ctx)
|
|
}
|
|
|
|
func (m *mockClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) {
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func (m *mockClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) {
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func TestInventory_RefreshPopulatesModels(t *testing.T) {
|
|
client := &mockClient{
|
|
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
|
|
return &TagsResponse{
|
|
Models: []ModelInfo{
|
|
{Name: "qwen3:30b"},
|
|
{Name: "nomic-embed-text"},
|
|
},
|
|
}, nil
|
|
},
|
|
psFn: func(ctx context.Context) (*PsResponse, error) {
|
|
return &PsResponse{
|
|
Models: []RunningModel{
|
|
{Name: "nomic-embed-text"},
|
|
},
|
|
}, nil
|
|
},
|
|
}
|
|
|
|
inv := NewModelInventory(client, slog.Default())
|
|
if err := inv.Refresh(context.Background()); err != nil {
|
|
t.Fatalf("Refresh: %v", err)
|
|
}
|
|
|
|
models := inv.Models()
|
|
if len(models) != 2 {
|
|
t.Fatalf("got %d models, want 2", len(models))
|
|
}
|
|
|
|
if !inv.HasModel("qwen3:30b") {
|
|
t.Error("HasModel(qwen3:30b) = false, want true")
|
|
}
|
|
if inv.HasModel("nonexistent") {
|
|
t.Error("HasModel(nonexistent) = true, want false")
|
|
}
|
|
|
|
resident := inv.ResidentModels()
|
|
if len(resident) != 1 {
|
|
t.Fatalf("got %d resident models, want 1", len(resident))
|
|
}
|
|
|
|
if inv.Degraded() {
|
|
t.Error("degraded should be false after successful refresh")
|
|
}
|
|
if inv.LastPoll().IsZero() {
|
|
t.Error("lastPoll should be non-zero after refresh")
|
|
}
|
|
}
|
|
|
|
func TestInventory_DegradedOnFailure(t *testing.T) {
|
|
callCount := 0
|
|
client := &mockClient{
|
|
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return &TagsResponse{
|
|
Models: []ModelInfo{{Name: "qwen3:30b"}},
|
|
}, nil
|
|
}
|
|
return nil, fmt.Errorf("connection refused")
|
|
},
|
|
psFn: func(ctx context.Context) (*PsResponse, error) {
|
|
return &PsResponse{}, nil
|
|
},
|
|
}
|
|
|
|
inv := NewModelInventory(client, slog.Default())
|
|
|
|
// First refresh succeeds.
|
|
if err := inv.Refresh(context.Background()); err != nil {
|
|
t.Fatalf("first Refresh: %v", err)
|
|
}
|
|
if inv.Degraded() {
|
|
t.Error("should not be degraded after first successful poll")
|
|
}
|
|
|
|
// Second refresh fails — should retain models but mark degraded.
|
|
if err := inv.Refresh(context.Background()); err == nil {
|
|
t.Fatal("expected error on second refresh")
|
|
}
|
|
if !inv.Degraded() {
|
|
t.Error("should be degraded after failed poll")
|
|
}
|
|
|
|
// Models should be retained.
|
|
if !inv.HasModel("qwen3:30b") {
|
|
t.Error("should retain models after failed poll")
|
|
}
|
|
}
|
|
|
|
func TestInventory_RecoveryFromDegraded(t *testing.T) {
|
|
failing := true
|
|
client := &mockClient{
|
|
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
|
|
if failing {
|
|
return nil, fmt.Errorf("connection refused")
|
|
}
|
|
return &TagsResponse{
|
|
Models: []ModelInfo{{Name: "qwen3:30b"}},
|
|
}, nil
|
|
},
|
|
psFn: func(ctx context.Context) (*PsResponse, error) {
|
|
return &PsResponse{}, nil
|
|
},
|
|
}
|
|
|
|
inv := NewModelInventory(client, slog.Default())
|
|
|
|
// First refresh fails.
|
|
inv.Refresh(context.Background())
|
|
if !inv.Degraded() {
|
|
t.Error("should be degraded after failed poll")
|
|
}
|
|
|
|
// Target recovers.
|
|
failing = false
|
|
if err := inv.Refresh(context.Background()); err != nil {
|
|
t.Fatalf("recovery Refresh: %v", err)
|
|
}
|
|
if inv.Degraded() {
|
|
t.Error("should not be degraded after successful poll")
|
|
}
|
|
}
|
|
|
|
func TestInventory_StartAndCancel(t *testing.T) {
|
|
pollCount := 0
|
|
client := &mockClient{
|
|
tagsFn: func(ctx context.Context) (*TagsResponse, error) {
|
|
pollCount++
|
|
return &TagsResponse{}, nil
|
|
},
|
|
psFn: func(ctx context.Context) (*PsResponse, error) {
|
|
return &PsResponse{}, nil
|
|
},
|
|
}
|
|
|
|
inv := NewModelInventory(client, slog.Default())
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
inv.Start(ctx, 10*time.Millisecond)
|
|
close(done)
|
|
}()
|
|
|
|
// Let it poll a few times.
|
|
time.Sleep(50 * time.Millisecond)
|
|
cancel()
|
|
|
|
select {
|
|
case <-done:
|
|
// Clean exit.
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Start did not exit after context cancellation")
|
|
}
|
|
|
|
if pollCount < 2 {
|
|
t.Errorf("poll count = %d, want >= 2 (initial + at least one tick)", pollCount)
|
|
}
|
|
}
|