ca243a2d50
executus CI / test (push) Failing after 24s
Batteries-included agent-harness base, extracted from mort's agent layer. This first cut establishes the module + the zero-coupling core primitives: - lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort - config: host config Source seam + env-var default (nil-safe helpers) - deliver: output-egress seam + Discard/Stdout defaults - identity: AdminPolicy + MemberResolver seams (nil-safe) - fanout: programmatic N×M swarm (bounded global + per-key concurrency) - README/CLAUDE.md with the vibe-coded banner; CI with Go gates + the "core stays majordomo+stdlib only" invariant Core builds with stdlib only today; majordomo enters at P1 (model/structured). go build/vet/test -race all green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
193 lines
5.8 KiB
Go
193 lines
5.8 KiB
Go
package lane
|
|
|
|
import (
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
// fairSharePolicy implements queuePolicy with per-user sub-queues.
|
|
// Dequeue rotates through users round-robin so one user can't starve
|
|
// others. Within a user's queue, higher priority comes first; ties
|
|
// broken FIFO.
|
|
//
|
|
// Why round-robin not weighted-fair: simple, no tuning. If user A has
|
|
// 5 queued and user B has 1, user B's job runs after at most one of
|
|
// user A's jobs. That matches the v6 spec's "Steve queues 10, Dave
|
|
// queues 1, Dave gets in after at most 1 of Steve's" guarantee.
|
|
//
|
|
// Why a separate file: keeps pool.go focused on the in-memory pool;
|
|
// the queue policy is a swap-out. v7 may add weighted fair share or
|
|
// strict priority.
|
|
type fairSharePolicy struct {
|
|
// perUser maps caller_id → ordered sub-queue (priority desc,
|
|
// FIFO ties).
|
|
perUser map[string][]*queuedJob
|
|
// users is the round-robin rotation order. A user is appended
|
|
// when they first enqueue; removed when their sub-queue empties.
|
|
users []string
|
|
// nextIdx is the index into users for the next Dequeue. Wraps
|
|
// modulo len(users).
|
|
nextIdx int
|
|
}
|
|
|
|
// NewFairSharePolicy returns a queuePolicy with per-user round-robin
|
|
// dequeue and priority-ordered FIFO within each user's sub-queue.
|
|
//
|
|
// Why exported: lets tests (and future callers in pkg/logic/skills)
|
|
// construct lanes with explicit fair-share policy via NewWithPolicy.
|
|
func NewFairSharePolicy() queuePolicy {
|
|
return &fairSharePolicy{
|
|
perUser: make(map[string][]*queuedJob),
|
|
}
|
|
}
|
|
|
|
// NewWithFairShare constructs a Lane backed by a pool with fair-share
|
|
// queueing. Convenience wrapper used by the registry default.
|
|
func NewWithFairShare(name string, maxConcurrent int) Lane {
|
|
return NewWithPolicy(name, maxConcurrent, NewFairSharePolicy())
|
|
}
|
|
|
|
// Enqueue adds the job to the caller's sub-queue, sorted by priority
|
|
// (higher first) with FIFO tie-breaking.
|
|
func (f *fairSharePolicy) Enqueue(j *queuedJob) {
|
|
user := j.job.CallerID()
|
|
if _, ok := f.perUser[user]; !ok {
|
|
f.perUser[user] = []*queuedJob{}
|
|
f.users = append(f.users, user)
|
|
}
|
|
sub := f.perUser[user]
|
|
// Insert sorted by priority desc; FIFO ties via stable insert
|
|
// after the last entry of equal-or-higher priority.
|
|
//
|
|
// Why sort.Search: O(log n) within a single user's queue. Since
|
|
// per-user backlog is typically small, even a linear scan would
|
|
// be fine, but sort.Search keeps the worst case bounded.
|
|
i := sort.Search(len(sub), func(i int) bool {
|
|
return sub[i].job.Priority() < j.job.Priority()
|
|
})
|
|
sub = append(sub, nil)
|
|
copy(sub[i+1:], sub[i:])
|
|
sub[i] = j
|
|
f.perUser[user] = sub
|
|
}
|
|
|
|
// Dequeue rotates users round-robin until it finds a non-empty
|
|
// sub-queue. Returns nil when all sub-queues are empty.
|
|
//
|
|
// Why a single-pass loop bounded by len(users): a user whose sub-queue
|
|
// is empty stays in `users` only briefly (we delete on the empty
|
|
// transition); a single rotation through `users` always finds a non-
|
|
// empty sub-queue if one exists, and an empty rotation means truly
|
|
// empty.
|
|
func (f *fairSharePolicy) Dequeue() *queuedJob {
|
|
if len(f.users) == 0 {
|
|
return nil
|
|
}
|
|
for tries := 0; tries < len(f.users); tries++ {
|
|
// Bounds-safe selection — len(users) might shrink during
|
|
// iteration, so re-bound on every iteration.
|
|
if f.nextIdx >= len(f.users) {
|
|
f.nextIdx = 0
|
|
}
|
|
user := f.users[f.nextIdx]
|
|
sub := f.perUser[user]
|
|
// Advance the cursor for next time, regardless of whether
|
|
// we picked from this user. A round-robin pass that finds
|
|
// every user empty exits the loop.
|
|
f.nextIdx++
|
|
if len(sub) == 0 {
|
|
continue
|
|
}
|
|
j := sub[0]
|
|
sub[0] = nil
|
|
sub = sub[1:]
|
|
if len(sub) == 0 {
|
|
// User's sub-queue is now empty — remove from rotation.
|
|
delete(f.perUser, user)
|
|
f.users = removeStringAt(f.users, f.nextIdx-1)
|
|
// f.nextIdx-1 is the index we just dequeued from. After
|
|
// removing, nextIdx now points at the next user (if any),
|
|
// so we don't decrement.
|
|
if f.nextIdx > len(f.users) {
|
|
f.nextIdx = 0
|
|
}
|
|
} else {
|
|
f.perUser[user] = sub
|
|
}
|
|
return j
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Cancel walks every sub-queue looking for a matching job ID. Returns
|
|
// true if found and removed.
|
|
//
|
|
// Why O(n) scan: callers cancel by job ID without knowing the user.
|
|
// Could maintain a jobID → user index for O(1) cancel; deferred to
|
|
// later if profiling shows it matters. n is bounded by total queued
|
|
// jobs across all users.
|
|
func (f *fairSharePolicy) Cancel(jobID string) bool {
|
|
for user, sub := range f.perUser {
|
|
for i, j := range sub {
|
|
if j.job.ID() == jobID {
|
|
// Remove from sub-queue.
|
|
j.done <- jobResult{err: ErrCancelled}
|
|
f.perUser[user] = append(sub[:i], sub[i+1:]...)
|
|
if len(f.perUser[user]) == 0 {
|
|
delete(f.perUser, user)
|
|
f.users = removeString(f.users, user)
|
|
if f.nextIdx > len(f.users) {
|
|
f.nextIdx = 0
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Len returns the total queued count across every sub-queue.
|
|
func (f *fairSharePolicy) Len() int {
|
|
total := 0
|
|
for _, sub := range f.perUser {
|
|
total += len(sub)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// OldestEnqueueTime returns the earliest enqueue time across every
|
|
// sub-queue. Returns nil if every queue is empty.
|
|
func (f *fairSharePolicy) OldestEnqueueTime() *time.Time {
|
|
var oldest *time.Time
|
|
for _, sub := range f.perUser {
|
|
for _, j := range sub {
|
|
if oldest == nil || j.enqueuedAt.Before(*oldest) {
|
|
t := j.enqueuedAt
|
|
oldest = &t
|
|
}
|
|
}
|
|
}
|
|
return oldest
|
|
}
|
|
|
|
// removeString returns a new slice with the first occurrence of target
|
|
// removed. Order is preserved (round-robin order matters).
|
|
func removeString(s []string, target string) []string {
|
|
for i, v := range s {
|
|
if v == target {
|
|
return append(s[:i], s[i+1:]...)
|
|
}
|
|
}
|
|
return s
|
|
}
|
|
|
|
// removeStringAt returns a new slice with the element at idx removed.
|
|
// Order is preserved. idx is bounds-checked defensively.
|
|
func removeStringAt(s []string, idx int) []string {
|
|
if idx < 0 || idx >= len(s) {
|
|
return s
|
|
}
|
|
return append(s[:idx], s[idx+1:]...)
|
|
}
|