package worker import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "path/filepath" "sync" "sync/atomic" "testing" "time" "gitea.stevedudenhoeffer.com/steve/foreman/internal/ollama" "gitea.stevedudenhoeffer.com/steve/foreman/internal/store" "gitea.stevedudenhoeffer.com/steve/foreman/internal/webhook" ) // openTestDB creates a fresh SQLite store in a temp directory for test isolation. func openTestDB(t *testing.T) *store.Store { t.Helper() path := filepath.Join(t.TempDir(), "test.db") s, err := store.Open(path) if err != nil { t.Fatalf("Open(%q): %v", path, err) } t.Cleanup(func() { s.Close() }) return s } // newTestWorker creates a worker with stub dependencies for testing. func newTestWorker(t *testing.T, client ollama.Client) (*Worker, *store.Store, *Notifier) { t.Helper() st := openTestDB(t) logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) inv := ollama.NewModelInventory(client, logger) notifier := NewNotifier() dispatcher := webhook.NewDispatcher("", logger) w := New(st, client, inv, notifier, dispatcher, logger) return w, st, notifier } // stubOllamaClient implements ollama.Client for worker tests. type stubOllamaClient struct { chatFunc func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) tags *ollama.TagsResponse ps *ollama.PsResponse mu sync.Mutex chatCalls []ollama.ChatRequest callCount atomic.Int32 } func (s *stubOllamaClient) Chat(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { s.callCount.Add(1) s.mu.Lock() s.chatCalls = append(s.chatCalls, req) s.mu.Unlock() if s.chatFunc != nil { return s.chatFunc(ctx, req, stream) } return &ollama.ChatResponse{ Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "test response"}, }, nil, nil } func (s *stubOllamaClient) Embed(ctx context.Context, req ollama.EmbedRequest) (*ollama.EmbedResponse, error) { return nil, fmt.Errorf("not implemented") } func (s *stubOllamaClient) Tags(ctx context.Context) (*ollama.TagsResponse, error) { if s.tags != nil { return s.tags, nil } return &ollama.TagsResponse{}, nil } func (s *stubOllamaClient) Ps(ctx context.Context) (*ollama.PsResponse, error) { if s.ps != nil { return s.ps, nil } return &ollama.PsResponse{}, nil } func (s *stubOllamaClient) RawChat(ctx context.Context, body []byte) (*http.Response, error) { return nil, fmt.Errorf("not implemented") } func (s *stubOllamaClient) RawEmbed(ctx context.Context, body []byte) (*http.Response, error) { return nil, fmt.Errorf("not implemented") } func TestWorker_ExecutesSingleJob(t *testing.T) { client := &stubOllamaClient{} w, st, notifier := newTestWorker(t, client) // Create a job. job := store.Job{ ID: "01TEST001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } // Register a waiter. waitCh := notifier.Register("01TEST001") // Run the worker. ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) // Wait for the job to complete. select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for job to complete") } // Check the result. state, result, errMsg, ok := notifier.Result("01TEST001") if !ok { t.Fatal("no result available") } if state != store.JobStateDone { t.Errorf("state = %q, want %q", state, store.JobStateDone) } if errMsg != nil { t.Errorf("unexpected error: %s", *errMsg) } if result == nil { t.Fatal("result should not be nil") } // Verify the job in the store. got, err := st.GetJob("01TEST001") if err != nil { t.Fatalf("GetJob: %v", err) } if got.State != store.JobStateDone { t.Errorf("stored state = %q, want %q", got.State, store.JobStateDone) } if got.CompletedAt == nil { t.Error("CompletedAt should be set") } // Verify artifact was created. artifact, err := st.GetArtifact("01TEST001", "completion") if err != nil { t.Fatalf("GetArtifact: %v", err) } if artifact.ContentType != "application/json" { t.Errorf("artifact content_type = %q, want %q", artifact.ContentType, "application/json") } } func TestWorker_SerialExecution(t *testing.T) { var inflight atomic.Int32 var maxInflight atomic.Int32 client := &stubOllamaClient{ chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { cur := inflight.Add(1) defer inflight.Add(-1) for { old := maxInflight.Load() if cur <= old || maxInflight.CompareAndSwap(old, cur) { break } } time.Sleep(30 * time.Millisecond) return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil }, } w, st, notifier := newTestWorker(t, client) // Create multiple jobs. for i := 0; i < 3; i++ { id := fmt.Sprintf("01SERIAL%03d", i) job := store.Job{ ID: id, Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } } // Register waiters for all jobs. waitCh := notifier.Register("01SERIAL002") ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) // Wait for last job. select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for jobs to complete") } if got := maxInflight.Load(); got > 1 { t.Errorf("max concurrent executions = %d, want 1", got) } if got := client.callCount.Load(); got != 3 { t.Errorf("chat call count = %d, want 3", got) } } func TestWorker_DrainByModel(t *testing.T) { var executionOrder []string var mu sync.Mutex client := &stubOllamaClient{ ps: &ollama.PsResponse{ Models: []ollama.RunningModel{ {Name: "qwen3:30b"}, }, }, chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { mu.Lock() executionOrder = append(executionOrder, req.Model) mu.Unlock() return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil }, } w, st, notifier := newTestWorker(t, client) // Refresh inventory to pick up the running model. if err := w.inventory.Refresh(context.Background()); err != nil { t.Fatalf("Refresh: %v", err) } // Create jobs: interleave two models, but qwen3:30b is currently resident. // job1: qwen3:14b (not resident) // job2: qwen3:30b (resident) // job3: qwen3:14b (not resident) // job4: qwen3:30b (resident) jobs := []struct { id string model string }{ {"01DRAIN001", "qwen3:14b"}, {"01DRAIN002", "qwen3:30b"}, {"01DRAIN003", "qwen3:14b"}, {"01DRAIN004", "qwen3:30b"}, } for _, j := range jobs { job := store.Job{ ID: j.id, Model: j.model, Payload: json.RawMessage(fmt.Sprintf(`{"model":"%s","messages":[{"role":"user","content":"hi"}]}`, j.model)), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob %s: %v", j.id, err) } } // Wait for last job. waitCh := notifier.Register("01DRAIN004") // Also register for the non-resident ones so we know when everything is done. waitCh3 := notifier.Register("01DRAIN003") ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) // Wait for all jobs. for _, ch := range []<-chan struct{}{waitCh, waitCh3} { select { case <-ch: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for jobs to complete") } } mu.Lock() defer mu.Unlock() // Drain-by-model: the resident model (qwen3:30b) jobs should execute first, // then the non-resident model (qwen3:14b) jobs. if len(executionOrder) != 4 { t.Fatalf("executed %d jobs, want 4", len(executionOrder)) } // First two should be qwen3:30b (the resident model). if executionOrder[0] != "qwen3:30b" || executionOrder[1] != "qwen3:30b" { t.Errorf("first two executions = %v, want [qwen3:30b, qwen3:30b]", executionOrder[:2]) } // Last two should be qwen3:14b. if executionOrder[2] != "qwen3:14b" || executionOrder[3] != "qwen3:14b" { t.Errorf("last two executions = %v, want [qwen3:14b, qwen3:14b]", executionOrder[2:]) } } func TestWorker_RetryOnConnectionError(t *testing.T) { callCount := atomic.Int32{} client := &stubOllamaClient{ chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { n := callCount.Add(1) if n == 1 { // First call fails with connection error. return nil, nil, &ollama.ConnectionError{URL: "http://test", Err: fmt.Errorf("connection refused")} } // Second call succeeds. return &ollama.ChatResponse{Model: req.Model, Done: true, Message: &ollama.Message{Role: "assistant", Content: "ok"}}, nil, nil }, } w, st, notifier := newTestWorker(t, client) job := store.Job{ ID: "01RETRY001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } waitCh := notifier.Register("01RETRY001") ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for job to complete") } state, _, _, _ := notifier.Result("01RETRY001") if state != store.JobStateDone { t.Errorf("state = %q, want %q", state, store.JobStateDone) } if got := callCount.Load(); got != 2 { t.Errorf("chat calls = %d, want 2 (1 fail + 1 success)", got) } // Verify attempt was incremented in the store. got, err := st.GetJob("01RETRY001") if err != nil { t.Fatalf("GetJob: %v", err) } if got.Attempt != 1 { t.Errorf("attempt = %d, want 1 (incremented once from retry)", got.Attempt) } } func TestWorker_MaxAttemptsExhausted(t *testing.T) { client := &stubOllamaClient{ chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { return nil, nil, &ollama.ConnectionError{URL: "http://test", Err: fmt.Errorf("connection refused")} }, } w, st, notifier := newTestWorker(t, client) job := store.Job{ ID: "01MAXATT001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 2, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } waitCh := notifier.Register("01MAXATT001") ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for job to fail") } state, _, errMsg, ok := notifier.Result("01MAXATT001") if !ok { t.Fatal("no result available") } if state != store.JobStateFailed { t.Errorf("state = %q, want %q", state, store.JobStateFailed) } if errMsg == nil { t.Fatal("error message should be set") } got, _ := st.GetJob("01MAXATT001") if got.State != store.JobStateFailed { t.Errorf("stored state = %q, want %q", got.State, store.JobStateFailed) } } func TestWorker_HTTPErrorIsTerminal(t *testing.T) { client := &stubOllamaClient{ chatFunc: func(ctx context.Context, req ollama.ChatRequest, stream bool) (*ollama.ChatResponse, <-chan ollama.ChatResponse, error) { return nil, nil, &ollama.HTTPError{StatusCode: 400, Body: "bad request"} }, } w, st, notifier := newTestWorker(t, client) job := store.Job{ ID: "01HTTP001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } waitCh := notifier.Register("01HTTP001") ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for job to fail") } state, _, _, _ := notifier.Result("01HTTP001") if state != store.JobStateFailed { t.Errorf("state = %q, want %q (HTTP errors should be terminal)", state, store.JobStateFailed) } // Verify only one attempt was made (no retries for HTTP errors). if got := client.callCount.Load(); got != 1 { t.Errorf("chat calls = %d, want 1 (HTTP errors should not retry)", got) } } func TestWorker_ResetInterruptedJobsOnStartup(t *testing.T) { client := &stubOllamaClient{} w, st, notifier := newTestWorker(t, client) // Manually create jobs in loading and working states (simulating a crash). job1 := store.Job{ ID: "01RESET001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job1); err != nil { t.Fatalf("CreateJob: %v", err) } if err := st.UpdateJobState("01RESET001", store.JobStateLoading, nil, nil); err != nil { t.Fatalf("UpdateJobState: %v", err) } job2 := store.Job{ ID: "01RESET002", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hello"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job2); err != nil { t.Fatalf("CreateJob: %v", err) } if err := st.UpdateJobState("01RESET002", store.JobStateWorking, nil, nil); err != nil { t.Fatalf("UpdateJobState: %v", err) } // Register waiters. waitCh1 := notifier.Register("01RESET001") waitCh2 := notifier.Register("01RESET002") // Start the worker — it should reset interrupted jobs and then process them. ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) // Wait for both jobs to complete. for _, ch := range []<-chan struct{}{waitCh1, waitCh2} { select { case <-ch: case <-time.After(5 * time.Second): t.Fatal("timed out waiting for reset jobs to complete") } } // Both should be done now. for _, id := range []string{"01RESET001", "01RESET002"} { got, err := st.GetJob(id) if err != nil { t.Fatalf("GetJob %s: %v", id, err) } if got.State != store.JobStateDone { t.Errorf("job %s state = %q, want %q", id, got.State, store.JobStateDone) } } } func TestNotifier_RegisterAndComplete(t *testing.T) { n := NewNotifier() ch := n.Register("test-job") // Channel should not be closed yet. select { case <-ch: t.Fatal("channel should not be closed before completion") default: } // Complete the job. result := json.RawMessage(`{"done":true}`) n.Complete("test-job", store.JobStateDone, result, nil) // Channel should be closed now. select { case <-ch: // Expected. default: t.Fatal("channel should be closed after completion") } // Get the result. state, res, errMsg, ok := n.Result("test-job") if !ok { t.Fatal("result should be available") } if state != store.JobStateDone { t.Errorf("state = %q, want %q", state, store.JobStateDone) } if string(res) != `{"done":true}` { t.Errorf("result = %s, want %s", res, `{"done":true}`) } if errMsg != nil { t.Errorf("unexpected error: %s", *errMsg) } // Second call should return not-found (cleaned up). _, _, _, ok = n.Result("test-job") if ok { t.Error("result should be cleaned up after first retrieval") } } func TestNotifier_CompleteWithoutRegister(t *testing.T) { n := NewNotifier() // Complete a job that nobody is waiting for. Should not panic. n.Complete("orphan-job", store.JobStateDone, nil, nil) // Result should still be retrievable even without a registered waiter. state, _, _, ok := n.Result("orphan-job") if !ok { t.Fatal("result should be available even without registered waiter") } if state != store.JobStateDone { t.Errorf("state = %q, want %q", state, store.JobStateDone) } } func TestWorker_WakeSignal(t *testing.T) { client := &stubOllamaClient{} w, st, notifier := newTestWorker(t, client) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go w.Run(ctx) // Give the worker time to start and block on the empty queue. time.Sleep(50 * time.Millisecond) // Now add a job and wake the worker. job := store.Job{ ID: "01WAKE001", Model: "qwen3:30b", Payload: json.RawMessage(`{"model":"qwen3:30b","messages":[{"role":"user","content":"hi"}]}`), MaxAttempts: 3, } if _, err := st.CreateJob(job); err != nil { t.Fatalf("CreateJob: %v", err) } waitCh := notifier.Register("01WAKE001") w.Wake() select { case <-waitCh: case <-time.After(5 * time.Second): t.Fatal("timed out: worker did not process job after wake signal") } state, _, _, _ := notifier.Result("01WAKE001") if state != store.JobStateDone { t.Errorf("state = %q, want %q", state, store.JobStateDone) } } func TestStore_NextJobDrainByModel(t *testing.T) { st := openTestDB(t) // Create jobs interleaved. for _, j := range []struct { id string model string }{ {"01A", "modelA"}, {"01B", "modelB"}, {"01C", "modelA"}, {"01D", "modelB"}, } { _, err := st.CreateJob(store.Job{ ID: j.id, Model: j.model, Payload: json.RawMessage(`{}`), }) if err != nil { t.Fatalf("CreateJob: %v", err) } } // With currentModel = modelB, we should get modelB jobs first. j1, err := st.NextJob("modelB") if err != nil { t.Fatalf("NextJob: %v", err) } if j1.Model != "modelB" { t.Errorf("first job model = %q, want modelB", j1.Model) } // Mark it done and get next. st.UpdateJobState(j1.ID, store.JobStateDone, nil, nil) j2, err := st.NextJob("modelB") if err != nil { t.Fatalf("NextJob: %v", err) } if j2.Model != "modelB" { t.Errorf("second job model = %q, want modelB", j2.Model) } // Mark done, now should get modelA. st.UpdateJobState(j2.ID, store.JobStateDone, nil, nil) j3, err := st.NextJob("modelB") if err != nil { t.Fatalf("NextJob: %v", err) } if j3.Model != "modelA" { t.Errorf("third job model = %q, want modelA", j3.Model) } } func TestStore_NextJobEmptyQueue(t *testing.T) { st := openTestDB(t) _, err := st.NextJob("any") if !errors.Is(err, sql.ErrNoRows) { t.Errorf("NextJob on empty queue: err = %v, want sql.ErrNoRows", err) } } func TestStore_IncrementAttempt(t *testing.T) { st := openTestDB(t) _, err := st.CreateJob(store.Job{ ID: "01INC", Model: "m", Payload: json.RawMessage(`{}`), }) if err != nil { t.Fatalf("CreateJob: %v", err) } // Mark as working, then increment. st.UpdateJobState("01INC", store.JobStateWorking, nil, nil) if err := st.IncrementAttempt("01INC"); err != nil { t.Fatalf("IncrementAttempt: %v", err) } got, _ := st.GetJob("01INC") if got.Attempt != 1 { t.Errorf("attempt = %d, want 1", got.Attempt) } if got.State != store.JobStateQueued { t.Errorf("state = %q, want %q (should be re-queued)", got.State, store.JobStateQueued) } } func TestStore_ResetInterruptedJobs(t *testing.T) { st := openTestDB(t) for _, j := range []struct { id string state store.JobState }{ {"01A", store.JobStateQueued}, {"01B", store.JobStateLoading}, {"01C", store.JobStateWorking}, {"01D", store.JobStateDone}, {"01E", store.JobStateFailed}, } { _, err := st.CreateJob(store.Job{ID: j.id, Model: "m", Payload: json.RawMessage(`{}`)}) if err != nil { t.Fatalf("CreateJob: %v", err) } if j.state != store.JobStateQueued { st.UpdateJobState(j.id, j.state, nil, nil) } } n, err := st.ResetInterruptedJobs() if err != nil { t.Fatalf("ResetInterruptedJobs: %v", err) } if n != 2 { t.Errorf("reset count = %d, want 2", n) } // Verify loading and working are back to queued. for _, id := range []string{"01B", "01C"} { j, _ := st.GetJob(id) if j.State != store.JobStateQueued { t.Errorf("job %s state = %q, want %q", id, j.State, store.JobStateQueued) } } // Verify done and failed are untouched. for _, tc := range []struct { id string want store.JobState }{ {"01D", store.JobStateDone}, {"01E", store.JobStateFailed}, } { j, _ := st.GetJob(tc.id) if j.State != tc.want { t.Errorf("job %s state = %q, want %q", tc.id, j.State, tc.want) } } } func TestStore_DeleteTerminalJobsBefore(t *testing.T) { st := openTestDB(t) // Create some terminal jobs. for _, j := range []struct { id string state store.JobState }{ {"01OLD1", store.JobStateDone}, {"01OLD2", store.JobStateFailed}, {"01ACTIVE", store.JobStateQueued}, } { _, err := st.CreateJob(store.Job{ID: j.id, Model: "m", Payload: json.RawMessage(`{}`)}) if err != nil { t.Fatalf("CreateJob: %v", err) } if j.state != store.JobStateQueued { errMsg := "some error" var errPtr *string if j.state == store.JobStateFailed { errPtr = &errMsg } st.UpdateJobState(j.id, j.state, nil, errPtr) } } // Delete terminal jobs older than right now (all terminal jobs are "old"). cutoff := time.Now().UTC().Add(1 * time.Minute) n, err := st.DeleteTerminalJobsBefore(cutoff) if err != nil { t.Fatalf("DeleteTerminalJobsBefore: %v", err) } if n != 2 { t.Errorf("deleted = %d, want 2", n) } // Active job should still exist. _, err = st.GetJob("01ACTIVE") if err != nil { t.Errorf("active job should still exist: %v", err) } // Deleted jobs should be gone. for _, id := range []string{"01OLD1", "01OLD2"} { _, err := st.GetJob(id) if !errors.Is(err, sql.ErrNoRows) { t.Errorf("job %s should be deleted but got err: %v", id, err) } } }