diff --git a/gigl/distributed/base_dist_loader.py b/gigl/distributed/base_dist_loader.py index 203c8520d..496b32381 100644 --- a/gigl/distributed/base_dist_loader.py +++ b/gigl/distributed/base_dist_loader.py @@ -426,16 +426,6 @@ def create_mp_producer( channel = BaseDistLoader.create_colocated_channel(worker_options) if isinstance(sampler_options, PPRSamplerOptions): degree_tensors = dataset.degree_tensor - if isinstance(degree_tensors, dict): - logger.info( - f"Pre-computed degree tensors for PPR sampling across " - f"{len(degree_tensors)} edge types." - ) - else: - logger.info( - f"Pre-computed degree tensor for PPR sampling with " - f"{degree_tensors.size(0)} nodes." - ) else: degree_tensors = None return DistSamplingProducer( diff --git a/gigl/distributed/dist_dataset.py b/gigl/distributed/dist_dataset.py index cd38c5653..c0cf6f207 100644 --- a/gigl/distributed/dist_dataset.py +++ b/gigl/distributed/dist_dataset.py @@ -80,9 +80,7 @@ def __init__( edge_feature_info: Optional[ Union[FeatureInfo, dict[EdgeType, FeatureInfo]] ] = None, - degree_tensor: Optional[ - Union[torch.Tensor, dict[EdgeType, torch.Tensor]] - ] = None, + degree_tensor: Optional[dict[NodeType, torch.Tensor]] = None, max_labels_per_anchor_node: Optional[int] = None, ) -> None: """ @@ -108,7 +106,7 @@ def __init__( Note this will be None in the homogeneous case if the data has no node features, or will only contain node types with node features in the heterogeneous case. edge_feature_info: Optional[Union[FeatureInfo, dict[EdgeType, FeatureInfo]]]: Dimension of edge features and its data type, will be a dict if heterogeneous. Note this will be None in the homogeneous case if the data has no edge features, or will only contain edge types with edge features in the heterogeneous case. - degree_tensor: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]: Pre-computed degree tensor. Lazily computed on first access via the degree_tensor property. + degree_tensor: Optional[dict[NodeType, torch.Tensor]]: Pre-computed degree tensor keyed by node type. Lazily computed on first access via the degree_tensor property. max_labels_per_anchor_node (Optional[int]): Optional cap for how many labels to materialize per anchor node for ABLP label fetching. """ @@ -146,9 +144,7 @@ def __init__( self._node_feature_info = node_feature_info self._edge_feature_info = edge_feature_info - self._degree_tensor: Optional[ - Union[torch.Tensor, dict[EdgeType, torch.Tensor]] - ] = degree_tensor + self._degree_tensor: Optional[dict[NodeType, torch.Tensor]] = degree_tensor self._max_labels_per_anchor_node = max_labels_per_anchor_node # TODO (mkolodner-sc): Modify so that we don't need to rely on GLT's base variable naming (i.e. partition_idx, num_partitions) in favor of more clear @@ -307,13 +303,15 @@ def edge_feature_info( @property def degree_tensor( self, - ) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: + ) -> dict[NodeType, torch.Tensor]: """ - Lazily compute and return the degree tensor for the graph. + Lazily compute and return the total degree tensor per node type. On first access, computes node degrees from the graph partition and uses - all-reduce to aggregate across all machines. Requires torch.distributed - to be initialized. + all-reduce to aggregate across all machines. Degrees are summed across + all incident edge types per anchor node type before the all-reduce, so + the per-edge-type tensor is never stored. Requires torch.distributed to + be initialized. Over-counting correction (for processes sharing the same data on the same machine) is handled automatically by detecting the distributed topology. @@ -321,9 +319,9 @@ def degree_tensor( The result is cached for subsequent accesses. Returns: - Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: The aggregated degree tensor. - - For homogeneous graphs: A tensor of shape [num_nodes]. - - For heterogeneous graphs: A dict mapping EdgeType to degree tensors. + dict[NodeType, torch.Tensor]: Total degree tensors keyed by node type. + For homogeneous graphs the single entry uses + ``DEFAULT_HOMOGENEOUS_NODE_TYPE`` as its key. Raises: RuntimeError: If torch.distributed is not initialized. @@ -333,7 +331,9 @@ def degree_tensor( if self.graph is None: raise ValueError("Dataset graph is None. Cannot compute degrees.") - self._degree_tensor = compute_and_broadcast_degree_tensor(self.graph) + self._degree_tensor = compute_and_broadcast_degree_tensor( + self.graph, self._edge_dir + ) return self._degree_tensor @property @@ -902,7 +902,7 @@ def share_ipc( Optional[Union[int, dict[NodeType, int]]]: Number of test nodes on the current machine. Will be a dict if heterogeneous. Optional[Union[FeatureInfo, dict[NodeType, FeatureInfo]]]: Node feature dim and its data type, will be a dict if heterogeneous Optional[Union[FeatureInfo, dict[EdgeType, FeatureInfo]]]: Edge feature dim and its data type, will be a dict if heterogeneous - Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]: Degree tensors, will be a dict if heterogeneous + Optional[dict[NodeType, torch.Tensor]]: Degree tensors keyed by node type Optional[int]: Optional per-anchor label cap for ABLP label fetching """ # TODO (mkolodner-sc): Investigate moving share_memory calls to the build() function @@ -1188,7 +1188,7 @@ def _rebuild_distributed_dataset( Optional[ Union[FeatureInfo, dict[EdgeType, FeatureInfo]] ], # Edge feature dim and its data type - Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], # Degree tensors + Optional[dict[NodeType, torch.Tensor]], # Degree tensors Optional[int], # Optional per-anchor label cap for ABLP label fetching ], ): diff --git a/gigl/distributed/dist_ppr_sampler.py b/gigl/distributed/dist_ppr_sampler.py index 402e381c1..dc9671974 100644 --- a/gigl/distributed/dist_ppr_sampler.py +++ b/gigl/distributed/dist_ppr_sampler.py @@ -17,7 +17,7 @@ from graphlearn_torch.utils import merge_dict from gigl.distributed.base_sampler import BaseDistNeighborSampler -from gigl.types.graph import is_label_edge_type +from gigl.types.graph import DEFAULT_HOMOGENEOUS_NODE_TYPE, is_label_edge_type # Trailing "." is an intentional separator. These constants are used both to # write metadata keys (f"{KEY}{repr(edge_type)}" → e.g. "ppr_edge_index.('user', 'to', 'story')") @@ -26,14 +26,14 @@ PPR_EDGE_INDEX_METADATA_KEY = "ppr_edge_index." PPR_WEIGHT_METADATA_KEY = "ppr_weight." -# Sentinel type names for homogeneous graphs. The PPR algorithm uses -# dict[NodeType, ...] internally for both homo and hetero graphs; these -# sentinels let the homogeneous path reuse the same dict-based code. -_PPR_HOMOGENEOUS_NODE_TYPE = "ppr_homogeneous_node_type" +# Sentinel edge type for homogeneous graphs. The PPR algorithm uses +# dict[NodeType, ...] internally for both homo and hetero graphs; the +# DEFAULT_HOMOGENEOUS_NODE_TYPE sentinel lets the homogeneous path reuse +# the same dict-based code. _PPR_HOMOGENEOUS_EDGE_TYPE = ( - _PPR_HOMOGENEOUS_NODE_TYPE, + DEFAULT_HOMOGENEOUS_NODE_TYPE, "to", - _PPR_HOMOGENEOUS_NODE_TYPE, + DEFAULT_HOMOGENEOUS_NODE_TYPE, ) @@ -74,10 +74,10 @@ class DistPPRNeighborSampler(BaseDistNeighborSampler): but require more computation. Typical values: 1e-4 to 1e-6. max_ppr_nodes: Maximum number of nodes to return per seed based on PPR scores. num_neighbors_per_hop: Maximum number of neighbors to fetch per hop. - total_degree_dtype: Dtype for precomputed total-degree tensors. Defaults - to ``torch.int32``. Use a larger dtype if nodes have exceptionally high - aggregate degrees. - degree_tensors: Pre-computed degree tensors from the dataset. + degree_tensors: Pre-computed total-degree tensors (int32), keyed by NodeType. + Must be pre-computed by the caller + (e.g. via :func:`build_ppr_total_degree_tensors`) so that workers + share a single allocation rather than recomputing per-worker. """ def __init__( @@ -87,8 +87,7 @@ def __init__( eps: float = 1e-4, max_ppr_nodes: int = 50, num_neighbors_per_hop: int = 100_000, - total_degree_dtype: torch.dtype = torch.int32, - degree_tensors: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], + degree_tensors: dict[NodeType, torch.Tensor], max_fetch_iterations: Optional[int] = None, **kwargs, ): @@ -125,23 +124,17 @@ def __init__( self._node_type_to_edge_types[anchor_type].append(etype) else: - self._node_type_to_edge_types[_PPR_HOMOGENEOUS_NODE_TYPE] = [ + self._node_type_to_edge_types[DEFAULT_HOMOGENEOUS_NODE_TYPE] = [ _PPR_HOMOGENEOUS_EDGE_TYPE ] self._is_homogeneous = True - # Precompute total degree per node type: the sum of degrees across all - # edge types traversable from that node type. This is a graph-level - # property used on every PPR iteration, so computing it once at init - # avoids per-node summation and cache lookups in the hot loop. - # TODO (mkolodner-sc): This trades memory for throughput — we - # materialize a tensor per node type to avoid recomputing total degree - # on every neighbor during sampling. Computing it here (rather than in - # the dataset) also keeps the door open for edge-specific degree - # strategies. If memory becomes a bottleneck, revisit this. - self._node_type_to_total_degree: dict[NodeType, torch.Tensor] = ( - self._build_total_degree_tensors(degree_tensors, total_degree_dtype) - ) + # Total-degree tensors keyed by NodeType, pre-computed by the caller. + # Callers (create_mp_producer for colocated, SharedDistSamplingBackend + # for graph-store) run build_ppr_total_degree_tensors once in the parent + # process and place the result in shared memory so all worker processes + # map the same allocation. + self._node_type_to_total_degree: dict[NodeType, torch.Tensor] = degree_tensors # Build integer ID mappings for the C++ forward-push kernel. String # NodeType / EdgeType keys are only used at the Python boundary @@ -191,58 +184,6 @@ def __init__( for nt in all_node_types ] - def _build_total_degree_tensors( - self, - degree_tensors: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], - dtype: torch.dtype, - ) -> dict[NodeType, torch.Tensor]: - """Build total-degree tensors by summing per-edge-type degrees for each node type. - - For homogeneous graphs, the total degree is just the single degree tensor. - For heterogeneous graphs, it sums degree tensors across all edge types - traversable from each node type, padding shorter tensors with zeros. - - Args: - degree_tensors: Per-edge-type degree tensors from the dataset. - dtype: Dtype for the output tensors. - - Returns: - Dict mapping node type to a 1-D tensor of total degrees. - """ - result: dict[NodeType, torch.Tensor] = {} - - if self._is_homogeneous: - assert isinstance(degree_tensors, torch.Tensor) - # Single edge type: degree values fit directly in the target dtype. - result[_PPR_HOMOGENEOUS_NODE_TYPE] = degree_tensors.to(dtype) - else: - assert isinstance(degree_tensors, dict) - dtype_max = torch.iinfo(dtype).max - for node_type, edge_types in self._node_type_to_edge_types.items(): - max_len = 0 - for et in edge_types: - if et not in degree_tensors: - raise ValueError( - f"Edge type {et} not found in degree tensors. " - f"Available: {list(degree_tensors.keys())}" - ) - max_len = max(max_len, len(degree_tensors[et])) - - # Each degree tensor is indexed by node ID (derived from CSR - # indptr), so index i in every edge type's tensor refers to - # the same node. Element-wise summation gives the total degree - # per node across all edge types. Shorter tensors are padded - # implicitly (only the first len(et_degrees) entries are added). - # Sum in int64: aggregate degrees are bounded by partition size - # and fit comfortably within int64 range in practice. - summed = torch.zeros(max_len, dtype=torch.int64) - for et in edge_types: - et_degrees = degree_tensors[et] - summed[: len(et_degrees)] += et_degrees.to(torch.int64) - result[node_type] = summed.clamp(max=dtype_max).to(dtype) - - return result - def _get_destination_type(self, edge_type: EdgeType) -> NodeType: """Get the node type at the destination end of an edge type.""" return edge_type[0] if self.edge_dir == "in" else edge_type[-1] @@ -362,7 +303,7 @@ async def _compute_ppr_scores( valid_counts = tensor([1, 3, 2, 0]) """ if seed_node_type is None: - seed_node_type = _PPR_HOMOGENEOUS_NODE_TYPE + seed_node_type = DEFAULT_HOMOGENEOUS_NODE_TYPE device = seed_nodes.device ppr_state = PPRForwardPush( @@ -422,12 +363,12 @@ async def _compute_ppr_scores( if self._is_homogeneous: assert ( len(ntype_to_flat_ids) == 1 - and _PPR_HOMOGENEOUS_NODE_TYPE in ntype_to_flat_ids + and DEFAULT_HOMOGENEOUS_NODE_TYPE in ntype_to_flat_ids ) return ( - ntype_to_flat_ids[_PPR_HOMOGENEOUS_NODE_TYPE], - ntype_to_flat_weights[_PPR_HOMOGENEOUS_NODE_TYPE], - ntype_to_valid_counts[_PPR_HOMOGENEOUS_NODE_TYPE], + ntype_to_flat_ids[DEFAULT_HOMOGENEOUS_NODE_TYPE], + ntype_to_flat_weights[DEFAULT_HOMOGENEOUS_NODE_TYPE], + ntype_to_valid_counts[DEFAULT_HOMOGENEOUS_NODE_TYPE], ) else: return ( diff --git a/gigl/distributed/dist_sampling_producer.py b/gigl/distributed/dist_sampling_producer.py index 3a51715e2..15d29a48c 100644 --- a/gigl/distributed/dist_sampling_producer.py +++ b/gigl/distributed/dist_sampling_producer.py @@ -30,7 +30,7 @@ SamplingConfig, SamplingType, ) -from graphlearn_torch.typing import EdgeType +from graphlearn_torch.typing import NodeType from graphlearn_torch.utils import seed_everything from torch._C import _set_worker_signal_handlers from torch.utils.data.dataloader import DataLoader @@ -55,7 +55,7 @@ def _sampling_worker_loop( sampling_completed_worker_count, # mp.Value mp_barrier: Barrier, sampler_options: SamplerOptions, - degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], + degree_tensors: Optional[dict[NodeType, torch.Tensor]], ): dist_sampler = None try: @@ -180,9 +180,7 @@ def __init__( worker_options: MpDistSamplingWorkerOptions, channel: ChannelBase, sampler_options: SamplerOptions, - degree_tensors: Optional[ - Union[torch.Tensor, dict[EdgeType, torch.Tensor]] - ] = None, + degree_tensors: Optional[dict[NodeType, torch.Tensor]] = None, ): super().__init__(data, sampler_input, sampling_config, worker_options, channel) self._sampler_options = sampler_options diff --git a/gigl/distributed/graph_store/shared_dist_sampling_producer.py b/gigl/distributed/graph_store/shared_dist_sampling_producer.py index 0f7461196..b45f8deae 100644 --- a/gigl/distributed/graph_store/shared_dist_sampling_producer.py +++ b/gigl/distributed/graph_store/shared_dist_sampling_producer.py @@ -93,7 +93,7 @@ SamplingConfig, SamplingType, ) -from graphlearn_torch.typing import EdgeType +from graphlearn_torch.typing import NodeType from torch._C import _set_worker_signal_handlers from gigl.common.logger import Logger @@ -103,6 +103,7 @@ SamplerRuntime, create_dist_sampler, ) +from gigl.utils.share_memory import share_memory logger = Logger() @@ -338,7 +339,7 @@ def _shared_sampling_worker_loop( event_queue: mp.Queue, mp_barrier: Barrier, sampler_options: SamplerOptions, - degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], + degree_tensors: Optional[dict[NodeType, torch.Tensor]], ) -> None: """Run one shared graph-store worker that schedules many input channels. @@ -835,7 +836,7 @@ def __init__( worker_options: RemoteDistSamplingWorkerOptions, sampling_config: SamplingConfig, sampler_options: SamplerOptions, - degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], + degree_tensors: Optional[dict[NodeType, torch.Tensor]], ) -> None: """Initialize the shared sampling backend. @@ -871,7 +872,10 @@ def __init__( self._completed_workers: defaultdict[tuple[int, int], set[int]] = defaultdict( set ) - self._degree_tensors = degree_tensors + # Move degree tensors to shared memory so all spawned workers map the + # same allocation instead of each pickling a private copy. + self._degree_tensors: Optional[dict[NodeType, torch.Tensor]] = degree_tensors + share_memory(self._degree_tensors) def init_backend(self) -> None: """Initialize worker processes once for this backend. diff --git a/gigl/distributed/sampler_options.py b/gigl/distributed/sampler_options.py index fccd7a3ba..08cd27352 100644 --- a/gigl/distributed/sampler_options.py +++ b/gigl/distributed/sampler_options.py @@ -10,7 +10,6 @@ from dataclasses import dataclass from typing import Optional, Union -import torch from graphlearn_torch.typing import EdgeType from gigl.common.logger import Logger @@ -58,9 +57,6 @@ class PPRSamplerOptions: hub nodes receive diminishing residual per neighbor, so capping the fetch has little effect on PPR accuracy while keeping per-hop RPC cost bounded. Set large to approximate fetching all neighbors. - total_degree_dtype: Dtype for precomputed total-degree tensors. Defaults - to ``torch.int32``, which supports total degrees up to ~2 billion. - Use a larger dtype if nodes have exceptionally high aggregate degrees. max_fetch_iterations: Maximum number of iterations that issue RPC neighbor fetches. After this many fetch iterations, subsequent iterations push residuals using only already-cached neighbor lists (no new RPCs). @@ -73,7 +69,6 @@ class PPRSamplerOptions: eps: float = 1e-4 max_ppr_nodes: int = 50 num_neighbors_per_hop: int = 1_000 - total_degree_dtype: torch.dtype = torch.int32 max_fetch_iterations: Optional[int] = None diff --git a/gigl/distributed/utils/degree.py b/gigl/distributed/utils/degree.py index 7374f53ed..d33ec74f0 100644 --- a/gigl/distributed/utils/degree.py +++ b/gigl/distributed/utils/degree.py @@ -5,8 +5,9 @@ and aggregate them across distributed machines. Degrees are computed from the CSR (Compressed Sparse Row) topology stored in GraphLearn-Torch Graph objects. -Note: Degree tensors are not moved to shared memory and may be duplicated across -processes on the same machine. +Degrees are accumulated per anchor node type (summing across all edge types +incident to that node type) before the distributed all-reduce, so callers +receive ``dict[NodeType, torch.Tensor]`` directly with no further conversion. Requirements ============ @@ -27,24 +28,28 @@ import torch from graphlearn_torch.data import Graph +from graphlearn_torch.typing import NodeType from torch_geometric.typing import EdgeType from gigl.common.logger import Logger from gigl.distributed.utils.device import get_device_from_process_group from gigl.distributed.utils.networking import get_internal_ip_from_all_ranks -from gigl.types.graph import is_label_edge_type +from gigl.types.graph import DEFAULT_HOMOGENEOUS_NODE_TYPE, is_label_edge_type logger = Logger() def compute_and_broadcast_degree_tensor( graph: Union[Graph, dict[EdgeType, Graph]], -) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: - """ - Compute node degrees from a graph and aggregate across all machines. + edge_dir: str, +) -> dict[NodeType, torch.Tensor]: + """Compute node degrees from a graph and aggregate across all machines. - Computes degrees from the CSR row pointers (indptr) and performs all-reduce - to aggregate across ranks. + For each non-label edge type, degrees are derived from the CSR row pointers + (indptr). For heterogeneous graphs, degrees are summed across all edge types + incident to each anchor node type **locally** before the all-reduce, so the + per-edge-type tensor is only a transient intermediate and is never stored, + returned, or transmitted over RPC. Over-counting correction (for processes sharing the same data) is handled automatically by detecting the distributed topology. @@ -52,13 +57,16 @@ def compute_and_broadcast_degree_tensor( Args: graph: A Graph (homogeneous) or dict[EdgeType, Graph] (heterogeneous). For heterogeneous graphs, label edge types are automatically excluded - from the computation — they are supervision edges and should not - contribute to node degree for graph traversal algorithms like PPR. + — they are supervision edges and should not contribute to node degree + for graph traversal algorithms like PPR. + edge_dir: Sampling direction — ``"in"`` or ``"out"``. Determines which + end of each edge is the anchor node type for degree accumulation. Returns: - Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: The aggregated degree tensors. - - For homogeneous graphs: A tensor of shape [num_nodes]. - - For heterogeneous graphs: A dict mapping non-label EdgeType to degree tensors. + dict[NodeType, torch.Tensor]: Aggregated degree tensors keyed by node + type. For homogeneous graphs the single entry uses + ``DEFAULT_HOMOGENEOUS_NODE_TYPE`` as its key. Values are int32 + tensors of shape ``[num_nodes_of_that_type]``. Raises: RuntimeError: If torch.distributed is not initialized. @@ -69,52 +77,50 @@ def compute_and_broadcast_degree_tensor( "compute_and_broadcast_degree_tensor requires torch.distributed to be initialized." ) - # Compute local degrees from graph topology + local_dict: dict[NodeType, torch.Tensor] = {} + if isinstance(graph, Graph): topo = graph.topo if topo is None or topo.indptr is None: raise ValueError("Topology/indptr not available for graph.") - local_degrees: Union[torch.Tensor, dict[EdgeType, torch.Tensor]] = ( - _compute_degrees_from_indptr(topo.indptr) + local_dict[DEFAULT_HOMOGENEOUS_NODE_TYPE] = _compute_degrees_from_indptr( + topo.indptr ) else: - local_dict: dict[EdgeType, torch.Tensor] = {} for edge_type, edge_graph in graph.items(): - # Label edge types are supervision edges and should not contribute - # to node degree for graph traversal algorithms like PPR. if is_label_edge_type(edge_type): continue + anchor_type: NodeType = edge_type[-1] if edge_dir == "in" else edge_type[0] topo = edge_graph.topo if topo is None or topo.indptr is None: logger.warning( f"Topology/indptr not available for edge type {edge_type}, using empty tensor." ) - local_dict[edge_type] = torch.empty(0, dtype=torch.int16) + degrees = torch.empty(0, dtype=torch.int32) else: - local_dict[edge_type] = _compute_degrees_from_indptr(topo.indptr) - local_degrees = local_dict + degrees = _compute_degrees_from_indptr(topo.indptr) + + if anchor_type in local_dict: + existing = local_dict[anchor_type] + max_len = max(len(existing), len(degrees)) + summed = _pad_to_size(existing, max_len).to(torch.int64) + summed[: len(degrees)] += degrees.to(torch.int64) + local_dict[anchor_type] = summed.to(torch.int32) + else: + local_dict[anchor_type] = degrees - # All-reduce across ranks (over-counting correction handled internally) - result = _all_reduce_degrees(local_degrees) + result = _all_reduce_degrees(local_dict) - # Log results - if isinstance(result, torch.Tensor): - if result.numel() > 0: + for node_type, degrees in result.items(): + if degrees.numel() > 0: logger.info( - f"{result.size(0)} nodes, max={result.max().item()}, min={result.min().item()}" + f"{node_type}: {degrees.size(0)} nodes, " + f"max={degrees.max().item()}, min={degrees.min().item()}" ) else: - logger.info("Graph contained 0 nodes when computing degrees") - else: - for edge_type, degrees in result.items(): - if degrees.numel() > 0: - logger.info( - f"{edge_type}: {degrees.size(0)} nodes, max={degrees.max().item()}, min={degrees.min().item()}" - ) - else: - logger.info( - f"Graph contained 0 nodes for edge type {edge_type} when computing degrees" - ) + logger.info( + f"Graph contained 0 nodes for node type {node_type} when computing degrees" + ) return result @@ -131,33 +137,25 @@ def _pad_to_size(tensor: torch.Tensor, target_size: int) -> torch.Tensor: return torch.cat([tensor, padding]) -def _clamp_to_int16(tensor: torch.Tensor) -> torch.Tensor: - """Clamp tensor values to int16 max and convert dtype.""" - max_int16 = torch.iinfo(torch.int16).max - return tensor.clamp(max=max_int16).to(torch.int16) - - def _compute_degrees_from_indptr(indptr: torch.Tensor) -> torch.Tensor: """Compute degrees from CSR row pointers: degree[i] = indptr[i+1] - indptr[i].""" - return (indptr[1:] - indptr[:-1]).to(torch.int16) + return (indptr[1:] - indptr[:-1]).to(torch.int32) def _all_reduce_degrees( - local_degrees: Union[torch.Tensor, dict[EdgeType, torch.Tensor]], -) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: - """All-reduce degree tensors across ranks, handling both homogeneous and heterogeneous cases. + local_degrees: dict[NodeType, torch.Tensor], +) -> dict[NodeType, torch.Tensor]: + """All-reduce degree tensors across ranks. - For heterogeneous graphs, iterates over the edge types in local_degrees. All partitions - are expected to have entries for all edge types (even if some have empty tensors). - - Moves tensors to GPU for the all-reduce if using NCCL backend (which requires CUDA), - otherwise keeps tensors on CPU (for Gloo backend). + Moves tensors to GPU for the all-reduce if using NCCL backend (which + requires CUDA), otherwise keeps tensors on CPU (for Gloo backend). Over-counting correction: - In distributed training, multiple processes on the same machine often share the - same graph partition data (via shared memory). When we all-reduce degrees, each - process contributes its "local" degrees - but if 4 processes on one machine all - read the same partition, that partition's degrees get summed 4 times instead of 1. + In distributed training, multiple processes on the same machine often + share the same graph partition data (via shared memory). When we + all-reduce degrees, each process contributes its "local" degrees — but + if 4 processes on one machine all read the same partition, that + partition's degrees get summed 4 times instead of 1. Example: Machine A has 2 processes sharing partition with degrees [3, 5, 2]. Machine B has 2 processes sharing partition with degrees [1, 4, 6]. @@ -168,16 +166,16 @@ def _all_reduce_degrees( With correction: divide by local_world_size (2 per machine) = [4, 9, 8] (correct: [3+1, 5+4, 2+6]) - This function detects how many processes share the same machine by comparing - IP addresses, then divides by that count to correct the over-counting. + This function detects how many processes share the same machine by + comparing IP addresses, then divides by that count to correct the + over-counting. Args: - local_degrees: Either a single tensor (homogeneous) or dict mapping EdgeType - to tensors (heterogeneous). For heterogeneous graphs, all partitions must - have entries for all edge types. + local_degrees: Dict mapping NodeType to local degree tensors. + All partitions must have entries for all node types. Returns: - Aggregated degree tensors in the same format as input. + Aggregated degree tensors keyed by NodeType. Raises: RuntimeError: If torch.distributed is not initialized. @@ -187,38 +185,25 @@ def _all_reduce_degrees( "_all_reduce_degrees requires torch.distributed to be initialized." ) - # Compute local_world_size: number of processes on the same machine sharing data all_ips = get_internal_ip_from_all_ranks() my_rank = torch.distributed.get_rank() my_ip = all_ips[my_rank] local_world_size = Counter(all_ips)[my_ip] - # NCCL backend requires CUDA tensors; Gloo works with CPU device = get_device_from_process_group() def reduce_tensor(tensor: torch.Tensor) -> torch.Tensor: """All-reduce a single tensor with size sync and over-counting correction.""" - # Synchronize max size across all ranks local_size = torch.tensor([tensor.size(0)], dtype=torch.long, device=device) torch.distributed.all_reduce(local_size, op=torch.distributed.ReduceOp.MAX) max_size = int(local_size.item()) - # Pad, convert to int64 (all_reduce doesn't support int16), move to device padded = _pad_to_size(tensor, max_size).to(torch.int64).to(device) torch.distributed.all_reduce(padded, op=torch.distributed.ReduceOp.SUM) - # Correct for over-counting, move back to CPU, and clamp to int16 - # TODO (mkolodner-sc): Potentially want to paramaterize this in the future if we want degrees higher than the int16 max. - return _clamp_to_int16((padded // local_world_size).cpu()) - - # Homogeneous case - if isinstance(local_degrees, torch.Tensor): - return reduce_tensor(local_degrees) - - # Heterogeneous case: all-reduce each edge type - # Sort edge types for deterministic ordering across ranks - result: dict[EdgeType, torch.Tensor] = {} - for edge_type in sorted(local_degrees.keys()): - result[edge_type] = reduce_tensor(local_degrees[edge_type]) + return (padded // local_world_size).to(torch.int32).cpu() + result: dict[NodeType, torch.Tensor] = {} + for node_type in sorted(local_degrees.keys()): + result[node_type] = reduce_tensor(local_degrees[node_type]) return result diff --git a/gigl/distributed/utils/dist_sampler.py b/gigl/distributed/utils/dist_sampler.py index 0333f4138..db5dba1af 100644 --- a/gigl/distributed/utils/dist_sampler.py +++ b/gigl/distributed/utils/dist_sampler.py @@ -10,7 +10,7 @@ RemoteDistSamplingWorkerOptions, ) from graphlearn_torch.sampler import EdgeSamplerInput, NodeSamplerInput, SamplingConfig -from graphlearn_torch.typing import EdgeType +from graphlearn_torch.typing import NodeType from gigl.distributed.dist_neighbor_sampler import DistNeighborSampler from gigl.distributed.dist_ppr_sampler import DistPPRNeighborSampler @@ -35,7 +35,7 @@ def create_dist_sampler( worker_options: Union[MpDistSamplingWorkerOptions, RemoteDistSamplingWorkerOptions], channel: ChannelBase, sampler_options: SamplerOptions, - degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], + degree_tensors: Optional[dict[NodeType, torch.Tensor]], current_device: torch.device, ) -> SamplerRuntime: """Create a GiGL sampler runtime for one channel on one worker. @@ -84,7 +84,6 @@ def create_dist_sampler( max_ppr_nodes=sampler_options.max_ppr_nodes, max_fetch_iterations=sampler_options.max_fetch_iterations, num_neighbors_per_hop=sampler_options.num_neighbors_per_hop, - total_degree_dtype=sampler_options.total_degree_dtype, degree_tensors=degree_tensors, ) else: