Files
executus/lane/pool.go
T
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

695 lines
21 KiB
Go

package lane
import (
"context"
"sync"
"time"
)
// pool implements Lane with a slot-counting mutex + a pluggable queue
// policy. A single dispatch path lives inside complete(): when a job
// finishes it pulls the next queued job (if any) under the same lock,
// guaranteeing a strict "release one slot, fill one slot" rhythm with
// no goroutine racing to pick the same job.
//
// Why a mutex + map vs a buffered channel as semaphore: we need to
// inspect "running" + "queued" state for Stats, Cancel, and the
// dispatch decision. A single mutex over both maps keeps that cheap
// and consistent.
//
// Test: pool_test.go covers slot-available, slot-full, cancel,
// SubmitWait blocking, Stats accuracy, throughput sampling, and
// SetMaxConcurrent.
type pool struct {
name string
mu sync.Mutex
maxConcurrent int
running map[string]*runningJob
queue queuePolicy
closed bool
// completions is a sliding window of job-finish timestamps used
// for the Throughput1m stat. Append on every complete(); prune
// entries older than 60s on read + on each append. Bounded by
// the throughput rate, not by an explicit cap — at 60s/window
// even a tight loop tops out at a few thousand entries.
completions []time.Time
// runtimes is a bounded sliding window of completed-job wall-clock
// runtimes used by SubmitWithMaxWait's ETA estimator. Capped at
// the configured eta window size (default 16). v9.
runtimes []time.Duration
etaWindowSize int
// preemption configuration. Both can be reconfigured after
// construction via SetPreemptionPolicy. nil-safe defaults preserve
// pre-v9 behavior (no preemption). v9.
preemptPolicy PreemptionPolicy
}
type runningJob struct {
job Job
// startedAt captures dispatch wall-clock for future ETA tuning;
// not currently surfaced.
startedAt time.Time
// runCtx is the context passed to Job.Run; cancel calls the
// associated CancelCauseFunc. v9.
runCtx context.Context
cancel context.CancelCauseFunc
// preempted is set true when the lane scheduler chose this job for
// preemption. The worker reads this on Run-return to deliver
// ErrPreempted instead of the actual ctx.Cause. v9.
preempted bool
}
// queuedJob is the in-queue representation of a Submit. done is buffered
// so the dispatch goroutine can signal completion without blocking
// (SubmitWait may have given up on ctx.Done before the job runs;
// dispatch must still be able to deliver the result without leaking).
type queuedJob struct {
job Job
enqueuedAt time.Time
// done is closed (or sent on) exactly once when the job's outcome
// is known: either Run returned, or the job was cancelled before
// dispatch.
done chan jobResult
}
type jobResult struct {
err error
}
// queuePolicy is the pluggable queue ordering. fifoPolicy is the
// default; fairSharePolicy lives in policy_fair_share.go.
//
// Why pluggable: the LLM-transport lane wants fair-share, but
// single-resource lanes (e.g. gpu-imagine, max_concurrent=1) work
// fine with FIFO. Future v7 work might add weighted fair share or
// strict priority — keeping the policy small lets us evolve.
type queuePolicy interface {
// Enqueue adds a job to the queue. Implementations may reorder
// the queue based on caller / priority.
Enqueue(j *queuedJob)
// Dequeue returns the next job to run, removing it from the
// queue. Returns nil when empty.
Dequeue() *queuedJob
// Cancel removes a job by ID and signals its done channel with
// ErrCancelled. Returns true if found.
Cancel(jobID string) bool
// Len returns the number of queued jobs.
Len() int
// OldestEnqueueTime returns the earliest enqueue timestamp, or
// nil if the queue is empty.
OldestEnqueueTime() *time.Time
}
// New constructs a pool with FIFO queueing.
//
// Why a separate New / NewWithFairShare instead of a single function
// taking a policy: lanes are usually instantiated by name from convars
// — keeping the constructor selection explicit makes call sites read
// clearly ("we want fair-share for the ollama lane").
func New(name string, maxConcurrent int) Lane {
if maxConcurrent <= 0 {
maxConcurrent = 1
}
return &pool{
name: name,
maxConcurrent: maxConcurrent,
running: make(map[string]*runningJob),
queue: newFIFOPolicy(),
}
}
// NewWithPolicy constructs a pool with a caller-supplied queue policy.
// Used by NewWithFairShare and by tests that exercise custom orderings.
func NewWithPolicy(name string, maxConcurrent int, policy queuePolicy) Lane {
if maxConcurrent <= 0 {
maxConcurrent = 1
}
if policy == nil {
policy = newFIFOPolicy()
}
return &pool{
name: name,
maxConcurrent: maxConcurrent,
running: make(map[string]*runningJob),
queue: policy,
}
}
func (p *pool) Name() string { return p.name }
func (p *pool) Submit(ctx context.Context, job Job) (int, time.Duration, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return 0, 0, ErrLaneClosed
}
if len(p.running) < p.maxConcurrent {
// Slot available — dispatch immediately.
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
// We need a done channel even for fire-and-forget Submit so
// complete() has somewhere to signal; it's discarded.
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
return 0, 0, nil
}
// V9 preemption: incoming job has higher priority than at least one
// preemptible running job that has been running for the min-runtime
// guard. If we can find such a victim, cancel it and dispatch the
// new job into the freed slot. The victim's worker delivers
// ErrPreempted on its done channel.
if p.tryPreemptLocked(job) {
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
return 0, 0, nil
}
// Queue.
qj := &queuedJob{
job: job,
enqueuedAt: time.Now(),
done: make(chan jobResult, 1),
}
p.queue.Enqueue(qj)
pos := p.queue.Len()
eta := p.estimateETALocked(pos)
p.mu.Unlock()
return pos, eta, nil
}
// SubmitWithMaxWait is like Submit but returns ErrLaneBusy without
// enqueueing if the estimated wait time would exceed maxWait. maxWait
// <= 0 disables the gate (equivalent to Submit). v9.
//
// ETA is computed from the recent completed-job runtime window; with
// no history the estimator falls back to a conservative 1s/slot.
// Callers ARE NOT charged for an ErrLaneBusy submission — the job is
// never enqueued. The estimated wait at the time of decision is
// returned alongside the error so callers can log/report the exact
// gate value.
func (p *pool) SubmitWithMaxWait(ctx context.Context, job Job, maxWait time.Duration) (int, time.Duration, error) {
if maxWait <= 0 {
return p.Submit(ctx, job)
}
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return 0, 0, ErrLaneClosed
}
if len(p.running) < p.maxConcurrent {
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
return 0, 0, nil
}
if p.tryPreemptLocked(job) {
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
return 0, 0, nil
}
// Estimate wait at queue tail (current depth + 1).
pos := p.queue.Len() + 1
eta := p.estimateWaitLocked(pos)
if eta > maxWait {
p.mu.Unlock()
return pos, eta, ErrLaneBusy
}
qj := &queuedJob{
job: job,
enqueuedAt: time.Now(),
done: make(chan jobResult, 1),
}
p.queue.Enqueue(qj)
p.mu.Unlock()
return pos, eta, nil
}
// newRunningJobLocked allocates the per-running-job state. Caller MUST
// hold p.mu. v9: every running job carries its own context so the
// preemption path has somewhere to deliver cancellation.
func (p *pool) newRunningJobLocked(job Job) *runningJob {
jobCtx, cancel := context.WithCancelCause(context.Background())
return &runningJob{
job: job,
startedAt: time.Now(),
runCtx: jobCtx,
cancel: cancel,
}
}
// tryPreemptLocked picks a preemption victim and cancels it. Returns
// true if a slot was freed. Caller MUST hold p.mu and MUST verify
// the lane is full before calling. v9.
//
// Selection: among running jobs that (a) implement Preemptible and
// IsPreemptible() returns true, AND (b) have a strictly LOWER priority
// than the incoming job, AND (c) have been running for >= MinRuntime,
// pick the one with the LOWEST priority; FIFO tie-break by oldest
// startedAt. We pick lowest priority first so we always sacrifice the
// least-valuable running job. The min-runtime guard prevents thrashing
// (a just-dispatched job staying alive long enough to make progress).
func (p *pool) tryPreemptLocked(incoming Job) bool {
if p.preemptPolicy != nil && !p.preemptPolicy.Enabled() {
return false
}
pol, ok := incoming.(Preemptible)
_ = pol
_ = ok
// We don't gate by "incoming is preemptible". Even non-preemptible
// incoming jobs may preempt a preemptible victim: the goal is to
// give higher-priority work the slot, regardless of whether THAT
// work is itself preemptible. Mark a skill preemptible only when
// you'd accept losing its work to whatever priority arrives next.
minRuntime := p.minRuntimeLocked()
now := time.Now()
var victim *runningJob
for _, rj := range p.running {
pj, isPre := rj.job.(Preemptible)
if !isPre || !pj.IsPreemptible() {
continue
}
if rj.preempted {
continue // already chosen in a prior race; don't double-cancel
}
if rj.job.Priority() >= incoming.Priority() {
continue
}
if now.Sub(rj.startedAt) < minRuntime {
continue
}
if victim == nil ||
rj.job.Priority() < victim.job.Priority() ||
(rj.job.Priority() == victim.job.Priority() && rj.startedAt.Before(victim.startedAt)) {
victim = rj
}
}
if victim == nil {
return false
}
victim.preempted = true
if victim.cancel != nil {
victim.cancel(ErrPreempted)
}
// We DO NOT remove the victim from p.running here — the worker
// goroutine's Run() may take some non-trivial time to honour
// cancellation. The slot will free when the worker calls
// complete(). Until then, we count this victim as still occupying
// a slot. The caller MUST not assume an immediate slot is
// available; it should still go through the normal "queue if
// full" path. We return true to signal "preemption requested" so
// the caller can elect to immediately enqueue at queue head.
//
// However, the v9 spec wants the higher-priority job to take the
// slot directly. We accomplish this by NOT going through the
// queue: the caller already verified len(running) >=
// maxConcurrent, but by setting victim.preempted=true and
// signalling cancel, the victim's worker will exit imminently.
// We dispatch the incoming job NOW, accepting that running may
// briefly exceed maxConcurrent. The complete() path doesn't
// re-enforce the cap; SetMaxConcurrent uses the same "let
// in-flight finish" semantics. So the incoming job runs in
// parallel with the about-to-die victim, and order-of-magnitude
// the lane may briefly hold maxConcurrent+1 jobs. This is
// acceptable because preemption is opt-in and rare.
return true
}
// minRuntimeLocked returns the configured preemption min-runtime, or
// the default of 30s when the policy is nil. Caller MUST hold p.mu.
//
// A configured policy returning d == 0 is honored as "no min-runtime
// guard" (preempt immediately). d < 0 falls back to the default.
func (p *pool) minRuntimeLocked() time.Duration {
if p.preemptPolicy == nil {
return 30 * time.Second
}
d := p.preemptPolicy.MinRuntime()
if d < 0 {
return 30 * time.Second
}
return d
}
// SetPreemptionPolicy installs a new preemption policy. Existing
// running jobs are unaffected; future dispatch decisions consult the
// new policy. v9.
func (p *pool) SetPreemptionPolicy(policy PreemptionPolicy) {
p.mu.Lock()
p.preemptPolicy = policy
p.mu.Unlock()
}
// SetETAWindowSize updates the rolling window size used by
// SubmitWithMaxWait's ETA estimator. v9.
func (p *pool) SetETAWindowSize(n int) {
if n <= 0 {
return
}
p.mu.Lock()
p.etaWindowSize = n
if len(p.runtimes) > n {
p.runtimes = p.runtimes[len(p.runtimes)-n:]
}
p.mu.Unlock()
}
func (p *pool) SubmitWait(ctx context.Context, job Job) error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return ErrLaneClosed
}
if len(p.running) < p.maxConcurrent {
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
select {
case res := <-done:
return res.err
case <-ctx.Done():
// Run has its own context; we cannot kill it from here.
// Wait for it to finish and return ctx.Err to the caller.
<-done
return ctx.Err()
}
}
// V9 preemption: same path as Submit.
if p.tryPreemptLocked(job) {
rj := p.newRunningJobLocked(job)
p.running[job.ID()] = rj
done := make(chan jobResult, 1)
p.mu.Unlock()
go p.run(rj, done)
select {
case res := <-done:
return res.err
case <-ctx.Done():
<-done
return ctx.Err()
}
}
qj := &queuedJob{
job: job,
enqueuedAt: time.Now(),
done: make(chan jobResult, 1),
}
p.queue.Enqueue(qj)
p.mu.Unlock()
select {
case res := <-qj.done:
return res.err
case <-ctx.Done():
// Try to cancel before dispatch picks it up.
if p.Cancel(job.ID()) == nil {
return ctx.Err()
}
// Already dequeued and running — wait for the run to finish.
<-qj.done
return ctx.Err()
}
}
// run executes the job and arranges for the next queued job to be
// dispatched on completion. The done channel is signaled exactly once
// with the run's error.
//
// v9: each running job carries its own cancellable context so the
// preemption path can deliver cancellation. Pre-v9 callers passed
// context.Background; that semantic is preserved for jobs that ignore
// ctx.Done. Jobs that respect ctx will see cancellation immediately
// when the lane scheduler chooses them as a preemption victim.
func (p *pool) run(rj *runningJob, done chan<- jobResult) {
jobCtx := p.newJobContext(rj)
err := rj.job.Run(jobCtx)
// If the lane chose this job for preemption, override the worker's
// returned error with ErrPreempted so SubmitWait callers can
// distinguish "preempted" from a generic ctx.Cause.
p.mu.Lock()
preempted := rj.preempted
startedAt := rj.startedAt
p.mu.Unlock()
if preempted {
err = ErrPreempted
}
done <- jobResult{err: err}
p.complete(rj.job.ID(), startedAt, time.Now())
}
// runQueued is the dispatch path for jobs that were queued, not
// dispatched immediately. Identical to run() except it signals the
// queued job's done channel (the caller's SubmitWait waits on it).
func (p *pool) runQueued(rj *runningJob, qj *queuedJob) {
jobCtx := p.newJobContext(rj)
err := qj.job.Run(jobCtx)
p.mu.Lock()
preempted := rj.preempted
startedAt := rj.startedAt
p.mu.Unlock()
if preempted {
err = ErrPreempted
}
qj.done <- jobResult{err: err}
p.complete(qj.job.ID(), startedAt, time.Now())
}
// newJobContext returns the context the worker passes to Job.Run. v9:
// every running job has a cancellable context backing rj.cancel, so
// the preemption path can interrupt it.
func (p *pool) newJobContext(rj *runningJob) context.Context {
if rj.runCtx == nil {
return context.Background()
}
return rj.runCtx
}
// complete is called when a job's Run returns. It removes the job
// from the running map, records throughput, and pulls the next queued
// job (if any) to fill the freed slot.
func (p *pool) complete(jobID string, startedAt, finishedAt time.Time) {
p.mu.Lock()
delete(p.running, jobID)
p.completions = append(p.completions, finishedAt)
p.pruneCompletionsLocked(finishedAt)
// V9: track runtime for ETA estimator.
if !startedAt.IsZero() {
p.recordRuntimeLocked(finishedAt.Sub(startedAt))
}
// Pull next queued job under the same lock.
if !p.closed && len(p.running) < p.maxConcurrent {
next := p.queue.Dequeue()
if next != nil {
rj := p.newRunningJobLocked(next.job)
p.running[next.job.ID()] = rj
p.mu.Unlock()
go p.runQueued(rj, next)
return
}
}
p.mu.Unlock()
}
// recordRuntimeLocked appends to the rolling runtime window used by
// SubmitWithMaxWait's ETA estimator. Caller MUST hold p.mu. v9.
func (p *pool) recordRuntimeLocked(d time.Duration) {
if d <= 0 {
return
}
cap := p.etaWindowSize
if cap <= 0 {
cap = 16
}
p.runtimes = append(p.runtimes, d)
if len(p.runtimes) > cap {
p.runtimes = p.runtimes[len(p.runtimes)-cap:]
}
}
func (p *pool) Cancel(jobID string) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.queue.Cancel(jobID) {
return nil
}
return ErrNotQueued
}
func (p *pool) Stats() LaneStats {
p.mu.Lock()
defer p.mu.Unlock()
now := time.Now()
p.pruneCompletionsLocked(now)
return LaneStats{
Name: p.name,
MaxConcurrent: p.maxConcurrent,
Running: len(p.running),
Queued: p.queue.Len(),
OldestQueuedAt: p.queue.OldestEnqueueTime(),
Throughput1m: len(p.completions),
}
}
func (p *pool) SetMaxConcurrent(n int) {
if n <= 0 {
return
}
p.mu.Lock()
p.maxConcurrent = n
// If we just raised the cap, dispatch backlog.
for len(p.running) < p.maxConcurrent && !p.closed {
next := p.queue.Dequeue()
if next == nil {
break
}
rj := p.newRunningJobLocked(next.job)
p.running[next.job.ID()] = rj
// Spin up the goroutine while still holding the lock; the
// goroutine itself doesn't take p.mu until complete().
go p.runQueued(rj, next)
}
p.mu.Unlock()
}
// pruneCompletionsLocked drops completion timestamps older than 60s.
// Caller must hold p.mu. The slice is rebuilt rather than truncated
// in place because the throughput counts are typically small (hundreds
// at most); avoiding pointer churn here is not worth the complexity
// of an in-place compaction.
func (p *pool) pruneCompletionsLocked(now time.Time) {
cutoff := now.Add(-time.Minute)
if len(p.completions) == 0 {
return
}
// Find the first entry within the window — completions is
// append-only so it's already sorted ascending.
first := 0
for first < len(p.completions) && p.completions[first].Before(cutoff) {
first++
}
if first == 0 {
return
}
if first >= len(p.completions) {
p.completions = p.completions[:0]
return
}
// Copy tail down to head; reuse the backing array.
n := copy(p.completions, p.completions[first:])
p.completions = p.completions[:n]
}
// estimateWaitLocked returns the best-effort wait time before the
// given queue position is dispatched. Caller MUST hold p.mu. v9 —
// uses the recent-runtime window when available, falling back to the
// throughput-based estimate. The result reflects the time the
// position-`pos` job will sit in the queue: with `maxConcurrent`
// running jobs the wait is `(pos / maxConcurrent) * avgRuntime`.
func (p *pool) estimateWaitLocked(pos int) time.Duration {
if pos <= 0 {
return 0
}
if len(p.runtimes) == 0 {
return p.estimateETALocked(pos)
}
var total time.Duration
for _, d := range p.runtimes {
total += d
}
avg := total / time.Duration(len(p.runtimes))
if avg <= 0 {
return p.estimateETALocked(pos)
}
concurrency := p.maxConcurrent
if concurrency <= 0 {
concurrency = 1
}
// Each "round" through the slots drains `concurrency` jobs in
// avg runtime. Position `pos` waits ceil(pos / concurrency) rounds.
rounds := (pos + concurrency - 1) / concurrency
return avg * time.Duration(rounds)
}
// estimateETALocked returns a rough ETA for a job at the given
// 1-based queue position. Caller must hold p.mu.
//
// Why best-effort: production callers (Discord "queued (~30s)" reply)
// only need an order-of-magnitude estimate. Throughput is sampled over
// a 1-minute window; if the window is empty we fall back to a
// conservative default of 1s/slot * pos.
func (p *pool) estimateETALocked(pos int) time.Duration {
if pos <= 0 {
return 0
}
// throughput per second over the window
thr := len(p.completions)
if thr == 0 {
// Fallback: assume each slot takes ~1s — better than zero.
return time.Duration(pos) * time.Second
}
// We have N completions in the last 60s; the lane's "effective
// throughput" is N jobs / 60s. ETA for position `pos` is the
// time needed to drain pos jobs at that rate.
perJob := 60.0 / float64(thr)
return time.Duration(perJob * float64(pos) * float64(time.Second))
}
// fifoPolicy is a simple slice-backed FIFO queue. Used by the v1
// constructor (New).
type fifoPolicy struct {
queue []*queuedJob
}
func newFIFOPolicy() queuePolicy { return &fifoPolicy{} }
func (f *fifoPolicy) Enqueue(j *queuedJob) {
f.queue = append(f.queue, j)
}
func (f *fifoPolicy) Dequeue() *queuedJob {
if len(f.queue) == 0 {
return nil
}
j := f.queue[0]
// Avoid retaining the old reference.
f.queue[0] = nil
f.queue = f.queue[1:]
return j
}
func (f *fifoPolicy) Cancel(jobID string) bool {
for i, j := range f.queue {
if j.job.ID() == jobID {
// Remove and signal cancelled.
f.queue = append(f.queue[:i], f.queue[i+1:]...)
j.done <- jobResult{err: ErrCancelled}
return true
}
}
return false
}
func (f *fifoPolicy) Len() int { return len(f.queue) }
func (f *fifoPolicy) OldestEnqueueTime() *time.Time {
if len(f.queue) == 0 {
return nil
}
t := f.queue[0].enqueuedAt
return &t
}