Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/model/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,19 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
ml.mu.Lock()
defer ml.mu.Unlock()

return ml.deleteProcess(modelName)
return ml.deleteProcess(modelName, false)
}

// ShutdownModelForce stops a backend without waiting for an in-flight gRPC
// call to finish first. It is used by the watchdog's busy-killer, which only
// fires once a backend has been stuck on a call past the busy timeout — the
// graceful ShutdownModel would block forever on that stuck call (while
// holding ml.mu), preventing every other model load. See deleteProcess.
func (ml *ModelLoader) ShutdownModelForce(modelName string) error {
ml.mu.Lock()
defer ml.mu.Unlock()

return ml.deleteProcess(modelName, true)
}

func (ml *ModelLoader) CheckIsLoaded(s string) *Model {
Expand Down Expand Up @@ -411,7 +423,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model {
// Timeouts may mean the node is busy, so keep the model cached.
if isConnectionError(err) {
xlog.Warn("Remote model unreachable (connection error), removing from cache", "model", s, "error", err)
if delErr := ml.deleteProcess(s); delErr != nil {
if delErr := ml.deleteProcess(s, false); delErr != nil {
xlog.Error("error cleaning up remote model", "error", delErr, "model", s)
}
return nil
Expand All @@ -422,7 +434,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model {
if !process.IsAlive() {
xlog.Debug("GRPC Process is not responding", "model", s)
// stop and delete the process, this forces to re-load the model and re-create again the service
err := ml.deleteProcess(s)
err := ml.deleteProcess(s, false)
if err != nil {
xlog.Error("error stopping process", "error", err, "process", s)
}
Expand Down
55 changes: 36 additions & 19 deletions pkg/model/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,57 @@ var (
modelNotFoundErr = errors.New("model not found")
)

func (ml *ModelLoader) deleteProcess(s string) error {
// deleteProcess stops and removes a backend. The force flag trades a graceful
// shutdown for a prompt one and is meant for the watchdog's busy-killer: a
// backend that has been busy past the watchdog timeout is, by definition,
// stuck in an in-flight gRPC call. Waiting for that call to finish before
// stopping the process (the graceful path) would block forever — and since
// deleteProcess runs under ml.mu, it would stall every other model load,
// including the shared opus backend load at the start of every realtime
// (WebRTC) session, hanging new connections at "Connected, waiting for
// session...". The force path stops the process first, which drops the
// in-flight call's gRPC connection and unblocks it, then cleans up.
func (ml *ModelLoader) deleteProcess(s string, force bool) error {
model, ok := ml.store.Get(s)
if !ok {
xlog.Debug("Model not found", "model", s)
return modelNotFoundErr
}

retries := 1
for model.GRPC(false, ml.wd).IsBusy() {
xlog.Debug("Model busy. Waiting.", "model", s)
dur := time.Duration(retries*2) * time.Second
if dur > retryTimeout {
dur = retryTimeout
}
time.Sleep(dur)
retries++
if !force {
retries := 1
for model.GRPC(false, ml.wd).IsBusy() {
xlog.Debug("Model busy. Waiting.", "model", s)
dur := time.Duration(retries*2) * time.Second
if dur > retryTimeout {
dur = retryTimeout
}
time.Sleep(dur)
retries++

if retries > 10 && forceBackendShutdown {
xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries)
break
if retries > 10 && forceBackendShutdown {
xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries)
break
}
}
}

xlog.Debug("Deleting process", "model", s)
xlog.Debug("Deleting process", "model", s, "force", force)

// Run unload hooks (e.g. close MCP sessions)
for _, hook := range ml.onUnloadHooks {
hook(s)
}

// Free GPU resources before stopping the process to ensure VRAM is released
xlog.Debug("Calling Free() to release GPU resources", "model", s)
if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil {
xlog.Warn("Error freeing GPU resources", "error", err, "model", s)
// Free GPU resources before stopping the process to ensure VRAM is
// released. Skipped on force-shutdown: a stuck-busy backend won't answer
// a Free RPC (it's hung on the same stuck call), and stopping the process
// releases its VRAM anyway.
if !force {
xlog.Debug("Calling Free() to release GPU resources", "model", s)
if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil {
xlog.Warn("Error freeing GPU resources", "error", err, "model", s)
}
}

process := model.Process()
Expand Down Expand Up @@ -103,7 +120,7 @@ func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error {
return true
})
for _, k := range toDelete {
e := ml.deleteProcess(k)
e := ml.deleteProcess(k, false)
err = errors.Join(err, e)
}
return err
Expand Down
87 changes: 66 additions & 21 deletions pkg/model/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type WatchDog struct {

type ProcessManager interface {
ShutdownModel(modelName string) error
// ShutdownModelForce stops the backend without waiting for an in-flight
// gRPC call to finish. Used when the watchdog evicts a backend that is
// stuck busy: the graceful ShutdownModel would block on that stuck call
// (while holding the loader's mutex), stalling every other model load.
ShutdownModelForce(modelName string) error
}

// NewWatchDog creates a new WatchDog with the provided options.
Expand Down Expand Up @@ -342,6 +347,17 @@ type modelUsageInfo struct {
sizeBytes int64 // on-disk file size; 0 if unknown
}

// evictionTarget is a model selected for eviction, retaining whether it was busy
// at selection time. A busy target must be shut down via ShutdownModelForce:
// the graceful path waits for its in-flight gRPC call to finish, but a busy
// eviction only happens when that call is stuck (busy-killer) or the operator
// opted into forceEvictionWhenBusy — either way, waiting would deadlock while
// holding the loader mutex.
type evictionTarget struct {
model string
wasBusy bool
}

// EnforceLRULimitResult contains the result of LRU enforcement
type EnforceLRULimitResult struct {
EvictedCount int // Number of models successfully evicted
Expand Down Expand Up @@ -410,13 +426,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult {
needMore := len(modelsToShutdown) < modelsToEvict && skippedBusyCount > 0
wd.Unlock()

// Now shutdown models without holding the watchdog lock to prevent deadlock
for _, model := range modelsToShutdown {
if err := wd.pm.ShutdownModel(model); err != nil {
xlog.Error("[WatchDog] error shutting down model during LRU eviction", "error", err, "model", model)
}
xlog.Debug("[WatchDog] LRU eviction complete", "model", model)
}
// Now shutdown models without holding the watchdog lock to prevent
// deadlock. Busy targets go through the force path so the loader doesn't
// wait on their stuck in-flight call (which would re-deadlock here).
wd.shutdownEvicted(modelsToShutdown, "LRU eviction")

if needMore {
xlog.Warn("[WatchDog] LRU eviction incomplete", "evicted", len(modelsToShutdown), "needed", modelsToEvict, "skippedBusy", skippedBusyCount, "reason", "some models are busy with active API calls")
Expand All @@ -431,9 +444,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult {
// collectEvictionsLocked walks `candidates` (already in eviction order) and
// untracks up to `maxToEvict` models that are eligible for eviction. Pinned
// models are always skipped; busy models are skipped unless `force` is true.
// Returns the names of evicted models and the number skipped because they
// were busy. Must be called with wd.Lock() held.
func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []string, skippedBusy int) {
// Returns the evicted models (with their busy state at selection time) and
// the number skipped because they were busy. Must be called with wd.Lock()
// held.
func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []evictionTarget, skippedBusy int) {
for i := 0; len(evicted) < maxToEvict && i < len(candidates); i++ {
m := candidates[i]
if wd.pinnedModels[m.model] {
Expand All @@ -447,12 +461,31 @@ func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvi
continue
}
xlog.Info("[WatchDog] evicting model", "model", m.model, "busy", isBusy)
evicted = append(evicted, m.model)
evicted = append(evicted, evictionTarget{model: m.model, wasBusy: isBusy})
wd.untrack(m.address)
}
return evicted, skippedBusy
}

// shutdownEvicted shuts down each evicted model, using the force path for any
// that were busy at selection time so a stuck in-flight call doesn't deadlock
// the graceful ShutdownModel (which waits for that call while holding the
// loader's mutex).
func (wd *WatchDog) shutdownEvicted(targets []evictionTarget, label string) {
for _, t := range targets {
var err error
if t.wasBusy {
err = wd.pm.ShutdownModelForce(t.model)
} else {
err = wd.pm.ShutdownModel(t.model)
}
if err != nil {
xlog.Error("[WatchDog] error shutting down model during "+label, "error", err, "model", t.model, "busy", t.wasBusy)
}
xlog.Debug("[WatchDog] "+label+" complete", "model", t.model, "busy", t.wasBusy)
}
}

// EnforceGroupExclusivity evicts every loaded model that shares at least one
// concurrency group with the requested model. The pinned/busy/retry semantics
// match EnforceLRULimit so the loader's retry loop can stay generic.
Expand Down Expand Up @@ -502,12 +535,7 @@ func (wd *WatchDog) EnforceGroupExclusivity(requestedModel string) EnforceLRULim
needMore := len(modelsToShutdown) < len(conflicts)
wd.Unlock()

for _, m := range modelsToShutdown {
if err := wd.pm.ShutdownModel(m); err != nil {
xlog.Error("[WatchDog] error shutting down model during group eviction", "error", err, "model", m)
}
xlog.Debug("[WatchDog] Group eviction complete", "model", m)
}
wd.shutdownEvicted(modelsToShutdown, "group eviction")

if needMore {
xlog.Warn("[WatchDog] Group eviction incomplete", "requested", requestedModel, "evicted", len(modelsToShutdown), "needed", len(conflicts), "skippedBusy", skippedBusyCount, "reason", "some conflicts are busy or pinned")
Expand Down Expand Up @@ -623,12 +651,19 @@ func (wd *WatchDog) checkBusy() {
}
wd.Unlock()

// Now shutdown models without holding the watchdog lock to prevent deadlock
// The busy-killer targets backends whose in-flight gRPC call has been
// stuck past the busy timeout. Use the force path so the loader stops
// the process FIRST (dropping the stuck call's gRPC connection) instead
// of waiting for that call to finish — the graceful path would block on
// that stuck call while holding the loader mutex and stall every other
// model load (notably the opus backend load at the start of every
// realtime WebRTC session, which hangs new sessions at "Connected,
// waiting for session...").
for _, model := range modelsToShutdown {
if err := wd.pm.ShutdownModel(model); err != nil {
if err := wd.pm.ShutdownModelForce(model); err != nil {
xlog.Error("[watchdog] error shutting down model", "error", err, "model", model)
}
xlog.Debug("[WatchDog] model shut down", "model", model)
xlog.Debug("[WatchDog] busy model shut down", "model", model)
}
}

Expand Down Expand Up @@ -743,10 +778,20 @@ func (wd *WatchDog) evictLRUModel() {

xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed)

// A busy target only gets here when forceEvictionWhenBusy is true, i.e.
// the operator accepted evicting models with active calls. Route it
// through the force path so the loader stops the process first instead
// of blocking on the stuck in-flight call while holding its mutex.
_, wasBusy := wd.busyTime[lruModel.address]

wd.Unlock()

// Shutdown the model
if err := wd.pm.ShutdownModel(lruModel.model); err != nil && err != modelNotFoundErr {
shutdown := wd.pm.ShutdownModel
if wasBusy {
shutdown = wd.pm.ShutdownModelForce
}
if err := shutdown(lruModel.model); err != nil && err != modelNotFoundErr {
xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model)
} else {
// Untrack the model
Expand Down
51 changes: 51 additions & 0 deletions pkg/model/watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type mockProcessManager struct {
mu sync.Mutex
shutdownCalls []string
forceCalls []string
gracefulCalls []string
shutdownErrors map[string]error
}

Expand All @@ -27,6 +29,18 @@ func (m *mockProcessManager) ShutdownModel(modelName string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.shutdownCalls = append(m.shutdownCalls, modelName)
m.gracefulCalls = append(m.gracefulCalls, modelName)
if err, ok := m.shutdownErrors[modelName]; ok {
return err
}
return nil
}

func (m *mockProcessManager) ShutdownModelForce(modelName string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.shutdownCalls = append(m.shutdownCalls, modelName)
m.forceCalls = append(m.forceCalls, modelName)
if err, ok := m.shutdownErrors[modelName]; ok {
return err
}
Expand All @@ -41,6 +55,14 @@ func (m *mockProcessManager) getShutdownCalls() []string {
return result
}

func (m *mockProcessManager) getForceShutdownCalls() []string {
m.mu.Lock()
defer m.mu.Unlock()
result := make([]string, len(m.forceCalls))
copy(result, m.forceCalls)
return result
}

var _ = Describe("WatchDog", func() {
var (
wd *model.WatchDog
Expand Down Expand Up @@ -666,6 +688,35 @@ var _ = Describe("WatchDog", func() {
})
})

Context("Busy Killer", func() {
It("force-shuts down a model that is stuck busy past the busy timeout", func() {
// Regression: a backend stuck on an in-flight gRPC call must be
// killed via the force path (stop the process first), not the
// graceful one (wait for the stuck call to finish, which would
// deadlock while holding the loader mutex and stall every other
// model load — e.g. the opus backend load at the start of every
// realtime WebRTC session, hanging new "Connected, waiting for
// session..." connections).
wd = model.NewWatchDog(
model.WithProcessManager(pm),
model.WithBusyTimeout(10*time.Millisecond),
model.WithBusyCheck(true),
model.WithWatchdogInterval(20*time.Millisecond),
)

wd.AddAddressModelMap("addr1", "stuckModel")
wd.Mark("addr1") // busy — simulates an in-flight gRPC call

go wd.Run()
defer wd.Shutdown()

Eventually(func() []string {
return pm.getForceShutdownCalls()
}, "300ms", "10ms").Should(ContainElement("stuckModel"))
Expect(pm.getShutdownCalls()).To(ContainElement("stuckModel"))
})
})

Context("Concurrency Groups", func() {
Describe("ReplaceModelGroups / GetModelGroups", func() {
It("returns nil for unknown models", func() {
Expand Down