package lane import ( "sort" "time" ) // fairSharePolicy implements queuePolicy with per-user sub-queues. // Dequeue rotates through users round-robin so one user can't starve // others. Within a user's queue, higher priority comes first; ties // broken FIFO. // // Why round-robin not weighted-fair: simple, no tuning. If user A has // 5 queued and user B has 1, user B's job runs after at most one of // user A's jobs. That matches the v6 spec's "Steve queues 10, Dave // queues 1, Dave gets in after at most 1 of Steve's" guarantee. // // Why a separate file: keeps pool.go focused on the in-memory pool; // the queue policy is a swap-out. v7 may add weighted fair share or // strict priority. type fairSharePolicy struct { // perUser maps caller_id → ordered sub-queue (priority desc, // FIFO ties). perUser map[string][]*queuedJob // users is the round-robin rotation order. A user is appended // when they first enqueue; removed when their sub-queue empties. users []string // nextIdx is the index into users for the next Dequeue. Wraps // modulo len(users). nextIdx int } // NewFairSharePolicy returns a queuePolicy with per-user round-robin // dequeue and priority-ordered FIFO within each user's sub-queue. // // Why exported: lets tests (and future callers in pkg/logic/skills) // construct lanes with explicit fair-share policy via NewWithPolicy. func NewFairSharePolicy() queuePolicy { return &fairSharePolicy{ perUser: make(map[string][]*queuedJob), } } // NewWithFairShare constructs a Lane backed by a pool with fair-share // queueing. Convenience wrapper used by the registry default. func NewWithFairShare(name string, maxConcurrent int) Lane { return NewWithPolicy(name, maxConcurrent, NewFairSharePolicy()) } // Enqueue adds the job to the caller's sub-queue, sorted by priority // (higher first) with FIFO tie-breaking. func (f *fairSharePolicy) Enqueue(j *queuedJob) { user := j.job.CallerID() if _, ok := f.perUser[user]; !ok { f.perUser[user] = []*queuedJob{} f.users = append(f.users, user) } sub := f.perUser[user] // Insert sorted by priority desc; FIFO ties via stable insert // after the last entry of equal-or-higher priority. // // Why sort.Search: O(log n) within a single user's queue. Since // per-user backlog is typically small, even a linear scan would // be fine, but sort.Search keeps the worst case bounded. i := sort.Search(len(sub), func(i int) bool { return sub[i].job.Priority() < j.job.Priority() }) sub = append(sub, nil) copy(sub[i+1:], sub[i:]) sub[i] = j f.perUser[user] = sub } // Dequeue rotates users round-robin until it finds a non-empty // sub-queue. Returns nil when all sub-queues are empty. // // Why a single-pass loop bounded by len(users): a user whose sub-queue // is empty stays in `users` only briefly (we delete on the empty // transition); a single rotation through `users` always finds a non- // empty sub-queue if one exists, and an empty rotation means truly // empty. func (f *fairSharePolicy) Dequeue() *queuedJob { if len(f.users) == 0 { return nil } for tries := 0; tries < len(f.users); tries++ { // Bounds-safe selection — len(users) might shrink during // iteration, so re-bound on every iteration. if f.nextIdx >= len(f.users) { f.nextIdx = 0 } user := f.users[f.nextIdx] sub := f.perUser[user] // Advance the cursor for next time, regardless of whether // we picked from this user. A round-robin pass that finds // every user empty exits the loop. f.nextIdx++ if len(sub) == 0 { continue } j := sub[0] sub[0] = nil sub = sub[1:] if len(sub) == 0 { // User's sub-queue is now empty — remove from rotation. delete(f.perUser, user) f.users = removeStringAt(f.users, f.nextIdx-1) // f.nextIdx-1 is the index we just dequeued from. After // removing, nextIdx now points at the next user (if any), // so we don't decrement. if f.nextIdx > len(f.users) { f.nextIdx = 0 } } else { f.perUser[user] = sub } return j } return nil } // Cancel walks every sub-queue looking for a matching job ID. Returns // true if found and removed. // // Why O(n) scan: callers cancel by job ID without knowing the user. // Could maintain a jobID → user index for O(1) cancel; deferred to // later if profiling shows it matters. n is bounded by total queued // jobs across all users. func (f *fairSharePolicy) Cancel(jobID string) bool { for user, sub := range f.perUser { for i, j := range sub { if j.job.ID() == jobID { // Remove from sub-queue. j.done <- jobResult{err: ErrCancelled} f.perUser[user] = append(sub[:i], sub[i+1:]...) if len(f.perUser[user]) == 0 { delete(f.perUser, user) f.users = removeString(f.users, user) if f.nextIdx > len(f.users) { f.nextIdx = 0 } } return true } } } return false } // Len returns the total queued count across every sub-queue. func (f *fairSharePolicy) Len() int { total := 0 for _, sub := range f.perUser { total += len(sub) } return total } // OldestEnqueueTime returns the earliest enqueue time across every // sub-queue. Returns nil if every queue is empty. func (f *fairSharePolicy) OldestEnqueueTime() *time.Time { var oldest *time.Time for _, sub := range f.perUser { for _, j := range sub { if oldest == nil || j.enqueuedAt.Before(*oldest) { t := j.enqueuedAt oldest = &t } } } return oldest } // removeString returns a new slice with the first occurrence of target // removed. Order is preserved (round-robin order matters). func removeString(s []string, target string) []string { for i, v := range s { if v == target { return append(s[:i], s[i+1:]...) } } return s } // removeStringAt returns a new slice with the element at idx removed. // Order is preserved. idx is bounds-checked defensively. func removeStringAt(s []string, idx int) []string { if idx < 0 || idx >= len(s) { return s } return append(s[:idx], s[idx+1:]...) }