Skip to content

[feat] provide save/load checkpoint interfaces#124

Open
dodatboii wants to merge 1 commit into
Ascend:mainfrom
dodatboii:dev_dump
Open

[feat] provide save/load checkpoint interfaces#124
dodatboii wants to merge 1 commit into
Ascend:mainfrom
dodatboii:dev_dump

Conversation

@dodatboii

Copy link
Copy Markdown
Contributor

Summary

  • Add tq.save_checkpoint(checkpoint_dir, *, include_storage, metadata) and tq.load_checkpoint(checkpoint_dir) as top-level public APIs
  • Controller and each storage unit write state directly to file in-process (only a bool ACK goes through Ray object store), avoiding large payload transmission over Ray
  • Save order: controller first, then storage units in parallel — guarantees consistency without pausing the data flow (storage unit data is always a superset of what the controller has confirmed)
  • Atomic save via a .tmp directory that is renamed on success and deleted on failure, ensuring no partial checkpoint is left on disk
  • Load validates storage unit count before touching any state; restoration is position-based (global_idx % num_units) so storage unit IDs regenerated across restarts are handled correctly

Checkpoint layout:

checkpoint_dir/
├── metadata.json          # timestamp, storage unit list, user metadata
├── controller_state.pkl   # TransferQueueController full state
└── storage_units/
    ├── su_0_<id>.pkl
    └── su_1_<id>.pkl

Test plan

  • pytest tests/e2e/test_checkpoint_e2e.py -v
    • save creates expected files and metadata structure
    • load restores controller partitions, key mappings, and per-sample tags
    • load restores storage data and round-trips tensors correctly across multiple partitions
    • include_storage=False saves only controller state
    • error cases: uninitialized system, missing directory/metadata, storage unit count mismatch
    • failed save leaves no partial directory on disk
    • non-tensor fields (NonTensorStack) and variable-length (jagged) tensor fields survive round-trip

Signed-off-by: yxstev <zhangyixiang9@huawei.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

dodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 requested a review from Copilot June 15, 2026 03:47

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

Comment thread transfer_queue/interface.py

Args:
checkpoint_dir: Directory to save the checkpoint (created if not exists).
include_storage: Whether to include storage unit data.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This param should only be valid for KV backends such as MooncakeStore/Yuanrong

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SimpleStorage, we should always store the storage contents. So we need to add additional check logics for different backends


def save_checkpoint(
checkpoint_dir: str | Path,
*,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Comment thread transfer_queue/interface.py
Comment thread transfer_queue/interface.py
try:
# Step 1: controller dumps itself to file
controller_path = tmp_dir / _CONTROLLER_FILE
success = ray.get(_TQ_CONTROLLER.dump_to_file.remote(str(controller_path)))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest use zmq rather than ray remote here to protect the data integrity

Comment thread transfer_queue/interface.py
Comment on lines +1166 to +1168
The ordered storage unit list of the current system must exactly match the
checkpoint (same count, same IDs, same positions). This is required because
data routing is position-based (global_idx % num_units).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for SimpelStorage

Comment thread transfer_queue/interface.py
Comment thread transfer_queue/interface.py
"""
return self.zmq_server_info

def dump_to_file(self, path: str) -> bool:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to define an abstraction interface in base manager

Comment thread transfer_queue/interface.py
},
"sampler": self.sampler.get_state() if hasattr(self.sampler, "get_state") else None,
"tq_config": self.tq_config,
"connected_storage_managers": set(self._connected_storage_managers),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? The newly initialized ones have different id

"allocated_indexes": set(self.index_manager.allocated_indexes),
},
"sampler": self.sampler.get_state() if hasattr(self.sampler, "get_state") else None,
"tq_config": self.tq_config,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful about this, it may different from the new config during reinit

_STORAGE_UNITS_DIR = "storage_units"


def save_checkpoint(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation only works for a shared network directory. Clearly label this limitation in doc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants