package scheduler import ( "fmt" "sort" "time" "github.com/mostlygeek/llama-swap/internal/logmon" "github.com/mostlygeek/llama-swap/internal/process" ) // Serial is a strict one-model-at-a-time scheduler. Unlike FIFO it never reorders // or batches: requests run in exact arrival order and at most one request runs at // any instant. When the next request targets a model other than the one loaded, // every other running model is evicted and the target is loaded before it runs, // so a single model occupies memory at a time — at the cost of throughput. // // Example: A B C A is served as A B C A. The final A reloads its model even // though it ran first, because B and C displaced it in between. (FIFO, by // contrast, would batch the two A requests: A A B C.) // // Serial ignores group/eviction policy entirely: it always evicts every other // running model, regardless of how groups are configured. That is what makes the // single-model guarantee a property of the scheduler rather than of the config. // // Like FIFO, every method runs on the router's single run-loop goroutine, so no // internal locking is needed. type Serial struct { name string logger *logmon.Monitor effects Effects // queued holds requests in strict arrival order. It is never reordered. queued []HandlerReq // active is the one request currently being processed (loading or serving), // or nil when idle. phase is meaningful only while active != nil. active *HandlerReq phase serialPhase } // serialPhase is the lifecycle stage of the active request. type serialPhase int const ( phaseIdle serialPhase = iota phaseSwapping // waiting for OnSwapDone for active.Model phaseServing // waiting for OnServeDone for active.Model ) // NewSerial builds a Serial scheduler. It takes no Swapper: eviction is always // "stop every other running model", so the group planner is not consulted. func NewSerial(name string, logger *logmon.Monitor, eff Effects) *Serial { return &Serial{ name: name, logger: logger, effects: eff, } } // OnRequest validates the model and appends the request to the tail of the queue, // then tries to start the next job. Unknown models fail immediately. func (s *Serial) OnRequest(req HandlerReq) { if _, ok := s.effects.ModelState(req.Model); !ok { s.logger.Debugf("%s: model %s not handled by this router", s.name, req.Model) s.effects.GrantError(req, ErrModelNotFound) return } s.queued = append(s.queued, req) broadcastQueuePositions(s.queued) s.startNext() } // startNext begins processing the head of the queue when nothing is active. It // fast-paths a request whose model is already the sole loaded-and-ready process; // otherwise it launches a swap that evicts every other running model first. The // loop skips over requests for models that vanished (e.g. a config reload) and // requests whose caller disconnected before they could be served. func (s *Serial) startNext() { if s.active != nil { return // a job is already loading or serving } for len(s.queued) > 0 { req := s.queued[0] s.queued = s.queued[1:] broadcastQueuePositions(s.queued) state, ok := s.effects.ModelState(req.Model) if !ok { s.effects.GrantError(req, ErrModelNotFound) continue } r := req s.active = &r evict := s.otherRunning(req.Model) if state == process.StateReady && len(evict) == 0 { // Already loaded and the only model running — serve immediately. s.logger.Debugf("%s: serving model %s (already loaded)", s.name, req.Model) if s.serve() { return } continue // caller gone; pick the next request } s.logger.Debugf("%s: swapping to model %s, evicting %v", s.name, req.Model, evict) s.phase = phaseSwapping s.effects.StartSwap(req.Model, evict) return } } // serve hands the active request its tracked handler. It returns true when the // request is now serving (await OnServeDone); false when the caller had already // disconnected, in which case active is cleared so the next job can start. func (s *Serial) serve() bool { if s.effects.GrantServe(*s.active, s.active.Model) { s.phase = phaseServing return true } s.logger.Debugf("%s: caller for model %s gone before serve", s.name, s.active.Model) s.active = nil s.phase = phaseIdle return false } // OnSwapDone fires when the load for the active request completes. On success the // request is served; on failure its caller receives the error and the queue // advances. A SwapDone that does not match the active load (e.g. its request was // unloaded or cancelled mid-load) is ignored. func (s *Serial) OnSwapDone(ev SwapDone) { if s.active == nil || s.phase != phaseSwapping || s.active.Model != ev.ModelID { return } if ev.Err != nil { s.logger.Debugf("%s: swap for model %s failed: %v", s.name, ev.ModelID, ev.Err) s.effects.GrantError(*s.active, ev.Err) s.active = nil s.phase = phaseIdle s.startNext() return } if !s.serve() { s.startNext() // caller vanished while the model loaded; move on } } // OnServeDone fires when the active request's handler returns. The slot is freed // and the next queued request begins. func (s *Serial) OnServeDone(ev ServeDoneEvent) { if s.active == nil || s.phase != phaseServing { return } s.active = nil s.phase = phaseIdle s.startNext() } // OnCancel removes a disconnected client's request from the queue. A request that // is already active is left to finish: if it was loading, OnSwapDone's serve() // will find the caller gone (GrantServe false) and advance; if it was serving, // its handler returns normally and reaches OnServeDone. func (s *Serial) OnCancel(req HandlerReq) { if len(s.queued) == 0 { return } kept := s.queued[:0] removed := false for _, q := range s.queued { if q.Respond == req.Respond { removed = true continue } kept = append(kept, q) } s.queued = kept if removed { s.logger.Debugf("%s: cancelled request for model %s pruned from queue", s.name, req.Model) broadcastQueuePositions(s.queued) } } // OnUnload reconciles state for an unload, stops the targeted processes, and // advances the queue. It mirrors the FIFO contract: queued requests for unloaded // models are failed; an active *loading* request for an unloaded model is failed // (its swap goroutine is left to finish and its SwapDone is then ignored); an // active *serving* request is left for its handler to end when StopProcesses // kills the upstream. The Stop is synchronous so callers of Unload can rely on // the processes being stopped on return. func (s *Serial) OnUnload(targets []string, timeout time.Duration) { unloadErr := fmt.Errorf("%s: model unloaded", s.name) targetSet := make(map[string]bool, len(targets)) for _, id := range targets { targetSet[id] = true } if s.active != nil && s.phase == phaseSwapping && targetSet[s.active.Model] { s.effects.GrantError(*s.active, unloadErr) s.active = nil s.phase = phaseIdle } if len(s.queued) > 0 { kept := s.queued[:0] for _, q := range s.queued { if targetSet[q.Model] { s.effects.GrantError(q, unloadErr) continue } kept = append(kept, q) } s.queued = kept broadcastQueuePositions(s.queued) } s.effects.StopProcesses(timeout, targets) // A still-serving active request advances via OnServeDone when its killed // handler returns; only start the next job when nothing is active now. if s.active == nil { s.startNext() } } // OnShutdown grants err to every request the scheduler still holds: an active // loading request and all queued requests. A serving request is torn down with // its process by the baseRouter. func (s *Serial) OnShutdown(err error) { if s.active != nil && s.phase == phaseSwapping { s.effects.GrantError(*s.active, err) s.active = nil s.phase = phaseIdle } for _, q := range s.queued { s.effects.GrantError(q, err) } s.queued = nil } // otherRunning returns every running model except target, sorted for // deterministic eviction. func (s *Serial) otherRunning(target string) []string { var out []string for id := range s.effects.RunningModels() { if id != target { out = append(out, id) } } sort.Strings(out) return out }