-
Notifications
You must be signed in to change notification settings - Fork 9
[examples] Add WideEP DP group fault tolerance example #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
f386d20
Add WideEP DP group fault tolerance example
jeffreywang-anyscale 2668214
Fix
jeffreywang-anyscale f2bb6b1
fix: correct worker node memory spec for 4xL4 instance
robertnishihara cbdd69e
rename: wide_ep_dp_group_fault_tolerance -> wide_ep_fault_tolerance
robertnishihara ce30029
rename: dp_group_autoscaling_service.yaml -> service.yaml
robertnishihara 8b4ebd7
rename: dp_group_fault_tolerance_demo.py -> fault_tolerance_demo.py
robertnishihara f5c2a32
docs: simplify README and add job.yaml for fault tolerance demo
robertnishihara 2728fc2
docs: replace job-based FT demo with live service instructions
robertnishihara e495bfa
Unify to services flow
jeffreywang-anyscale e18399e
[experimental] deepseek example
jeffreywang-anyscale e5c2f65
Allow to configure model
jeffreywang-anyscale fc60442
CR feedback
jeffreywang-anyscale 1c4db41
Minor edit
jeffreywang-anyscale c53576c
Remove uv
jeffreywang-anyscale 24fcbab
uv installation
jeffreywang-anyscale 37a93ec
Add Ray Serve LLM
jeffreywang-anyscale File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| # Wide EP Fault Tolerance | ||
|
|
||
| Demonstrates data-parallel (DP) group fault tolerance and autoscaling for MoE LLM serving with Ray Serve. Uses gang-scheduled DP deployments where all workers in a DP group are restarted atomically when one fails. | ||
|
|
||
| Check out the [blog post](https://www.anyscale.com/blog/dp-group-fault-tolerance-vllm-wideep-ray-serve-llm) for a detailed walkthrough of the Wide EP Fault Tolerance feature. | ||
|
|
||
| ## Install the Anyscale CLI | ||
|
|
||
| ```bash | ||
| pip install -U anyscale | ||
| anyscale login | ||
| ``` | ||
|
|
||
| ## Install `uv` | ||
|
|
||
| ```bash | ||
| curl -LsSf https://astral.sh/uv/install.sh | sh | ||
| ``` | ||
|
|
||
| ## Deploy the service | ||
|
|
||
| Clone the example from GitHub. | ||
|
|
||
| ```bash | ||
| git clone https://github.com/anyscale/examples.git | ||
| cd examples/wide_ep_fault_tolerance | ||
| ``` | ||
|
|
||
| Deploy the service. By default it uses `microsoft/Phi-tiny-MoE-instruct` with autoscaling enabled (`num_replicas: auto`). | ||
|
|
||
| ```bash | ||
| anyscale service deploy -f service.yaml | ||
| anyscale service wait --name wide-ep-fault-tolerance --state RUNNING --timeout-s 600 | ||
| ``` | ||
|
|
||
| Set `SERVICE_URL` and `SERVICE_TOKEN` from the deploy output: | ||
|
|
||
| ```bash | ||
| export SERVICE_URL=<SERVICE_URL> | ||
| export SERVICE_TOKEN=<SERVICE_TOKEN> | ||
| ``` | ||
|
|
||
| ## Fault tolerance demo | ||
|
|
||
| Start constant traffic in one terminal: | ||
|
|
||
| ```bash | ||
| uv run --with locust --with requests run_locust.py \ | ||
| --host $SERVICE_URL \ | ||
| --token $SERVICE_TOKEN \ | ||
| --traffic-pattern constant \ | ||
| --baseline-users 10 | ||
| ``` | ||
|
|
||
| In another terminal, kill a random GPU worker process via the service's `/simulate-fault` endpoint: | ||
|
|
||
| ```bash | ||
| curl -X POST -H "Authorization: Bearer $SERVICE_TOKEN" $SERVICE_URL/simulate-fault | ||
| ``` | ||
|
|
||
| Observe recovery: | ||
|
|
||
| - The **Locust output** shows a brief spike in errors as the affected DP group tears down. | ||
| - The **Service dashboard** shows replica count drop then recover. | ||
| - The surviving DP group continues serving requests throughout. | ||
|
|
||
| ## Autoscaling demo | ||
|
|
||
| Run a shaped traffic pattern to trigger scale-up/down: | ||
|
|
||
| ```bash | ||
| uv run --with locust --with requests run_locust.py \ | ||
| --host $SERVICE_URL \ | ||
| --token $SERVICE_TOKEN \ | ||
| --traffic-pattern varying \ | ||
| --baseline-users 5 \ | ||
| --peak-users 40 | ||
| ``` | ||
|
|
||
| The load test runs a 14-minute shaped traffic pattern (baseline -> ramp up -> peak -> ramp down -> baseline). The service autoscales when traffic pattern shifts. Watch replica count change in the services tab. | ||
|
|
||
| ## Understanding the example | ||
|
|
||
| - This example is built with [Ray Serve LLM](https://docs.ray.io/en/latest/serve/llm/index.html), leveraging vLLM as the engine and Ray Serve as the orchestration framework to deploy LLM applications at scale. | ||
| - `service.yaml` deploys `microsoft/Phi-tiny-MoE-instruct` with `data_parallel_size: 2` and `num_replicas: auto` (autoscaling between 1-4 DP groups, 2 ranks per group). | ||
| - `kill_worker_proc.py` is deployed as a separate Ray Serve application at `/simulate-fault`. It uses `nvidia-smi` to find a GPU process on a random worker node and kills it with `SIGKILL`. | ||
| - Ray Serve gang scheduling ensures that if one worker in a DP group fails, the entire group is torn down and restarted together — preventing partial failures from leaving the deployment in an inconsistent state. | ||
|
|
||
| ## Shutdown | ||
|
|
||
| ```bash | ||
| anyscale service terminate --name wide-ep-fault-tolerance | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| """Simulate engine failure by killing a GPU worker process. | ||
|
|
||
| Deployed as a separate Ray Serve application so it doesn't interfere with the WideEP application. | ||
|
|
||
| Usage: | ||
| curl -X POST -H "Authorization: Bearer $SERVICE_TOKEN" $SERVICE_URL/simulate-fault | ||
| """ | ||
|
|
||
| import random | ||
| import subprocess | ||
|
|
||
| import ray | ||
| from ray import serve | ||
| from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy | ||
| from starlette.requests import Request | ||
| from starlette.responses import JSONResponse | ||
|
|
||
|
|
||
| @ray.remote(num_cpus=0) | ||
| def _find_and_kill_gpu_process(): | ||
| result = subprocess.run( | ||
| ["nvidia-smi", "--query-compute-apps=pid", "--format=csv,noheader,nounits"], | ||
| capture_output=True, | ||
| text=True, | ||
| ) | ||
| if result.returncode != 0: | ||
| return {"error": f"nvidia-smi failed: {result.stderr}"} | ||
|
|
||
| pids = [int(p.strip()) for p in result.stdout.strip().splitlines() if p.strip()] | ||
| if not pids: | ||
| return {"error": "No GPU processes found on this node"} | ||
|
|
||
| victim_pid = pids[0] | ||
| kill_result = subprocess.run(["kill", "-9", str(victim_pid)]) | ||
| if kill_result.returncode != 0: | ||
| return {"error": f"Failed to kill PID {victim_pid}"} | ||
|
|
||
| return {"killed_pid": victim_pid, "node_ip": ray.util.get_node_ip_address()} | ||
|
|
||
|
|
||
| @serve.deployment(num_replicas=1) | ||
| class KillWorkerProc: | ||
| async def __call__(self, request: Request): | ||
| if request.method != "POST": | ||
| return JSONResponse( | ||
| {"error": "Use POST"}, status_code=405, headers={"Allow": "POST"} | ||
| ) | ||
|
|
||
| # Find nodes that have GPUs. | ||
| gpu_nodes = [ | ||
| n for n in ray.nodes() if n["Alive"] and n["Resources"].get("GPU", 0) > 0 | ||
| ] | ||
| if not gpu_nodes: | ||
| return JSONResponse({"error": "No live GPU nodes found"}, status_code=404) | ||
|
|
||
| target = random.choice(gpu_nodes) | ||
| result = await _find_and_kill_gpu_process.options( | ||
| scheduling_strategy=NodeAffinitySchedulingStrategy( | ||
| node_id=target["NodeID"], soft=False | ||
| ) | ||
| ).remote() | ||
|
|
||
| return JSONResponse(result) | ||
|
|
||
|
|
||
| app = KillWorkerProc.bind() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| """Locust load test for an OpenAI-compatible LLM service. | ||
|
|
||
| Supports two traffic patterns (--traffic-pattern): | ||
| - constant: Steady traffic at --baseline-users (runs indefinitely, Ctrl+C to stop) | ||
| - varying: Shaped 14-min pattern: baseline 2m -> ramp up 4m -> peak 2m -> ramp down 4m -> baseline 2m | ||
| """ | ||
|
|
||
| import json | ||
| import os | ||
|
|
||
| from locust import HttpUser, LoadTestShape, task, constant, events | ||
|
|
||
|
|
||
| @events.init_command_line_parser.add_listener | ||
| def add_custom_args(parser): | ||
| parser.add_argument( | ||
| "--token", | ||
| type=str, | ||
| default=os.environ.get("ANYSCALE_API_TOKEN", ""), | ||
| help="Bearer token for Anyscale service auth", | ||
| ) | ||
| parser.add_argument( | ||
| "--route-prefix", | ||
| type=str, | ||
| default="/v1", | ||
| help="Route prefix for the service", | ||
| ) | ||
| parser.add_argument( | ||
| "--max-tokens", | ||
| type=int, | ||
| default=256, | ||
| help="Max tokens to generate per request", | ||
| ) | ||
| parser.add_argument( | ||
| "--prompt", | ||
| type=str, | ||
| default="Write a short paragraph about distributed systems.", | ||
| help="Prompt to send to the LLM", | ||
| ) | ||
| parser.add_argument( | ||
| "--baseline-users", | ||
| type=int, | ||
| default=10, | ||
| help="Baseline number of users (default: 10)", | ||
| ) | ||
| parser.add_argument( | ||
| "--peak-users", | ||
| type=int, | ||
| default=50, | ||
| help="Peak number of users (default: 50)", | ||
| ) | ||
| parser.add_argument( | ||
| "--ramp-rate", | ||
| type=float, | ||
| default=5, | ||
| help="Users spawned/despawned per second during ramps (default: 5)", | ||
| ) | ||
| parser.add_argument( | ||
| "--traffic-pattern", | ||
| type=str, | ||
| required=True, | ||
| choices=["constant", "varying"], | ||
| help="Traffic pattern: 'constant' for steady load, 'varying' for shaped 14-min pattern", | ||
| ) | ||
| parser.add_argument( | ||
| "--model", | ||
| type=str, | ||
| default="microsoft/Phi-tiny-MoE-instruct", | ||
| help="Model ID to send in chat completion requests", | ||
| ) | ||
|
|
||
|
|
||
| class TrafficShape(LoadTestShape): | ||
| """ | ||
| Supports two modes based on --traffic-pattern: | ||
|
|
||
| constant: | ||
| Holds --baseline-users forever (Ctrl+C to stop). | ||
|
|
||
| varying (14-min shaped pattern): | ||
| 0:00 - 2:00 baseline (steady) | ||
| 2:00 - 6:00 ramp up from baseline to peak | ||
| 6:00 - 8:00 peak (steady) | ||
| 8:00 - 12:00 ramp down from peak to baseline | ||
| 12:00 - 14:00 baseline (steady) | ||
| 14:00 stop | ||
| """ | ||
|
|
||
| def tick(self): | ||
| opts = self.runner.environment.parsed_options | ||
| baseline = opts.baseline_users | ||
| peak = opts.peak_users | ||
| spawn_rate = opts.ramp_rate | ||
|
|
||
| if opts.traffic_pattern == "constant": | ||
| return baseline, spawn_rate | ||
|
|
||
| # --- varying: 14-min shaped pattern --- | ||
| run_time = self.get_run_time() | ||
|
|
||
| t_baseline1 = 120 # 2 min | ||
| t_ramp_up = 240 # 4 min | ||
| t_peak = 120 # 2 min | ||
| t_ramp_down = 240 # 4 min | ||
| t_baseline2 = 120 # 2 min | ||
|
|
||
| c1 = t_baseline1 | ||
| c2 = c1 + t_ramp_up | ||
| c3 = c2 + t_peak | ||
| c4 = c3 + t_ramp_down | ||
| c5 = c4 + t_baseline2 | ||
|
|
||
| if run_time < c1: | ||
| return baseline, spawn_rate | ||
| elif run_time < c2: | ||
| progress = (run_time - c1) / t_ramp_up | ||
| users = int(baseline + (peak - baseline) * progress) | ||
| return users, spawn_rate | ||
| elif run_time < c3: | ||
| return peak, spawn_rate | ||
| elif run_time < c4: | ||
| progress = (run_time - c3) / t_ramp_down | ||
| users = int(peak - (peak - baseline) * progress) | ||
| return users, spawn_rate | ||
| elif run_time < c5: | ||
| return baseline, spawn_rate | ||
| else: | ||
| return None | ||
|
|
||
|
|
||
| class LLMUser(HttpUser): | ||
| """Simulates a user sending chat completion requests to an LLM service.""" | ||
|
|
||
| wait_time = constant(0) | ||
|
|
||
| def on_start(self): | ||
| token = self.environment.parsed_options.token | ||
| self.route_prefix = self.environment.parsed_options.route_prefix | ||
| self.max_tokens = self.environment.parsed_options.max_tokens | ||
| self.prompt = self.environment.parsed_options.prompt | ||
|
|
||
| self.model = self.environment.parsed_options.model | ||
| self.headers = {"Content-Type": "application/json"} | ||
| if token: | ||
| self.headers["Authorization"] = f"Bearer {token}" | ||
|
|
||
| @task | ||
| def chat_completion(self): | ||
| payload = { | ||
| "model": self.model, | ||
| "messages": [{"role": "user", "content": self.prompt}], | ||
| "max_tokens": self.max_tokens, | ||
| "temperature": 0.7, | ||
| } | ||
|
|
||
| with self.client.post( | ||
| f"{self.route_prefix}/chat/completions", | ||
| json=payload, | ||
| headers=self.headers, | ||
| catch_response=True, | ||
| ) as response: | ||
| if response.status_code == 200: | ||
| try: | ||
| data = response.json() | ||
| if "choices" not in data: | ||
| response.failure("Response missing 'choices' field") | ||
| except json.JSONDecodeError: | ||
| response.failure("Invalid JSON response") | ||
| else: | ||
| response.failure(f"HTTP {response.status_code}") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.