ca243a2d50
executus CI / test (push) Failing after 24s
Batteries-included agent-harness base, extracted from mort's agent layer. This first cut establishes the module + the zero-coupling core primitives: - lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort - config: host config Source seam + env-var default (nil-safe helpers) - deliver: output-egress seam + Discard/Stdout defaults - identity: AdminPolicy + MemberResolver seams (nil-safe) - fanout: programmatic N×M swarm (bounded global + per-key concurrency) - README/CLAUDE.md with the vibe-coded banner; CI with Go gates + the "core stays majordomo+stdlib only" invariant Core builds with stdlib only today; majordomo enters at P1 (model/structured). go build/vet/test -race all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
184 lines
7.2 KiB
Go
184 lines
7.2 KiB
Go
// Package lane provides a bounded worker pool primitive with
|
|
// priority-aware fair-share queueing. Used by mort to bound concurrent
|
|
// access to constrained resources (LLM provider connection limits,
|
|
// skill execution slots, etc).
|
|
//
|
|
// Key design constraints:
|
|
// - Submit is non-blocking past the dispatch decision. If a slot is
|
|
// available the job is dispatched immediately; otherwise it is
|
|
// enqueued and Submit returns the queue position. Callers that
|
|
// want "wait until done" semantics use SubmitWait.
|
|
// - Fair-share-by-user prevents one heavy user from starving others
|
|
// (see policy_fair_share.go).
|
|
// - Priority is a tie-breaker within a user's queue (higher first).
|
|
// - Cancel must work for queued jobs; running jobs are owned by the
|
|
// caller's Run goroutine and not killable from here — the caller
|
|
// is expected to wire ctx cancellation if desired.
|
|
// - Stats are sampled cheaply; ETA is best-effort.
|
|
//
|
|
// Persistence (DB-backed restart recovery) is layered ON TOP of the
|
|
// in-memory primitives via pkg/lane/persistence.go.
|
|
package lane
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
)
|
|
|
|
// Job is what callers submit to a Lane. Implementations carry whatever
|
|
// state Run needs.
|
|
//
|
|
// Why: keeping Job a tiny interface lets multiple subsystems (LLM
|
|
// transport wrapper, skill executor, future runners) define their own
|
|
// concrete job types without leaking implementation details into the
|
|
// lane primitives. Persistence is layered on top via the optional
|
|
// MetadataProvider interface in persistence.go.
|
|
//
|
|
// Test: see pool_test.go for end-to-end submit/run/cancel coverage.
|
|
type Job interface {
|
|
// ID is unique per submission; used by Cancel and by the
|
|
// persistence layer to correlate DB rows with in-memory queue
|
|
// entries.
|
|
ID() string
|
|
|
|
// CallerID is the user identity for fair-share queueing. Empty
|
|
// string is allowed but lumps every empty-caller job into a
|
|
// single bucket; production callers should always populate this.
|
|
CallerID() string
|
|
|
|
// Priority is the tie-breaker within a single caller's sub-queue.
|
|
// Higher numbers run first. Default 0.
|
|
Priority() int
|
|
|
|
// Run executes the job. The lane calls Run inside a worker
|
|
// goroutine when a slot is available. Errors are returned to the
|
|
// SubmitWait caller (or logged and dropped for fire-and-forget
|
|
// Submit). The provided context is the lane's worker context;
|
|
// callers SHOULD respect cancellation but the lane does not kill
|
|
// long-running Runs that ignore it.
|
|
Run(ctx context.Context) error
|
|
}
|
|
|
|
// Lane is the bounded worker pool surface.
|
|
//
|
|
// Why an interface: lets tests substitute a fake lane, and lets the
|
|
// persistence wrapper compose around the in-memory implementation
|
|
// without having to extend it.
|
|
//
|
|
// Test: pool_test.go covers the in-memory pool implementation;
|
|
// persistence_test.go covers the persistence wrapper.
|
|
type Lane interface {
|
|
// Name returns the lane's stable identifier (e.g. "ollama").
|
|
Name() string
|
|
|
|
// Submit enqueues the job. If a slot is available, the job is
|
|
// dispatched immediately and Submit returns (0, 0, nil). If the
|
|
// lane is full, Submit returns (queuePos, eta, nil) — the job
|
|
// runs later when a slot frees. Submit does NOT block beyond the
|
|
// dispatch decision; for "wait until done" semantics use
|
|
// SubmitWait.
|
|
//
|
|
// queuePos is the 1-based position in the queue at submission
|
|
// time (1 = next to run). eta is a best-effort estimate based on
|
|
// recent throughput; zero when running immediately.
|
|
Submit(ctx context.Context, job Job) (queuePos int, eta time.Duration, err error)
|
|
|
|
// SubmitWait submits the job and blocks until Run completes or
|
|
// ctx is cancelled. Returns Run's error (or ctx.Err on cancel).
|
|
// When ctx is cancelled while the job is queued, the job is
|
|
// removed from the queue and never runs. When ctx is cancelled
|
|
// while the job is running, SubmitWait still waits for Run to
|
|
// return — Run's own respect for the context is the caller's
|
|
// responsibility.
|
|
SubmitWait(ctx context.Context, job Job) error
|
|
|
|
// Cancel removes a queued job by ID. Returns ErrNotQueued if the
|
|
// job isn't in the queue (already running, finished, or
|
|
// unknown).
|
|
Cancel(jobID string) error
|
|
|
|
// Stats returns a snapshot of the lane's current state.
|
|
Stats() LaneStats
|
|
|
|
// SetMaxConcurrent updates the lane's concurrency cap. Existing
|
|
// running jobs continue to run; new dispatches respect the new
|
|
// cap. Calling this with n <= 0 is a no-op (lanes need at least
|
|
// one slot to make progress).
|
|
SetMaxConcurrent(n int)
|
|
}
|
|
|
|
// LaneStats is a snapshot of a lane's current state. All fields are
|
|
// captured under the lane's mutex so the snapshot is internally
|
|
// consistent.
|
|
type LaneStats struct {
|
|
Name string
|
|
MaxConcurrent int
|
|
Running int
|
|
Queued int
|
|
OldestQueuedAt *time.Time
|
|
Throughput1m int // jobs completed in the last 60s
|
|
}
|
|
|
|
// Sentinels.
|
|
//
|
|
// Why exported sentinels: callers compare with errors.Is so tests and
|
|
// production handlers can distinguish lane-internal failures from
|
|
// caller errors.
|
|
var (
|
|
// ErrNotQueued is returned by Cancel when the job isn't in the
|
|
// queue (already running, finished, or unknown).
|
|
ErrNotQueued = errors.New("lane: job not queued")
|
|
|
|
// ErrLaneClosed is returned by Submit/SubmitWait after Close has
|
|
// been called.
|
|
ErrLaneClosed = errors.New("lane: closed")
|
|
|
|
// ErrCancelled is returned by SubmitWait when the job is
|
|
// cancelled while queued (either via Cancel or by ctx.Done).
|
|
ErrCancelled = errors.New("lane: job cancelled")
|
|
|
|
// ErrPreempted is delivered to a SubmitWait caller when the job's
|
|
// running goroutine was cancelled mid-run because a higher-priority
|
|
// queued job arrived at a full lane and this job was marked
|
|
// preemptible. v9.
|
|
ErrPreempted = errors.New("lane: preempted by higher priority job")
|
|
|
|
// ErrLaneBusy is returned by SubmitWithMaxWait when the estimated
|
|
// queue wait would exceed the caller's maxWait. The job is NOT
|
|
// enqueued — caller may retry or degrade. v9.
|
|
ErrLaneBusy = errors.New("lane: estimated wait exceeds max")
|
|
)
|
|
|
|
// Preemptible is an optional Job extension. A Job that returns true is
|
|
// eligible for preemption: when a higher-priority job arrives at a
|
|
// full lane, the lane scheduler may cancel this job's worker context
|
|
// mid-run. The job's Run method MUST honour ctx.Done for the
|
|
// cancellation to take effect.
|
|
//
|
|
// Why an interface (vs a flag on the Job): keeps the base Job
|
|
// interface tiny and lets each subsystem decide its preemption
|
|
// semantics. Skill jobs implement this by reading
|
|
// `skills.Skill.Preemptible`; LLM-transport jobs leave it
|
|
// unimplemented (they're never preemptible — cancelling an in-flight
|
|
// LLM call costs more than it saves).
|
|
//
|
|
// v9.
|
|
type Preemptible interface {
|
|
IsPreemptible() bool
|
|
}
|
|
|
|
// PreemptionPolicy reports whether a running job should be preempted
|
|
// by an arriving higher-priority queued job. Optional registry-level
|
|
// surface: when nil, the default policy is "preempt the oldest
|
|
// preemptible running job whose runtime exceeds the min-runtime
|
|
// guard". v9.
|
|
type PreemptionPolicy interface {
|
|
// MinRuntime returns the minimum elapsed wall-clock time before a
|
|
// preemptible job may be preempted. Default 30s when nil.
|
|
MinRuntime() time.Duration
|
|
// Enabled reports whether preemption is enabled at all on this
|
|
// lane. Default true when nil.
|
|
Enabled() bool
|
|
}
|