From 6cef4a67de1aee7ef265a82a32a0ff41c8489264 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Wed, 26 Nov 2025 22:57:49 +0000 Subject: [PATCH 1/7] Align the counts for allreduce nvls kernel --- apps/nccl/src/allreduce.cu | 12 +++++++++++- test/torch/correctness_test.py | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/apps/nccl/src/allreduce.cu b/apps/nccl/src/allreduce.cu index d5b8200a4..b8a1d0e41 100644 --- a/apps/nccl/src/allreduce.cu +++ b/apps/nccl/src/allreduce.cu @@ -250,6 +250,7 @@ ncclResult_t AllreducePacket::allreduceKernelFunc(const std::shared_ptrscratchBuffer_.get(), output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr, channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, count, stream, deviceFlag7_.get(), @@ -443,6 +444,15 @@ ncclResult_t AllreduceNvlsWithCopy::allreduceKernelFunc(const std::shared_ptr>&) { + // NVLS kernels require that (size / nRanksPerNode) is aligned to 16 bytes + size_t elemSize = ncclTypeSize(dtype); + size_t nRanksPerNode = ctx->nRanksPerNode; + size_t alignmentBytes = 16; + + // Calculate the minimum element count that ensures per-rank size is 16-byte aligned + size_t elementsPerRankAlign = (alignmentBytes * nRanksPerNode) / elemSize; + size_t alignedCount = ((count + elementsPerRankAlign - 1) / elementsPerRankAlign) * elementsPerRankAlign; + AllreduceFunc allreduce = dispatch(ncclSum, dtype); if (!allreduce) { WARN("Unsupported operation or data type for allreduce, dtype=%d", dtype); @@ -451,7 +461,7 @@ ncclResult_t AllreduceNvlsWithCopy::allreduceKernelFunc(const std::shared_ptrscratchBuffer_.get(), output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, ctx->rank, - ctx->nRanksPerNode, ctx->workSize, count, stream, nullptr, nullptr, nullptr, 0); + ctx->nRanksPerNode, ctx->workSize, alignedCount, stream, nullptr, nullptr, nullptr, 0); if (error != cudaSuccess) { WARN("AllreduceNvlsWithCopy failed with error: %s", cudaGetErrorString(error)); return ncclUnhandledCudaError; diff --git a/test/torch/correctness_test.py b/test/torch/correctness_test.py index ca50064e2..f23cd7cd3 100644 --- a/test/torch/correctness_test.py +++ b/test/torch/correctness_test.py @@ -65,7 +65,7 @@ def _init_dist(): rank = int(os.environ["RANK"]) world_size = int(os.environ["WORLD_SIZE"]) local_rank = int(os.environ.get("LOCAL_RANK", os.environ["RANK"])) - dist.init_process_group(backend=backend, rank=rank, world_size=world_size, device_id=local_rank) + dist.init_process_group(backend=backend, rank=rank, world_size=world_size, device_id=torch.device(f"cuda:{local_rank}")) torch.cuda.set_device(local_rank) From eb162ca6a17e8b8223a84565f1d03133fc52e92a Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Wed, 26 Nov 2025 23:25:24 +0000 Subject: [PATCH 2/7] Use getDataTypeSize instead of ncclTypeSize --- apps/nccl/src/allreduce.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/nccl/src/allreduce.cu b/apps/nccl/src/allreduce.cu index 3a63e7aea..e070a0ae1 100644 --- a/apps/nccl/src/allreduce.cu +++ b/apps/nccl/src/allreduce.cu @@ -448,7 +448,7 @@ ncclResult_t AllreduceNvlsWithCopy::allreduceKernelFunc(const std::shared_ptr>&) { // NVLS kernels require that (size / nRanksPerNode) is aligned to 16 bytes - size_t elemSize = ncclTypeSize(dtype); + size_t elemSize = getDataTypeSize(dtype); size_t nRanksPerNode = ctx->nRanksPerNode; size_t alignmentBytes = 16; From 677f014cf056ee8a1d2c09340d0a20bc246faa51 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Mon, 1 Dec 2025 20:00:58 +0000 Subject: [PATCH 3/7] Remove empty line --- apps/nccl/src/allreduce.cu | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/nccl/src/allreduce.cu b/apps/nccl/src/allreduce.cu index e070a0ae1..cf25f4edf 100644 --- a/apps/nccl/src/allreduce.cu +++ b/apps/nccl/src/allreduce.cu @@ -252,7 +252,6 @@ ncclResult_t AllreducePacket::allreduceKernelFunc(const std::shared_ptr(dtype)); return ncclInvalidArgument; } - cudaError_t error = allreduce(input, this->scratchBuffer_.get(), output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr, channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, count, stream, deviceFlag7_.get(), From 094ca80ddccfb8c094a520f3ade955d573c14053 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Mon, 1 Dec 2025 20:11:17 +0000 Subject: [PATCH 4/7] Update pylint --- test/torch/correctness_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/torch/correctness_test.py b/test/torch/correctness_test.py index f23cd7cd3..6fbd1f912 100644 --- a/test/torch/correctness_test.py +++ b/test/torch/correctness_test.py @@ -65,7 +65,9 @@ def _init_dist(): rank = int(os.environ["RANK"]) world_size = int(os.environ["WORLD_SIZE"]) local_rank = int(os.environ.get("LOCAL_RANK", os.environ["RANK"])) - dist.init_process_group(backend=backend, rank=rank, world_size=world_size, device_id=torch.device(f"cuda:{local_rank}")) + dist.init_process_group( + backend=backend, rank=rank, world_size=world_size, device_id=torch.device(f"cuda:{local_rank}") + ) torch.cuda.set_device(local_rank) From e86683422f183bfff4961be2419c8bd38179eba1 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Thu, 4 Dec 2025 22:12:56 +0000 Subject: [PATCH 5/7] Add padding to make the total size divisible by (nRanksPerNode * alignment) in allreduce11 --- apps/nccl/src/allreduce.cu | 11 +---------- apps/nccl/src/allreduce.hpp | 38 +++++++++++++++++++++++++++++++++---- src/include/logger.hpp | 1 + 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/apps/nccl/src/allreduce.cu b/apps/nccl/src/allreduce.cu index cf25f4edf..257db2ccc 100644 --- a/apps/nccl/src/allreduce.cu +++ b/apps/nccl/src/allreduce.cu @@ -446,15 +446,6 @@ ncclResult_t AllreduceNvlsWithCopy::allreduceKernelFunc(const std::shared_ptr>&) { - // NVLS kernels require that (size / nRanksPerNode) is aligned to 16 bytes - size_t elemSize = getDataTypeSize(dtype); - size_t nRanksPerNode = ctx->nRanksPerNode; - size_t alignmentBytes = 16; - - // Calculate the minimum element count that ensures per-rank size is 16-byte aligned - size_t elementsPerRankAlign = (alignmentBytes * nRanksPerNode) / elemSize; - size_t alignedCount = ((count + elementsPerRankAlign - 1) / elementsPerRankAlign) * elementsPerRankAlign; - AllreduceFunc allreduce = dispatch(ncclSum, dtype); if (!allreduce) { WARN("Unsupported operation or data type for allreduce, dtype=%d", static_cast(dtype)); @@ -463,7 +454,7 @@ ncclResult_t AllreduceNvlsWithCopy::allreduceKernelFunc(const std::shared_ptrscratchBuffer_.get(), output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, ctx->rank, - ctx->nRanksPerNode, ctx->workSize, alignedCount, stream, nullptr, nullptr, nullptr, 0); + ctx->nRanksPerNode, ctx->workSize, count, stream, nullptr, nullptr, nullptr, 0); if (error != cudaSuccess) { WARN("AllreduceNvlsWithCopy failed with error: %s", cudaGetErrorString(error)); return ncclUnhandledCudaError; diff --git a/apps/nccl/src/allreduce.hpp b/apps/nccl/src/allreduce.hpp index 82adc323b..087016adc 100644 --- a/apps/nccl/src/allreduce.hpp +++ b/apps/nccl/src/allreduce.hpp @@ -963,8 +963,20 @@ __global__ void __launch_bounds__(1024, 1) int nBlocksForReduce = nRanksPerNode; int copyReduceRatio = nBlocksForCopy / nBlocksForReduce; size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; - size_t sizePerRank = size / nRanksPerNode; - assert(sizePerRank % alignment == 0); + + // Pad size to be divisible by (nRanksPerNode * alignment) + // This ensures each rank gets an aligned portion + size_t paddingNeeded = (nRanksPerNode * alignment - (size % (nRanksPerNode * alignment))) % (nRanksPerNode * alignment); + size_t paddedSize = size + paddingNeeded; + size_t sizePerRank = paddedSize / nRanksPerNode; // Always aligned to 16 bytes + + // Calculate actual size this rank should process (without padding) + size_t actualSizeThisRank = sizePerRank; + if (rank == nRanksPerNode - 1) { + // Last rank might have less actual data due to padding + actualSizeThisRank = size - (sizePerRank * (nRanksPerNode - 1)); + } + uint32_t sizePerBlock = ((sizePerRank + (nBlocksForCopy - 1)) / nBlocksForCopy + alignment - 1) / alignment * alignment; uint32_t lastBlockSize = sizePerRank - (nBlocksForCopy - 1) * sizePerBlock; @@ -1008,7 +1020,16 @@ __global__ void __launch_bounds__(1024, 1) uint32_t scratchOffset = scratchIt * unitSize + bid * scratchSizePerBlock + i * scratchSizePerRank; char* srcData = (char*)src + blockOffset; char* dstData = (char*)scratch + scratchOffset; - mscclpp::copy(dstData, srcData, iterSize, tid, blockDim.x); + // Calculate actual copy size - don't copy beyond actual data on last rank + size_t actualCopySize = iterSize; + if (i == nRanksPerNode - 1 && blockOffset + iterSize > i * sizePerRank + actualSizeThisRank) { + // On last rank, clamp to actual data size + actualCopySize = (i * sizePerRank + actualSizeThisRank > blockOffset) + ? (i * sizePerRank + actualSizeThisRank - blockOffset) : 0; + } + if (actualCopySize > 0) { + mscclpp::copy(dstData, srcData, actualCopySize, tid, blockDim.x); + } } __syncthreads(); if (tid < nPeers) { @@ -1067,7 +1088,16 @@ __global__ void __launch_bounds__(1024, 1) i * scratchSizePerRank; char* srcData = (char*)scratch + scratchOffset; char* dstData = (char*)dst + blockOffset; - mscclpp::copy(dstData, srcData, iterSize, tid, blockDim.x); + // Calculate actual copy size - don't copy beyond actual data on last rank + size_t actualCopySize = iterSize; + if (i == nRanksPerNode - 1 && blockOffset + iterSize > i * sizePerRank + actualSizeThisRank) { + // On last rank, clamp to actual data size + actualCopySize = (i * sizePerRank + actualSizeThisRank > blockOffset) + ? (i * sizePerRank + actualSizeThisRank - blockOffset) : 0; + } + if (actualCopySize > 0) { + mscclpp::copy(dstData, srcData, actualCopySize, tid, blockDim.x); + } } __syncthreads(); if (tid == 0) { diff --git a/src/include/logger.hpp b/src/include/logger.hpp index e3e129fb6..67a5fcdc6 100644 --- a/src/include/logger.hpp +++ b/src/include/logger.hpp @@ -6,6 +6,7 @@ #include +#include #include #include #include From 1de1e89ebfafac3f7f6baf8fa0f96b02255b76bb Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Fri, 5 Dec 2025 01:59:57 +0000 Subject: [PATCH 6/7] Update clang-format --- apps/nccl/src/allreduce.hpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/nccl/src/allreduce.hpp b/apps/nccl/src/allreduce.hpp index 087016adc..618f10795 100644 --- a/apps/nccl/src/allreduce.hpp +++ b/apps/nccl/src/allreduce.hpp @@ -965,10 +965,10 @@ __global__ void __launch_bounds__(1024, 1) size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; // Pad size to be divisible by (nRanksPerNode * alignment) - // This ensures each rank gets an aligned portion - size_t paddingNeeded = (nRanksPerNode * alignment - (size % (nRanksPerNode * alignment))) % (nRanksPerNode * alignment); + size_t paddingNeeded = + (nRanksPerNode * alignment - (size % (nRanksPerNode * alignment))) % (nRanksPerNode * alignment); size_t paddedSize = size + paddingNeeded; - size_t sizePerRank = paddedSize / nRanksPerNode; // Always aligned to 16 bytes + size_t sizePerRank = paddedSize / nRanksPerNode; // Calculate actual size this rank should process (without padding) size_t actualSizeThisRank = sizePerRank; @@ -1025,7 +1025,8 @@ __global__ void __launch_bounds__(1024, 1) if (i == nRanksPerNode - 1 && blockOffset + iterSize > i * sizePerRank + actualSizeThisRank) { // On last rank, clamp to actual data size actualCopySize = (i * sizePerRank + actualSizeThisRank > blockOffset) - ? (i * sizePerRank + actualSizeThisRank - blockOffset) : 0; + ? (i * sizePerRank + actualSizeThisRank - blockOffset) + : 0; } if (actualCopySize > 0) { mscclpp::copy(dstData, srcData, actualCopySize, tid, blockDim.x); @@ -1088,12 +1089,12 @@ __global__ void __launch_bounds__(1024, 1) i * scratchSizePerRank; char* srcData = (char*)scratch + scratchOffset; char* dstData = (char*)dst + blockOffset; - // Calculate actual copy size - don't copy beyond actual data on last rank + size_t actualCopySize = iterSize; if (i == nRanksPerNode - 1 && blockOffset + iterSize > i * sizePerRank + actualSizeThisRank) { - // On last rank, clamp to actual data size actualCopySize = (i * sizePerRank + actualSizeThisRank > blockOffset) - ? (i * sizePerRank + actualSizeThisRank - blockOffset) : 0; + ? (i * sizePerRank + actualSizeThisRank - blockOffset) + : 0; } if (actualCopySize > 0) { mscclpp::copy(dstData, srcData, actualCopySize, tid, blockDim.x); From f1561c9637fad908a7ae0ba2bbf373d9cd01e6f5 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Tue, 13 Jan 2026 21:59:49 +0000 Subject: [PATCH 7/7] Add ut test to track input size alignment --- .../templates/ut-size-alignment.yaml | 151 ++++++++++++++++++ .azure-pipelines/ut-size-alignment.yaml | 42 +++++ 2 files changed, 193 insertions(+) create mode 100644 .azure-pipelines/templates/ut-size-alignment.yaml create mode 100644 .azure-pipelines/ut-size-alignment.yaml diff --git a/.azure-pipelines/templates/ut-size-alignment.yaml b/.azure-pipelines/templates/ut-size-alignment.yaml new file mode 100644 index 000000000..2e471958a --- /dev/null +++ b/.azure-pipelines/templates/ut-size-alignment.yaml @@ -0,0 +1,151 @@ +# .azure-pipelines/templates/ut-size-alignment.yaml +# ---------------------------------------- +# A step-template that runs unit tests to track the input size alignment using PyTorch and MSCCLPP. +# +# Parameters: +# subscription – Azure subscription to use for VMSS start/stop +# vmssName – Name of the VMSS to use +# sshKeySecureFile – the secureFile name for your SSH key +# pytorchImage – PyTorch Docker image to use for unit tests + +parameters: +- name: subscription + type: string +- name: vmssName + type: string +- name: sshKeySecureFile + type: string +- name: pytorchImage + type: string + default: "mcr.microsoft.com/mirror/nvcr/nvidia/pytorch:25.03-py3" + +steps: +- task: Bash@3 + name: Build + displayName: Build MSCCLPP + inputs: + targetType: inline + script: | + mkdir build && cd build + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)' + +- task: Bash@3 + name: InstallPackages + displayName: Install Packages + inputs: + targetType: inline + script: | + sudo apt-get update -y + sudo apt-get install pssh -y + curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash + +- task: DownloadSecureFile@1 + name: SshKeyFile + displayName: Download key file + inputs: + secureFile: ${{ parameters.sshKeySecureFile }} + +- task: AzureCLI@2 + name: StartVMSS + displayName: Start VMSS + inputs: + azureSubscription: ${{ parameters.subscription }} + scriptType: bash + scriptLocation: inlineScript + inlineScript: | + az vmss start --name ${{ parameters.vmssName }} --resource-group mscclpp + +- task: Bash@3 + name: DeployBenchmarkEnv + displayName: Deploy Benchmark Environment + inputs: + targetType: inline + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory) + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + + # Get the host IP + HOST_IP=$(head -1 ${HOSTFILE} | awk '{print $1}') + + # Pull the PyTorch image on the remote machine + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo docker pull ${{ parameters.pytorchImage }}" + + # Stop any existing container + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo docker rm -f mscclpp-benchmark || true" + + # Copy MSCCLPP build to the remote machine + scp -i ${KeyFilePath} -o ${SSH_OPTION} -r ${ROOT_DIR}/build ${HOST_IP}:/tmp/mscclpp-build + scp -i ${KeyFilePath} -o ${SSH_OPTION} -r ${ROOT_DIR}/test ${HOST_IP}:/tmp/mscclpp-test + + # Start the PyTorch container with MSCCLPP mounted + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo docker run -d --name mscclpp-benchmark \ + --gpus all \ + --ipc=host \ + --ulimit memlock=-1 \ + --ulimit stack=67108864 \ + -v /tmp/mscclpp-build:/root/mscclpp/build \ + -v /tmp/mscclpp-test:/root/mscclpp/test \ + ${{ parameters.pytorchImage }} \ + sleep infinity" + workingDirectory: '$(System.DefaultWorkingDirectory)' + +- task: Bash@3 + name: AllReduceBenchmarkMultipleSizes + displayName: Run AllReduce Benchmark Test (Multiple Sizes to test the alignment) + inputs: + targetType: inline + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/test/deploy/hostfile_ci + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + HOST_IP=$(head -1 ${HOSTFILE} | awk '{print $1}') + + : > azureuser@${HOST_IP} + tail -f azureuser@${HOST_IP} & + CHILD_PID=$! + + # Run with different element counts + for nelem in 10556576 10556587 10556592 10556608 1048576 9999999 12345678; do + echo "Running AllReduce benchmark with nelem=${nelem}" + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo docker exec -t mscclpp-benchmark bash -c \" \ + set -e; \ + cd /root/mscclpp; \ + LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so torchrun --nproc_per_node=8 test/torch/correctness_test.py --collective allreduce --nelem ${nelem} --dtype float\"" + done + + kill $CHILD_PID || true + workingDirectory: '$(System.DefaultWorkingDirectory)' + +- task: Bash@3 + name: Cleanup + displayName: Cleanup Benchmark Container + condition: always() + inputs: + targetType: inline + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/test/deploy/hostfile_ci + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + HOST_IP=$(head -1 ${HOSTFILE} | awk '{print $1}') + + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo docker rm -f mscclpp-benchmark || true" + ssh -i ${KeyFilePath} -o ${SSH_OPTION} ${HOST_IP} "sudo rm -rf /tmp/mscclpp-build /tmp/mscclpp-test || true" + workingDirectory: '$(System.DefaultWorkingDirectory)' + +- task: AzureCLI@2 + name: StopVMSS + displayName: Deallocate VMSS + condition: always() + inputs: + azureSubscription: ${{ parameters.subscription }} + scriptType: bash + scriptLocation: inlineScript + inlineScript: | + az vmss deallocate --name ${{ parameters.vmssName }} --resource-group mscclpp diff --git a/.azure-pipelines/ut-size-alignment.yaml b/.azure-pipelines/ut-size-alignment.yaml new file mode 100644 index 000000000..8ccd0700e --- /dev/null +++ b/.azure-pipelines/ut-size-alignment.yaml @@ -0,0 +1,42 @@ +trigger: + branches: + include: + - main + - release/* + paths: + exclude: + - .devcontainer/** + - .github/** + - docker/** + - docs/** + - '**/*.md' + +pr: + branches: + include: + - main + - release/* + drafts: false + paths: + exclude: + - .devcontainer/** + - .github/** + - docker/** + - docs/** + - '**/*.md' + +jobs: +- job: BenchmarkTestH100 + displayName: Benchmark Test H100 + pool: + name: msccl-ci-h100 + container: + image: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.4 + + steps: + - template: templates/ut-size-alignment.yaml + parameters: + subscription: mscclpp-ci-h100 + vmssName: mscclpp-h100-ci + sshKeySecureFile: mscclpp.pem + pytorchImage: mcr.microsoft.com/mirror/nvcr/nvidia/pytorch:25.03-py3