diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 5eb40cdb9837..4a72e27a5b11 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -369,7 +369,19 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() defer ml.mu.Unlock() - return ml.deleteProcess(modelName) + return ml.deleteProcess(modelName, false) +} + +// ShutdownModelForce stops a backend without waiting for an in-flight gRPC +// call to finish first. It is used by the watchdog's busy-killer, which only +// fires once a backend has been stuck on a call past the busy timeout — the +// graceful ShutdownModel would block forever on that stuck call (while +// holding ml.mu), preventing every other model load. See deleteProcess. +func (ml *ModelLoader) ShutdownModelForce(modelName string) error { + ml.mu.Lock() + defer ml.mu.Unlock() + + return ml.deleteProcess(modelName, true) } func (ml *ModelLoader) CheckIsLoaded(s string) *Model { @@ -411,7 +423,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model { // Timeouts may mean the node is busy, so keep the model cached. if isConnectionError(err) { xlog.Warn("Remote model unreachable (connection error), removing from cache", "model", s, "error", err) - if delErr := ml.deleteProcess(s); delErr != nil { + if delErr := ml.deleteProcess(s, false); delErr != nil { xlog.Error("error cleaning up remote model", "error", delErr, "model", s) } return nil @@ -422,7 +434,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model { if !process.IsAlive() { xlog.Debug("GRPC Process is not responding", "model", s) // stop and delete the process, this forces to re-load the model and re-create again the service - err := ml.deleteProcess(s) + err := ml.deleteProcess(s, false) if err != nil { xlog.Error("error stopping process", "error", err, "process", s) } diff --git a/pkg/model/process.go b/pkg/model/process.go index 95e3e0758464..a6fc9cef4542 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -22,40 +22,57 @@ var ( modelNotFoundErr = errors.New("model not found") ) -func (ml *ModelLoader) deleteProcess(s string) error { +// deleteProcess stops and removes a backend. The force flag trades a graceful +// shutdown for a prompt one and is meant for the watchdog's busy-killer: a +// backend that has been busy past the watchdog timeout is, by definition, +// stuck in an in-flight gRPC call. Waiting for that call to finish before +// stopping the process (the graceful path) would block forever — and since +// deleteProcess runs under ml.mu, it would stall every other model load, +// including the shared opus backend load at the start of every realtime +// (WebRTC) session, hanging new connections at "Connected, waiting for +// session...". The force path stops the process first, which drops the +// in-flight call's gRPC connection and unblocks it, then cleans up. +func (ml *ModelLoader) deleteProcess(s string, force bool) error { model, ok := ml.store.Get(s) if !ok { xlog.Debug("Model not found", "model", s) return modelNotFoundErr } - retries := 1 - for model.GRPC(false, ml.wd).IsBusy() { - xlog.Debug("Model busy. Waiting.", "model", s) - dur := time.Duration(retries*2) * time.Second - if dur > retryTimeout { - dur = retryTimeout - } - time.Sleep(dur) - retries++ + if !force { + retries := 1 + for model.GRPC(false, ml.wd).IsBusy() { + xlog.Debug("Model busy. Waiting.", "model", s) + dur := time.Duration(retries*2) * time.Second + if dur > retryTimeout { + dur = retryTimeout + } + time.Sleep(dur) + retries++ - if retries > 10 && forceBackendShutdown { - xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries) - break + if retries > 10 && forceBackendShutdown { + xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries) + break + } } } - xlog.Debug("Deleting process", "model", s) + xlog.Debug("Deleting process", "model", s, "force", force) // Run unload hooks (e.g. close MCP sessions) for _, hook := range ml.onUnloadHooks { hook(s) } - // Free GPU resources before stopping the process to ensure VRAM is released - xlog.Debug("Calling Free() to release GPU resources", "model", s) - if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil { - xlog.Warn("Error freeing GPU resources", "error", err, "model", s) + // Free GPU resources before stopping the process to ensure VRAM is + // released. Skipped on force-shutdown: a stuck-busy backend won't answer + // a Free RPC (it's hung on the same stuck call), and stopping the process + // releases its VRAM anyway. + if !force { + xlog.Debug("Calling Free() to release GPU resources", "model", s) + if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil { + xlog.Warn("Error freeing GPU resources", "error", err, "model", s) + } } process := model.Process() @@ -103,7 +120,7 @@ func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error { return true }) for _, k := range toDelete { - e := ml.deleteProcess(k) + e := ml.deleteProcess(k, false) err = errors.Join(err, e) } return err diff --git a/pkg/model/watchdog.go b/pkg/model/watchdog.go index d6dd18da8d89..a961333477d8 100644 --- a/pkg/model/watchdog.go +++ b/pkg/model/watchdog.go @@ -62,6 +62,11 @@ type WatchDog struct { type ProcessManager interface { ShutdownModel(modelName string) error + // ShutdownModelForce stops the backend without waiting for an in-flight + // gRPC call to finish. Used when the watchdog evicts a backend that is + // stuck busy: the graceful ShutdownModel would block on that stuck call + // (while holding the loader's mutex), stalling every other model load. + ShutdownModelForce(modelName string) error } // NewWatchDog creates a new WatchDog with the provided options. @@ -342,6 +347,17 @@ type modelUsageInfo struct { sizeBytes int64 // on-disk file size; 0 if unknown } +// evictionTarget is a model selected for eviction, retaining whether it was busy +// at selection time. A busy target must be shut down via ShutdownModelForce: +// the graceful path waits for its in-flight gRPC call to finish, but a busy +// eviction only happens when that call is stuck (busy-killer) or the operator +// opted into forceEvictionWhenBusy — either way, waiting would deadlock while +// holding the loader mutex. +type evictionTarget struct { + model string + wasBusy bool +} + // EnforceLRULimitResult contains the result of LRU enforcement type EnforceLRULimitResult struct { EvictedCount int // Number of models successfully evicted @@ -410,13 +426,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { needMore := len(modelsToShutdown) < modelsToEvict && skippedBusyCount > 0 wd.Unlock() - // Now shutdown models without holding the watchdog lock to prevent deadlock - for _, model := range modelsToShutdown { - if err := wd.pm.ShutdownModel(model); err != nil { - xlog.Error("[WatchDog] error shutting down model during LRU eviction", "error", err, "model", model) - } - xlog.Debug("[WatchDog] LRU eviction complete", "model", model) - } + // Now shutdown models without holding the watchdog lock to prevent + // deadlock. Busy targets go through the force path so the loader doesn't + // wait on their stuck in-flight call (which would re-deadlock here). + wd.shutdownEvicted(modelsToShutdown, "LRU eviction") if needMore { xlog.Warn("[WatchDog] LRU eviction incomplete", "evicted", len(modelsToShutdown), "needed", modelsToEvict, "skippedBusy", skippedBusyCount, "reason", "some models are busy with active API calls") @@ -431,9 +444,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { // collectEvictionsLocked walks `candidates` (already in eviction order) and // untracks up to `maxToEvict` models that are eligible for eviction. Pinned // models are always skipped; busy models are skipped unless `force` is true. -// Returns the names of evicted models and the number skipped because they -// were busy. Must be called with wd.Lock() held. -func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []string, skippedBusy int) { +// Returns the evicted models (with their busy state at selection time) and +// the number skipped because they were busy. Must be called with wd.Lock() +// held. +func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []evictionTarget, skippedBusy int) { for i := 0; len(evicted) < maxToEvict && i < len(candidates); i++ { m := candidates[i] if wd.pinnedModels[m.model] { @@ -447,12 +461,31 @@ func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvi continue } xlog.Info("[WatchDog] evicting model", "model", m.model, "busy", isBusy) - evicted = append(evicted, m.model) + evicted = append(evicted, evictionTarget{model: m.model, wasBusy: isBusy}) wd.untrack(m.address) } return evicted, skippedBusy } +// shutdownEvicted shuts down each evicted model, using the force path for any +// that were busy at selection time so a stuck in-flight call doesn't deadlock +// the graceful ShutdownModel (which waits for that call while holding the +// loader's mutex). +func (wd *WatchDog) shutdownEvicted(targets []evictionTarget, label string) { + for _, t := range targets { + var err error + if t.wasBusy { + err = wd.pm.ShutdownModelForce(t.model) + } else { + err = wd.pm.ShutdownModel(t.model) + } + if err != nil { + xlog.Error("[WatchDog] error shutting down model during "+label, "error", err, "model", t.model, "busy", t.wasBusy) + } + xlog.Debug("[WatchDog] "+label+" complete", "model", t.model, "busy", t.wasBusy) + } +} + // EnforceGroupExclusivity evicts every loaded model that shares at least one // concurrency group with the requested model. The pinned/busy/retry semantics // match EnforceLRULimit so the loader's retry loop can stay generic. @@ -502,12 +535,7 @@ func (wd *WatchDog) EnforceGroupExclusivity(requestedModel string) EnforceLRULim needMore := len(modelsToShutdown) < len(conflicts) wd.Unlock() - for _, m := range modelsToShutdown { - if err := wd.pm.ShutdownModel(m); err != nil { - xlog.Error("[WatchDog] error shutting down model during group eviction", "error", err, "model", m) - } - xlog.Debug("[WatchDog] Group eviction complete", "model", m) - } + wd.shutdownEvicted(modelsToShutdown, "group eviction") if needMore { xlog.Warn("[WatchDog] Group eviction incomplete", "requested", requestedModel, "evicted", len(modelsToShutdown), "needed", len(conflicts), "skippedBusy", skippedBusyCount, "reason", "some conflicts are busy or pinned") @@ -623,12 +651,19 @@ func (wd *WatchDog) checkBusy() { } wd.Unlock() - // Now shutdown models without holding the watchdog lock to prevent deadlock + // The busy-killer targets backends whose in-flight gRPC call has been + // stuck past the busy timeout. Use the force path so the loader stops + // the process FIRST (dropping the stuck call's gRPC connection) instead + // of waiting for that call to finish — the graceful path would block on + // that stuck call while holding the loader mutex and stall every other + // model load (notably the opus backend load at the start of every + // realtime WebRTC session, which hangs new sessions at "Connected, + // waiting for session..."). for _, model := range modelsToShutdown { - if err := wd.pm.ShutdownModel(model); err != nil { + if err := wd.pm.ShutdownModelForce(model); err != nil { xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) } - xlog.Debug("[WatchDog] model shut down", "model", model) + xlog.Debug("[WatchDog] busy model shut down", "model", model) } } @@ -743,10 +778,20 @@ func (wd *WatchDog) evictLRUModel() { xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed) + // A busy target only gets here when forceEvictionWhenBusy is true, i.e. + // the operator accepted evicting models with active calls. Route it + // through the force path so the loader stops the process first instead + // of blocking on the stuck in-flight call while holding its mutex. + _, wasBusy := wd.busyTime[lruModel.address] + wd.Unlock() // Shutdown the model - if err := wd.pm.ShutdownModel(lruModel.model); err != nil && err != modelNotFoundErr { + shutdown := wd.pm.ShutdownModel + if wasBusy { + shutdown = wd.pm.ShutdownModelForce + } + if err := shutdown(lruModel.model); err != nil && err != modelNotFoundErr { xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model) } else { // Untrack the model diff --git a/pkg/model/watchdog_test.go b/pkg/model/watchdog_test.go index a8bd47bf0929..e158b84c005e 100644 --- a/pkg/model/watchdog_test.go +++ b/pkg/model/watchdog_test.go @@ -13,6 +13,8 @@ import ( type mockProcessManager struct { mu sync.Mutex shutdownCalls []string + forceCalls []string + gracefulCalls []string shutdownErrors map[string]error } @@ -27,6 +29,18 @@ func (m *mockProcessManager) ShutdownModel(modelName string) error { m.mu.Lock() defer m.mu.Unlock() m.shutdownCalls = append(m.shutdownCalls, modelName) + m.gracefulCalls = append(m.gracefulCalls, modelName) + if err, ok := m.shutdownErrors[modelName]; ok { + return err + } + return nil +} + +func (m *mockProcessManager) ShutdownModelForce(modelName string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.shutdownCalls = append(m.shutdownCalls, modelName) + m.forceCalls = append(m.forceCalls, modelName) if err, ok := m.shutdownErrors[modelName]; ok { return err } @@ -41,6 +55,14 @@ func (m *mockProcessManager) getShutdownCalls() []string { return result } +func (m *mockProcessManager) getForceShutdownCalls() []string { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]string, len(m.forceCalls)) + copy(result, m.forceCalls) + return result +} + var _ = Describe("WatchDog", func() { var ( wd *model.WatchDog @@ -666,6 +688,35 @@ var _ = Describe("WatchDog", func() { }) }) + Context("Busy Killer", func() { + It("force-shuts down a model that is stuck busy past the busy timeout", func() { + // Regression: a backend stuck on an in-flight gRPC call must be + // killed via the force path (stop the process first), not the + // graceful one (wait for the stuck call to finish, which would + // deadlock while holding the loader mutex and stall every other + // model load — e.g. the opus backend load at the start of every + // realtime WebRTC session, hanging new "Connected, waiting for + // session..." connections). + wd = model.NewWatchDog( + model.WithProcessManager(pm), + model.WithBusyTimeout(10*time.Millisecond), + model.WithBusyCheck(true), + model.WithWatchdogInterval(20*time.Millisecond), + ) + + wd.AddAddressModelMap("addr1", "stuckModel") + wd.Mark("addr1") // busy — simulates an in-flight gRPC call + + go wd.Run() + defer wd.Shutdown() + + Eventually(func() []string { + return pm.getForceShutdownCalls() + }, "300ms", "10ms").Should(ContainElement("stuckModel")) + Expect(pm.getShutdownCalls()).To(ContainElement("stuckModel")) + }) + }) + Context("Concurrency Groups", func() { Describe("ReplaceModelGroups / GetModelGroups", func() { It("returns nil for unknown models", func() {