27f196d333
Phase 2 of foreman: the daemon now acts as a transparent Ollama proxy. - internal/ollama: Client interface and HTTP implementation for chat (streaming + non-streaming), embed, tags, ps with auth forwarding, NDJSON streaming via bufio.Scanner, and connection vs HTTP error classification via custom error types. - internal/ollama: ModelInventory with background poller for /api/tags and /api/ps, degraded mode on target unreachable with model retention, automatic recovery on reconnect. - internal/server: Passthrough routes (/api/chat, /api/tags, /api/ps, /api/embed, /api/embeddings) with model validation, chat serialization gate (capacity-1 channel), concurrent embedding bypass (ADR-0013), NDJSON streaming with per-chunk flush, and degraded health reporting. - cmd/foreman: Full serve wiring with Ollama client, poller goroutine, embedder warmup (keep_alive:-1), and signal-based shutdown. The Mac is now usable as a go-llm target through foreman. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
168 lines
5.2 KiB
Go
168 lines
5.2 KiB
Go
package ollama
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ModelInventory maintains an in-memory cache of the Ollama target's installed
|
|
// and running models, refreshed by a background poller.
|
|
//
|
|
// Why: foreman needs a reasonably fresh view of installed models for validation,
|
|
// passthrough, and scheduling without hitting the target on every request.
|
|
// What: wraps a mutex-protected model list updated by a polling goroutine.
|
|
// Test: create with a stub client, poll, verify Models()/HasModel()/Degraded().
|
|
type ModelInventory struct {
|
|
client Client
|
|
logger *slog.Logger
|
|
|
|
mu sync.RWMutex
|
|
models []ModelInfo
|
|
runningModels []RunningModel
|
|
lastPoll time.Time
|
|
degraded bool
|
|
}
|
|
|
|
// NewModelInventory creates a new ModelInventory backed by the given client.
|
|
//
|
|
// Why: separates construction from the poll loop so callers can control lifecycle.
|
|
// What: returns an inventory ready for polling; call Start to begin the background loop.
|
|
// Test: create, call Refresh manually, assert Models() is populated.
|
|
func NewModelInventory(client Client, logger *slog.Logger) *ModelInventory {
|
|
return &ModelInventory{
|
|
client: client,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Models returns the current known model list.
|
|
//
|
|
// Why: the /api/tags handler and model validation need the cached list.
|
|
// What: returns a copy of the model slice under a read lock.
|
|
// Test: Refresh, call Models(), verify the returned slice matches.
|
|
func (inv *ModelInventory) Models() []ModelInfo {
|
|
inv.mu.RLock()
|
|
defer inv.mu.RUnlock()
|
|
out := make([]ModelInfo, len(inv.models))
|
|
copy(out, inv.models)
|
|
return out
|
|
}
|
|
|
|
// HasModel checks whether a model name is in the current inventory.
|
|
//
|
|
// Why: job submission validates the model name before queuing.
|
|
// What: scans the model list for an exact name match.
|
|
// Test: Refresh with known models, assert HasModel returns true/false correctly.
|
|
func (inv *ModelInventory) HasModel(name string) bool {
|
|
inv.mu.RLock()
|
|
defer inv.mu.RUnlock()
|
|
for _, m := range inv.models {
|
|
if m.Name == name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ResidentModels returns the list of currently-loaded models from the last /api/ps poll.
|
|
//
|
|
// Why: the scheduler (Phase 3) uses this to decide if a model swap is needed.
|
|
// What: returns a copy of the running model slice under a read lock.
|
|
// Test: Refresh, call ResidentModels(), verify it matches /api/ps output.
|
|
func (inv *ModelInventory) ResidentModels() []RunningModel {
|
|
inv.mu.RLock()
|
|
defer inv.mu.RUnlock()
|
|
out := make([]RunningModel, len(inv.runningModels))
|
|
copy(out, inv.runningModels)
|
|
return out
|
|
}
|
|
|
|
// LastPoll returns the timestamp of the most recent successful poll.
|
|
//
|
|
// Why: health/diagnostics use this to judge staleness.
|
|
// What: returns the lastPoll time under a read lock.
|
|
// Test: Refresh, assert LastPoll is non-zero and recent.
|
|
func (inv *ModelInventory) LastPoll() time.Time {
|
|
inv.mu.RLock()
|
|
defer inv.mu.RUnlock()
|
|
return inv.lastPoll
|
|
}
|
|
|
|
// Degraded reports whether the target was unreachable on the last poll attempt.
|
|
//
|
|
// Why: the /healthz endpoint surfaces this to operators and probes.
|
|
// What: returns the degraded flag under a read lock.
|
|
// Test: Refresh with an unreachable stub, assert Degraded() is true; then with a
|
|
// reachable stub, assert it clears.
|
|
func (inv *ModelInventory) Degraded() bool {
|
|
inv.mu.RLock()
|
|
defer inv.mu.RUnlock()
|
|
return inv.degraded
|
|
}
|
|
|
|
// Refresh performs an immediate poll of /api/tags and /api/ps on the target.
|
|
//
|
|
// Why: called by the poller goroutine and on-demand (e.g., model-miss re-check).
|
|
// What: fetches tags and ps, updates the cached lists, clears or sets the degraded
|
|
// flag based on success/failure.
|
|
// Test: stub both endpoints, call Refresh, verify Models() and ResidentModels() match.
|
|
func (inv *ModelInventory) Refresh(ctx context.Context) error {
|
|
tags, tagsErr := inv.client.Tags(ctx)
|
|
ps, psErr := inv.client.Ps(ctx)
|
|
|
|
inv.mu.Lock()
|
|
defer inv.mu.Unlock()
|
|
|
|
if tagsErr != nil {
|
|
inv.degraded = true
|
|
inv.logger.Warn("model poll failed: tags",
|
|
"error", tagsErr,
|
|
"retained_models", len(inv.models),
|
|
)
|
|
return tagsErr
|
|
}
|
|
|
|
// Tags succeeded — update model list and clear degraded.
|
|
inv.models = tags.Models
|
|
inv.lastPoll = time.Now()
|
|
inv.degraded = false
|
|
|
|
if psErr != nil {
|
|
// Tags worked but ps failed — log but don't mark degraded for ps alone.
|
|
inv.logger.Warn("model poll partial: ps failed", "error", psErr)
|
|
} else {
|
|
inv.runningModels = ps.Models
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start begins the background polling loop. It blocks until ctx is cancelled.
|
|
// Call this in a goroutine.
|
|
//
|
|
// Why: continuous polling keeps the model inventory fresh for validation and scheduling.
|
|
// What: polls at the given interval, respecting context cancellation for clean shutdown.
|
|
// Test: start with a short interval and cancelled context, verify it exits cleanly.
|
|
func (inv *ModelInventory) Start(ctx context.Context, interval time.Duration) {
|
|
// Do an initial poll immediately.
|
|
if err := inv.Refresh(ctx); err != nil {
|
|
inv.logger.Warn("initial model poll failed", "error", err)
|
|
}
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := inv.Refresh(ctx); err != nil {
|
|
inv.logger.Warn("periodic model poll failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|