From 30777565ac9e0c833477fd31e82d2a57e56fcb1e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 20:27:28 +0000 Subject: [PATCH 1/7] Unique QP per channel and env-controlled GID index - Change executor to create one connection (unique QP) per channel entry instead of sharing connections per peer. This is required for HostNoAtomic IB mode where each connection can only forward signals to one semaphore via setSignalForwardingDst. - Add MSCCLPP_IB_GID_INDEX environment variable to override the default GID index (3) used for IB transport. Set to the desired GID index value, or leave unset/-1 to use the default. --- include/mscclpp/env.hpp | 4 ++++ src/core/endpoint.cc | 8 ++++++- src/core/env.cpp | 4 +++- src/core/executor/executor.cc | 39 +++++++++++++++++++++++++---------- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp index 39f73e8d..0c005097 100644 --- a/include/mscclpp/env.hpp +++ b/include/mscclpp/env.hpp @@ -110,6 +110,10 @@ class Env { /// Default is false. const bool forceDisableNvls; + /// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport. + /// If unset or set to -1, it defaults to `EndpointConfig::Ib::DefaultGidIndex` (3). + const int ibGidIndex; + private: Env(); diff --git a/src/core/endpoint.cc b/src/core/endpoint.cc index 4795aa62..520d33b2 100644 --- a/src/core/endpoint.cc +++ b/src/core/endpoint.cc @@ -49,8 +49,14 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl) int maxRecvWr = ibNoAtomic_ ? config_.ib.maxRecvWr : 0; + // Override GID index from environment variable if set + int gidIndex = config_.ib.gidIndex; + if (env()->ibGidIndex >= 0) { + gidIndex = env()->ibGidIndex; + } + ibQp_ = contextImpl.getIbContext(config_.transport) - ->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, + ->createQp(config_.ib.port, gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend); ibQpInfo_ = ibQp_->getInfo(); } else if (config_.transport == Transport::Ethernet) { diff --git a/src/core/env.cpp b/src/core/env.cpp index 484b40af..4e25bfea 100644 --- a/src/core/env.cpp +++ b/src/core/env.cpp @@ -65,7 +65,8 @@ Env::Env() ncclSharedLibPath(readEnv("MSCCLPP_NCCL_LIB_PATH", "")), forceNcclFallbackOperation(readEnv("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", "")), ncclSymmetricMemory(readEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", false)), - forceDisableNvls(readEnv("MSCCLPP_FORCE_DISABLE_NVLS", false)) {} + forceDisableNvls(readEnv("MSCCLPP_FORCE_DISABLE_NVLS", false)), + ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", -1)) {} std::shared_ptr env() { static std::shared_ptr globalEnv = std::shared_ptr(new Env()); @@ -93,6 +94,7 @@ std::shared_ptr env() { logEnv("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", globalEnv->forceNcclFallbackOperation); logEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", globalEnv->ncclSymmetricMemory); logEnv("MSCCLPP_FORCE_DISABLE_NVLS", globalEnv->forceDisableNvls); + logEnv("MSCCLPP_IB_GID_INDEX", globalEnv->ibGidIndex); } return globalEnv; } diff --git a/src/core/executor/executor.cc b/src/core/executor/executor.cc index bf2caf97..63fb3285 100644 --- a/src/core/executor/executor.cc +++ b/src/core/executor/executor.cc @@ -109,7 +109,7 @@ namespace mscclpp { struct ExecutionContext { std::shared_ptr proxyService; - std::unordered_map connections; + std::vector connections; // one connection (unique QP) per channel std::vector> nvlsConnections; MemoryId localMemoryIdBegin = MemoryId(0); @@ -266,15 +266,31 @@ struct Executor::Impl { } }; - std::vector connectedPeers = plan.impl_->getConnectedPeers(); - std::vector> connectionFutures; - for (int peer : connectedPeers) { - Transport transport = - !useIB(rank, peer, this->nranksPerNode) ? Transport::CudaIpc : IBs[rank % this->nranksPerNode]; - connectionFutures.push_back(this->comm->connect(transport, peer)); + // Create one connection (unique QP) per channel entry. Each channel gets its own + // QP — no shared connections. This is required for HostNoAtomic IB mode where each + // connection can only forward signals to one semaphore via setSignalForwardingDst. + int tag = 0; + Transport ibTransport = IBs[rank % this->nranksPerNode]; + std::vector> connFutures; + for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) { + std::vector channelInfos = plan.impl_->getChannelInfos(channelType); + for (const auto& info : channelInfos) { + for (int peer : info.connectedPeers) { + Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; + connFutures.push_back(this->comm->connect(transport, peer, tag++)); + } + } + channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType); + for (const auto& info : channelInfos) { + for (int peer : info.connectedPeers) { + Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; + connFutures.push_back(this->comm->connect(transport, peer, tag++)); + } + } } - for (size_t i = 0; i < connectionFutures.size(); i++) { - context.connections[connectedPeers[i]] = connectionFutures[i].get(); + + for (auto& future : connFutures) { + context.connections.push_back(future.get()); } std::vector nvlsInfos = plan.impl_->nvlsInfos.at(rank); @@ -328,10 +344,11 @@ struct Executor::Impl { std::vector> futureProxySemaphores; std::vector> memorySemaphores; std::vector proxySemaphores; + int connIdx = 0; auto processChannelInfos = [&](std::vector& channelInfos) { for (ChannelInfo& info : channelInfos) { - for (int peer : info.connectedPeers) { - auto connection = context.connections.at(peer); + for (size_t i = 0; i < info.connectedPeers.size(); i++) { + auto& connection = context.connections[connIdx++]; if (info.channelType == ChannelType::MEMORY) { futureMemorySemaphores.push_back(this->comm->buildSemaphore( connection, this->comm->remoteRankOf(connection), this->comm->tagOf(connection))); From 982b1f3f4e2eec94d3333819d522a20503da62f1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 22:38:08 +0000 Subject: [PATCH 2/7] update --- include/mscclpp/core.hpp | 3 ++- src/core/endpoint.cc | 8 +------- src/core/executor/executor.cc | 20 ++++++-------------- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 37bdbd51..5b184f0a 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -430,7 +431,7 @@ struct EndpointConfig { int maxWrPerSend = DefaultMaxWrPerSend, Mode mode = Mode::Default) : deviceIndex(deviceIndex), port(port), - gidIndex(gidIndex), + gidIndex(env()->ibGidIndex > 0 ? env()->ibGidIndex : gidIndex), maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), diff --git a/src/core/endpoint.cc b/src/core/endpoint.cc index 520d33b2..4795aa62 100644 --- a/src/core/endpoint.cc +++ b/src/core/endpoint.cc @@ -49,14 +49,8 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl) int maxRecvWr = ibNoAtomic_ ? config_.ib.maxRecvWr : 0; - // Override GID index from environment variable if set - int gidIndex = config_.ib.gidIndex; - if (env()->ibGidIndex >= 0) { - gidIndex = env()->ibGidIndex; - } - ibQp_ = contextImpl.getIbContext(config_.transport) - ->createQp(config_.ib.port, gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, + ->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend); ibQpInfo_ = ibQp_->getInfo(); } else if (config_.transport == Transport::Ethernet) { diff --git a/src/core/executor/executor.cc b/src/core/executor/executor.cc index 63fb3285..9229f9ac 100644 --- a/src/core/executor/executor.cc +++ b/src/core/executor/executor.cc @@ -109,7 +109,7 @@ namespace mscclpp { struct ExecutionContext { std::shared_ptr proxyService; - std::vector connections; // one connection (unique QP) per channel + std::vector connections; std::vector> nvlsConnections; MemoryId localMemoryIdBegin = MemoryId(0); @@ -121,8 +121,6 @@ struct ExecutionContext { // local registered memories to keep resources alive std::vector localRegisteredMemories; - std::vector> memorySemaphores; - std::vector proxySemaphores; std::vector memoryChannels; std::vector portChannels; std::vector nvlsChannels; @@ -266,10 +264,7 @@ struct Executor::Impl { } }; - // Create one connection (unique QP) per channel entry. Each channel gets its own - // QP — no shared connections. This is required for HostNoAtomic IB mode where each - // connection can only forward signals to one semaphore via setSignalForwardingDst. - int tag = 0; + std::unordered_map peerTags; Transport ibTransport = IBs[rank % this->nranksPerNode]; std::vector> connFutures; for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) { @@ -277,14 +272,14 @@ struct Executor::Impl { for (const auto& info : channelInfos) { for (int peer : info.connectedPeers) { Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; - connFutures.push_back(this->comm->connect(transport, peer, tag++)); + connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); } } channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType); for (const auto& info : channelInfos) { for (int peer : info.connectedPeers) { Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; - connFutures.push_back(this->comm->connect(transport, peer, tag++)); + connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); } } } @@ -377,18 +372,15 @@ struct Executor::Impl { proxySemaphores.push_back(context.proxyService->addSemaphore(sem.get())); } - context.memorySemaphores = std::move(memorySemaphores); - context.proxySemaphores = std::move(proxySemaphores); - for (ChannelType channelType : channelTypes) { std::vector channelInfos = plan.impl_->getChannelInfos(channelType); int index = 0; for (ChannelInfo& info : channelInfos) { for (size_t i = 0; i < info.connectedPeers.size(); i++) { if (channelType == ChannelType::MEMORY) { - context.memoryChannels.emplace_back(context.memorySemaphores[index++]); + context.memoryChannels.emplace_back(memorySemaphores[index++]); } else if (channelType == ChannelType::PORT) { - context.portChannels.emplace_back(context.proxyService->basePortChannel(context.proxySemaphores[index++])); + context.portChannels.emplace_back(context.proxyService->basePortChannel(proxySemaphores[index++])); } } } From a76dbe8587c733211f1a6ce9050d5effebc9e3de Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 22:40:35 +0000 Subject: [PATCH 3/7] update --- include/mscclpp/env.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp index 0c005097..d6047dea 100644 --- a/include/mscclpp/env.hpp +++ b/include/mscclpp/env.hpp @@ -111,7 +111,7 @@ class Env { const bool forceDisableNvls; /// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport. - /// If unset or set to -1, it defaults to `EndpointConfig::Ib::DefaultGidIndex` (3). + /// If unset or set to -1, it defaults to `EndpointConfig::Ib::DefaultGidIndex` (0). const int ibGidIndex; private: From 8c7298a3572b3f96c67d61c7710c298867d10444 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 23:15:40 +0000 Subject: [PATCH 4/7] Add missing env fields to Python binding --- python/csrc/env_py.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/csrc/env_py.cpp b/python/csrc/env_py.cpp index ce89fd3d..c1d465ae 100644 --- a/python/csrc/env_py.cpp +++ b/python/csrc/env_py.cpp @@ -23,7 +23,12 @@ void register_env(nb::module_& m) { .def_ro("ibv_mode", &Env::ibvMode) .def_ro("cache_dir", &Env::cacheDir) .def_ro("npkit_dump_dir", &Env::npkitDumpDir) - .def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream); + .def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream) + .def_ro("nccl_shared_lib_path", &Env::ncclSharedLibPath) + .def_ro("force_nccl_fallback_operation", &Env::forceNcclFallbackOperation) + .def_ro("nccl_symmetric_memory", &Env::ncclSymmetricMemory) + .def_ro("force_disable_nvls", &Env::forceDisableNvls) + .def_ro("ib_gid_index", &Env::ibGidIndex); m.def("env", &env); } From aa5d42fc3a9769632e04a2872c927825fc9a1105 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 23:19:40 +0000 Subject: [PATCH 5/7] Change MSCCLPP_IB_GID_INDEX default to 0 --- include/mscclpp/env.hpp | 2 +- src/core/env.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp index d6047dea..8b8750f1 100644 --- a/include/mscclpp/env.hpp +++ b/include/mscclpp/env.hpp @@ -111,7 +111,7 @@ class Env { const bool forceDisableNvls; /// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport. - /// If unset or set to -1, it defaults to `EndpointConfig::Ib::DefaultGidIndex` (0). + /// Default is 0 (`EndpointConfig::Ib::DefaultGidIndex`). const int ibGidIndex; private: diff --git a/src/core/env.cpp b/src/core/env.cpp index 4e25bfea..106998aa 100644 --- a/src/core/env.cpp +++ b/src/core/env.cpp @@ -66,7 +66,7 @@ Env::Env() forceNcclFallbackOperation(readEnv("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", "")), ncclSymmetricMemory(readEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", false)), forceDisableNvls(readEnv("MSCCLPP_FORCE_DISABLE_NVLS", false)), - ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", -1)) {} + ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", 0)) {} std::shared_ptr env() { static std::shared_ptr globalEnv = std::shared_ptr(new Env()); From e6595f1be584518bfb3788b75571b0276b528227 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 23:31:36 +0000 Subject: [PATCH 6/7] Use -1 sentinel for MSCCLPP_IB_GID_INDEX default and improve comment --- include/mscclpp/core.hpp | 2 +- include/mscclpp/env.hpp | 3 ++- src/core/env.cpp | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 5b184f0a..16e8858a 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -431,7 +431,7 @@ struct EndpointConfig { int maxWrPerSend = DefaultMaxWrPerSend, Mode mode = Mode::Default) : deviceIndex(deviceIndex), port(port), - gidIndex(env()->ibGidIndex > 0 ? env()->ibGidIndex : gidIndex), + gidIndex(env()->ibGidIndex >= 0 ? env()->ibGidIndex : gidIndex), maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp index 8b8750f1..696255d2 100644 --- a/include/mscclpp/env.hpp +++ b/include/mscclpp/env.hpp @@ -111,7 +111,8 @@ class Env { const bool forceDisableNvls; /// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport. - /// Default is 0 (`EndpointConfig::Ib::DefaultGidIndex`). + /// When set to a non-negative value, overrides the `gidIndex` parameter passed to `EndpointConfig::Ib`. + /// Default is -1 (unset, uses the constructor argument which defaults to `EndpointConfig::Ib::DefaultGidIndex`). const int ibGidIndex; private: diff --git a/src/core/env.cpp b/src/core/env.cpp index 106998aa..4e25bfea 100644 --- a/src/core/env.cpp +++ b/src/core/env.cpp @@ -66,7 +66,7 @@ Env::Env() forceNcclFallbackOperation(readEnv("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", "")), ncclSymmetricMemory(readEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", false)), forceDisableNvls(readEnv("MSCCLPP_FORCE_DISABLE_NVLS", false)), - ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", 0)) {} + ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", -1)) {} std::shared_ptr env() { static std::shared_ptr globalEnv = std::shared_ptr(new Env()); From 61e017ca97fa7de282ee746e721cd6ef8939eb5c Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 17 Apr 2026 18:34:08 +0000 Subject: [PATCH 7/7] Revert env/core changes superseded by main main already provides MSCCLPP_IB_GID_INDEX env override and MSCCLPP_FORCE_DISABLE_GDR, so drop the now-redundant divergences in include/mscclpp/core.hpp, include/mscclpp/env.hpp, and src/core/env.cpp. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- include/mscclpp/core.hpp | 3 +-- include/mscclpp/env.hpp | 3 +-- src/core/env.cpp | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index c0c37970..ca2fc34f 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -431,7 +430,7 @@ struct EndpointConfig { int maxWrPerSend = DefaultMaxWrPerSend, Mode mode = Mode::Default) : deviceIndex(deviceIndex), port(port), - gidIndex(env()->ibGidIndex >= 0 ? env()->ibGidIndex : gidIndex), + gidIndex(gidIndex), maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp index acaf0bfd..a6dd306b 100644 --- a/include/mscclpp/env.hpp +++ b/include/mscclpp/env.hpp @@ -116,8 +116,7 @@ class Env { const bool forceDisableGdr; /// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport. - /// When set to a non-negative value, overrides the `gidIndex` parameter passed to `EndpointConfig::Ib`. - /// Default is -1 (unset, uses the constructor argument which defaults to `EndpointConfig::Ib::DefaultGidIndex`). + /// Default is 0. Used when `EndpointConfig::Ib::gidIndex` is -1 (unspecified). const int ibGidIndex; private: diff --git a/src/core/env.cpp b/src/core/env.cpp index 2af5bddf..7a42471b 100644 --- a/src/core/env.cpp +++ b/src/core/env.cpp @@ -67,7 +67,7 @@ Env::Env() ncclSymmetricMemory(readEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", false)), forceDisableNvls(readEnv("MSCCLPP_FORCE_DISABLE_NVLS", false)), forceDisableGdr(readEnv("MSCCLPP_FORCE_DISABLE_GDR", false)), - ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", -1)) {} + ibGidIndex(readEnv("MSCCLPP_IB_GID_INDEX", 0)) {} std::shared_ptr env() { static std::shared_ptr globalEnv = std::shared_ptr(new Env());