Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions deepspeed/checkpoint/autoep_universal.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,98 @@
CAT_DIM,
EP_IS_EXPERT_PARAM,
EP_NUM_EXPERTS,
FOLDING_METADATA_KEY,
FOLDING_METADATA_VERSION,
FOLDING_TP_SIZE,
FOLDING_TP_RANK,
FOLDING_EP_SIZE,
FOLDING_EP_RANK,
FOLDING_ETP_SIZE,
FOLDING_ETP_RANK,
FOLDING_ZERO_PARTITION_GROUP,
FOLDING_ZERO_PARTITION_RANK,
FOLDING_ZERO_PARTITION_COUNT,
FOLDING_DISPATCH_STRATEGY,
FOLDING_SHARED_EXPERT_PLACEMENT,
FOLDING_FAMILY,
FOLDING_PARAM_FAMILIES,
)


def make_folding_metadata(*,
tp_size,
tp_rank,
ep_size,
ep_rank,
zero_partition_group,
zero_partition_rank,
zero_partition_count,
family,
param_families=None):
metadata = {
"version": FOLDING_METADATA_VERSION,
FOLDING_TP_SIZE: tp_size,
FOLDING_TP_RANK: tp_rank,
FOLDING_EP_SIZE: ep_size,
FOLDING_EP_RANK: ep_rank,
FOLDING_ETP_SIZE: 1,
FOLDING_ETP_RANK: 0,
FOLDING_ZERO_PARTITION_GROUP: zero_partition_group,
FOLDING_ZERO_PARTITION_RANK: zero_partition_rank,
FOLDING_ZERO_PARTITION_COUNT: zero_partition_count,
FOLDING_DISPATCH_STRATEGY: "route_full_partition_dispatch",
FOLDING_SHARED_EXPERT_PLACEMENT: "tp_sharded",
FOLDING_FAMILY: family,
}
if param_families is not None:
metadata[FOLDING_PARAM_FAMILIES] = dict(param_families)
return metadata


def validate_folding_metadata(metadata,
*,
tp_size,
ep_size,
etp_size=1,
tp_rank=None,
ep_rank=None,
etp_rank=None,
zero_partition_group=None,
zero_partition_rank=None,
zero_partition_count=None,
family=None,
param_families=None,
shared_expert_placement=None,
dispatch_strategy=None):
if not isinstance(metadata, dict) or FOLDING_METADATA_KEY not in metadata:
raise RuntimeError("Missing AutoEP+AutoTP folding metadata in folded checkpoint.")
folding = metadata[FOLDING_METADATA_KEY]
if folding.get("version") != FOLDING_METADATA_VERSION:
raise RuntimeError(f"Unsupported folding metadata version: {folding.get('version')}")
expected = {
FOLDING_TP_SIZE: tp_size,
FOLDING_EP_SIZE: ep_size,
FOLDING_ETP_SIZE: etp_size,
}
optional_expected = {
FOLDING_TP_RANK: tp_rank,
FOLDING_EP_RANK: ep_rank,
FOLDING_ETP_RANK: etp_rank,
FOLDING_ZERO_PARTITION_GROUP: zero_partition_group,
FOLDING_ZERO_PARTITION_RANK: zero_partition_rank,
FOLDING_ZERO_PARTITION_COUNT: zero_partition_count,
FOLDING_FAMILY: family,
FOLDING_PARAM_FAMILIES: param_families,
FOLDING_SHARED_EXPERT_PLACEMENT: shared_expert_placement,
FOLDING_DISPATCH_STRATEGY: dispatch_strategy,
}
expected.update({key: value for key, value in optional_expected.items() if value is not None})
for key, value in expected.items():
if folding.get(key) != value:
raise RuntimeError(f"Folding metadata mismatch for {key}: saved={folding.get(key)} runtime={value}")
return folding


def _state_entry(state, param_id):
"""Get optimizer state entry by param id, handling int/str key variants."""
if param_id in state:
Expand Down Expand Up @@ -102,6 +191,13 @@ def resolve_expert_ckpt_path(checkpoint_dir, moe_layer_id, global_expert_id):
raise FileNotFoundError(f"Expert checkpoint file not found: layer_{moe_layer_id} "
f"expert_{global_expert_id} in {checkpoint_dir}")
if len(matches) > 1:
for match in matches:
state = torch.load(match, map_location='cpu', weights_only=False)
if FOLDING_METADATA_KEY in state:
raise NotImplementedError("Universal checkpoint conversion for folded AutoEP+AutoTP expert shards "
"is not supported yet. Load this checkpoint with a matching folded "
"runtime, or consolidate the tensor-parallel expert shards before "
"running ds_to_universal.")
raise NotImplementedError(f"Multiple expert checkpoint files found for layer_{moe_layer_id} "
f"expert_{global_expert_id}: {matches}. Multi-mp_rank expert files "
f"are not yet supported.")
Expand Down Expand Up @@ -138,9 +234,12 @@ def consolidate_autoep_expert_files(checkpoint_dir, output_dir, autoep_layers_me

for wname in ('w1', 'w2', 'w3'):
expert_tensors = []
folding_metadata = None
for global_eid in range(num_experts):
ckpt_path = resolve_expert_ckpt_path(checkpoint_dir, moe_layer_id, global_eid)
sd = torch.load(ckpt_path, map_location='cpu', weights_only=False)
if folding_metadata is None:
folding_metadata = sd.get(FOLDING_METADATA_KEY)
key = f"{prefix}.{wname}.{global_eid}"
if key not in sd:
raise RuntimeError(f"Expected key '{key}' not found in {ckpt_path}")
Expand All @@ -153,12 +252,15 @@ def consolidate_autoep_expert_files(checkpoint_dir, output_dir, autoep_layers_me
param_name = f"{prefix}.{wname}"
param_dir = os.path.join(output_dir, "zero", param_name)
os.makedirs(param_dir, exist_ok=True)
torch.save({
universal_state = {
PARAM: full_tensor,
CAT_DIM: 0,
EP_IS_EXPERT_PARAM: True,
EP_NUM_EXPERTS: num_experts,
}, os.path.join(param_dir, "fp32.pt"))
}
if folding_metadata is not None:
universal_state[FOLDING_METADATA_KEY] = folding_metadata
torch.save(universal_state, os.path.join(param_dir, "fp32.pt"))


def consolidate_autoep_optimizer_states(checkpoint_dir, output_dir, autoep_layers_metadata, ep_size):
Expand Down
19 changes: 19 additions & 0 deletions deepspeed/checkpoint/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,22 @@
EP_IS_EXPERT_PARAM = 'is_expert_param'
EP_NUM_EXPERTS = 'ep_num_experts'
EXPERT_PARAMETER_PATTERNS = 'expert_parameter_patterns'

#########################################
# AutoEP + AutoTP folding metadata keys
#########################################
FOLDING_METADATA_KEY = 'folding'
FOLDING_METADATA_VERSION = 1
FOLDING_TP_SIZE = 'tp_size'
FOLDING_TP_RANK = 'tp_rank'
FOLDING_EP_SIZE = 'ep_size'
FOLDING_EP_RANK = 'ep_rank'
FOLDING_ETP_SIZE = 'etp_size'
FOLDING_ETP_RANK = 'etp_rank'
FOLDING_ZERO_PARTITION_GROUP = 'zero_partition_group'
FOLDING_ZERO_PARTITION_RANK = 'zero_partition_rank'
FOLDING_ZERO_PARTITION_COUNT = 'zero_partition_count'
FOLDING_DISPATCH_STRATEGY = 'dispatch_strategy'
FOLDING_SHARED_EXPERT_PLACEMENT = 'shared_expert_placement'
FOLDING_FAMILY = 'family'
FOLDING_PARAM_FAMILIES = 'param_families'
2 changes: 1 addition & 1 deletion deepspeed/comm/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def disable_compiler_collective(func):

def build_shm_op():
builder = get_accelerator().create_op_builder("ShareMemCommBuilder")
if builder is None or not deepspeed.ops.__compatible_ops__[builder.NAME]:
if builder is None or not deepspeed.ops.__compatible_ops__.get(builder.NAME, False):
return None
shm_cpp_module = builder.load()
print(f'DeepSpeed {builder.absolute_name()} built successfully')
Expand Down
40 changes: 29 additions & 11 deletions deepspeed/module_inject/auto_ep_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
available_preset_names,
resolve_autoep_config_defaults,
)
from deepspeed.module_inject.auto_ep_folding import build_folding_spec, validate_folding_global
from deepspeed.utils import logger

__all__ = [
Expand Down Expand Up @@ -45,6 +46,7 @@ def parse_autoep_config(param_dict: dict) -> AutoEPConfig:
config = AutoEPConfig()
config.enabled = param_dict.get("enabled", False)
config.autoep_size = param_dict.get("autoep_size", 1)
config.expert_tensor_parallel_size = param_dict.get("expert_tensor_parallel_size", 1)
config.preset_model = param_dict.get("preset_model", None)
config.moe_layer_pattern = param_dict.get("moe_layer_pattern", None)
config.expert_pattern = param_dict.get("expert_pattern", None)
Expand Down Expand Up @@ -95,6 +97,13 @@ def validate_autoep_config(
pp_size: int,
tp_size: int,
sp_size: int,
*,
zero_stage: int = 0,
tp_preset_model: str | None = None,
use_data_before_expert_parallel: bool = False,
mpu=None,
zero_offload_optimizer: bool = False,
zero_offload_param: bool = False,
) -> None:
"""Validate config constraints. Raises ValueError on invalid config."""
if config.load_balance_coeff is not None:
Expand All @@ -103,17 +112,26 @@ def validate_autoep_config(
if not config.enabled:
return

if tp_size > 1:
raise ValueError("AutoEP does not currently support AutoTP "
f"(tensor_parallel.autotp_size={tp_size}). Disable AutoTP for this run; "
"AutoEP+AutoTP support is planned as follow-up work.")

# ep_size must divide the stage size (world_size / pp_size)
stage_size = world_size // pp_size
if stage_size % config.autoep_size != 0:
raise ValueError(f"autoep_size={config.autoep_size} must divide the stage size "
f"(world_size={world_size} / pp_size={pp_size} = {stage_size}). "
f"Valid autoep_size values: {_divisors(stage_size)}")
folding_spec = build_folding_spec(
world_size=world_size,
pp_size=pp_size,
tp_size=max(tp_size, 1),
ep_size=config.autoep_size,
etp_size=config.expert_tensor_parallel_size,
mp_mode="tp" if tp_size > 1 else "sp",
)
validate_folding_global(
folding_spec,
zero_stage=zero_stage,
sp_size=sp_size,
use_data_before_expert_parallel=use_data_before_expert_parallel,
mpu=mpu,
autoep_enabled=config.enabled,
tp_preset=tp_preset_model,
ep_preset=config.preset_model,
zero_offload_optimizer=zero_offload_optimizer,
zero_offload_param=zero_offload_param,
)

# Validate preset_model if specified
if config.preset_model is not None and config.preset_model not in PRESET_MODELS:
Expand Down
Loading
Loading