[feat] provide save/load checkpoint interfaces#124
Conversation
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
|
|
||
| Args: | ||
| checkpoint_dir: Directory to save the checkpoint (created if not exists). | ||
| include_storage: Whether to include storage unit data. |
There was a problem hiding this comment.
This param should only be valid for KV backends such as MooncakeStore/Yuanrong
There was a problem hiding this comment.
For SimpleStorage, we should always store the storage contents. So we need to add additional check logics for different backends
|
|
||
| def save_checkpoint( | ||
| checkpoint_dir: str | Path, | ||
| *, |
| try: | ||
| # Step 1: controller dumps itself to file | ||
| controller_path = tmp_dir / _CONTROLLER_FILE | ||
| success = ray.get(_TQ_CONTROLLER.dump_to_file.remote(str(controller_path))) |
There was a problem hiding this comment.
I suggest use zmq rather than ray remote here to protect the data integrity
| The ordered storage unit list of the current system must exactly match the | ||
| checkpoint (same count, same IDs, same positions). This is required because | ||
| data routing is position-based (global_idx % num_units). |
There was a problem hiding this comment.
Only for SimpelStorage
| """ | ||
| return self.zmq_server_info | ||
|
|
||
| def dump_to_file(self, path: str) -> bool: |
There was a problem hiding this comment.
We need to define an abstraction interface in base manager
| }, | ||
| "sampler": self.sampler.get_state() if hasattr(self.sampler, "get_state") else None, | ||
| "tq_config": self.tq_config, | ||
| "connected_storage_managers": set(self._connected_storage_managers), |
There was a problem hiding this comment.
Is this necessary? The newly initialized ones have different id
| "allocated_indexes": set(self.index_manager.allocated_indexes), | ||
| }, | ||
| "sampler": self.sampler.get_state() if hasattr(self.sampler, "get_state") else None, | ||
| "tq_config": self.tq_config, |
There was a problem hiding this comment.
Be careful about this, it may different from the new config during reinit
| _STORAGE_UNITS_DIR = "storage_units" | ||
|
|
||
|
|
||
| def save_checkpoint( |
There was a problem hiding this comment.
Current implementation only works for a shared network directory. Clearly label this limitation in doc
Summary
tq.save_checkpoint(checkpoint_dir, *, include_storage, metadata)andtq.load_checkpoint(checkpoint_dir)as top-level public APIs.tmpdirectory that is renamed on success and deleted on failure, ensuring no partial checkpoint is left on diskglobal_idx % num_units) so storage unit IDs regenerated across restarts are handled correctlyCheckpoint layout:
Test plan
pytest tests/e2e/test_checkpoint_e2e.py -vinclude_storage=Falsesaves only controller state