// Package worker implements the single-worker loop that pulls jobs from the // SQLite queue, executes them against the Ollama target, and records results. // // Why: foreman serializes all chat work through one worker to avoid swap thrash // on the target (ADR-0009). The worker is the only writer of job state transitions. // What: runs a goroutine that picks the next job (drain-by-model), calls Ollama, // stores the result, fires webhooks, and notifies waiting sync handlers. // Test: create with a stub client, enqueue jobs, verify serial execution, // drain-by-model ordering, retry on connection error, and completion notification. package worker import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log/slog" "strconv" "sync" "time" "gitea.stevedudenhoeffer.com/steve/foreman/internal/ollama" "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" ) // Notifier manages channels that sync HTTP handlers use to wait for job completion. // // Why: the /api/chat handler blocks until its job finishes; a notification map // avoids polling the DB. // What: maps job IDs to channels; the worker signals completion by closing the channel. // Test: register a waiter, complete the job, verify the channel unblocks. type Notifier struct { mu sync.Mutex waiters map[string]chan struct{} // results stores the terminal job state so the waiter can read it after notification. results map[string]jobResult } type jobResult struct { State store.JobState Result json.RawMessage Error *string } // NewNotifier creates a new Notifier. func NewNotifier() *Notifier { return &Notifier{ waiters: make(map[string]chan struct{}), results: make(map[string]jobResult), } } // Register creates a wait channel for the given job ID. The caller should select // on the returned channel and their context. // // Why: each sync chat handler needs its own completion signal. // What: allocates a buffered channel keyed by job ID. // Test: register, verify channel is open, complete, verify it closes. func (n *Notifier) Register(jobID string) <-chan struct{} { n.mu.Lock() defer n.mu.Unlock() ch := make(chan struct{}) n.waiters[jobID] = ch return ch } // Complete signals that the job has reached a terminal state and stores the result. // // Why: the worker calls this when a job is done or failed; the HTTP handler unblocks. // What: closes the wait channel and stores the result for retrieval. // Test: register, complete, verify the channel is closed and result is available. func (n *Notifier) Complete(jobID string, state store.JobState, result json.RawMessage, errMsg *string) { n.mu.Lock() defer n.mu.Unlock() n.results[jobID] = jobResult{State: state, Result: result, Error: errMsg} if ch, ok := n.waiters[jobID]; ok { close(ch) delete(n.waiters, jobID) } } // Result returns the stored result for a completed job, if any. // // Why: after the wait channel closes, the HTTP handler needs the result data. // What: returns the cached result and cleans up. // Test: complete a job, call Result, verify data, call again, verify cleaned up. func (n *Notifier) Result(jobID string) (store.JobState, json.RawMessage, *string, bool) { n.mu.Lock() defer n.mu.Unlock() r, ok := n.results[jobID] if ok { delete(n.results, jobID) } return r.State, r.Result, r.Error, ok } // Worker is the single-threaded job execution loop. type Worker struct { store *store.Store client ollama.Client inventory *ollama.ModelInventory notifier *Notifier dispatcher *webhook.Dispatcher logger *slog.Logger // keepAlive is the JSON-encoded keep_alive value sent in outbound chat requests // to control how long the worker model stays resident on the target. Derived from // FOREMAN_KEEP_ALIVE config; does not affect the embedder. keepAlive json.RawMessage // wake is signaled when a new job is enqueued. wake chan struct{} } // New creates a new Worker. // // Why: dependency injection makes the worker testable with stub clients and stores. // What: wires all dependencies and creates the wake channel. // Test: create with stubs, call Run in a goroutine, enqueue a job, verify execution. func New( st *store.Store, client ollama.Client, inv *ollama.ModelInventory, notifier *Notifier, dispatcher *webhook.Dispatcher, logger *slog.Logger, keepAlive string, ) *Worker { return &Worker{ store: st, client: client, inventory: inv, notifier: notifier, dispatcher: dispatcher, logger: logger, keepAlive: encodeKeepAlive(keepAlive), wake: make(chan struct{}, 1), } } // encodeKeepAlive converts a FOREMAN_KEEP_ALIVE config string to a json.RawMessage // suitable for the Ollama ChatRequest KeepAlive field. // // Why: Ollama's keep_alive field accepts either a JSON number (seconds, or -1 for // forever) or a JSON string duration ("15m", "1h"). Pure-numeric values and "-1" // are encoded as JSON numbers; everything else is encoded as a JSON string. // What: returns a json.RawMessage containing the appropriate JSON representation. // Test: assert "-1" -> `-1`, "0" -> `0`, "15m" -> `"15m"`, "3600" -> `3600`. func encodeKeepAlive(val string) json.RawMessage { if val == "" { val = "-1" } // If the value parses as an integer, emit it as a JSON number. // This covers "-1", "0", "3600", etc. if _, err := strconv.Atoi(val); err == nil { return json.RawMessage(val) } // Otherwise, emit it as a JSON string (e.g. "15m", "1h"). b, _ := json.Marshal(val) return json.RawMessage(b) } // Wake signals the worker that a new job may be available. Non-blocking. // // Why: the HTTP handlers signal the worker to check for new work immediately // instead of waiting for the next poll cycle. // What: sends on the wake channel (drops if already pending). // Test: call Wake multiple times, verify no blocking. func (w *Worker) Wake() { select { case w.wake <- struct{}{}: default: } } // Run starts the worker loop. It blocks until ctx is cancelled. On startup it // resets any interrupted jobs back to queued. // // Why: the main loop is the core of foreman's job execution (ADR-0009). // What: resets interrupted jobs, then loops: pick next job, execute, record result. // Test: enqueue jobs, run worker with a cancellable context, verify all execute. func (w *Worker) Run(ctx context.Context) { // Reset any jobs stuck in loading/working from a previous crash. if n, err := w.store.ResetInterruptedJobs(); err != nil { w.logger.Error("failed to reset interrupted jobs", "error", err) } else if n > 0 { w.logger.Info("reset interrupted jobs", "count", n) } for { if ctx.Err() != nil { return } currentModel := w.currentWorkerModel() job, err := w.store.NextJob(currentModel) if err != nil { if errors.Is(err, sql.ErrNoRows) { // No jobs available — wait for a wake signal or context cancel. select { case <-w.wake: continue case <-ctx.Done(): return } } w.logger.Error("failed to fetch next job", "error", err) select { case <-time.After(1 * time.Second): continue case <-ctx.Done(): return } } w.executeJob(ctx, job) } } // currentWorkerModel returns the model name currently in the worker slot (slot 2). // The embedder is in slot 1; any other model is the worker model. func (w *Worker) currentWorkerModel() string { residents := w.inventory.ResidentModels() embedModel := w.getEmbedModel() for _, r := range residents { if r.Name != embedModel { return r.Name } } return "" } // getEmbedModel returns the embedder model name from the first resident model // that looks like an embedder slot. This is a heuristic — in practice the embed // model is the one that stays loaded with keep_alive=-1. func (w *Worker) getEmbedModel() string { // We check inventory for a model that matches common embed model patterns. // The simplest approach: the embedder is usually the first (smallest) resident. // However, we can't easily know which is which without config. // For now, return empty — drain-by-model still works because we prefer // whatever model is resident. return "" } // executeJob runs a single job through its lifecycle. func (w *Worker) executeJob(ctx context.Context, job store.Job) { w.logger.Info("executing job", "job_id", job.ID, "model", job.Model, "attempt", job.Attempt) // Determine if we need to load a new model. needsLoad := !w.isModelResident(job.Model) if needsLoad { w.transitionState(job, store.JobStateLoading) } w.transitionState(job, store.JobStateWorking) // Parse the payload into a ChatRequest. var chatReq ollama.ChatRequest if err := json.Unmarshal(job.Payload, &chatReq); err != nil { errMsg := fmt.Sprintf("invalid chat request payload: %v", err) w.failJob(job, &errMsg) return } // Ensure model is set. chatReq.Model = job.Model // Set stream to false for worker execution — we collect the full response. streamFalse := false chatReq.Stream = &streamFalse // Override keep_alive with the configured value so the worker model stays // resident for the desired duration. The embedder is pinned separately. chatReq.KeepAlive = w.keepAlive // Execute the chat request. resp, _, err := w.client.Chat(ctx, chatReq, false) if err != nil { w.handleExecutionError(job, err) return } // Marshal the result. resultBytes, err := json.Marshal(resp) if err != nil { errMsg := fmt.Sprintf("marshal result: %v", err) w.failJob(job, &errMsg) return } result := json.RawMessage(resultBytes) // Store the completion artifact. _, artifactErr := w.store.CreateArtifact(store.Artifact{ JobID: job.ID, Name: "completion", ContentType: "application/json", Data: resultBytes, }) if artifactErr != nil { w.logger.Error("failed to store artifact", "error", artifactErr, "job_id", job.ID) } // Transition to done. if err := w.store.UpdateJobState(job.ID, store.JobStateDone, result, nil); err != nil { w.logger.Error("failed to update job to done", "error", err, "job_id", job.ID) } // Notify waiting sync handlers. w.notifier.Complete(job.ID, store.JobStateDone, result, nil) // Fire webhook if configured. w.fireWebhook(job, store.JobStateDone, store.JobStateWorking, result, nil) w.logger.Info("job completed", "job_id", job.ID, "model", job.Model) } // transitionState updates a job's state and fires a webhook. func (w *Worker) transitionState(job store.Job, newState store.JobState) { prevState := job.State if err := w.store.UpdateJobState(job.ID, newState, nil, nil); err != nil { w.logger.Error("failed to transition job state", "error", err, "job_id", job.ID, "from", prevState, "to", newState) return } w.fireWebhook(job, newState, prevState, nil, nil) job.State = newState } // handleExecutionError handles errors from the Ollama client during job execution. func (w *Worker) handleExecutionError(job store.Job, err error) { var connErr *ollama.ConnectionError if errors.As(err, &connErr) { // Connection error — retryable. w.logger.Warn("job hit connection error, will retry", "job_id", job.ID, "error", err, "attempt", job.Attempt) if job.Attempt+1 >= job.MaxAttempts { errMsg := fmt.Sprintf("connection failed after %d attempts: %v", job.MaxAttempts, err) w.failJob(job, &errMsg) return } // Re-queue with incremented attempt. if err := w.store.IncrementAttempt(job.ID); err != nil { w.logger.Error("failed to increment attempt", "error", err, "job_id", job.ID) } return } // Non-connection error (HTTP 4xx/5xx from target) — terminal failure. errMsg := fmt.Sprintf("chat execution failed: %v", err) w.failJob(job, &errMsg) } // failJob transitions a job to failed and notifies waiters. func (w *Worker) failJob(job store.Job, errMsg *string) { if err := w.store.UpdateJobState(job.ID, store.JobStateFailed, nil, errMsg); err != nil { w.logger.Error("failed to mark job as failed", "error", err, "job_id", job.ID) } w.notifier.Complete(job.ID, store.JobStateFailed, nil, errMsg) w.fireWebhook(job, store.JobStateFailed, job.State, nil, errMsg) w.logger.Warn("job failed", "job_id", job.ID, "error", *errMsg) } // fireWebhook sends a webhook event if the job has a webhook URL configured. func (w *Worker) fireWebhook(job store.Job, state, prevState store.JobState, result json.RawMessage, errMsg *string) { if job.StateWebhookURL == nil || *job.StateWebhookURL == "" || w.dispatcher == nil { return } event := webhook.Event{ JobID: job.ID, State: string(state), PreviousState: string(prevState), Timestamp: time.Now().UTC(), Model: job.Model, Attempt: job.Attempt, Result: result, Error: errMsg, } // If done, include artifact metadata. if state == store.JobStateDone { artifacts, err := w.store.GetArtifactsByJob(job.ID) if err != nil { w.logger.Error("failed to get artifacts for webhook", "error", err, "job_id", job.ID) } else { metas := make([]webhook.ArtifactMeta, len(artifacts)) for i, a := range artifacts { metas[i] = webhook.ArtifactMeta{ Name: a.Name, ContentType: a.ContentType, Size: a.Size, Data: a.Data, } } event.Artifacts = webhook.FormatArtifacts(job.ID, metas) } } w.dispatcher.Fire(*job.StateWebhookURL, event) } // isModelResident checks whether the given model is currently loaded on the target. func (w *Worker) isModelResident(model string) bool { for _, r := range w.inventory.ResidentModels() { if r.Name == model { return true } } return false }