package lane import ( "context" "errors" "sync/atomic" "testing" "time" ) // preemptibleJob is a testJob extension that opts into preemption and // honours ctx cancellation by returning ctx.Err on the cancel path. type preemptibleJob struct { *testJob preemptible bool // ranWith is set inside Run with the actual error returned by the // honest ctx.Done observer so tests can distinguish "preempted" // from "ran to completion". ranWith atomic.Value // error } func newPreemptibleJob(id string, priority int, preemptible bool) *preemptibleJob { pj := &preemptibleJob{testJob: newTestJob(id), preemptible: preemptible} pj.priority = priority return pj } func (p *preemptibleJob) IsPreemptible() bool { return p.preemptible } // finishedSentinel is a non-nil error stored when Run finishes via // p.release (no preemption). atomic.Value cannot store nil, so we use // this sentinel to disambiguate "Run completed normally" from "not // yet finished". var finishedSentinel = errors.New("test: finished normally") // Run blocks until either ctx is cancelled (preemption) or release is // closed (normal finish). Records which path won so the test asserts. func (p *preemptibleJob) Run(ctx context.Context) error { atomic.AddInt32(&p.runCount, 1) select { case p.started <- struct{}{}: default: } select { case <-ctx.Done(): err := context.Cause(ctx) p.ranWith.Store(err) return err case <-p.release: p.ranWith.Store(finishedSentinel) return p.err } } // fixedPreemptionPolicy is a test PreemptionPolicy with knobs for // MinRuntime + Enabled. type fixedPreemptionPolicy struct { min time.Duration enabled bool } func (f *fixedPreemptionPolicy) MinRuntime() time.Duration { return f.min } func (f *fixedPreemptionPolicy) Enabled() bool { return f.enabled } // TestPool_Preemption_FiresOnHigherPriority verifies that a high- // priority Submit at a full lane preempts a preemptible low-priority // running job that has been running for at least min-runtime. func TestPool_Preemption_FiresOnHigherPriority(t *testing.T) { p := NewWithPolicy("test", 1, NewFairSharePolicy()).(*pool) p.SetPreemptionPolicy(&fixedPreemptionPolicy{min: 0, enabled: true}) low := newPreemptibleJob("low", 0, true) low.caller = "u1" if err := submitNoBlock(p, low); err != nil { t.Fatalf("submit low: %v", err) } <-low.started // Slot is full. Submit a higher-priority job — should preempt. high := newPreemptibleJob("high", 5, false) high.caller = "u2" pos, _, err := p.Submit(context.Background(), high) if err != nil { t.Fatalf("submit high: %v", err) } if pos != 0 { t.Errorf("high pos = %d, want 0 (dispatched after preempt)", pos) } // Wait for the low's Run to return with ctx.Cause = ErrPreempted. deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if v := low.ranWith.Load(); v != nil { if err, ok := v.(error); ok && errors.Is(err, ErrPreempted) { goto done } } time.Sleep(5 * time.Millisecond) } t.Fatalf("low Run never returned with ErrPreempted; ranWith=%v", low.ranWith.Load()) done: // Let high finish. close(high.release) // Drain low's release channel to release the goroutine cleanly. close(low.release) } // TestPool_Preemption_RespectsMinRuntime verifies that a high-priority // Submit does NOT preempt a job younger than the min-runtime guard. func TestPool_Preemption_RespectsMinRuntime(t *testing.T) { p := NewWithPolicy("test", 1, NewFairSharePolicy()).(*pool) // Min-runtime in the future so no running job qualifies. p.SetPreemptionPolicy(&fixedPreemptionPolicy{min: time.Hour, enabled: true}) low := newPreemptibleJob("low", 0, true) low.caller = "u1" if err := submitNoBlock(p, low); err != nil { t.Fatalf("submit low: %v", err) } <-low.started high := newPreemptibleJob("high", 5, false) high.caller = "u2" pos, _, err := p.Submit(context.Background(), high) if err != nil { t.Fatalf("submit high: %v", err) } if pos == 0 { t.Errorf("high pos = 0; expected to be queued (preemption blocked by min-runtime)") } // Confirm low was NOT preempted: ranWith stays nil. time.Sleep(20 * time.Millisecond) if v := low.ranWith.Load(); v != nil { if err, ok := v.(error); ok && err != nil { t.Errorf("low was preempted unexpectedly: %v", err) } } close(low.release) close(high.release) } // TestPool_Preemption_NonPreemptibleProtected verifies that a // non-preemptible running job is not chosen as a victim even when a // higher-priority job arrives. func TestPool_Preemption_NonPreemptibleProtected(t *testing.T) { p := NewWithPolicy("test", 1, NewFairSharePolicy()).(*pool) p.SetPreemptionPolicy(&fixedPreemptionPolicy{min: 0, enabled: true}) low := newPreemptibleJob("low", 0, false /* not preemptible */) low.caller = "u1" if err := submitNoBlock(p, low); err != nil { t.Fatalf("submit low: %v", err) } <-low.started high := newPreemptibleJob("high", 5, false) high.caller = "u2" pos, _, err := p.Submit(context.Background(), high) if err != nil { t.Fatalf("submit high: %v", err) } if pos == 0 { t.Errorf("high pos = 0; expected queued (non-preemptible victim)") } close(low.release) close(high.release) } // TestPool_SubmitWithMaxWait_ZeroBlocks verifies that maxWait=0 falls // back to the default Submit path (no early-return). func TestPool_SubmitWithMaxWait_ZeroBlocks(t *testing.T) { p := New("test", 1).(*pool) first := newTestJob("j1") if err := submitNoBlock(p, first); err != nil { t.Fatalf("submit first: %v", err) } <-first.started second := newTestJob("j2") pos, _, err := p.SubmitWithMaxWait(context.Background(), second, 0) if err != nil { t.Fatalf("submit second: %v", err) } if pos == 0 { t.Errorf("expected second to be queued, got pos=0") } close(first.release) close(second.release) } // TestPool_SubmitWithMaxWait_RejectsWhenETAExceedsCap verifies that // SubmitWithMaxWait returns ErrLaneBusy without enqueueing when the // estimated wait exceeds maxWait. func TestPool_SubmitWithMaxWait_RejectsWhenETAExceedsCap(t *testing.T) { p := New("test", 1).(*pool) p.SetETAWindowSize(4) // Run a job that takes ~30ms so the estimator has runtime data. timed := newTestJob("timed") go func() { time.Sleep(30 * time.Millisecond) close(timed.release) }() if err := p.SubmitWait(context.Background(), timed); err != nil { t.Fatalf("timed: %v", err) } // Block the lane. blocker := newTestJob("blocker") go func() { _ = p.SubmitWait(context.Background(), blocker) }() <-blocker.started // Try to submit with maxWait=1ns — definitely shorter than the // average runtime. hopeless := newTestJob("hopeless") pos, eta, err := p.SubmitWithMaxWait(context.Background(), hopeless, time.Nanosecond) if !errors.Is(err, ErrLaneBusy) { t.Fatalf("err = %v, want ErrLaneBusy; pos=%d eta=%s", err, pos, eta) } if eta == 0 { t.Errorf("expected non-zero eta on busy reject, got 0") } // Was hopeless enqueued? Stats should show 0 queued (only blocker // running). stats := p.Stats() if stats.Queued != 0 { t.Errorf("hopeless was enqueued despite ErrLaneBusy: queued=%d", stats.Queued) } close(blocker.release) } // TestPool_SubmitWithMaxWait_AllowsWhenETAUnderCap verifies that // SubmitWithMaxWait does enqueue when the estimated wait is under the // max. func TestPool_SubmitWithMaxWait_AllowsWhenETAUnderCap(t *testing.T) { p := New("test", 1).(*pool) first := newTestJob("first") if err := submitNoBlock(p, first); err != nil { t.Fatalf("submit first: %v", err) } <-first.started second := newTestJob("second") pos, _, err := p.SubmitWithMaxWait(context.Background(), second, time.Hour) if err != nil { t.Fatalf("submit second: %v", err) } if pos != 1 { t.Errorf("second pos = %d, want 1", pos) } close(first.release) close(second.release) } // submitNoBlock is a helper that asynchronously calls SubmitWait so the // caller can inspect the running job's state without blocking on // completion. func submitNoBlock(p Lane, job Job) error { errCh := make(chan error, 1) go func() { errCh <- p.SubmitWait(context.Background(), job) }() // Give the dispatch goroutine a chance to start. select { case err := <-errCh: return err case <-time.After(50 * time.Millisecond): return nil } }