Fix event-loop blocking in one-step-off async save/export paths#1446
Fix event-loop blocking in one-step-off async save/export paths#1446taivu1998 wants to merge 2 commits intoNovaSky-AI:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request offloads heavyweight synchronous operations, such as saving checkpoints and models, to separate threads using asyncio.to_thread within the AsyncRayPPOTrainer. This change is intended to prevent blocking the event loop and is accompanied by updated documentation and a new test case. Feedback focuses on potential thread-safety issues introduced by this concurrency, specifically regarding the dataloader's state access during checkpointing and a race condition when resetting the shared all_timings dictionary.
| if self.cfg.trainer.ckpt_interval > 0 and self.global_step % self.cfg.trainer.ckpt_interval == 0: | ||
| with Timer("save_checkpoints", self.all_timings): | ||
| self.save_checkpoints() | ||
| await asyncio.to_thread(self.save_checkpoints) |
There was a problem hiding this comment.
Offloading save_checkpoints to a separate thread while the background generation loop is running introduces potential thread-safety concerns. Specifically, self.save_checkpoints calls self.train_dataloader.state_dict(), which may conflict with the concurrent iteration of the same dataloader in _run_generate_loop. It is important to ensure that the dataloader implementation used here supports concurrent state access while an iteration is in progress.
| self.save_models() | ||
| await asyncio.to_thread(self.save_models) | ||
| self.tracker.log({"timing/" + k: v for k, v in self.all_timings.items()}, step=self.global_step) | ||
| self.all_timings = {} |
There was a problem hiding this comment.
Resetting self.all_timings by assignment (self.all_timings = {}) while the background _run_generate_loop coroutine is active creates a race condition. If the background loop is currently inside a Timer block, it holds a reference to the old dictionary; when it finishes, it will write its timing data to that discarded dictionary, causing the metrics to be lost. While this pattern existed previously, offloading more work to threads increases the window for such concurrency issues. Consider using a thread-safe mechanism to aggregate and clear timings.
Summary
asyncio.to_thread(...)Why
This addresses issue #188.
The async example in
examples/train/async/async_trainer.pyruns generation in a background coroutine, but its periodic and finalsave_checkpoints()/save_models()calls were still executed directly fromasync def train(). Those methods perform heavyweight synchronous work, so running them inline blocks the event loop and stalls the background generation coroutine during save windows.Changes
await asyncio.to_thread(self.save_checkpoints)await asyncio.to_thread(self.save_models)tests/test_async_trainer_example.pyto verify the periodic and final save/export paths are offloadedValidation
PYTHONPATH=/tmp/skyrl-issue-188 /Users/vuductai/Documents/Projects/SkyRL/.venv/bin/pytest tests/test_async_trainer_example.py -qPYTHONPATH=/tmp/skyrl-issue-188 /Users/vuductai/Documents/Projects/SkyRL/.venv/bin/python -m py_compile examples/train/async/async_trainer.py tests/test_async_trainer_example.pygit diff --checkNotes
save_checkpoints()because it updatesself.all_timingsinternally during cleanup timing; that pattern already exists in the fully async trainer as well, so I left it out of scope for this patch.