package lane import ( "context" "errors" "sync" "testing" "time" ) // fakeStore is an in-memory PersistenceStore used by persistence // tests. Records every method call so tests can assert ordering. type fakeStore struct { mu sync.Mutex rows map[string]*storeRow enqErr error updErr error purgeFn func(time.Time) (int64, error) } type storeRow struct { jobID, lane, callerID string priority int metadata []byte state string enqueuedAt time.Time startedAt *time.Time finishedAt *time.Time } func newFakeStore() *fakeStore { return &fakeStore{rows: map[string]*storeRow{}} } func (f *fakeStore) EnqueueJob(_ context.Context, jobID, lane, callerID string, priority int, metadata []byte) error { f.mu.Lock() defer f.mu.Unlock() if f.enqErr != nil { return f.enqErr } if _, exists := f.rows[jobID]; exists { return errors.New("duplicate enqueue") } f.rows[jobID] = &storeRow{ jobID: jobID, lane: lane, callerID: callerID, priority: priority, metadata: metadata, state: "queued", enqueuedAt: time.Now(), } return nil } func (f *fakeStore) UpdateJobState(_ context.Context, jobID, state string, at time.Time) error { f.mu.Lock() defer f.mu.Unlock() if f.updErr != nil { return f.updErr } r, ok := f.rows[jobID] if !ok { return errors.New("not found") } r.state = state t := at switch state { case "running": r.startedAt = &t case "finished", "cancelled", "failed": r.finishedAt = &t } return nil } func (f *fakeStore) ListQueuedJobs(_ context.Context, lane string) ([]QueuedJobRef, error) { return f.list(lane, "queued"), nil } func (f *fakeStore) ListRunningJobs(_ context.Context, lane string) ([]QueuedJobRef, error) { return f.list(lane, "running"), nil } func (f *fakeStore) list(lane, state string) []QueuedJobRef { f.mu.Lock() defer f.mu.Unlock() var out []QueuedJobRef for _, r := range f.rows { if r.lane == lane && r.state == state { out = append(out, QueuedJobRef{ JobID: r.jobID, Lane: r.lane, CallerID: r.callerID, Priority: r.priority, Metadata: r.metadata, EnqueuedAt: r.enqueuedAt, }) } } return out } func (f *fakeStore) PurgeFinishedJobs(_ context.Context, olderThan time.Time) (int64, error) { if f.purgeFn != nil { return f.purgeFn(olderThan) } f.mu.Lock() defer f.mu.Unlock() var deleted int64 for id, r := range f.rows { if (r.state == "finished" || r.state == "cancelled" || r.state == "failed") && r.finishedAt != nil && r.finishedAt.Before(olderThan) { delete(f.rows, id) deleted++ } } return deleted, nil } func (f *fakeStore) state(jobID string) string { f.mu.Lock() defer f.mu.Unlock() if r, ok := f.rows[jobID]; ok { return r.state } return "" } // metaJob is a Job impl that exposes Metadata. Used in persistence // tests that assert metadata round-trip. type metaJob struct { id, caller string priority int meta []byte run func(ctx context.Context) error } func (m *metaJob) ID() string { return m.id } func (m *metaJob) CallerID() string { return m.caller } func (m *metaJob) Priority() int { return m.priority } func (m *metaJob) Metadata() []byte { return m.meta } func (m *metaJob) Run(ctx context.Context) error { return m.run(ctx) } // TestPersistedLane_Submit_WritesRow verifies Submit writes a queued // row with the right fields, then on Run completes transitions to // finished. func TestPersistedLane_Submit_WritesRow(t *testing.T) { store := newFakeStore() inner := New("ollama", 1) pl := NewPersistedLane(inner, store) done := make(chan struct{}) job := &metaJob{ id: "j1", caller: "alice", priority: 3, meta: []byte(`{"prompt":"hi"}`), run: func(ctx context.Context) error { close(done) return nil }, } if _, _, err := pl.Submit(context.Background(), job); err != nil { t.Fatal(err) } <-done // Wait for state-update goroutine to land "finished". waitFor(t, func() bool { return store.state("j1") == "finished" }) r := store.rows["j1"] if r.lane != "ollama" || r.callerID != "alice" || r.priority != 3 { t.Fatalf("row identity mismatch: %+v", r) } if string(r.metadata) != `{"prompt":"hi"}` { t.Fatalf("metadata mismatch: %s", r.metadata) } if r.startedAt == nil || r.finishedAt == nil { t.Fatalf("expected started_at + finished_at set; row=%+v", r) } } // TestPersistedLane_Submit_RunErrorMarksFailed verifies a failing Run // transitions to state=failed. func TestPersistedLane_Submit_RunErrorMarksFailed(t *testing.T) { store := newFakeStore() inner := New("test", 1) pl := NewPersistedLane(inner, store) job := &metaJob{ id: "j1", caller: "alice", run: func(ctx context.Context) error { return errors.New("boom") }, } if err := pl.SubmitWait(context.Background(), job); err == nil { t.Fatal("expected error from Run") } if got := store.state("j1"); got != "failed" { t.Fatalf("expected state=failed, got %s", got) } } // TestPersistedLane_EnqueueErrorAborts verifies that if EnqueueJob // errors, the inner lane never sees the job. func TestPersistedLane_EnqueueErrorAborts(t *testing.T) { store := newFakeStore() store.enqErr = errors.New("disk full") inner := New("test", 1).(*pool) pl := NewPersistedLane(inner, store) job := &funcJob{ id: "j1", caller: "alice", run: func(ctx context.Context) error { t.Fatal("Run should not be called when persist enqueue fails") return nil }, } _, _, err := pl.Submit(context.Background(), job) if err == nil { t.Fatal("expected Submit to fail") } // Inner lane should be empty. if got := inner.Stats().Running + inner.Stats().Queued; got != 0 { t.Fatalf("expected inner lane empty, got running+queued=%d", got) } } // TestPersistedLane_Cancel_QueuedWritesCancelled verifies cancelling a // queued job writes state=cancelled. func TestPersistedLane_Cancel_QueuedWritesCancelled(t *testing.T) { store := newFakeStore() inner := New("test", 1).(*pool) pl := NewPersistedLane(inner, store) blocker := newTestJob("blocker") if _, _, err := pl.Submit(context.Background(), blocker); err != nil { t.Fatal(err) } <-blocker.started target := newTestJob("target") if _, _, err := pl.Submit(context.Background(), target); err != nil { t.Fatal(err) } if got := inner.Stats().Queued; got != 1 { t.Fatalf("expected queued=1, got %d", got) } if err := pl.Cancel("target"); err != nil { t.Fatal(err) } waitFor(t, func() bool { return store.state("target") == "cancelled" }) close(blocker.release) } // TestRecover_RunningMarkedFailed verifies that running rows at // recovery time are marked failed (lost-on-restart). func TestRecover_RunningMarkedFailed(t *testing.T) { store := newFakeStore() now := time.Now() store.rows["r1"] = &storeRow{ jobID: "r1", lane: "ollama", callerID: "alice", state: "running", enqueuedAt: now.Add(-1 * time.Hour), startedAt: &now, } inner := New("ollama", 1) pl := NewPersistedLane(inner, store) // reconstructFn never called for running rows. if err := pl.Recover(context.Background(), nil); err != nil { t.Fatal(err) } if got := store.state("r1"); got != "failed" { t.Fatalf("expected r1 → failed, got %s", got) } } // TestRecover_QueuedReSubmitted verifies queued rows are re-submitted // to the inner lane via reconstructFn. func TestRecover_QueuedReSubmitted(t *testing.T) { store := newFakeStore() store.rows["q1"] = &storeRow{ jobID: "q1", lane: "ollama", callerID: "alice", state: "queued", enqueuedAt: time.Now(), metadata: []byte("opaque"), } store.rows["q2"] = &storeRow{ jobID: "q2", lane: "ollama", callerID: "bob", state: "queued", enqueuedAt: time.Now(), } inner := New("ollama", 2) pl := NewPersistedLane(inner, store) calls := make(chan string, 2) reconstruct := func(ref QueuedJobRef) Job { return &funcJob{ id: ref.JobID, caller: ref.CallerID, run: func(ctx context.Context) error { calls <- ref.JobID return nil }, } } if err := pl.Recover(context.Background(), reconstruct); err != nil { t.Fatal(err) } got := map[string]bool{} for i := 0; i < 2; i++ { select { case id := <-calls: got[id] = true case <-time.After(time.Second): t.Fatalf("expected 2 reconstructed runs; only got %v", got) } } if !got["q1"] || !got["q2"] { t.Fatalf("expected both q1 and q2 reconstructed; got %v", got) } // After Run completes, both rows should be state=finished. waitFor(t, func() bool { return store.state("q1") == "finished" && store.state("q2") == "finished" }) } // TestRecover_NilReconstructMarksFailed verifies that when // reconstructFn returns nil for a queued row, the row is marked // failed. func TestRecover_NilReconstructMarksFailed(t *testing.T) { store := newFakeStore() store.rows["q1"] = &storeRow{ jobID: "q1", lane: "ollama", callerID: "alice", state: "queued", enqueuedAt: time.Now(), } inner := New("ollama", 1) pl := NewPersistedLane(inner, store) if err := pl.Recover(context.Background(), func(QueuedJobRef) Job { return nil }); err != nil { t.Fatal(err) } if got := store.state("q1"); got != "failed" { t.Fatalf("expected q1 → failed, got %s", got) } } // TestSweeper_PurgesFinishedRows verifies Sweep calls // PurgeFinishedJobs with the right cutoff. func TestSweeper_PurgesFinishedRows(t *testing.T) { store := newFakeStore() old := time.Now().Add(-25 * time.Hour) finished := time.Now() store.rows["old"] = &storeRow{ jobID: "old", lane: "x", state: "finished", enqueuedAt: old, finishedAt: &old, } store.rows["recent"] = &storeRow{ jobID: "recent", lane: "x", state: "finished", enqueuedAt: finished, finishedAt: &finished, } sw := NewSweeper(store, func() time.Duration { return 24 * time.Hour }, nil) sw.Sweep(context.Background()) if _, ok := store.rows["old"]; ok { t.Fatal("old row should have been purged") } if _, ok := store.rows["recent"]; !ok { t.Fatal("recent row should remain") } } // TestSweeper_RetentionIsDynamic verifies the retention function is // called per Sweep, so a runtime convar change takes effect. func TestSweeper_RetentionIsDynamic(t *testing.T) { store := newFakeStore() called := 0 retention := func() time.Duration { called++ return time.Hour } sw := NewSweeper(store, retention, nil) sw.Sweep(context.Background()) sw.Sweep(context.Background()) if called != 2 { t.Fatalf("expected retention called twice, got %d", called) } }