feat: native Atropos-SHM integration and modular ingestion layer#1473
feat: native Atropos-SHM integration and modular ingestion layer#1473RUFFY-369 wants to merge 5 commits intoNovaSky-AI:mainfrom
Conversation
…tandards Refactored SHM generator into a dedicated integration/atropos package. Consolidated utilities and added technical README. Registered generator in global factory and integrated test into the GPU CI suite.
There was a problem hiding this comment.
Code Review
This pull request introduces a high-performance integration with the Atropos reasoning environment using Zero-Copy Shared Memory (SHM) for trajectory transport, enabling significantly higher throughput. Key changes include the AtroposSHMGenerator, a NoOpInferenceClient for external generation, and a new joint training entry point. Review feedback identifies a critical issue with non-unique keys in the trajectory stash and a potential runtime error in the NoOpInferenceClient's attribute handling. Additionally, improvements are suggested to address hardcoded file paths, brittle timing logic, and broad exception handling in the launcher and test scripts.
| if not self.running: break | ||
| entry = self.shm.read_next() | ||
| if entry: | ||
| key = entry["instance_id"] |
There was a problem hiding this comment.
The key used for stashing and retrieving trajectories from the SHM buffer only uses entry["instance_id"]. This is incorrect when there are multiple repetitions for the same instance, as it will cause trajectories to be overwritten in the stash. The key should be a unique identifier for each trajectory, likely a combination of instance_id and repetition_id.
| key = entry["instance_id"] | |
| key = f'{entry["instance_id"]}_{entry["repetition_id"]}' |
| def __getattr__(self, name): | ||
| """Catch-all for any other interface methods to ensure robustness.""" | ||
| return lambda *args, **kwargs: None |
There was a problem hiding this comment.
The current __getattr__ implementation returns a synchronous lambda. If an unimplemented async method is called on this client, awaiting its result will lead to a TypeError because await None is invalid. To make this more robust for async methods, you should return a coroutine.
This will correctly handle await on unknown methods. While this would return a coroutine for unknown sync methods, it's a safer default given that most of the interface seems to be async.
| def __getattr__(self, name): | |
| """Catch-all for any other interface methods to ensure robustness.""" | |
| return lambda *args, **kwargs: None | |
| def __getattr__(self, name): | |
| """Catch-all for any other interface methods to ensure robustness.""" | |
| async def async_noop(*args, **kwargs): | |
| """An awaitable no-op function.""" | |
| pass | |
| return async_noop | |
| vllm_env["CUDA_VISIBLE_DEVICES"] = "0" | ||
| # Use SkyRL Bridge instead of broken vLLM | ||
| vllm_cmd = [ | ||
| sys.executable, "/root/atropos/example_trainer/skyrl_bridge_server.py", "--port", "9002" |
There was a problem hiding this comment.
The script contains hardcoded paths to Atropos executables (e.g., "/root/atropos/example_trainer/skyrl_bridge_server.py" on this line and "/root/atropos/environments/skyrl_server.py" on line 86). This makes the script brittle and not portable outside of the specific container environment it was developed for. These paths should be made configurable, for example through environment variables or command-line arguments.
| except Exception: | ||
| pass |
There was a problem hiding this comment.
The exception handler in the vLLM health check loop is too broad (except Exception:). This can mask underlying issues, such as network problems or misconfigurations. It's better to catch more specific exceptions (e.g., requests.exceptions.RequestException) and log the error for easier debugging.
| except Exception: | |
| pass | |
| except requests.exceptions.RequestException as e: | |
| logger.debug(f"vLLM health check failed: {e}") |
| server_proc = subprocess.Popen(server_cmd, env=env_vars_server, stdout=server_log, stderr=subprocess.STDOUT) | ||
|
|
||
| # Wait for SHM segment to be initialized | ||
| time.sleep(5) |
There was a problem hiding this comment.
Using a fixed time.sleep(5) to wait for the SHM segment to be initialized is brittle and can lead to race conditions if the Atropos server takes longer to start up. A more robust approach would be to implement a polling mechanism that checks for the existence of the SHM file or another signal from the server indicating it's ready.
| from skyrl.train.integrations.atropos.generator import AtroposSHMGenerator | ||
| from skyrl.train.integrations.atropos.utils import ZeroCopySHMBuffer | ||
|
|
||
| @pytest.mark.asyncasync |
| sys.path.insert(0, "/root/SkyRL") | ||
| sys.path.insert(0, "/root/atropos") |
There was a problem hiding this comment.
| if not self.running: break | ||
| entry = self.shm.read_next() | ||
| if entry: | ||
| key = entry["instance_id"] |
There was a problem hiding this comment.
🔴 SHM trajectory key mismatch causes generator to never match any trajectory
The AtroposSHMGenerator.generate() method uses entry["instance_id"] (e.g., "task_0") as the lookup key at generator.py:67, but the target_keys set at generator.py:54 is built from tid.to_string() which returns f"{self.instance_id}_{self.repetition_id}" (e.g., "task_0_0") per skyrl/train/generators/base.py:15-16. Since the keys never match, every SHM entry is diverted to the stash, and the generator always times out (300s). Additionally, the stash itself uses only instance_id as the key, so multiple repetitions of the same instance silently overwrite each other, causing data loss. The end result is that all output entries have loss_masks=[0], producing zero usable training data.
| key = entry["instance_id"] | |
| key = f"{entry['instance_id']}_{entry['repetition_id']}" |
Was this helpful? React with 👍 or 👎 to provide feedback.
Note
What is Atropos? Atropos is a robust, scalable framework for LLM-based RL environments. It provides a standardized microservice architecture for collecting and evaluating complex trajectories (including reasoning-dense "thought" traces). This PR integrates SkyRL as a high-performance consumer of the Atropos trajectory API via a new zero-copy SHM transport layer.
What does this PR do?
Adds a native ingestion layer for Atropos-SHM and formalizes "External Generation" architecture in SkyRL. This decouples the trainer from inference lifecycles, allowing it to function as a pure consumer of external rollouts.
Key Changes
Related Issue
Solves #1472
Testing
Stress-tested via test_shm_stress.py on a dual RTX 3090 cluster. Handshake verified across binary SHM boundaries with 1M+ samples.
Test Runs
- unlink=False verified for multi-worker stability.
- libnuma guards in worker.py fixed initialization crashes in heterogeneous environments.
- Confirmed 100% retention of rollout_logprobs and stop_reasons.