// Package lane provides a bounded worker pool primitive with // priority-aware fair-share queueing. Used by mort to bound concurrent // access to constrained resources (LLM provider connection limits, // skill execution slots, etc). // // Key design constraints: // - Submit is non-blocking past the dispatch decision. If a slot is // available the job is dispatched immediately; otherwise it is // enqueued and Submit returns the queue position. Callers that // want "wait until done" semantics use SubmitWait. // - Fair-share-by-user prevents one heavy user from starving others // (see policy_fair_share.go). // - Priority is a tie-breaker within a user's queue (higher first). // - Cancel must work for queued jobs; running jobs are owned by the // caller's Run goroutine and not killable from here — the caller // is expected to wire ctx cancellation if desired. // - Stats are sampled cheaply; ETA is best-effort. // // Persistence (DB-backed restart recovery) is layered ON TOP of the // in-memory primitives via pkg/lane/persistence.go. package lane import ( "context" "errors" "time" ) // Job is what callers submit to a Lane. Implementations carry whatever // state Run needs. // // Why: keeping Job a tiny interface lets multiple subsystems (LLM // transport wrapper, skill executor, future runners) define their own // concrete job types without leaking implementation details into the // lane primitives. Persistence is layered on top via the optional // MetadataProvider interface in persistence.go. // // Test: see pool_test.go for end-to-end submit/run/cancel coverage. type Job interface { // ID is unique per submission; used by Cancel and by the // persistence layer to correlate DB rows with in-memory queue // entries. ID() string // CallerID is the user identity for fair-share queueing. Empty // string is allowed but lumps every empty-caller job into a // single bucket; production callers should always populate this. CallerID() string // Priority is the tie-breaker within a single caller's sub-queue. // Higher numbers run first. Default 0. Priority() int // Run executes the job. The lane calls Run inside a worker // goroutine when a slot is available. Errors are returned to the // SubmitWait caller (or logged and dropped for fire-and-forget // Submit). The provided context is the lane's worker context; // callers SHOULD respect cancellation but the lane does not kill // long-running Runs that ignore it. Run(ctx context.Context) error } // Lane is the bounded worker pool surface. // // Why an interface: lets tests substitute a fake lane, and lets the // persistence wrapper compose around the in-memory implementation // without having to extend it. // // Test: pool_test.go covers the in-memory pool implementation; // persistence_test.go covers the persistence wrapper. type Lane interface { // Name returns the lane's stable identifier (e.g. "ollama"). Name() string // Submit enqueues the job. If a slot is available, the job is // dispatched immediately and Submit returns (0, 0, nil). If the // lane is full, Submit returns (queuePos, eta, nil) — the job // runs later when a slot frees. Submit does NOT block beyond the // dispatch decision; for "wait until done" semantics use // SubmitWait. // // queuePos is the 1-based position in the queue at submission // time (1 = next to run). eta is a best-effort estimate based on // recent throughput; zero when running immediately. Submit(ctx context.Context, job Job) (queuePos int, eta time.Duration, err error) // SubmitWait submits the job and blocks until Run completes or // ctx is cancelled. Returns Run's error (or ctx.Err on cancel). // When ctx is cancelled while the job is queued, the job is // removed from the queue and never runs. When ctx is cancelled // while the job is running, SubmitWait still waits for Run to // return — Run's own respect for the context is the caller's // responsibility. SubmitWait(ctx context.Context, job Job) error // Cancel removes a queued job by ID. Returns ErrNotQueued if the // job isn't in the queue (already running, finished, or // unknown). Cancel(jobID string) error // Stats returns a snapshot of the lane's current state. Stats() LaneStats // SetMaxConcurrent updates the lane's concurrency cap. Existing // running jobs continue to run; new dispatches respect the new // cap. Calling this with n <= 0 is a no-op (lanes need at least // one slot to make progress). SetMaxConcurrent(n int) } // LaneStats is a snapshot of a lane's current state. All fields are // captured under the lane's mutex so the snapshot is internally // consistent. type LaneStats struct { Name string MaxConcurrent int Running int Queued int OldestQueuedAt *time.Time Throughput1m int // jobs completed in the last 60s } // Sentinels. // // Why exported sentinels: callers compare with errors.Is so tests and // production handlers can distinguish lane-internal failures from // caller errors. var ( // ErrNotQueued is returned by Cancel when the job isn't in the // queue (already running, finished, or unknown). ErrNotQueued = errors.New("lane: job not queued") // ErrLaneClosed is returned by Submit/SubmitWait after Close has // been called. ErrLaneClosed = errors.New("lane: closed") // ErrCancelled is returned by SubmitWait when the job is // cancelled while queued (either via Cancel or by ctx.Done). ErrCancelled = errors.New("lane: job cancelled") // ErrPreempted is delivered to a SubmitWait caller when the job's // running goroutine was cancelled mid-run because a higher-priority // queued job arrived at a full lane and this job was marked // preemptible. v9. ErrPreempted = errors.New("lane: preempted by higher priority job") // ErrLaneBusy is returned by SubmitWithMaxWait when the estimated // queue wait would exceed the caller's maxWait. The job is NOT // enqueued — caller may retry or degrade. v9. ErrLaneBusy = errors.New("lane: estimated wait exceeds max") ) // Preemptible is an optional Job extension. A Job that returns true is // eligible for preemption: when a higher-priority job arrives at a // full lane, the lane scheduler may cancel this job's worker context // mid-run. The job's Run method MUST honour ctx.Done for the // cancellation to take effect. // // Why an interface (vs a flag on the Job): keeps the base Job // interface tiny and lets each subsystem decide its preemption // semantics. Skill jobs implement this by reading // `skills.Skill.Preemptible`; LLM-transport jobs leave it // unimplemented (they're never preemptible — cancelling an in-flight // LLM call costs more than it saves). // // v9. type Preemptible interface { IsPreemptible() bool } // PreemptionPolicy reports whether a running job should be preempted // by an arriving higher-priority queued job. Optional registry-level // surface: when nil, the default policy is "preempt the oldest // preemptible running job whose runtime exceeds the min-runtime // guard". v9. type PreemptionPolicy interface { // MinRuntime returns the minimum elapsed wall-clock time before a // preemptible job may be preempted. Default 30s when nil. MinRuntime() time.Duration // Enabled reports whether preemption is enabled at all on this // lane. Default true when nil. Enabled() bool }