Files
executus/lane/policy_fair_share.go
steve ca243a2d50
executus CI / test (push) Failing after 24s
P0: stand up executus harness module above majordomo
Batteries-included agent-harness base, extracted from mort's agent layer.
This first cut establishes the module + the zero-coupling core primitives:

- lane, dispatchguard, pendingattach, run/progress.go: moved verbatim from mort
- config: host config Source seam + env-var default (nil-safe helpers)
- deliver: output-egress seam + Discard/Stdout defaults
- identity: AdminPolicy + MemberResolver seams (nil-safe)
- fanout: programmatic N×M swarm (bounded global + per-key concurrency)
- README/CLAUDE.md with the vibe-coded banner; CI with Go gates +
  the "core stays majordomo+stdlib only" invariant

Core builds with stdlib only today; majordomo enters at P1 (model/structured).
go build/vet/test -race all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 19:18:37 -04:00

193 lines
5.8 KiB
Go

package lane
import (
"sort"
"time"
)
// fairSharePolicy implements queuePolicy with per-user sub-queues.
// Dequeue rotates through users round-robin so one user can't starve
// others. Within a user's queue, higher priority comes first; ties
// broken FIFO.
//
// Why round-robin not weighted-fair: simple, no tuning. If user A has
// 5 queued and user B has 1, user B's job runs after at most one of
// user A's jobs. That matches the v6 spec's "Steve queues 10, Dave
// queues 1, Dave gets in after at most 1 of Steve's" guarantee.
//
// Why a separate file: keeps pool.go focused on the in-memory pool;
// the queue policy is a swap-out. v7 may add weighted fair share or
// strict priority.
type fairSharePolicy struct {
// perUser maps caller_id → ordered sub-queue (priority desc,
// FIFO ties).
perUser map[string][]*queuedJob
// users is the round-robin rotation order. A user is appended
// when they first enqueue; removed when their sub-queue empties.
users []string
// nextIdx is the index into users for the next Dequeue. Wraps
// modulo len(users).
nextIdx int
}
// NewFairSharePolicy returns a queuePolicy with per-user round-robin
// dequeue and priority-ordered FIFO within each user's sub-queue.
//
// Why exported: lets tests (and future callers in pkg/logic/skills)
// construct lanes with explicit fair-share policy via NewWithPolicy.
func NewFairSharePolicy() queuePolicy {
return &fairSharePolicy{
perUser: make(map[string][]*queuedJob),
}
}
// NewWithFairShare constructs a Lane backed by a pool with fair-share
// queueing. Convenience wrapper used by the registry default.
func NewWithFairShare(name string, maxConcurrent int) Lane {
return NewWithPolicy(name, maxConcurrent, NewFairSharePolicy())
}
// Enqueue adds the job to the caller's sub-queue, sorted by priority
// (higher first) with FIFO tie-breaking.
func (f *fairSharePolicy) Enqueue(j *queuedJob) {
user := j.job.CallerID()
if _, ok := f.perUser[user]; !ok {
f.perUser[user] = []*queuedJob{}
f.users = append(f.users, user)
}
sub := f.perUser[user]
// Insert sorted by priority desc; FIFO ties via stable insert
// after the last entry of equal-or-higher priority.
//
// Why sort.Search: O(log n) within a single user's queue. Since
// per-user backlog is typically small, even a linear scan would
// be fine, but sort.Search keeps the worst case bounded.
i := sort.Search(len(sub), func(i int) bool {
return sub[i].job.Priority() < j.job.Priority()
})
sub = append(sub, nil)
copy(sub[i+1:], sub[i:])
sub[i] = j
f.perUser[user] = sub
}
// Dequeue rotates users round-robin until it finds a non-empty
// sub-queue. Returns nil when all sub-queues are empty.
//
// Why a single-pass loop bounded by len(users): a user whose sub-queue
// is empty stays in `users` only briefly (we delete on the empty
// transition); a single rotation through `users` always finds a non-
// empty sub-queue if one exists, and an empty rotation means truly
// empty.
func (f *fairSharePolicy) Dequeue() *queuedJob {
if len(f.users) == 0 {
return nil
}
for tries := 0; tries < len(f.users); tries++ {
// Bounds-safe selection — len(users) might shrink during
// iteration, so re-bound on every iteration.
if f.nextIdx >= len(f.users) {
f.nextIdx = 0
}
user := f.users[f.nextIdx]
sub := f.perUser[user]
// Advance the cursor for next time, regardless of whether
// we picked from this user. A round-robin pass that finds
// every user empty exits the loop.
f.nextIdx++
if len(sub) == 0 {
continue
}
j := sub[0]
sub[0] = nil
sub = sub[1:]
if len(sub) == 0 {
// User's sub-queue is now empty — remove from rotation.
delete(f.perUser, user)
f.users = removeStringAt(f.users, f.nextIdx-1)
// f.nextIdx-1 is the index we just dequeued from. After
// removing, nextIdx now points at the next user (if any),
// so we don't decrement.
if f.nextIdx > len(f.users) {
f.nextIdx = 0
}
} else {
f.perUser[user] = sub
}
return j
}
return nil
}
// Cancel walks every sub-queue looking for a matching job ID. Returns
// true if found and removed.
//
// Why O(n) scan: callers cancel by job ID without knowing the user.
// Could maintain a jobID → user index for O(1) cancel; deferred to
// later if profiling shows it matters. n is bounded by total queued
// jobs across all users.
func (f *fairSharePolicy) Cancel(jobID string) bool {
for user, sub := range f.perUser {
for i, j := range sub {
if j.job.ID() == jobID {
// Remove from sub-queue.
j.done <- jobResult{err: ErrCancelled}
f.perUser[user] = append(sub[:i], sub[i+1:]...)
if len(f.perUser[user]) == 0 {
delete(f.perUser, user)
f.users = removeString(f.users, user)
if f.nextIdx > len(f.users) {
f.nextIdx = 0
}
}
return true
}
}
}
return false
}
// Len returns the total queued count across every sub-queue.
func (f *fairSharePolicy) Len() int {
total := 0
for _, sub := range f.perUser {
total += len(sub)
}
return total
}
// OldestEnqueueTime returns the earliest enqueue time across every
// sub-queue. Returns nil if every queue is empty.
func (f *fairSharePolicy) OldestEnqueueTime() *time.Time {
var oldest *time.Time
for _, sub := range f.perUser {
for _, j := range sub {
if oldest == nil || j.enqueuedAt.Before(*oldest) {
t := j.enqueuedAt
oldest = &t
}
}
}
return oldest
}
// removeString returns a new slice with the first occurrence of target
// removed. Order is preserved (round-robin order matters).
func removeString(s []string, target string) []string {
for i, v := range s {
if v == target {
return append(s[:i], s[i+1:]...)
}
}
return s
}
// removeStringAt returns a new slice with the element at idx removed.
// Order is preserved. idx is bounds-checked defensively.
func removeStringAt(s []string, idx int) []string {
if idx < 0 || idx >= len(s) {
return s
}
return append(s[:idx], s[idx+1:]...)
}