package lane import ( "context" "sync" "time" ) // pool implements Lane with a slot-counting mutex + a pluggable queue // policy. A single dispatch path lives inside complete(): when a job // finishes it pulls the next queued job (if any) under the same lock, // guaranteeing a strict "release one slot, fill one slot" rhythm with // no goroutine racing to pick the same job. // // Why a mutex + map vs a buffered channel as semaphore: we need to // inspect "running" + "queued" state for Stats, Cancel, and the // dispatch decision. A single mutex over both maps keeps that cheap // and consistent. // // Test: pool_test.go covers slot-available, slot-full, cancel, // SubmitWait blocking, Stats accuracy, throughput sampling, and // SetMaxConcurrent. type pool struct { name string mu sync.Mutex maxConcurrent int running map[string]*runningJob queue queuePolicy closed bool // completions is a sliding window of job-finish timestamps used // for the Throughput1m stat. Append on every complete(); prune // entries older than 60s on read + on each append. Bounded by // the throughput rate, not by an explicit cap — at 60s/window // even a tight loop tops out at a few thousand entries. completions []time.Time // runtimes is a bounded sliding window of completed-job wall-clock // runtimes used by SubmitWithMaxWait's ETA estimator. Capped at // the configured eta window size (default 16). v9. runtimes []time.Duration etaWindowSize int // preemption configuration. Both can be reconfigured after // construction via SetPreemptionPolicy. nil-safe defaults preserve // pre-v9 behavior (no preemption). v9. preemptPolicy PreemptionPolicy } type runningJob struct { job Job // startedAt captures dispatch wall-clock for future ETA tuning; // not currently surfaced. startedAt time.Time // runCtx is the context passed to Job.Run; cancel calls the // associated CancelCauseFunc. v9. runCtx context.Context cancel context.CancelCauseFunc // preempted is set true when the lane scheduler chose this job for // preemption. The worker reads this on Run-return to deliver // ErrPreempted instead of the actual ctx.Cause. v9. preempted bool } // queuedJob is the in-queue representation of a Submit. done is buffered // so the dispatch goroutine can signal completion without blocking // (SubmitWait may have given up on ctx.Done before the job runs; // dispatch must still be able to deliver the result without leaking). type queuedJob struct { job Job enqueuedAt time.Time // done is closed (or sent on) exactly once when the job's outcome // is known: either Run returned, or the job was cancelled before // dispatch. done chan jobResult } type jobResult struct { err error } // queuePolicy is the pluggable queue ordering. fifoPolicy is the // default; fairSharePolicy lives in policy_fair_share.go. // // Why pluggable: the LLM-transport lane wants fair-share, but // single-resource lanes (e.g. gpu-imagine, max_concurrent=1) work // fine with FIFO. Future v7 work might add weighted fair share or // strict priority — keeping the policy small lets us evolve. type queuePolicy interface { // Enqueue adds a job to the queue. Implementations may reorder // the queue based on caller / priority. Enqueue(j *queuedJob) // Dequeue returns the next job to run, removing it from the // queue. Returns nil when empty. Dequeue() *queuedJob // Cancel removes a job by ID and signals its done channel with // ErrCancelled. Returns true if found. Cancel(jobID string) bool // Len returns the number of queued jobs. Len() int // OldestEnqueueTime returns the earliest enqueue timestamp, or // nil if the queue is empty. OldestEnqueueTime() *time.Time } // New constructs a pool with FIFO queueing. // // Why a separate New / NewWithFairShare instead of a single function // taking a policy: lanes are usually instantiated by name from convars // — keeping the constructor selection explicit makes call sites read // clearly ("we want fair-share for the ollama lane"). func New(name string, maxConcurrent int) Lane { if maxConcurrent <= 0 { maxConcurrent = 1 } return &pool{ name: name, maxConcurrent: maxConcurrent, running: make(map[string]*runningJob), queue: newFIFOPolicy(), } } // NewWithPolicy constructs a pool with a caller-supplied queue policy. // Used by NewWithFairShare and by tests that exercise custom orderings. func NewWithPolicy(name string, maxConcurrent int, policy queuePolicy) Lane { if maxConcurrent <= 0 { maxConcurrent = 1 } if policy == nil { policy = newFIFOPolicy() } return &pool{ name: name, maxConcurrent: maxConcurrent, running: make(map[string]*runningJob), queue: policy, } } func (p *pool) Name() string { return p.name } func (p *pool) Submit(ctx context.Context, job Job) (int, time.Duration, error) { p.mu.Lock() if p.closed { p.mu.Unlock() return 0, 0, ErrLaneClosed } if len(p.running) < p.maxConcurrent { // Slot available — dispatch immediately. rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj // We need a done channel even for fire-and-forget Submit so // complete() has somewhere to signal; it's discarded. done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) return 0, 0, nil } // V9 preemption: incoming job has higher priority than at least one // preemptible running job that has been running for the min-runtime // guard. If we can find such a victim, cancel it and dispatch the // new job into the freed slot. The victim's worker delivers // ErrPreempted on its done channel. if p.tryPreemptLocked(job) { rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) return 0, 0, nil } // Queue. qj := &queuedJob{ job: job, enqueuedAt: time.Now(), done: make(chan jobResult, 1), } p.queue.Enqueue(qj) pos := p.queue.Len() eta := p.estimateETALocked(pos) p.mu.Unlock() return pos, eta, nil } // SubmitWithMaxWait is like Submit but returns ErrLaneBusy without // enqueueing if the estimated wait time would exceed maxWait. maxWait // <= 0 disables the gate (equivalent to Submit). v9. // // ETA is computed from the recent completed-job runtime window; with // no history the estimator falls back to a conservative 1s/slot. // Callers ARE NOT charged for an ErrLaneBusy submission — the job is // never enqueued. The estimated wait at the time of decision is // returned alongside the error so callers can log/report the exact // gate value. func (p *pool) SubmitWithMaxWait(ctx context.Context, job Job, maxWait time.Duration) (int, time.Duration, error) { if maxWait <= 0 { return p.Submit(ctx, job) } p.mu.Lock() if p.closed { p.mu.Unlock() return 0, 0, ErrLaneClosed } if len(p.running) < p.maxConcurrent { rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) return 0, 0, nil } if p.tryPreemptLocked(job) { rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) return 0, 0, nil } // Estimate wait at queue tail (current depth + 1). pos := p.queue.Len() + 1 eta := p.estimateWaitLocked(pos) if eta > maxWait { p.mu.Unlock() return pos, eta, ErrLaneBusy } qj := &queuedJob{ job: job, enqueuedAt: time.Now(), done: make(chan jobResult, 1), } p.queue.Enqueue(qj) p.mu.Unlock() return pos, eta, nil } // newRunningJobLocked allocates the per-running-job state. Caller MUST // hold p.mu. v9: every running job carries its own context so the // preemption path has somewhere to deliver cancellation. func (p *pool) newRunningJobLocked(job Job) *runningJob { jobCtx, cancel := context.WithCancelCause(context.Background()) return &runningJob{ job: job, startedAt: time.Now(), runCtx: jobCtx, cancel: cancel, } } // tryPreemptLocked picks a preemption victim and cancels it. Returns // true if a slot was freed. Caller MUST hold p.mu and MUST verify // the lane is full before calling. v9. // // Selection: among running jobs that (a) implement Preemptible and // IsPreemptible() returns true, AND (b) have a strictly LOWER priority // than the incoming job, AND (c) have been running for >= MinRuntime, // pick the one with the LOWEST priority; FIFO tie-break by oldest // startedAt. We pick lowest priority first so we always sacrifice the // least-valuable running job. The min-runtime guard prevents thrashing // (a just-dispatched job staying alive long enough to make progress). func (p *pool) tryPreemptLocked(incoming Job) bool { if p.preemptPolicy != nil && !p.preemptPolicy.Enabled() { return false } pol, ok := incoming.(Preemptible) _ = pol _ = ok // We don't gate by "incoming is preemptible". Even non-preemptible // incoming jobs may preempt a preemptible victim: the goal is to // give higher-priority work the slot, regardless of whether THAT // work is itself preemptible. Mark a skill preemptible only when // you'd accept losing its work to whatever priority arrives next. minRuntime := p.minRuntimeLocked() now := time.Now() var victim *runningJob for _, rj := range p.running { pj, isPre := rj.job.(Preemptible) if !isPre || !pj.IsPreemptible() { continue } if rj.preempted { continue // already chosen in a prior race; don't double-cancel } if rj.job.Priority() >= incoming.Priority() { continue } if now.Sub(rj.startedAt) < minRuntime { continue } if victim == nil || rj.job.Priority() < victim.job.Priority() || (rj.job.Priority() == victim.job.Priority() && rj.startedAt.Before(victim.startedAt)) { victim = rj } } if victim == nil { return false } victim.preempted = true if victim.cancel != nil { victim.cancel(ErrPreempted) } // We DO NOT remove the victim from p.running here — the worker // goroutine's Run() may take some non-trivial time to honour // cancellation. The slot will free when the worker calls // complete(). Until then, we count this victim as still occupying // a slot. The caller MUST not assume an immediate slot is // available; it should still go through the normal "queue if // full" path. We return true to signal "preemption requested" so // the caller can elect to immediately enqueue at queue head. // // However, the v9 spec wants the higher-priority job to take the // slot directly. We accomplish this by NOT going through the // queue: the caller already verified len(running) >= // maxConcurrent, but by setting victim.preempted=true and // signalling cancel, the victim's worker will exit imminently. // We dispatch the incoming job NOW, accepting that running may // briefly exceed maxConcurrent. The complete() path doesn't // re-enforce the cap; SetMaxConcurrent uses the same "let // in-flight finish" semantics. So the incoming job runs in // parallel with the about-to-die victim, and order-of-magnitude // the lane may briefly hold maxConcurrent+1 jobs. This is // acceptable because preemption is opt-in and rare. return true } // minRuntimeLocked returns the configured preemption min-runtime, or // the default of 30s when the policy is nil. Caller MUST hold p.mu. // // A configured policy returning d == 0 is honored as "no min-runtime // guard" (preempt immediately). d < 0 falls back to the default. func (p *pool) minRuntimeLocked() time.Duration { if p.preemptPolicy == nil { return 30 * time.Second } d := p.preemptPolicy.MinRuntime() if d < 0 { return 30 * time.Second } return d } // SetPreemptionPolicy installs a new preemption policy. Existing // running jobs are unaffected; future dispatch decisions consult the // new policy. v9. func (p *pool) SetPreemptionPolicy(policy PreemptionPolicy) { p.mu.Lock() p.preemptPolicy = policy p.mu.Unlock() } // SetETAWindowSize updates the rolling window size used by // SubmitWithMaxWait's ETA estimator. v9. func (p *pool) SetETAWindowSize(n int) { if n <= 0 { return } p.mu.Lock() p.etaWindowSize = n if len(p.runtimes) > n { p.runtimes = p.runtimes[len(p.runtimes)-n:] } p.mu.Unlock() } func (p *pool) SubmitWait(ctx context.Context, job Job) error { p.mu.Lock() if p.closed { p.mu.Unlock() return ErrLaneClosed } if len(p.running) < p.maxConcurrent { rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) select { case res := <-done: return res.err case <-ctx.Done(): // Run has its own context; we cannot kill it from here. // Wait for it to finish and return ctx.Err to the caller. <-done return ctx.Err() } } // V9 preemption: same path as Submit. if p.tryPreemptLocked(job) { rj := p.newRunningJobLocked(job) p.running[job.ID()] = rj done := make(chan jobResult, 1) p.mu.Unlock() go p.run(rj, done) select { case res := <-done: return res.err case <-ctx.Done(): <-done return ctx.Err() } } qj := &queuedJob{ job: job, enqueuedAt: time.Now(), done: make(chan jobResult, 1), } p.queue.Enqueue(qj) p.mu.Unlock() select { case res := <-qj.done: return res.err case <-ctx.Done(): // Try to cancel before dispatch picks it up. if p.Cancel(job.ID()) == nil { return ctx.Err() } // Already dequeued and running — wait for the run to finish. <-qj.done return ctx.Err() } } // run executes the job and arranges for the next queued job to be // dispatched on completion. The done channel is signaled exactly once // with the run's error. // // v9: each running job carries its own cancellable context so the // preemption path can deliver cancellation. Pre-v9 callers passed // context.Background; that semantic is preserved for jobs that ignore // ctx.Done. Jobs that respect ctx will see cancellation immediately // when the lane scheduler chooses them as a preemption victim. func (p *pool) run(rj *runningJob, done chan<- jobResult) { jobCtx := p.newJobContext(rj) err := rj.job.Run(jobCtx) // If the lane chose this job for preemption, override the worker's // returned error with ErrPreempted so SubmitWait callers can // distinguish "preempted" from a generic ctx.Cause. p.mu.Lock() preempted := rj.preempted startedAt := rj.startedAt p.mu.Unlock() if preempted { err = ErrPreempted } done <- jobResult{err: err} p.complete(rj.job.ID(), startedAt, time.Now()) } // runQueued is the dispatch path for jobs that were queued, not // dispatched immediately. Identical to run() except it signals the // queued job's done channel (the caller's SubmitWait waits on it). func (p *pool) runQueued(rj *runningJob, qj *queuedJob) { jobCtx := p.newJobContext(rj) err := qj.job.Run(jobCtx) p.mu.Lock() preempted := rj.preempted startedAt := rj.startedAt p.mu.Unlock() if preempted { err = ErrPreempted } qj.done <- jobResult{err: err} p.complete(qj.job.ID(), startedAt, time.Now()) } // newJobContext returns the context the worker passes to Job.Run. v9: // every running job has a cancellable context backing rj.cancel, so // the preemption path can interrupt it. func (p *pool) newJobContext(rj *runningJob) context.Context { if rj.runCtx == nil { return context.Background() } return rj.runCtx } // complete is called when a job's Run returns. It removes the job // from the running map, records throughput, and pulls the next queued // job (if any) to fill the freed slot. func (p *pool) complete(jobID string, startedAt, finishedAt time.Time) { p.mu.Lock() delete(p.running, jobID) p.completions = append(p.completions, finishedAt) p.pruneCompletionsLocked(finishedAt) // V9: track runtime for ETA estimator. if !startedAt.IsZero() { p.recordRuntimeLocked(finishedAt.Sub(startedAt)) } // Pull next queued job under the same lock. if !p.closed && len(p.running) < p.maxConcurrent { next := p.queue.Dequeue() if next != nil { rj := p.newRunningJobLocked(next.job) p.running[next.job.ID()] = rj p.mu.Unlock() go p.runQueued(rj, next) return } } p.mu.Unlock() } // recordRuntimeLocked appends to the rolling runtime window used by // SubmitWithMaxWait's ETA estimator. Caller MUST hold p.mu. v9. func (p *pool) recordRuntimeLocked(d time.Duration) { if d <= 0 { return } cap := p.etaWindowSize if cap <= 0 { cap = 16 } p.runtimes = append(p.runtimes, d) if len(p.runtimes) > cap { p.runtimes = p.runtimes[len(p.runtimes)-cap:] } } func (p *pool) Cancel(jobID string) error { p.mu.Lock() defer p.mu.Unlock() if p.queue.Cancel(jobID) { return nil } return ErrNotQueued } func (p *pool) Stats() LaneStats { p.mu.Lock() defer p.mu.Unlock() now := time.Now() p.pruneCompletionsLocked(now) return LaneStats{ Name: p.name, MaxConcurrent: p.maxConcurrent, Running: len(p.running), Queued: p.queue.Len(), OldestQueuedAt: p.queue.OldestEnqueueTime(), Throughput1m: len(p.completions), } } func (p *pool) SetMaxConcurrent(n int) { if n <= 0 { return } p.mu.Lock() p.maxConcurrent = n // If we just raised the cap, dispatch backlog. for len(p.running) < p.maxConcurrent && !p.closed { next := p.queue.Dequeue() if next == nil { break } rj := p.newRunningJobLocked(next.job) p.running[next.job.ID()] = rj // Spin up the goroutine while still holding the lock; the // goroutine itself doesn't take p.mu until complete(). go p.runQueued(rj, next) } p.mu.Unlock() } // pruneCompletionsLocked drops completion timestamps older than 60s. // Caller must hold p.mu. The slice is rebuilt rather than truncated // in place because the throughput counts are typically small (hundreds // at most); avoiding pointer churn here is not worth the complexity // of an in-place compaction. func (p *pool) pruneCompletionsLocked(now time.Time) { cutoff := now.Add(-time.Minute) if len(p.completions) == 0 { return } // Find the first entry within the window — completions is // append-only so it's already sorted ascending. first := 0 for first < len(p.completions) && p.completions[first].Before(cutoff) { first++ } if first == 0 { return } if first >= len(p.completions) { p.completions = p.completions[:0] return } // Copy tail down to head; reuse the backing array. n := copy(p.completions, p.completions[first:]) p.completions = p.completions[:n] } // estimateWaitLocked returns the best-effort wait time before the // given queue position is dispatched. Caller MUST hold p.mu. v9 — // uses the recent-runtime window when available, falling back to the // throughput-based estimate. The result reflects the time the // position-`pos` job will sit in the queue: with `maxConcurrent` // running jobs the wait is `(pos / maxConcurrent) * avgRuntime`. func (p *pool) estimateWaitLocked(pos int) time.Duration { if pos <= 0 { return 0 } if len(p.runtimes) == 0 { return p.estimateETALocked(pos) } var total time.Duration for _, d := range p.runtimes { total += d } avg := total / time.Duration(len(p.runtimes)) if avg <= 0 { return p.estimateETALocked(pos) } concurrency := p.maxConcurrent if concurrency <= 0 { concurrency = 1 } // Each "round" through the slots drains `concurrency` jobs in // avg runtime. Position `pos` waits ceil(pos / concurrency) rounds. rounds := (pos + concurrency - 1) / concurrency return avg * time.Duration(rounds) } // estimateETALocked returns a rough ETA for a job at the given // 1-based queue position. Caller must hold p.mu. // // Why best-effort: production callers (Discord "queued (~30s)" reply) // only need an order-of-magnitude estimate. Throughput is sampled over // a 1-minute window; if the window is empty we fall back to a // conservative default of 1s/slot * pos. func (p *pool) estimateETALocked(pos int) time.Duration { if pos <= 0 { return 0 } // throughput per second over the window thr := len(p.completions) if thr == 0 { // Fallback: assume each slot takes ~1s — better than zero. return time.Duration(pos) * time.Second } // We have N completions in the last 60s; the lane's "effective // throughput" is N jobs / 60s. ETA for position `pos` is the // time needed to drain pos jobs at that rate. perJob := 60.0 / float64(thr) return time.Duration(perJob * float64(pos) * float64(time.Second)) } // fifoPolicy is a simple slice-backed FIFO queue. Used by the v1 // constructor (New). type fifoPolicy struct { queue []*queuedJob } func newFIFOPolicy() queuePolicy { return &fifoPolicy{} } func (f *fifoPolicy) Enqueue(j *queuedJob) { f.queue = append(f.queue, j) } func (f *fifoPolicy) Dequeue() *queuedJob { if len(f.queue) == 0 { return nil } j := f.queue[0] // Avoid retaining the old reference. f.queue[0] = nil f.queue = f.queue[1:] return j } func (f *fifoPolicy) Cancel(jobID string) bool { for i, j := range f.queue { if j.job.ID() == jobID { // Remove and signal cancelled. f.queue = append(f.queue[:i], f.queue[i+1:]...) j.done <- jobResult{err: ErrCancelled} return true } } return false } func (f *fifoPolicy) Len() int { return len(f.queue) } func (f *fifoPolicy) OldestEnqueueTime() *time.Time { if len(f.queue) == 0 { return nil } t := f.queue[0].enqueuedAt return &t }