feat: add durable queue, single worker, and drain-by-model scheduling
Replace the Phase 2 in-flight chat gate (buffered channel) with a real SQLite-backed job queue and single worker loop. Every /api/chat request now creates a job row, blocks until the worker completes it, and returns the result transparently. Key changes: - internal/store: NextJob (drain-by-model ordering), IncrementAttempt, ResetInterruptedJobs, DeleteTerminalJobsBefore; busy_timeout pragma - internal/worker: single-threaded worker loop with Notifier for sync handler completion signaling; retry on ConnectionError, terminal fail on HTTPError; crash recovery resets interrupted jobs on startup - internal/webhook: dispatcher infrastructure for async webhook delivery - internal/server: chat handler rewritten to enqueue+wait; old chatGate removed; embeddings remain direct concurrent proxies (ADR-0013) - internal/config: FOREMAN_MAX_ATTEMPTS, FOREMAN_JOB_TTL Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+126
-13
@@ -103,23 +103,13 @@ CREATE TABLE IF NOT EXISTS artifacts (
|
||||
// What: opens the DB, sets pragmas, runs CREATE TABLE IF NOT EXISTS.
|
||||
// Test: call Open with a temp dir path, assert no error and that tables exist.
|
||||
func Open(path string) (*Store, error) {
|
||||
db, err := sql.Open("sqlite", path)
|
||||
// Append pragmas to the DSN so they apply to every connection in the pool.
|
||||
dsn := path + "?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)"
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sqlite %q: %w", path, err)
|
||||
}
|
||||
|
||||
// Enable WAL mode for concurrent readers.
|
||||
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("enable WAL mode: %w", err)
|
||||
}
|
||||
|
||||
// Enable foreign keys.
|
||||
if _, err := db.Exec("PRAGMA foreign_keys=ON"); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("enable foreign keys: %w", err)
|
||||
}
|
||||
|
||||
if _, err := db.Exec(migration); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("run migration: %w", err)
|
||||
@@ -364,3 +354,126 @@ func (s *Store) GetArtifactsByJob(jobID string) ([]Artifact, error) {
|
||||
|
||||
return artifacts, rows.Err()
|
||||
}
|
||||
|
||||
// NextJob returns the next queued job using drain-by-model ordering. Jobs for the
|
||||
// currently-resident model are preferred to avoid swap costs, then ordered by
|
||||
// creation time.
|
||||
//
|
||||
// Why: the worker loop must pick the optimal next job to minimize model swaps
|
||||
// (ADR-0009 drain-by-model heuristic).
|
||||
// What: queries for the first queued job, sorting by model affinity then FIFO.
|
||||
// Test: enqueue jobs for two models, set currentModel to one, verify it drains
|
||||
// that model first before switching.
|
||||
func (s *Store) NextJob(currentModel string) (Job, error) {
|
||||
var j Job
|
||||
var payload, result []byte
|
||||
|
||||
err := s.db.QueryRow(
|
||||
`SELECT id, model, payload, state, result, error, attempt, max_attempts,
|
||||
state_webhook_url, created_at, updated_at, started_at, completed_at
|
||||
FROM jobs
|
||||
WHERE state = ?
|
||||
ORDER BY (CASE WHEN model = ? THEN 0 ELSE 1 END) ASC, created_at ASC
|
||||
LIMIT 1`, string(JobStateQueued), currentModel,
|
||||
).Scan(
|
||||
&j.ID, &j.Model, &payload, &j.State, &result, &j.Error,
|
||||
&j.Attempt, &j.MaxAttempts, &j.StateWebhookURL,
|
||||
&j.CreatedAt, &j.UpdatedAt, &j.StartedAt, &j.CompletedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return Job{}, fmt.Errorf("next job: %w", err)
|
||||
}
|
||||
|
||||
j.Payload = json.RawMessage(payload)
|
||||
if result != nil {
|
||||
j.Result = json.RawMessage(result)
|
||||
}
|
||||
|
||||
return j, nil
|
||||
}
|
||||
|
||||
// IncrementAttempt bumps the attempt counter on a job and resets it to queued.
|
||||
//
|
||||
// Why: retry logic needs to record each attempt while re-queuing the job.
|
||||
// What: increments attempt by 1 and sets state back to queued.
|
||||
// Test: create a job, increment twice, verify attempt=2 and state=queued.
|
||||
func (s *Store) IncrementAttempt(id string) error {
|
||||
now := time.Now().UTC()
|
||||
res, err := s.db.Exec(
|
||||
`UPDATE jobs SET attempt = attempt + 1, state = ?, updated_at = ? WHERE id = ?`,
|
||||
string(JobStateQueued), now, id,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("increment attempt for job %s: %w", id, err)
|
||||
}
|
||||
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("check rows affected for job %s: %w", id, err)
|
||||
}
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("job %s not found", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetInterruptedJobs moves any loading or working jobs back to queued. Called
|
||||
// on startup to recover from a crash mid-execution.
|
||||
//
|
||||
// Why: if the daemon restarts while a job is in-flight, the job must not be stuck
|
||||
// in a non-terminal, non-queued state forever.
|
||||
// What: updates all loading/working jobs to queued.
|
||||
// Test: create jobs in loading/working states, call Reset, verify all are queued.
|
||||
func (s *Store) ResetInterruptedJobs() (int64, error) {
|
||||
now := time.Now().UTC()
|
||||
res, err := s.db.Exec(
|
||||
`UPDATE jobs SET state = ?, updated_at = ? WHERE state IN (?, ?)`,
|
||||
string(JobStateQueued), now,
|
||||
string(JobStateLoading), string(JobStateWorking),
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("reset interrupted jobs: %w", err)
|
||||
}
|
||||
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("check rows affected: %w", err)
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// DeleteTerminalJobsBefore deletes terminal jobs (done or failed) and their
|
||||
// artifacts older than the given cutoff time.
|
||||
//
|
||||
// Why: prevents unbounded storage growth by pruning old completed work (ADR-0008).
|
||||
// What: deletes artifacts first (FK), then jobs with completed_at before cutoff.
|
||||
// Test: create old terminal jobs, call with a recent cutoff, verify they are gone.
|
||||
func (s *Store) DeleteTerminalJobsBefore(cutoff time.Time) (int64, error) {
|
||||
// Delete artifacts for terminal jobs first (foreign key).
|
||||
_, err := s.db.Exec(
|
||||
`DELETE FROM artifacts WHERE job_id IN (
|
||||
SELECT id FROM jobs WHERE state IN (?, ?) AND completed_at < ?
|
||||
)`,
|
||||
string(JobStateDone), string(JobStateFailed), cutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("delete old artifacts: %w", err)
|
||||
}
|
||||
|
||||
res, err := s.db.Exec(
|
||||
`DELETE FROM jobs WHERE state IN (?, ?) AND completed_at < ?`,
|
||||
string(JobStateDone), string(JobStateFailed), cutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("delete old jobs: %w", err)
|
||||
}
|
||||
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("check rows affected: %w", err)
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user