package lane import ( "context" "errors" "fmt" "log/slog" "time" ) // PersistenceStore is the narrow surface PersistedLane needs to // persist and recover lane jobs across process restarts. // // Why an interface here vs reaching into pkg/logic/skills directly: // keeps the lane primitive generic — anyone with a job-row table that // satisfies these six methods can plug in. pkg/logic/skills.Storage // satisfies it via a thin adapter (PersistedSkillsStore). // // Test: persistence_test.go covers the round-trip + restart recovery // flow using an in-memory fake store. type PersistenceStore interface { // EnqueueJob writes a row in state=queued. lane is the lane // name; metadata is opaque payload preserved verbatim across // restart for reconstruct paths. EnqueueJob(ctx context.Context, jobID, lane, callerID string, priority int, metadata []byte) error // UpdateJobState transitions the row to a new state. The state // strings are the QueueJobState values from // pkg/logic/skills/skill_queue_job.go ("queued", "running", // "finished", "cancelled", "failed"). Stamps the matching // timestamp column. UpdateJobState(ctx context.Context, jobID string, state string, at time.Time) error // ListQueuedJobs returns rows in state=queued for the given // lane. Used by Recover to re-submit pending work. ListQueuedJobs(ctx context.Context, lane string) ([]QueuedJobRef, error) // ListRunningJobs returns rows in state=running for the given // lane. After a process restart these are unrecoverable (the // worker goroutine is gone) and Recover marks them failed. ListRunningJobs(ctx context.Context, lane string) ([]QueuedJobRef, error) // PurgeFinishedJobs deletes terminal-state rows older than the // cutoff. Returns count deleted. PurgeFinishedJobs(ctx context.Context, olderThan time.Time) (int64, error) } // QueuedJobRef is a thin row reference returned by List* methods. // Carries enough state for Recover to reconstruct or mark a job. // // Why a separate type from the skills.QueueJob domain: the lane // package doesn't import the skills package (and would create an // import cycle if it did). The narrow ref type keeps the contract // flat. type QueuedJobRef struct { JobID string Lane string CallerID string Priority int Metadata []byte EnqueuedAt time.Time } // MetadataProvider is the optional interface a Job can implement to // supply its restart-recovery payload. // // Why optional: not every job needs to be reconstructed (raw LLM // transport jobs are issued ad-hoc by callers; a restart just drops // the in-flight ones). Skills set Metadata so the executor can // rehydrate the original Invocation. type MetadataProvider interface { Metadata() []byte } // PersistedLane wraps a Lane with DB persistence. Submit writes a // row before delegating to the inner lane; Run state transitions // update the row in place. // // Why a wrapper vs baking persistence into the pool: keeps the // in-memory primitives test-friendly (pool_test.go runs without a // DB). Production wires a PersistedLane around each named lane that // needs restart recovery; lanes that don't (e.g. transient // LLM-transport lanes used by anonymous callers) can stay // in-memory only. type PersistedLane struct { inner Lane store PersistenceStore } // NewPersistedLane wraps an existing Lane with a persistence store. // The inner lane keeps doing all the in-memory queueing; the // PersistedLane writes a DB row for each Submit and updates state on // transitions. func NewPersistedLane(inner Lane, store PersistenceStore) *PersistedLane { return &PersistedLane{inner: inner, store: store} } // Inner returns the wrapped lane. Used by Recover to bypass the // persistence path on re-submission (the row already exists). func (p *PersistedLane) Inner() Lane { return p.inner } // Name delegates to the inner lane. func (p *PersistedLane) Name() string { return p.inner.Name() } // Submit writes the queued row, then delegates to the inner lane. // The job is wrapped so Run-time state transitions update the row. // // On enqueue-row write failure: returns the error WITHOUT submitting // to the inner lane. We don't want to dispatch a job that we couldn't // persist — admin visibility (and restart recovery) would then be // inconsistent with the running set. func (p *PersistedLane) Submit(ctx context.Context, job Job) (int, time.Duration, error) { var meta []byte if mp, ok := job.(MetadataProvider); ok { meta = mp.Metadata() } if err := p.store.EnqueueJob(ctx, job.ID(), p.inner.Name(), job.CallerID(), job.Priority(), meta); err != nil { return 0, 0, fmt.Errorf("persist enqueue: %w", err) } wrapped := &persistedJob{inner: job, store: p.store} return p.inner.Submit(ctx, wrapped) } // SubmitWait writes the queued row and blocks until Run completes // (or ctx is cancelled). Same persistence semantics as Submit. func (p *PersistedLane) SubmitWait(ctx context.Context, job Job) error { var meta []byte if mp, ok := job.(MetadataProvider); ok { meta = mp.Metadata() } if err := p.store.EnqueueJob(ctx, job.ID(), p.inner.Name(), job.CallerID(), job.Priority(), meta); err != nil { return fmt.Errorf("persist enqueue: %w", err) } wrapped := &persistedJob{inner: job, store: p.store} return p.inner.SubmitWait(ctx, wrapped) } // Cancel removes the job from the inner queue and writes // state=cancelled to the persistence store. If Cancel returns // ErrNotQueued (already running, etc.) the row state is NOT touched — // the caller knows the job is past the queue stage. func (p *PersistedLane) Cancel(jobID string) error { if err := p.inner.Cancel(jobID); err != nil { return err } // Inner cancel succeeded — update DB. if uerr := p.store.UpdateJobState(context.Background(), jobID, string(stateCancelled), time.Now()); uerr != nil { // Best-effort: log; return nil because the in-memory // cancellation already happened. slog.Warn("lane persist: cancel state update failed", "job", jobID, "error", uerr) } return nil } // Stats delegates to the inner lane. func (p *PersistedLane) Stats() LaneStats { return p.inner.Stats() } // SetMaxConcurrent delegates to the inner lane. func (p *PersistedLane) SetMaxConcurrent(n int) { p.inner.SetMaxConcurrent(n) } // Recover reconciles the persistence store with the in-memory lane // after a process restart. // // - Rows in state=running at restart correspond to jobs whose // worker goroutine was killed. They are marked failed (no // auto-retry — skills can have side effects, see v6 spec // "Restart amnesia"). // - Rows in state=queued are re-submitted to the inner lane via // reconstructFn(ref) → Job. If reconstructFn returns nil the row // is marked failed with reason "lost on restart" — the caller // could not reconstruct the original work. // // Recover bypasses the PersistedLane.Submit path (which would write a // duplicate row). The row already exists in state=queued; we just // re-submit to the in-memory queue and let normal Run-time // transitions take over from there. func (p *PersistedLane) Recover(ctx context.Context, reconstructFn func(QueuedJobRef) Job) error { // 1. Mark running rows as failed. running, err := p.store.ListRunningJobs(ctx, p.inner.Name()) if err != nil { return fmt.Errorf("list running: %w", err) } for _, ref := range running { if uerr := p.store.UpdateJobState(ctx, ref.JobID, string(stateFailed), time.Now()); uerr != nil { slog.Warn("lane recover: failed to mark lost-on-restart", "lane", p.inner.Name(), "job", ref.JobID, "error", uerr) continue } slog.Warn("lane recover: job lost on restart", "lane", p.inner.Name(), "job", ref.JobID) } // 2. Re-submit queued rows. queued, err := p.store.ListQueuedJobs(ctx, p.inner.Name()) if err != nil { return fmt.Errorf("list queued: %w", err) } for _, ref := range queued { var job Job if reconstructFn != nil { job = reconstructFn(ref) } if job == nil { if uerr := p.store.UpdateJobState(ctx, ref.JobID, string(stateFailed), time.Now()); uerr != nil { slog.Warn("lane recover: cannot reconstruct, mark-failed errored", "lane", p.inner.Name(), "job", ref.JobID, "error", uerr) } else { slog.Warn("lane recover: cannot reconstruct, marked failed", "lane", p.inner.Name(), "job", ref.JobID) } continue } // Wrap the reconstructed job so Run-time state transitions // still update the existing row (no fresh enqueue). wrapped := &persistedJob{inner: job, store: p.store} if _, _, serr := p.inner.Submit(ctx, wrapped); serr != nil { slog.Warn("lane recover: re-submit failed", "lane", p.inner.Name(), "job", ref.JobID, "error", serr) // Mark failed — job is in DB as queued but in-memory // dispatch never happened. if uerr := p.store.UpdateJobState(ctx, ref.JobID, string(stateFailed), time.Now()); uerr != nil { slog.Warn("lane recover: post-resubmit-failure mark errored", "job", ref.JobID, "error", uerr) } } } return nil } // persistedJob wraps an inner Job to write state transitions on // Run() entry and exit. type persistedJob struct { inner Job store PersistenceStore } func (p *persistedJob) ID() string { return p.inner.ID() } func (p *persistedJob) CallerID() string { return p.inner.CallerID() } func (p *persistedJob) Priority() int { return p.inner.Priority() } func (p *persistedJob) Metadata() []byte { if mp, ok := p.inner.(MetadataProvider); ok { return mp.Metadata() } return nil } func (p *persistedJob) Run(ctx context.Context) error { // Mark running. if uerr := p.store.UpdateJobState(ctx, p.inner.ID(), string(stateRunning), time.Now()); uerr != nil { // Don't abort the run if the audit write fails — the // inner work is what the caller asked for. Log and continue. slog.Warn("lane persist: state=running update failed", "job", p.inner.ID(), "error", uerr) } err := p.inner.Run(ctx) terminal := stateFinished if err != nil { // Cancellation surfaced as ErrCancelled (queued cancel) is // already written by PersistedLane.Cancel; if it bubbles up // here that means Run was called and Run returned with the // cancellation error — record as cancelled. if errors.Is(err, ErrCancelled) { terminal = stateCancelled } else { terminal = stateFailed } } if uerr := p.store.UpdateJobState(ctx, p.inner.ID(), string(terminal), time.Now()); uerr != nil { slog.Warn("lane persist: terminal state update failed", "job", p.inner.ID(), "state", terminal, "error", uerr) } return err } // Internal copies of the QueueJobState string constants. Why duplicate // them here vs importing skills: pkg/lane is generic and cannot // import skills (would create a cycle). Production callers wire the // PersistedLane via an adapter that satisfies PersistenceStore — // the strings are part of the contract. const ( stateRunning = "running" stateFinished = "finished" stateCancelled = "cancelled" stateFailed = "failed" ) // Sweeper periodically purges finished/cancelled/failed rows older // than the configured retention window. // // Why a separate goroutine struct vs reusing // pkg/logic/skills.StorageSweeper: the queue rows are owned by the // lane primitive; keeping the sweeper in pkg/lane lets future lane // users (LLM transport, GPU lanes) share it without pulling in skills // concerns. // // Test: persistence_test.go drives Sweep synchronously. type Sweeper struct { store PersistenceStore clock func() time.Time interval time.Duration // retention is computed at Sweep call time so a runtime convar // change takes effect without restart. retention func() time.Duration } // NewSweeper constructs the sweeper. retention may be nil → defaults // to 24h. clock may be nil → time.Now. func NewSweeper(store PersistenceStore, retention func() time.Duration, clock func() time.Time) *Sweeper { if clock == nil { clock = time.Now } if retention == nil { retention = func() time.Duration { return 24 * time.Hour } } return &Sweeper{ store: store, clock: clock, retention: retention, interval: time.Hour, } } // SetInterval overrides the loop cadence. interval <= 0 is a no-op. func (s *Sweeper) SetInterval(d time.Duration) { if d > 0 { s.interval = d } } // Start launches the sweeper loop. Returns immediately; cancellation // via ctx. func (s *Sweeper) Start(ctx context.Context) { go s.loop(ctx) } // Sweep runs one purge pass synchronously. Public for tests. func (s *Sweeper) Sweep(ctx context.Context) { cutoff := s.clock().Add(-s.retention()) n, err := s.store.PurgeFinishedJobs(ctx, cutoff) if err != nil { slog.Warn("lane sweeper: purge failed", "error", err) return } if n > 0 { slog.Info("lane sweeper: purged finished jobs", "deleted", n) } } func (s *Sweeper) loop(ctx context.Context) { tick := time.NewTicker(s.interval) defer tick.Stop() // Startup delay so cold-start load doesn't stack everything in // the first second. 90s is a reasonable spread. startup := time.NewTimer(90 * time.Second) defer startup.Stop() for { select { case <-ctx.Done(): return case <-startup.C: s.Sweep(ctx) case <-tick.C: s.Sweep(ctx) } } }