69c2eb5f47
executus CI / test (pull_request) Successful in 1m0s
Independently verified all 18 gadfly findings against the code (18-agent
fan-out). Fixed the 9 real ones; the other 9 were false-positive /
hallucinated / valid-tradeoff (no change).
High:
- F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the
agent loop and nil-panicked. Now a clean error (Run never panics). +test.
- F9 compactor data-leak: renderTranscript sent tool-call args verbatim to
the summarizer (a possibly-different provider/tier); secret-bearing tool
args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc
note that result bodies still flow (summary needs them).
Medium/minor:
- F2 compactor error path returned the folded slice, not the original msgs
(contradicting the documented non-fatal contract) -> return msgs.
- F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) /
cancelled (Canceled) via statusFor. +test.
- F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs
min(calls,results) only.
- F5 SetIteration was never called -> RunState.Iteration always 0; the step
observer now updates it each loop.
- F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue).
- F7 estimateTokens had no default arm (future Part kinds counted as 0);
unknown parts now counted conservatively.
- F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now
a clear "response exceeded N bytes" via readCapped.
- F12 step observer captured the caller ctx; now the merged runCtx.
- F13 compaction onFire was nil (doc claimed it logged); now wired to
audit LogEvent("compaction_fired").
- F11 (no pre-dispatch hook in majordomo) documented honestly as a known
limitation; F18 UsageSink doc clarified cache tokens are subsets of input.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
475 lines
14 KiB
Go
475 lines
14 KiB
Go
// V15.4 — Ollama Cloud dynamic context-length sync.
|
|
//
|
|
// Why: the static map in context_limits.go has to be hand-maintained
|
|
// for every new Ollama Cloud model. Cloud ships new models monthly,
|
|
// and a missing entry silently disables compaction for runs on that
|
|
// model (compactionThresholdForModel returns 0 on MaxContextTokens
|
|
// miss). Dynamic sync removes the maintenance burden and means new
|
|
// cloud models work out-of-the-box.
|
|
//
|
|
// How: at boot, mort kicks off a CloudOllamaLimitCache.RefreshAll in a
|
|
// background goroutine. RefreshAll calls /api/tags to list every
|
|
// available cloud model, then concurrently calls /api/show for each
|
|
// to extract `<family>.context_length` from the response's model_info
|
|
// map. The cache is consulted by the executor's
|
|
// compactionThresholdForModel via the cache-aware
|
|
// MaxContextTokensWithCache helper.
|
|
//
|
|
// Periodic refresh: a daily ticker re-runs RefreshAll so newly
|
|
// released models surface without a mort restart. The interval is
|
|
// intentionally not configurable — cloud model context lengths don't
|
|
// change for a given tag (only the tag pointer can move, e.g. :cloud
|
|
// → larger model), so daily is conservative.
|
|
|
|
package model
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// defaultCloudEndpoint is the public Ollama Cloud base URL. Override
|
|
// in tests via NewCloudOllamaLimitCache's endpoint arg.
|
|
const defaultCloudEndpoint = "https://ollama.com"
|
|
|
|
// CloudOllamaLimitCache holds context-length values for Ollama Cloud
|
|
// models, populated dynamically via /api/tags + /api/show. Construct
|
|
// with NewCloudOllamaLimitCache. Safe for concurrent use.
|
|
//
|
|
// Empty when OLLAMA_API_KEY is unset — Refresh returns a clear error
|
|
// and the cache stays empty. Lookups return (0, false) and callers
|
|
// fall back to the static map / disabled compaction.
|
|
type CloudOllamaLimitCache struct {
|
|
mu sync.RWMutex
|
|
limit map[string]int
|
|
negative map[string]time.Time // model → fetch-failure time (for TTL)
|
|
|
|
endpoint string
|
|
apiKey string
|
|
httpClient *http.Client
|
|
|
|
// refreshConcurrency caps the number of concurrent /api/show calls
|
|
// during RefreshAll. Default 8 — enough to finish a ~50-model
|
|
// catalog in well under a minute without hammering Cloud.
|
|
refreshConcurrency int
|
|
|
|
// negativeTTL is how long a /api/show miss is cached before we
|
|
// retry. Prevents hammering Cloud on a typo or recently-removed
|
|
// model. Default 10 minutes.
|
|
negativeTTL time.Duration
|
|
}
|
|
|
|
// NewCloudOllamaLimitCache constructs a fresh cache. apiKey can be
|
|
// empty — RefreshAll then returns an error and the cache stays empty.
|
|
// endpoint defaults to https://ollama.com when empty. httpClient
|
|
// defaults to a 15s-timeout client.
|
|
func NewCloudOllamaLimitCache(endpoint, apiKey string, httpClient *http.Client) *CloudOllamaLimitCache {
|
|
if strings.TrimSpace(endpoint) == "" {
|
|
endpoint = defaultCloudEndpoint
|
|
}
|
|
endpoint = strings.TrimRight(endpoint, "/")
|
|
if httpClient == nil {
|
|
httpClient = &http.Client{Timeout: 15 * time.Second}
|
|
}
|
|
return &CloudOllamaLimitCache{
|
|
limit: make(map[string]int),
|
|
negative: make(map[string]time.Time),
|
|
endpoint: endpoint,
|
|
apiKey: apiKey,
|
|
httpClient: httpClient,
|
|
refreshConcurrency: 8,
|
|
negativeTTL: 10 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// SetNegativeTTL overrides the negative-cache lifetime. Tests use this
|
|
// to control retry behaviour without sleeping.
|
|
func (c *CloudOllamaLimitCache) SetNegativeTTL(d time.Duration) {
|
|
if c == nil || d < 0 {
|
|
return
|
|
}
|
|
c.mu.Lock()
|
|
c.negativeTTL = d
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// Lookup returns the cached context length for an Ollama Cloud model
|
|
// name (e.g. "qwen3.5:cloud", "qwen3-coder:480b"). Returns (0, false)
|
|
// on miss. Lookup never makes HTTP calls — it's the hot path consulted
|
|
// by the executor before every run.
|
|
//
|
|
// modelName accepts either the bare model:tag form or the prefixed
|
|
// "ollama-cloud/model:tag" form; the prefix is stripped.
|
|
func (c *CloudOllamaLimitCache) Lookup(modelName string) (int, bool) {
|
|
if c == nil {
|
|
return 0, false
|
|
}
|
|
key := stripCloudPrefix(modelName)
|
|
if key == "" {
|
|
return 0, false
|
|
}
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
v, ok := c.limit[key]
|
|
return v, ok
|
|
}
|
|
|
|
// Size returns the number of cached entries. Useful for logging /
|
|
// health checks.
|
|
func (c *CloudOllamaLimitCache) Size() int {
|
|
if c == nil {
|
|
return 0
|
|
}
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return len(c.limit)
|
|
}
|
|
|
|
// LookupOrFetch returns the cached context length OR, on miss, makes a
|
|
// single /api/show call to populate the cache. Negative results
|
|
// (model not found, /api/show returns no context_length) are cached
|
|
// for negativeTTL to prevent hammering Cloud on a typo. Returns
|
|
// (0, false) when the model is genuinely unknown and (size, true) on
|
|
// any successful resolve.
|
|
//
|
|
// Why this exists: Ollama Cloud's /api/tags lists canonical model
|
|
// names only (e.g. "qwen3.5:397b") but accepts aliases on /api/show
|
|
// (e.g. "qwen3.5:cloud" → same 397b model). The boot-time RefreshAll
|
|
// only sees the canonical names, so common aliases miss the cache.
|
|
// LookupOrFetch fills the gap.
|
|
//
|
|
// The cache is therefore self-healing: any unknown model gets one
|
|
// live /api/show call, the result lands in the cache, and subsequent
|
|
// runs hit immediately. Periodic RefreshAll overwrites everything
|
|
// with the canonical-name results but additionally-fetched aliases
|
|
// linger as positive entries.
|
|
func (c *CloudOllamaLimitCache) LookupOrFetch(ctx context.Context, modelName string) (int, bool) {
|
|
if c == nil {
|
|
return 0, false
|
|
}
|
|
key := stripCloudPrefix(modelName)
|
|
if key == "" {
|
|
return 0, false
|
|
}
|
|
// Fast path: positive hit.
|
|
c.mu.RLock()
|
|
if v, ok := c.limit[key]; ok {
|
|
c.mu.RUnlock()
|
|
return v, true
|
|
}
|
|
// Negative cache check.
|
|
if t, ok := c.negative[key]; ok && time.Since(t) < c.negativeTTL {
|
|
c.mu.RUnlock()
|
|
return 0, false
|
|
}
|
|
c.mu.RUnlock()
|
|
// No API key configured → can't fetch. Don't write a negative
|
|
// entry (when the key gets configured later we want the next call
|
|
// to re-try immediately).
|
|
if strings.TrimSpace(c.apiKey) == "" {
|
|
return 0, false
|
|
}
|
|
// Slow path: live /api/show.
|
|
n, err := c.fetchContextLength(ctx, key)
|
|
if err != nil || n <= 0 {
|
|
slog.Debug("cloud limit cache: lazy fetch miss",
|
|
"model", key, "err", err)
|
|
c.mu.Lock()
|
|
c.negative[key] = time.Now()
|
|
c.mu.Unlock()
|
|
return 0, false
|
|
}
|
|
c.set(key, n)
|
|
slog.Info("cloud limit cache: lazy fetch hit", "model", key, "context_length", n)
|
|
return n, true
|
|
}
|
|
|
|
// set stores a context length. n <= 0 is a no-op.
|
|
func (c *CloudOllamaLimitCache) set(modelName string, n int) {
|
|
if c == nil || n <= 0 {
|
|
return
|
|
}
|
|
key := stripCloudPrefix(modelName)
|
|
if key == "" {
|
|
return
|
|
}
|
|
c.mu.Lock()
|
|
c.limit[key] = n
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// RefreshAll queries /api/tags then concurrently calls /api/show for
|
|
// every listed model, populating the cache. Returns the number of
|
|
// models successfully cached and the first error encountered (a
|
|
// /api/tags failure aborts; individual /api/show failures are logged
|
|
// but don't abort the whole refresh).
|
|
//
|
|
// Safe to call repeatedly. Cache entries are overwritten with the
|
|
// fresh values; entries for models that have been removed from Cloud
|
|
// are NOT pruned (cheap to keep; pruning risks dropping an entry just
|
|
// before a run that needs it).
|
|
func (c *CloudOllamaLimitCache) RefreshAll(ctx context.Context) (int, error) {
|
|
if c == nil {
|
|
return 0, fmt.Errorf("cloud limit cache: nil receiver")
|
|
}
|
|
if strings.TrimSpace(c.apiKey) == "" {
|
|
return 0, fmt.Errorf("cloud limit cache: OLLAMA_API_KEY unset")
|
|
}
|
|
tags, err := c.fetchTags(ctx)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cloud limit cache: /api/tags: %w", err)
|
|
}
|
|
|
|
concurrency := c.refreshConcurrency
|
|
if concurrency <= 0 {
|
|
concurrency = 8
|
|
}
|
|
sem := make(chan struct{}, concurrency)
|
|
var wg sync.WaitGroup
|
|
var (
|
|
mu sync.Mutex
|
|
success int
|
|
)
|
|
for _, name := range tags {
|
|
name := name
|
|
wg.Add(1)
|
|
sem <- struct{}{}
|
|
go func() {
|
|
defer wg.Done()
|
|
defer func() { <-sem }()
|
|
ctxLen, ferr := c.fetchContextLength(ctx, name)
|
|
if ferr != nil {
|
|
slog.Debug("cloud limit cache: /api/show miss",
|
|
"model", name, "err", ferr)
|
|
return
|
|
}
|
|
c.set(name, ctxLen)
|
|
mu.Lock()
|
|
success++
|
|
mu.Unlock()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
slog.Info("cloud limit cache: refresh complete",
|
|
"models_total", len(tags), "cached", success)
|
|
return success, nil
|
|
}
|
|
|
|
// StartPeriodicRefresh runs RefreshAll once immediately, then on every
|
|
// interval tick. Cancellation via ctx stops the loop. Logs each
|
|
// outcome; never returns an error to the caller (this is a background
|
|
// task — failures are warnings, not show-stoppers).
|
|
//
|
|
// Typical usage: a goroutine spawned at mort boot.
|
|
//
|
|
// go cache.StartPeriodicRefresh(ctx, 24*time.Hour)
|
|
func (c *CloudOllamaLimitCache) StartPeriodicRefresh(ctx context.Context, interval time.Duration) {
|
|
if c == nil {
|
|
return
|
|
}
|
|
if interval <= 0 {
|
|
interval = 24 * time.Hour
|
|
}
|
|
doOne := func() {
|
|
n, err := c.RefreshAll(ctx)
|
|
if err != nil {
|
|
slog.Warn("cloud limit cache: refresh failed",
|
|
"err", err, "cached_size", c.Size())
|
|
return
|
|
}
|
|
slog.Info("cloud limit cache: refreshed",
|
|
"newly_cached_or_updated", n, "cached_size", c.Size())
|
|
}
|
|
doOne()
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
doOne()
|
|
}
|
|
}
|
|
}
|
|
|
|
// fetchTags calls GET /api/tags and returns the model names.
|
|
func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) {
|
|
url := c.endpoint + "/api/tags"
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.applyAuth(req)
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := readCapped(resp.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode/100 != 2 {
|
|
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(body, 400))
|
|
}
|
|
var parsed struct {
|
|
Models []struct {
|
|
Name string `json:"name"`
|
|
Model string `json:"model"`
|
|
} `json:"models"`
|
|
}
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return nil, fmt.Errorf("parse /api/tags: %w", err)
|
|
}
|
|
out := make([]string, 0, len(parsed.Models))
|
|
for _, m := range parsed.Models {
|
|
name := m.Name
|
|
if name == "" {
|
|
name = m.Model
|
|
}
|
|
if name == "" {
|
|
continue
|
|
}
|
|
out = append(out, name)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// fetchContextLength calls POST /api/show for a model and extracts
|
|
// the largest *.context_length value from model_info. Returns the
|
|
// length and nil on success; (0, err) on any failure.
|
|
//
|
|
// Why "largest" rather than family-keyed: the family field in the
|
|
// /api/show response is sometimes empty or doesn't match the
|
|
// model_info key prefix exactly (Ollama Cloud returns the
|
|
// architecture as the prefix, which usually but not always matches
|
|
// `family`). Scanning for any `*.context_length` is robust.
|
|
func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelName string) (int, error) {
|
|
url := c.endpoint + "/api/show"
|
|
body, _ := json.Marshal(map[string]string{"name": modelName})
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
c.applyAuth(req)
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer resp.Body.Close()
|
|
respBody, err := readCapped(resp.Body)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if resp.StatusCode/100 != 2 {
|
|
return 0, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(respBody, 400))
|
|
}
|
|
n, err := parseContextLengthJSON(respBody)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// parseContextLengthJSON extracts the largest `*.context_length` int
|
|
// from an /api/show response body. Exported-ish (lowercase but tested
|
|
// in the same package) so the unit test can exercise it without
|
|
// spinning up an httptest server.
|
|
func parseContextLengthJSON(body []byte) (int, error) {
|
|
var parsed struct {
|
|
ModelInfo map[string]any `json:"model_info"`
|
|
}
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return 0, fmt.Errorf("parse: %w", err)
|
|
}
|
|
best := 0
|
|
for k, v := range parsed.ModelInfo {
|
|
if !strings.HasSuffix(k, ".context_length") {
|
|
continue
|
|
}
|
|
n := toInt(v)
|
|
if n > best {
|
|
best = n
|
|
}
|
|
}
|
|
if best <= 0 {
|
|
return 0, fmt.Errorf("no context_length in model_info")
|
|
}
|
|
return best, nil
|
|
}
|
|
|
|
// toInt coerces a JSON-decoded value to int. Handles float64 (the
|
|
// json default) and json.Number; returns 0 for anything else.
|
|
func toInt(v any) int {
|
|
switch x := v.(type) {
|
|
case float64:
|
|
return int(x)
|
|
case int:
|
|
return x
|
|
case int64:
|
|
return int(x)
|
|
case json.Number:
|
|
if n, err := x.Int64(); err == nil {
|
|
return int(n)
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// applyAuth sets the Bearer token when an API key is configured.
|
|
func (c *CloudOllamaLimitCache) applyAuth(req *http.Request) {
|
|
if strings.TrimSpace(c.apiKey) == "" {
|
|
return
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(c.apiKey))
|
|
}
|
|
|
|
// stripCloudPrefix strips an "ollama-cloud/" prefix (and surrounding
|
|
// whitespace). Returns the bare model:tag form.
|
|
func stripCloudPrefix(s string) string {
|
|
s = strings.TrimSpace(s)
|
|
if strings.HasPrefix(s, "ollama-cloud/") {
|
|
s = s[len("ollama-cloud/"):]
|
|
}
|
|
return s
|
|
}
|
|
|
|
// truncate caps a byte slice for error messages.
|
|
func truncate(b []byte, n int) string {
|
|
if len(b) <= n {
|
|
return string(b)
|
|
}
|
|
return string(b[:n]) + "...(truncated)"
|
|
}
|
|
|
|
// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses
|
|
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
|
|
// body before the 15s timeout fires. 1 MiB is far above any real response.
|
|
const maxLimitCacheResponseBytes = 1 << 20
|
|
|
|
// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear
|
|
// error if the response EXCEEDS the cap — rather than silently truncating (as a
|
|
// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an
|
|
// opaque "unexpected end of JSON input". It reads one extra byte to detect the
|
|
// overflow.
|
|
func readCapped(r io.Reader) ([]byte, error) {
|
|
body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(body) > maxLimitCacheResponseBytes {
|
|
return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes)
|
|
}
|
|
return body, nil
|
|
}
|