// Package lane — sampler.go: periodic occupancy sampler (v7). // // Why a dedicated sampler goroutine: /skills/admin/queues shows current // state but operators need a timeline ("ollama lane was saturated for 4 // hours yesterday afternoon"). Sampling at fixed intervals is the // simplest way to capture that without instrumenting every Submit/ // complete path. Sampling is best-effort observability — if the // goroutine dies, charts show a gap; nothing else breaks. // // Why in pkg/lane (vs pkg/logic/skills/lane_sampler.go): the sampler // reads from the lane registry which lives here. The persistence layer // (skill_lane_samples table) lives in skills, so the sampler takes a // narrow LaneSampleSink interface — production wires // `skills.Storage.RecordLaneSample`; tests substitute a fake. package lane import ( "context" "log/slog" "sync" "time" ) // LaneSampleSink is the persistence surface the sampler writes to. // Production wires skills.Storage; tests substitute a recording fake. // // Why a narrow interface (vs importing skills.Storage): pkg/lane is a // generic primitive that must NOT import the application's skills // package — that would create an import cycle. Using a small typed // interface keeps lane decoupled. type LaneSampleSink interface { RecordLaneSample(ctx context.Context, lane string, running, queued int, sampledAt time.Time) error } // LaneSampleSinkFunc adapts a closure to LaneSampleSink. Useful in // production wiring (mort.go) where the underlying storage method has // a different shape. type LaneSampleSinkFunc func(ctx context.Context, lane string, running, queued int, sampledAt time.Time) error // RecordLaneSample satisfies LaneSampleSink. func (f LaneSampleSinkFunc) RecordLaneSample(ctx context.Context, lane string, running, queued int, sampledAt time.Time) error { if f == nil { return nil } return f(ctx, lane, running, queued, sampledAt) } // LaneSamplePurger is the periodic-sweeper surface. Production wires // skills.Storage.PurgeLaneSamples. type LaneSamplePurger interface { PurgeLaneSamples(ctx context.Context, olderThan time.Time) (int64, error) } // LaneSamplePurgerFunc adapts a closure. type LaneSamplePurgerFunc func(ctx context.Context, olderThan time.Time) (int64, error) // PurgeLaneSamples satisfies LaneSamplePurger. func (f LaneSamplePurgerFunc) PurgeLaneSamples(ctx context.Context, olderThan time.Time) (int64, error) { if f == nil { return 0, nil } return f(ctx, olderThan) } // Sampler periodically reads stats from every lane in the registry and // writes one sample row per lane via the configured Sink. Optionally // runs a daily retention sweep that purges samples older than // RetentionDays via Purger. // // Test: sampler_test.go drives Sample() synchronously with a fake // clock + recording sink. type Sampler struct { registry *Registry sink LaneSampleSink purger LaneSamplePurger interval time.Duration retention time.Duration purgeInterval time.Duration clock func() time.Time // run-time state mu sync.Mutex running bool stopCh chan struct{} doneCh chan struct{} } // NewSampler constructs the sampler. // // interval — sample cadence (typically 30s in production). // retention — purge cutoff (typically 7d). // clock=nil → time.Now. func NewSampler(registry *Registry, sink LaneSampleSink, purger LaneSamplePurger, interval, retention time.Duration, clock func() time.Time) *Sampler { if interval <= 0 { interval = 30 * time.Second } if retention <= 0 { retention = 7 * 24 * time.Hour } if clock == nil { clock = time.Now } return &Sampler{ registry: registry, sink: sink, purger: purger, interval: interval, retention: retention, purgeInterval: 24 * time.Hour, clock: clock, } } // Start launches the sampler goroutine. Cancelling ctx stops it. // Idempotent — calling Start twice without an intervening Stop is a // no-op for the second call. func (s *Sampler) Start(ctx context.Context) { s.mu.Lock() if s.running { s.mu.Unlock() return } s.running = true s.stopCh = make(chan struct{}) s.doneCh = make(chan struct{}) s.mu.Unlock() go s.loop(ctx) } // Stop signals the sampler to exit and waits for the goroutine to // finish. Idempotent. func (s *Sampler) Stop() { s.mu.Lock() if !s.running { s.mu.Unlock() return } close(s.stopCh) doneCh := s.doneCh s.running = false s.mu.Unlock() <-doneCh } // Sample runs one sampling pass synchronously. Test entry point — // production callers use Start. func (s *Sampler) Sample(ctx context.Context) { if s.registry == nil || s.sink == nil { return } now := s.clock() for _, l := range s.registry.List() { st := l.Stats() if err := s.sink.RecordLaneSample(ctx, st.Name, st.Running, st.Queued, now); err != nil { // Best-effort observability — log and continue, never block. slog.Warn("lane sampler: record failed", "lane", st.Name, "error", err) } } } // PurgeOnce runs one retention sweep synchronously. Test entry point. func (s *Sampler) PurgeOnce(ctx context.Context) { if s.purger == nil { return } cutoff := s.clock().Add(-s.retention) if _, err := s.purger.PurgeLaneSamples(ctx, cutoff); err != nil { slog.Warn("lane sampler: purge failed", "error", err) } } // loop is the sampler's main goroutine. Calls Sample at the interval // cadence and PurgeOnce daily. Exits on ctx.Done OR Stop. func (s *Sampler) loop(ctx context.Context) { defer func() { s.mu.Lock() if s.doneCh != nil { close(s.doneCh) s.doneCh = nil } s.mu.Unlock() }() sampleTicker := time.NewTicker(s.interval) defer sampleTicker.Stop() purgeTicker := time.NewTicker(s.purgeInterval) defer purgeTicker.Stop() for { select { case <-ctx.Done(): return case <-s.stopCh: return case <-sampleTicker.C: s.Sample(ctx) case <-purgeTicker.C: s.PurgeOnce(ctx) } } }