Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions gigl/distributed/base_dist_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,16 +426,6 @@ def create_mp_producer(
channel = BaseDistLoader.create_colocated_channel(worker_options)
if isinstance(sampler_options, PPRSamplerOptions):
degree_tensors = dataset.degree_tensor
if isinstance(degree_tensors, dict):
logger.info(
f"Pre-computed degree tensors for PPR sampling across "
f"{len(degree_tensors)} edge types."
)
else:
logger.info(
f"Pre-computed degree tensor for PPR sampling with "
f"{degree_tensors.size(0)} nodes."
)
else:
degree_tensors = None
return DistSamplingProducer(
Expand Down
34 changes: 17 additions & 17 deletions gigl/distributed/dist_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ def __init__(
edge_feature_info: Optional[
Union[FeatureInfo, dict[EdgeType, FeatureInfo]]
] = None,
degree_tensor: Optional[
Union[torch.Tensor, dict[EdgeType, torch.Tensor]]
] = None,
degree_tensor: Optional[dict[NodeType, torch.Tensor]] = None,
max_labels_per_anchor_node: Optional[int] = None,
) -> None:
"""
Expand All @@ -108,7 +106,7 @@ def __init__(
Note this will be None in the homogeneous case if the data has no node features, or will only contain node types with node features in the heterogeneous case.
edge_feature_info: Optional[Union[FeatureInfo, dict[EdgeType, FeatureInfo]]]: Dimension of edge features and its data type, will be a dict if heterogeneous.
Note this will be None in the homogeneous case if the data has no edge features, or will only contain edge types with edge features in the heterogeneous case.
degree_tensor: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]: Pre-computed degree tensor. Lazily computed on first access via the degree_tensor property.
degree_tensor: Optional[dict[NodeType, torch.Tensor]]: Pre-computed degree tensor keyed by node type. Lazily computed on first access via the degree_tensor property.
max_labels_per_anchor_node (Optional[int]): Optional cap for how many
labels to materialize per anchor node for ABLP label fetching.
"""
Expand Down Expand Up @@ -146,9 +144,7 @@ def __init__(
self._node_feature_info = node_feature_info
self._edge_feature_info = edge_feature_info

self._degree_tensor: Optional[
Union[torch.Tensor, dict[EdgeType, torch.Tensor]]
] = degree_tensor
self._degree_tensor: Optional[dict[NodeType, torch.Tensor]] = degree_tensor
self._max_labels_per_anchor_node = max_labels_per_anchor_node

# TODO (mkolodner-sc): Modify so that we don't need to rely on GLT's base variable naming (i.e. partition_idx, num_partitions) in favor of more clear
Expand Down Expand Up @@ -307,23 +303,25 @@ def edge_feature_info(
@property
def degree_tensor(
self,
) -> Union[torch.Tensor, dict[EdgeType, torch.Tensor]]:
) -> dict[NodeType, torch.Tensor]:
"""
Lazily compute and return the degree tensor for the graph.
Lazily compute and return the total degree tensor per node type.

On first access, computes node degrees from the graph partition and uses
all-reduce to aggregate across all machines. Requires torch.distributed
to be initialized.
all-reduce to aggregate across all machines. Degrees are summed across
all incident edge types per anchor node type before the all-reduce, so
the per-edge-type tensor is never stored. Requires torch.distributed to
be initialized.

Over-counting correction (for processes sharing the same data on the same
machine) is handled automatically by detecting the distributed topology.

The result is cached for subsequent accesses.

Returns:
Union[torch.Tensor, dict[EdgeType, torch.Tensor]]: The aggregated degree tensor.
- For homogeneous graphs: A tensor of shape [num_nodes].
- For heterogeneous graphs: A dict mapping EdgeType to degree tensors.
dict[NodeType, torch.Tensor]: Total degree tensors keyed by node type.
For homogeneous graphs the single entry uses
``DEFAULT_HOMOGENEOUS_NODE_TYPE`` as its key.

Raises:
RuntimeError: If torch.distributed is not initialized.
Expand All @@ -333,7 +331,9 @@ def degree_tensor(
if self.graph is None:
raise ValueError("Dataset graph is None. Cannot compute degrees.")

self._degree_tensor = compute_and_broadcast_degree_tensor(self.graph)
self._degree_tensor = compute_and_broadcast_degree_tensor(
self.graph, self._edge_dir
)
return self._degree_tensor

@property
Expand Down Expand Up @@ -902,7 +902,7 @@ def share_ipc(
Optional[Union[int, dict[NodeType, int]]]: Number of test nodes on the current machine. Will be a dict if heterogeneous.
Optional[Union[FeatureInfo, dict[NodeType, FeatureInfo]]]: Node feature dim and its data type, will be a dict if heterogeneous
Optional[Union[FeatureInfo, dict[EdgeType, FeatureInfo]]]: Edge feature dim and its data type, will be a dict if heterogeneous
Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]]: Degree tensors, will be a dict if heterogeneous
Optional[dict[NodeType, torch.Tensor]]: Degree tensors keyed by node type
Optional[int]: Optional per-anchor label cap for ABLP label fetching
"""
# TODO (mkolodner-sc): Investigate moving share_memory calls to the build() function
Expand Down Expand Up @@ -1188,7 +1188,7 @@ def _rebuild_distributed_dataset(
Optional[
Union[FeatureInfo, dict[EdgeType, FeatureInfo]]
], # Edge feature dim and its data type
Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]], # Degree tensors
Optional[dict[NodeType, torch.Tensor]], # Degree tensors
Optional[int], # Optional per-anchor label cap for ABLP label fetching
],
):
Expand Down
107 changes: 24 additions & 83 deletions gigl/distributed/dist_ppr_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from graphlearn_torch.utils import merge_dict

from gigl.distributed.base_sampler import BaseDistNeighborSampler
from gigl.types.graph import is_label_edge_type
from gigl.types.graph import DEFAULT_HOMOGENEOUS_NODE_TYPE, is_label_edge_type

# Trailing "." is an intentional separator. These constants are used both to
# write metadata keys (f"{KEY}{repr(edge_type)}" → e.g. "ppr_edge_index.('user', 'to', 'story')")
Expand All @@ -26,14 +26,14 @@
PPR_EDGE_INDEX_METADATA_KEY = "ppr_edge_index."
PPR_WEIGHT_METADATA_KEY = "ppr_weight."

# Sentinel type names for homogeneous graphs. The PPR algorithm uses
# dict[NodeType, ...] internally for both homo and hetero graphs; these
# sentinels let the homogeneous path reuse the same dict-based code.
_PPR_HOMOGENEOUS_NODE_TYPE = "ppr_homogeneous_node_type"
# Sentinel edge type for homogeneous graphs. The PPR algorithm uses
# dict[NodeType, ...] internally for both homo and hetero graphs; the
# DEFAULT_HOMOGENEOUS_NODE_TYPE sentinel lets the homogeneous path reuse
# the same dict-based code.
_PPR_HOMOGENEOUS_EDGE_TYPE = (
_PPR_HOMOGENEOUS_NODE_TYPE,
DEFAULT_HOMOGENEOUS_NODE_TYPE,
"to",
_PPR_HOMOGENEOUS_NODE_TYPE,
DEFAULT_HOMOGENEOUS_NODE_TYPE,
)


Expand Down Expand Up @@ -74,10 +74,10 @@ class DistPPRNeighborSampler(BaseDistNeighborSampler):
but require more computation. Typical values: 1e-4 to 1e-6.
max_ppr_nodes: Maximum number of nodes to return per seed based on PPR scores.
num_neighbors_per_hop: Maximum number of neighbors to fetch per hop.
total_degree_dtype: Dtype for precomputed total-degree tensors. Defaults
to ``torch.int32``. Use a larger dtype if nodes have exceptionally high
aggregate degrees.
degree_tensors: Pre-computed degree tensors from the dataset.
degree_tensors: Pre-computed total-degree tensors (int32), keyed by NodeType.
Must be pre-computed by the caller
(e.g. via :func:`build_ppr_total_degree_tensors`) so that workers
share a single allocation rather than recomputing per-worker.
"""

def __init__(
Expand All @@ -87,8 +87,7 @@ def __init__(
eps: float = 1e-4,
max_ppr_nodes: int = 50,
num_neighbors_per_hop: int = 100_000,
total_degree_dtype: torch.dtype = torch.int32,
degree_tensors: Union[torch.Tensor, dict[EdgeType, torch.Tensor]],
degree_tensors: dict[NodeType, torch.Tensor],
max_fetch_iterations: Optional[int] = None,
**kwargs,
):
Expand Down Expand Up @@ -125,23 +124,17 @@ def __init__(

self._node_type_to_edge_types[anchor_type].append(etype)
else:
self._node_type_to_edge_types[_PPR_HOMOGENEOUS_NODE_TYPE] = [
self._node_type_to_edge_types[DEFAULT_HOMOGENEOUS_NODE_TYPE] = [
_PPR_HOMOGENEOUS_EDGE_TYPE
]
self._is_homogeneous = True

# Precompute total degree per node type: the sum of degrees across all
# edge types traversable from that node type. This is a graph-level
# property used on every PPR iteration, so computing it once at init
# avoids per-node summation and cache lookups in the hot loop.
# TODO (mkolodner-sc): This trades memory for throughput — we
# materialize a tensor per node type to avoid recomputing total degree
# on every neighbor during sampling. Computing it here (rather than in
# the dataset) also keeps the door open for edge-specific degree
# strategies. If memory becomes a bottleneck, revisit this.
self._node_type_to_total_degree: dict[NodeType, torch.Tensor] = (
self._build_total_degree_tensors(degree_tensors, total_degree_dtype)
)
# Total-degree tensors keyed by NodeType, pre-computed by the caller.
# Callers (create_mp_producer for colocated, SharedDistSamplingBackend
# for graph-store) run build_ppr_total_degree_tensors once in the parent
# process and place the result in shared memory so all worker processes
# map the same allocation.
self._node_type_to_total_degree: dict[NodeType, torch.Tensor] = degree_tensors

# Build integer ID mappings for the C++ forward-push kernel. String
# NodeType / EdgeType keys are only used at the Python boundary
Expand Down Expand Up @@ -191,58 +184,6 @@ def __init__(
for nt in all_node_types
]

def _build_total_degree_tensors(
self,
degree_tensors: Union[torch.Tensor, dict[EdgeType, torch.Tensor]],
dtype: torch.dtype,
) -> dict[NodeType, torch.Tensor]:
"""Build total-degree tensors by summing per-edge-type degrees for each node type.

For homogeneous graphs, the total degree is just the single degree tensor.
For heterogeneous graphs, it sums degree tensors across all edge types
traversable from each node type, padding shorter tensors with zeros.

Args:
degree_tensors: Per-edge-type degree tensors from the dataset.
dtype: Dtype for the output tensors.

Returns:
Dict mapping node type to a 1-D tensor of total degrees.
"""
result: dict[NodeType, torch.Tensor] = {}

if self._is_homogeneous:
assert isinstance(degree_tensors, torch.Tensor)
# Single edge type: degree values fit directly in the target dtype.
result[_PPR_HOMOGENEOUS_NODE_TYPE] = degree_tensors.to(dtype)
else:
assert isinstance(degree_tensors, dict)
dtype_max = torch.iinfo(dtype).max
for node_type, edge_types in self._node_type_to_edge_types.items():
max_len = 0
for et in edge_types:
if et not in degree_tensors:
raise ValueError(
f"Edge type {et} not found in degree tensors. "
f"Available: {list(degree_tensors.keys())}"
)
max_len = max(max_len, len(degree_tensors[et]))

# Each degree tensor is indexed by node ID (derived from CSR
# indptr), so index i in every edge type's tensor refers to
# the same node. Element-wise summation gives the total degree
# per node across all edge types. Shorter tensors are padded
# implicitly (only the first len(et_degrees) entries are added).
# Sum in int64: aggregate degrees are bounded by partition size
# and fit comfortably within int64 range in practice.
summed = torch.zeros(max_len, dtype=torch.int64)
for et in edge_types:
et_degrees = degree_tensors[et]
summed[: len(et_degrees)] += et_degrees.to(torch.int64)
result[node_type] = summed.clamp(max=dtype_max).to(dtype)

return result

def _get_destination_type(self, edge_type: EdgeType) -> NodeType:
"""Get the node type at the destination end of an edge type."""
return edge_type[0] if self.edge_dir == "in" else edge_type[-1]
Expand Down Expand Up @@ -362,7 +303,7 @@ async def _compute_ppr_scores(
valid_counts = tensor([1, 3, 2, 0])
"""
if seed_node_type is None:
seed_node_type = _PPR_HOMOGENEOUS_NODE_TYPE
seed_node_type = DEFAULT_HOMOGENEOUS_NODE_TYPE
device = seed_nodes.device

ppr_state = PPRForwardPush(
Expand Down Expand Up @@ -422,12 +363,12 @@ async def _compute_ppr_scores(
if self._is_homogeneous:
assert (
len(ntype_to_flat_ids) == 1
and _PPR_HOMOGENEOUS_NODE_TYPE in ntype_to_flat_ids
and DEFAULT_HOMOGENEOUS_NODE_TYPE in ntype_to_flat_ids
)
return (
ntype_to_flat_ids[_PPR_HOMOGENEOUS_NODE_TYPE],
ntype_to_flat_weights[_PPR_HOMOGENEOUS_NODE_TYPE],
ntype_to_valid_counts[_PPR_HOMOGENEOUS_NODE_TYPE],
ntype_to_flat_ids[DEFAULT_HOMOGENEOUS_NODE_TYPE],
ntype_to_flat_weights[DEFAULT_HOMOGENEOUS_NODE_TYPE],
ntype_to_valid_counts[DEFAULT_HOMOGENEOUS_NODE_TYPE],
)
else:
return (
Expand Down
8 changes: 3 additions & 5 deletions gigl/distributed/dist_sampling_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SamplingConfig,
SamplingType,
)
from graphlearn_torch.typing import EdgeType
from graphlearn_torch.typing import NodeType
from graphlearn_torch.utils import seed_everything
from torch._C import _set_worker_signal_handlers
from torch.utils.data.dataloader import DataLoader
Expand All @@ -55,7 +55,7 @@ def _sampling_worker_loop(
sampling_completed_worker_count, # mp.Value
mp_barrier: Barrier,
sampler_options: SamplerOptions,
degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]],
degree_tensors: Optional[dict[NodeType, torch.Tensor]],
):
dist_sampler = None
try:
Expand Down Expand Up @@ -180,9 +180,7 @@ def __init__(
worker_options: MpDistSamplingWorkerOptions,
channel: ChannelBase,
sampler_options: SamplerOptions,
degree_tensors: Optional[
Union[torch.Tensor, dict[EdgeType, torch.Tensor]]
] = None,
degree_tensors: Optional[dict[NodeType, torch.Tensor]] = None,
):
super().__init__(data, sampler_input, sampling_config, worker_options, channel)
self._sampler_options = sampler_options
Expand Down
12 changes: 8 additions & 4 deletions gigl/distributed/graph_store/shared_dist_sampling_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
SamplingConfig,
SamplingType,
)
from graphlearn_torch.typing import EdgeType
from graphlearn_torch.typing import NodeType
from torch._C import _set_worker_signal_handlers

from gigl.common.logger import Logger
Expand All @@ -103,6 +103,7 @@
SamplerRuntime,
create_dist_sampler,
)
from gigl.utils.share_memory import share_memory

logger = Logger()

Expand Down Expand Up @@ -338,7 +339,7 @@ def _shared_sampling_worker_loop(
event_queue: mp.Queue,
mp_barrier: Barrier,
sampler_options: SamplerOptions,
degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]],
degree_tensors: Optional[dict[NodeType, torch.Tensor]],
) -> None:
"""Run one shared graph-store worker that schedules many input channels.

Expand Down Expand Up @@ -835,7 +836,7 @@ def __init__(
worker_options: RemoteDistSamplingWorkerOptions,
sampling_config: SamplingConfig,
sampler_options: SamplerOptions,
degree_tensors: Optional[Union[torch.Tensor, dict[EdgeType, torch.Tensor]]],
degree_tensors: Optional[dict[NodeType, torch.Tensor]],
) -> None:
"""Initialize the shared sampling backend.

Expand Down Expand Up @@ -871,7 +872,10 @@ def __init__(
self._completed_workers: defaultdict[tuple[int, int], set[int]] = defaultdict(
set
)
self._degree_tensors = degree_tensors
# Move degree tensors to shared memory so all spawned workers map the
# same allocation instead of each pickling a private copy.
self._degree_tensors: Optional[dict[NodeType, torch.Tensor]] = degree_tensors
share_memory(self._degree_tensors)

def init_backend(self) -> None:
"""Initialize worker processes once for this backend.
Expand Down
5 changes: 0 additions & 5 deletions gigl/distributed/sampler_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from dataclasses import dataclass
from typing import Optional, Union

import torch
from graphlearn_torch.typing import EdgeType

from gigl.common.logger import Logger
Expand Down Expand Up @@ -58,9 +57,6 @@ class PPRSamplerOptions:
hub nodes receive diminishing residual per neighbor, so capping the fetch
has little effect on PPR accuracy while keeping per-hop RPC cost bounded.
Set large to approximate fetching all neighbors.
total_degree_dtype: Dtype for precomputed total-degree tensors. Defaults
to ``torch.int32``, which supports total degrees up to ~2 billion.
Use a larger dtype if nodes have exceptionally high aggregate degrees.
max_fetch_iterations: Maximum number of iterations that issue RPC neighbor
fetches. After this many fetch iterations, subsequent iterations push
residuals using only already-cached neighbor lists (no new RPCs).
Expand All @@ -73,7 +69,6 @@ class PPRSamplerOptions:
eps: float = 1e-4
max_ppr_nodes: int = 50
num_neighbors_per_hop: int = 1_000
total_degree_dtype: torch.dtype = torch.int32
max_fetch_iterations: Optional[int] = None


Expand Down
Loading