Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions wide_ep_fault_tolerance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Wide EP Fault Tolerance
Comment thread
jeffreywang-anyscale marked this conversation as resolved.

Demonstrates data-parallel (DP) group fault tolerance and autoscaling for MoE LLM serving with Ray Serve. Uses gang-scheduled DP deployments where all workers in a DP group are restarted atomically when one fails.

Check out the [blog post](https://www.anyscale.com/blog/dp-group-fault-tolerance-vllm-wideep-ray-serve-llm) for a detailed walkthrough of the Wide EP Fault Tolerance feature.

## Install the Anyscale CLI

```bash
pip install -U anyscale
anyscale login
```

## Install `uv`

```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```

## Deploy the service

Clone the example from GitHub.

```bash
git clone https://github.com/anyscale/examples.git
cd examples/wide_ep_fault_tolerance
```

Deploy the service. By default it uses `microsoft/Phi-tiny-MoE-instruct` with autoscaling enabled (`num_replicas: auto`).

```bash
anyscale service deploy -f service.yaml
anyscale service wait --name wide-ep-fault-tolerance --state RUNNING --timeout-s 600
```

Set `SERVICE_URL` and `SERVICE_TOKEN` from the deploy output:

```bash
export SERVICE_URL=<SERVICE_URL>
export SERVICE_TOKEN=<SERVICE_TOKEN>
```

## Fault tolerance demo

Start constant traffic in one terminal:

```bash
uv run --with locust --with requests run_locust.py \
--host $SERVICE_URL \
--token $SERVICE_TOKEN \
--traffic-pattern constant \
--baseline-users 10
```

In another terminal, kill a random GPU worker process via the service's `/simulate-fault` endpoint:

```bash
curl -X POST -H "Authorization: Bearer $SERVICE_TOKEN" $SERVICE_URL/simulate-fault
```

Observe recovery:

- The **Locust output** shows a brief spike in errors as the affected DP group tears down.
- The **Service dashboard** shows replica count drop then recover.
- The surviving DP group continues serving requests throughout.

## Autoscaling demo

Run a shaped traffic pattern to trigger scale-up/down:

```bash
uv run --with locust --with requests run_locust.py \
--host $SERVICE_URL \
--token $SERVICE_TOKEN \
--traffic-pattern varying \
--baseline-users 5 \
--peak-users 40
```

The load test runs a 14-minute shaped traffic pattern (baseline -> ramp up -> peak -> ramp down -> baseline). The service autoscales when traffic pattern shifts. Watch replica count change in the services tab.

## Understanding the example

- This example is built with [Ray Serve LLM](https://docs.ray.io/en/latest/serve/llm/index.html), leveraging vLLM as the engine and Ray Serve as the orchestration framework to deploy LLM applications at scale.
- `service.yaml` deploys `microsoft/Phi-tiny-MoE-instruct` with `data_parallel_size: 2` and `num_replicas: auto` (autoscaling between 1-4 DP groups, 2 ranks per group).
- `kill_worker_proc.py` is deployed as a separate Ray Serve application at `/simulate-fault`. It uses `nvidia-smi` to find a GPU process on a random worker node and kills it with `SIGKILL`.
- Ray Serve gang scheduling ensures that if one worker in a DP group fails, the entire group is torn down and restarted together — preventing partial failures from leaving the deployment in an inconsistent state.

## Shutdown

```bash
anyscale service terminate --name wide-ep-fault-tolerance
```
66 changes: 66 additions & 0 deletions wide_ep_fault_tolerance/kill_worker_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Simulate engine failure by killing a GPU worker process.

Deployed as a separate Ray Serve application so it doesn't interfere with the WideEP application.

Usage:
curl -X POST -H "Authorization: Bearer $SERVICE_TOKEN" $SERVICE_URL/simulate-fault
"""

import random
import subprocess

import ray
from ray import serve
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from starlette.requests import Request
from starlette.responses import JSONResponse


@ray.remote(num_cpus=0)
def _find_and_kill_gpu_process():
result = subprocess.run(
["nvidia-smi", "--query-compute-apps=pid", "--format=csv,noheader,nounits"],
capture_output=True,
text=True,
)
if result.returncode != 0:
return {"error": f"nvidia-smi failed: {result.stderr}"}

pids = [int(p.strip()) for p in result.stdout.strip().splitlines() if p.strip()]
if not pids:
return {"error": "No GPU processes found on this node"}

victim_pid = pids[0]
kill_result = subprocess.run(["kill", "-9", str(victim_pid)])
if kill_result.returncode != 0:
return {"error": f"Failed to kill PID {victim_pid}"}

return {"killed_pid": victim_pid, "node_ip": ray.util.get_node_ip_address()}


@serve.deployment(num_replicas=1)
class KillWorkerProc:
async def __call__(self, request: Request):
if request.method != "POST":
return JSONResponse(
{"error": "Use POST"}, status_code=405, headers={"Allow": "POST"}
)

# Find nodes that have GPUs.
gpu_nodes = [
n for n in ray.nodes() if n["Alive"] and n["Resources"].get("GPU", 0) > 0
]
if not gpu_nodes:
return JSONResponse({"error": "No live GPU nodes found"}, status_code=404)

target = random.choice(gpu_nodes)
result = await _find_and_kill_gpu_process.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id=target["NodeID"], soft=False
)
).remote()

return JSONResponse(result)


app = KillWorkerProc.bind()
170 changes: 170 additions & 0 deletions wide_ep_fault_tolerance/locustfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Locust load test for an OpenAI-compatible LLM service.

Supports two traffic patterns (--traffic-pattern):
- constant: Steady traffic at --baseline-users (runs indefinitely, Ctrl+C to stop)
- varying: Shaped 14-min pattern: baseline 2m -> ramp up 4m -> peak 2m -> ramp down 4m -> baseline 2m
"""

import json
import os

from locust import HttpUser, LoadTestShape, task, constant, events


@events.init_command_line_parser.add_listener
def add_custom_args(parser):
parser.add_argument(
"--token",
type=str,
default=os.environ.get("ANYSCALE_API_TOKEN", ""),
help="Bearer token for Anyscale service auth",
)
parser.add_argument(
"--route-prefix",
type=str,
default="/v1",
help="Route prefix for the service",
)
parser.add_argument(
"--max-tokens",
type=int,
default=256,
help="Max tokens to generate per request",
)
parser.add_argument(
"--prompt",
type=str,
default="Write a short paragraph about distributed systems.",
help="Prompt to send to the LLM",
)
parser.add_argument(
"--baseline-users",
type=int,
default=10,
help="Baseline number of users (default: 10)",
)
parser.add_argument(
"--peak-users",
type=int,
default=50,
help="Peak number of users (default: 50)",
)
parser.add_argument(
"--ramp-rate",
type=float,
default=5,
help="Users spawned/despawned per second during ramps (default: 5)",
)
parser.add_argument(
"--traffic-pattern",
type=str,
required=True,
choices=["constant", "varying"],
help="Traffic pattern: 'constant' for steady load, 'varying' for shaped 14-min pattern",
)
parser.add_argument(
"--model",
type=str,
default="microsoft/Phi-tiny-MoE-instruct",
help="Model ID to send in chat completion requests",
)


class TrafficShape(LoadTestShape):
"""
Supports two modes based on --traffic-pattern:

constant:
Holds --baseline-users forever (Ctrl+C to stop).

varying (14-min shaped pattern):
0:00 - 2:00 baseline (steady)
2:00 - 6:00 ramp up from baseline to peak
6:00 - 8:00 peak (steady)
8:00 - 12:00 ramp down from peak to baseline
12:00 - 14:00 baseline (steady)
14:00 stop
"""

def tick(self):
opts = self.runner.environment.parsed_options
baseline = opts.baseline_users
peak = opts.peak_users
spawn_rate = opts.ramp_rate

if opts.traffic_pattern == "constant":
return baseline, spawn_rate

# --- varying: 14-min shaped pattern ---
run_time = self.get_run_time()

t_baseline1 = 120 # 2 min
t_ramp_up = 240 # 4 min
t_peak = 120 # 2 min
t_ramp_down = 240 # 4 min
t_baseline2 = 120 # 2 min

c1 = t_baseline1
c2 = c1 + t_ramp_up
c3 = c2 + t_peak
c4 = c3 + t_ramp_down
c5 = c4 + t_baseline2

if run_time < c1:
return baseline, spawn_rate
elif run_time < c2:
progress = (run_time - c1) / t_ramp_up
users = int(baseline + (peak - baseline) * progress)
return users, spawn_rate
elif run_time < c3:
return peak, spawn_rate
elif run_time < c4:
progress = (run_time - c3) / t_ramp_down
users = int(peak - (peak - baseline) * progress)
return users, spawn_rate
elif run_time < c5:
return baseline, spawn_rate
else:
return None


class LLMUser(HttpUser):
"""Simulates a user sending chat completion requests to an LLM service."""

wait_time = constant(0)

def on_start(self):
token = self.environment.parsed_options.token
self.route_prefix = self.environment.parsed_options.route_prefix
self.max_tokens = self.environment.parsed_options.max_tokens
self.prompt = self.environment.parsed_options.prompt

self.model = self.environment.parsed_options.model
self.headers = {"Content-Type": "application/json"}
if token:
self.headers["Authorization"] = f"Bearer {token}"

@task
def chat_completion(self):
payload = {
"model": self.model,
"messages": [{"role": "user", "content": self.prompt}],
"max_tokens": self.max_tokens,
"temperature": 0.7,
}

with self.client.post(
f"{self.route_prefix}/chat/completions",
json=payload,
headers=self.headers,
catch_response=True,
) as response:
if response.status_code == 200:
try:
data = response.json()
if "choices" not in data:
response.failure("Response missing 'choices' field")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"HTTP {response.status_code}")
Loading