diff --git a/src/client/EnvVars.hpp b/src/client/EnvVars.hpp index 78304f6..533c6c4 100644 --- a/src/client/EnvVars.hpp +++ b/src/client/EnvVars.hpp @@ -70,6 +70,7 @@ class EnvVars int numIterations; // Number of timed iterations to perform. If negative, run for -numIterations seconds instead int numSubIterations; // Number of subiterations to perform int numWarmups; // Number of un-timed warmup iterations to perform + int pingpongStride; // Stride in bytes between flag slots for pingpong laps int showBorders; // Show ASCII box-drawing characaters in tables int showIterations; // Show per-iteration timing info int useInteractive; // Pause for user-input before starting transfer loop @@ -159,6 +160,7 @@ class EnvVars numIterations = GetEnvVar("NUM_ITERATIONS" , 10); numSubIterations = GetEnvVar("NUM_SUBITERATIONS" , 1); numWarmups = GetEnvVar("NUM_WARMUPS" , 3); + pingpongStride = GetEnvVar("PINGPONG_STRIDE" , 8); outputToCsv = GetEnvVar("OUTPUT_TO_CSV" , 0); samplingFactor = GetEnvVar("SAMPLING_FACTOR" , 1); showBorders = GetEnvVar("SHOW_BORDERS" , 1); @@ -348,6 +350,7 @@ class EnvVars printf(" NUM_ITERATIONS - # of timed iterations per test. If negative, run for this many seconds instead\n"); printf(" NUM_SUBITERATIONS - # of sub-iterations to run per iteration. Must be non-negative\n"); printf(" NUM_WARMUPS - # of untimed warmup iterations per test\n"); + printf(" PINGPONG_STRIDE - Stride in bytes between flag slots for pingpong laps (default 8, must be multiple of 8)\n"); printf(" OUTPUT_TO_CSV - Outputs to CSV format if set\n"); #if NIC_EXEC_ENABLED printf(" ROCE_VERSION - RoCE version (default=2)\n"); @@ -467,6 +470,8 @@ class EnvVars "Running %s subiterations", (numSubIterations == 0 ? "infinite" : std::to_string(numSubIterations)).c_str()); Print("NUM_WARMUPS", numWarmups, "Running %d warmup iteration(s) per Test", numWarmups); + Print("PINGPONG_STRIDE", pingpongStride, + "Pingpong flag stride %d bytes per lap", pingpongStride); #if NIC_EXEC_ENABLED Print("ROCE_VERSION", roceVersion, "RoCE version is set to %d", roceVersion); @@ -621,6 +626,7 @@ class EnvVars cfg.general.numIterations = numIterations; cfg.general.numSubIterations = numSubIterations; cfg.general.numWarmups = numWarmups; + cfg.general.pingpongStride = pingpongStride; cfg.general.recordPerIteration = showIterations; cfg.general.useInteractive = useInteractive; diff --git a/src/client/Utilities.hpp b/src/client/Utilities.hpp index 9b7cfb8..cad8735 100644 --- a/src/client/Utilities.hpp +++ b/src/client/Utilities.hpp @@ -518,12 +518,16 @@ namespace TransferBench::Utils size_t numTimedIterations = results.numTimedIterations; for (auto const& exeInfoPair : results.exeResults) { ExeResult const& exeResult = exeInfoPair.second; - numRows += 1 + exeResult.transferIdx.size(); + int displayCount = 0; + for (int idx : exeResult.transferIdx) + if (transfers[idx].laps >= 0) displayCount++; + numRows += 1 + displayCount; if (ev.showIterations) { numRows += (numTimedIterations + 1); // Check that per-iteration information exists for (int idx : exeResult.transferIdx) { + if (transfers[idx].laps < 0) continue; TransferResult const& r = results.tfrResults[idx]; if (r.perIterMsec.size() != numTimedIterations) { Print("[ERROR] Per iteration timing data unavailable: Expected %lu data points, but have %lu\n", @@ -569,87 +573,141 @@ namespace TransferBench::Utils Transfer const& t = transfers[idx]; TransferResult const& r = results.tfrResults[idx]; - table.Set(rowIdx, 0, "Transfer %-4d ", idx); - table.Set(rowIdx, 1, "%8.3f GB/s " , r.avgBandwidthGbPerSec); - table.Set(rowIdx, 2, "%8.3f ms " , r.avgDurationMsec); - table.Set(rowIdx, 3, "%12lu bytes " , r.numBytes); - - char exeSubIndexStr[32] = ""; - if (t.exeSubIndex != -1) - sprintf(exeSubIndexStr, ".%d", t.exeSubIndex); - - if (isMultiRank) { - table.Set(rowIdx, 4, " %s -> R%d%c%d%s:%d -> %s", - MemDevicesToStr(t.srcs).c_str(), - exeDevice.exeRank, ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, - exeSubIndexStr, t.numSubExecs, - MemDevicesToStr(t.dsts).c_str()); + if (t.laps < 0) continue; + + if (t.laps > 0) { + // Pingpong row: show latency using ping's round-trip delta + double latencyUs = r.avgDurationMsec * 1000.0; + table.Set(rowIdx, 0, "PingPong %-4d ", idx); + table.Set(rowIdx, 1, "%8.3f us " , latencyUs); + table.Set(rowIdx, 2, "%8.3f ms " , r.avgDurationMsec); + table.Set(rowIdx, 3, "%8d laps " , t.laps); + + Transfer const& pong = transfers[idx + 1]; + if (isMultiRank) { + table.Set(rowIdx, 4, " %s->R%d%c%d->%s <+> %s->R%d%c%d->%s", + MemDevicesToStr(t.srcs).c_str(), + exeDevice.exeRank, ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, + MemDevicesToStr(t.dsts).c_str(), + MemDevicesToStr(pong.srcs).c_str(), + pong.exeDevice.exeRank, ExeTypeStr[pong.exeDevice.exeType], pong.exeDevice.exeIndex, + MemDevicesToStr(pong.dsts).c_str()); + } else { + table.Set(rowIdx, 4, " %s->%c%d->%s <+> %s->%c%d->%s", + MemDevicesToStr(t.srcs).c_str(), + ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, + MemDevicesToStr(t.dsts).c_str(), + MemDevicesToStr(pong.srcs).c_str(), + ExeTypeStr[pong.exeDevice.exeType], pong.exeDevice.exeIndex, + MemDevicesToStr(pong.dsts).c_str()); + } + table.SetCellAlignment(rowIdx, 4, TableHelper::ALIGN_LEFT); + rowIdx++; + + if (ev.showIterations) { + std::set> times; + double stdDevTime = 0; + for (size_t i = 0; i < numTimedIterations; i++) { + times.insert(std::make_pair(r.perIterMsec[i], i+1)); + double const varTime = fabs(r.avgDurationMsec - r.perIterMsec[i]); + stdDevTime += varTime * varTime; + } + stdDevTime = sqrt(stdDevTime / numTimedIterations); + + for (auto& time : times) { + double iterUs = time.first * 1000.0; + table.Set(rowIdx, 0, "Iter %03d ", time.second); + table.Set(rowIdx, 1, "%8.3f us ", iterUs); + table.Set(rowIdx, 2, "%8.3f ms ", time.first); + rowIdx++; + } + + table.Set(rowIdx, 0, "StandardDev "); + table.Set(rowIdx, 1, "%8.3f us ", stdDevTime * 1000.0); + table.Set(rowIdx, 2, "%8.3f ms ", stdDevTime); + rowIdx++; + table.DrawRowBorder(rowIdx); + } } else { - table.Set(rowIdx, 4, " %s -> %c%d%s:%d -> %s", - MemDevicesToStr(t.srcs).c_str(), - ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, - exeSubIndexStr, t.numSubExecs, - MemDevicesToStr(t.dsts).c_str()); - } - table.SetCellAlignment(rowIdx, 4, TableHelper::ALIGN_LEFT); - rowIdx++; - - // Show per-iteration timing information - if (ev.showIterations) { - - // Compute standard deviation and track iterations by speed - std::set> times; - double stdDevTime = 0; - double stdDevBw = 0; - for (int i = 0; i < numTimedIterations; i++) { - times.insert(std::make_pair(r.perIterMsec[i], i+1)); - double const varTime = fabs(r.avgDurationMsec - r.perIterMsec[i]); - stdDevTime += varTime * varTime; - - double iterBandwidthGbs = (t.numBytes / 1.0E9) / r.perIterMsec[i] * 1000.0f; - double const varBw = fabs(iterBandwidthGbs - r.avgBandwidthGbPerSec); - stdDevBw += varBw * varBw; + // Regular transfer row (laps == 0) + table.Set(rowIdx, 0, "Transfer %-4d ", idx); + table.Set(rowIdx, 1, "%8.3f GB/s " , r.avgBandwidthGbPerSec); + table.Set(rowIdx, 2, "%8.3f ms " , r.avgDurationMsec); + table.Set(rowIdx, 3, "%12lu bytes " , r.numBytes); + + char exeSubIndexStr[32] = ""; + if (t.exeSubIndex != -1) + sprintf(exeSubIndexStr, ".%d", t.exeSubIndex); + + if (isMultiRank) { + table.Set(rowIdx, 4, " %s -> R%d%c%d%s:%d -> %s", + MemDevicesToStr(t.srcs).c_str(), + exeDevice.exeRank, ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, + exeSubIndexStr, t.numSubExecs, + MemDevicesToStr(t.dsts).c_str()); + } else { + table.Set(rowIdx, 4, " %s -> %c%d%s:%d -> %s", + MemDevicesToStr(t.srcs).c_str(), + ExeTypeStr[t.exeDevice.exeType], t.exeDevice.exeIndex, + exeSubIndexStr, t.numSubExecs, + MemDevicesToStr(t.dsts).c_str()); } - stdDevTime = sqrt(stdDevTime / numTimedIterations); - stdDevBw = sqrt(stdDevBw / numTimedIterations); - - // Loop over iterations (fastest to slowest) - for (auto& time : times) { - double iterDurationMsec = time.first; - double iterBandwidthGbs = (t.numBytes / 1.0E9) / iterDurationMsec * 1000.0f; - - std::set usedXccs; - std::stringstream ss1; - if (exeDevice.exeType == EXE_GPU_GFX) { - if (time.second - 1 < r.perIterCUs.size()) { - ss1 << " CUs: "; - for (auto x : r.perIterCUs[time.second - 1]) { - ss1 << x.first << ":" << std::setfill('0') << std::setw(2) << x.second << " "; - usedXccs.insert(x.first); + table.SetCellAlignment(rowIdx, 4, TableHelper::ALIGN_LEFT); + rowIdx++; + + if (ev.showIterations) { + std::set> times; + double stdDevTime = 0; + double stdDevBw = 0; + for (size_t i = 0; i < numTimedIterations; i++) { + times.insert(std::make_pair(r.perIterMsec[i], i+1)); + double const varTime = fabs(r.avgDurationMsec - r.perIterMsec[i]); + stdDevTime += varTime * varTime; + + double iterBandwidthGbs = (t.numBytes / 1.0E9) / r.perIterMsec[i] * 1000.0f; + double const varBw = fabs(iterBandwidthGbs - r.avgBandwidthGbPerSec); + stdDevBw += varBw * varBw; + } + stdDevTime = sqrt(stdDevTime / numTimedIterations); + stdDevBw = sqrt(stdDevBw / numTimedIterations); + + for (auto& time : times) { + double iterDurationMsec = time.first; + double iterBandwidthGbs = (t.numBytes / 1.0E9) / iterDurationMsec * 1000.0f; + + std::set usedXccs; + std::stringstream ss1; + if (exeDevice.exeType == EXE_GPU_GFX) { + if (time.second - 1 < r.perIterCUs.size()) { + ss1 << " CUs: "; + for (auto x : r.perIterCUs[time.second - 1]) { + ss1 << x.first << ":" << std::setfill('0') << std::setw(2) << x.second << " "; + usedXccs.insert(x.first); + } } } - } - std::stringstream ss2; - if (!usedXccs.empty()) { - ss2 << " XCCs:"; - for (auto x : usedXccs) - ss2 << " " << x; + std::stringstream ss2; + if (!usedXccs.empty()) { + ss2 << " XCCs:"; + for (auto x : usedXccs) + ss2 << " " << x; + } + + table.Set(rowIdx, 0, "Iter %03d ", time.second); + table.Set(rowIdx, 1, "%8.3f GB/s ", iterBandwidthGbs); + table.Set(rowIdx, 2, "%8.3f ms ", iterDurationMsec); + table.Set(rowIdx, 3, ss1.str()); + table.Set(rowIdx, 4, ss2.str()); + rowIdx++; } - table.Set(rowIdx, 0, "Iter %03d ", time.second); - table.Set(rowIdx, 1, "%8.3f GB/s ", iterBandwidthGbs); - table.Set(rowIdx, 2, "%8.3f ms ", iterDurationMsec); - table.Set(rowIdx, 3, ss1.str()); - table.Set(rowIdx, 4, ss2.str()); + table.Set(rowIdx, 0, "StandardDev "); + table.Set(rowIdx, 1, "%8.3f GB/s ", stdDevBw); + table.Set(rowIdx, 2, "%8.3f ms ", stdDevTime); rowIdx++; + table.DrawRowBorder(rowIdx); } - - table.Set(rowIdx, 0, "StandardDev "); - table.Set(rowIdx, 1, "%8.3f GB/s ", stdDevBw); - table.Set(rowIdx, 2, "%8.3f ms ", stdDevTime); - rowIdx++; - table.DrawRowBorder(rowIdx); } } } diff --git a/src/header/TransferBench.hpp b/src/header/TransferBench.hpp index 023991b..07c1010 100644 --- a/src/header/TransferBench.hpp +++ b/src/header/TransferBench.hpp @@ -179,6 +179,9 @@ namespace TransferBench int32_t exeSubIndex = -1; ///< Executor subindex int32_t exeSubSlot = 0; ///< Executor subslot int numSubExecs = 0; ///< Number of subExecutors to use for this Transfer + int laps = 0; ///< 0 = one-way transfer, >0 = ping (forward), <0 = pong (backward). abs(laps) = lap count + MemDevice flag = {}; ///< Memory device for pingpong signal flag (only used when laps != 0) + ///< TODO: pingpong forbid multi-src/dst ? }; /** @@ -191,6 +194,7 @@ namespace TransferBench int numWarmups = 3; ///< Number of un-timed warmup iterations to perform int recordPerIteration = 0; ///< Record per-iteration timing information int useInteractive = 0; ///< Pause for user-input before starting transfer loop + int pingpongStride = 8; ///< Stride in bytes between flag slots for pingpong laps (must be multiple of 8) }; /** @@ -1845,6 +1849,7 @@ namespace { if (general.numIterations != cfg.general.numIterations) ADD_ERROR("cfg.general.numIterations"); if (general.numSubIterations != cfg.general.numSubIterations) ADD_ERROR("cfg.general.numSubIterations"); if (general.numWarmups != cfg.general.numWarmups) ADD_ERROR("cfg.general.numWarmups"); + if (general.pingpongStride != cfg.general.pingpongStride) ADD_ERROR("cfg.general.pingpongStride"); if (general.recordPerIteration != cfg.general.recordPerIteration) ADD_ERROR("cfg.general.recordPerIteration"); if (general.useInteractive != cfg.general.useInteractive) ADD_ERROR("cfg.general.useInteractive"); } @@ -2196,6 +2201,22 @@ namespace { } if (hasFatalError) break; + // Pingpong flag checks + if (t.laps != 0) { + if (t.flag.memType == MEM_NULL) { + errors.push_back({ERR_FATAL, "Transfer %d: Pingpong transfer requires a valid flag memory device", i}); + hasFatalError = true; + } else if (t.flag.memRank != t.exeDevice.exeRank) { + // forbid remote flag + // TODO: elaborate more on this and add UALoE support + errors.push_back({ERR_FATAL, + "Transfer %d: Pingpong flag must be on the same node as the executor (flag rank %d != executor rank %d)", + i, t.flag.memRank, t.exeDevice.exeRank}); + hasFatalError = true; + } + } + if (hasFatalError) break; + // Check executor rank if (t.exeDevice.exeRank < 0 || t.exeDevice.exeRank >= GetNumRanks()) { errors.push_back({ERR_FATAL, @@ -2538,6 +2559,10 @@ namespace { float* src[MAX_SRCS]; ///< Source array pointers float* dst[MAX_DSTS]; ///< Destination array pointers int32_t preferredXccId; ///< XCC ID to execute on (GFX only) + volatile int64_t* flagMem; ///< Pingpong flag base pointer (nullptr for normal transfers) + int laps; ///< 0 = normal, >0 = ping, <0 = pong + int flagStride; ///< Stride in bytes between flag slots per lap + int flagAllocBytes; ///< Total flag allocation size in bytes (for wrap-around) // Prepared int teamSize; ///< Index of this sub executor amongst team @@ -2605,6 +2630,13 @@ namespace { vector>sendWorkRequests; ///< Send work requests per queue pair #endif + // Pingpong + int laps = 0; ///< 0 = normal, >0 = ping, <0 = pong + void* flagMem = nullptr; ///< Pointer to paired transfer's dstMem[0] (the flag to wait on) + int flagStride = 8; ///< Stride in bytes between flag slots per lap + int flagAllocBytes = 8; ///< Total flag allocation in bytes (for wrap-around) + std::future pingpongWaitFuture; ///< Async handle for pingpong wait + // Counters double totalDurationMsec; ///< Total duration for all iterations for this Transfer vector perIterMsec; ///< Duration for each individual iteration @@ -3742,6 +3774,10 @@ static bool IsConfiguredGid(union ibv_gid const& gid) SubExecParam& p = subExecParam[i]; p.numSrcs = rss.srcMem.size(); p.numDsts = rss.dstMem.size(); + p.flagMem = nullptr; + p.laps = transfer.laps; + p.flagStride = 0; + p.flagAllocBytes = 0; p.startCycle = 0; p.stopCycle = 0; p.hwId = 0; @@ -3902,9 +3938,13 @@ static bool IsConfiguredGid(union ibv_gid const& gid) } // Allocate destination memory (on the correct rank) + // For pingpong transfers, allocate a larger buffer to hold multiple flag slots + size_t dstAllocBytes = t.numBytes + cfg.data.byteOffset; + if (t.laps != 0) + dstAllocBytes = std::max(dstAllocBytes, std::min((size_t)1024, (size_t)8 * abs(t.laps))); bool requiresFabricHandle = (dstMemDevice.memRank != exeDevice.exeRank) && IsGpuExeType(exeDevice.exeType); if (dstMemDevice.memRank == localRank) { - ERR_CHECK(AllocateMemory(dstMemDevice, t.numBytes + cfg.data.byteOffset, (void**)&rss.dstMem[iDst], + ERR_CHECK(AllocateMemory(dstMemDevice, dstAllocBytes, (void**)&rss.dstMem[iDst], &rss.dstActualBytes[iDst], requiresFabricHandle ? &rss.dstMemHandle[iDst] : NULL)); } @@ -4163,6 +4203,54 @@ static bool IsConfiguredGid(union ibv_gid const& gid) return ERR_NONE; } +// PingPong Wait primitives +//======================================================================================== + + // Dispatcher: selects the appropriate wait implementation based on executor type and flag memory type. + void PingpongWait(void* flagMem, MemType flagMemType, ExeType exeType) + { + //TODO + } + + // CPU-side spin wait: polls a flag until it becomes non-zero. + // Used by CPU executors and NIC pingpong lambdas. + template + static void CpuWait(volatile T* flag) + { + while (__atomic_load_n(flag, __ATOMIC_ACQUIRE) == T(0)) + ; + } + + // GPU-side spin wait: polls a flag until it becomes non-zero. + // Used by GPU-GFX executors (called inline from the transfer kernel). + template + __device__ void GpuWait(volatile T* flag) + { +#if defined(__NVCC__) + while (atomicAdd(const_cast(flag), T(0)) == T(0)) + ; +#else + while (__hip_atomic_load(flag, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_SYSTEM) == T(0)) + ; +#endif + } + + // Standalone GPU wait kernel: enqueued on a HIP stream for GPU-DMA executors. + // Launched with grid(1,1,1) block(1,1,1) on the same stream as the DMA transfer. + template + __global__ void GpuDmaWaitKernel(volatile T* flag) + { + GpuWait(flag); + } + + // Returns a pointer to the flag slot for the given lap, using strided wrap-around. + __host__ __device__ + static volatile int64_t* FlagSlot(volatile int64_t* base, int lap, int stride, int allocBytes) + { + int offset = (lap * stride) % allocBytes; + return (volatile int64_t*)((volatile char*)base + offset); + } + // CPU Executor-related functions //======================================================================================== @@ -4171,15 +4259,23 @@ static bool IsConfiguredGid(union ibv_gid const& gid) { if (p.N == 0) return; + int const iterations = (p.laps == 0) ? numSubIterations : abs(p.laps); int subIteration = 0; do { + // Pong: wait for ping's signal before each lap + if (p.laps < 0 && p.flagMem) + CpuWait(FlagSlot(p.flagMem, subIteration, p.flagStride, p.flagAllocBytes)); + + // For pingpong, offset dst per lap so the 8-byte write lands on the correct flag slot + int const dstByteOff = (p.laps != 0) ? (subIteration * p.flagStride) % p.flagAllocBytes : 0; + int const dstFloatOff = dstByteOff / (int)sizeof(float); + int const& numSrcs = p.numSrcs; int const& numDsts = p.numDsts; if (numSrcs == 0) { for (int i = 0; i < numDsts; ++i) { - memset(p.dst[i], MEMSET_CHAR, p.N * sizeof(float)); - //for (int j = 0; j < p.N; j++) p.dst[i][j] = MEMSET_VAL; + memset(p.dst[i] + dstFloatOff, MEMSET_CHAR, p.N * sizeof(float)); } } else if (numSrcs == 1) { float const* __restrict__ src = p.src[0]; @@ -4188,23 +4284,27 @@ static bool IsConfiguredGid(union ibv_gid const& gid) for (int j = 0; j < p.N; j++) sum += p.src[0][j]; - // Add a dummy check to ensure the read is not optimized out if (sum != sum) { System::Get().Log("[ERROR] Nan detected\n"); } } else { for (int i = 0; i < numDsts; ++i) - memcpy(p.dst[i], src, p.N * sizeof(float)); + memcpy(p.dst[i] + dstFloatOff, src, p.N * sizeof(float)); } } else { float sum = 0.0f; for (int j = 0; j < p.N; j++) { sum = p.src[0][j]; for (int i = 1; i < numSrcs; i++) sum += p.src[i][j]; - for (int i = 0; i < numDsts; i++) p.dst[i][j] = sum; + for (int i = 0; i < numDsts; i++) p.dst[i][j + dstFloatOff] = sum; } } - } while (++subIteration != numSubIterations); + + // Ping: wait for pong's signal before next lap + if (p.laps > 0 && p.flagMem) + CpuWait(FlagSlot(p.flagMem, subIteration, p.flagStride, p.flagAllocBytes)); + + } while (++subIteration != iterations); } // Execution of a single CPU Transfers @@ -4213,6 +4313,9 @@ static bool IsConfiguredGid(union ibv_gid const& gid) int const exeIndex, TransferResources& rss) { + if (rss.laps >= 0 && rss.flagMem) + usleep(1); + auto cpuStart = std::chrono::high_resolution_clock::now(); vector childThreads; @@ -4224,7 +4327,8 @@ static bool IsConfiguredGid(union ibv_gid const& gid) childThreads.clear(); auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; - double deltaMsec = (std::chrono::duration_cast>(cpuDelta).count() * 1000.0) / cfg.general.numSubIterations; + int const divisor = (rss.laps != 0) ? abs(rss.laps) : cfg.general.numSubIterations; + double deltaMsec = (std::chrono::duration_cast>(cpuDelta).count() * 1000.0) / divisor; if (iteration >= 0) { rss.totalDurationMsec += deltaMsec; @@ -4299,53 +4403,113 @@ static bool IsConfiguredGid(union ibv_gid const& gid) auto transferCount = exeInfo.resources.size(); std::vector totalTimeMsec(transferCount, 0.0); + // Launch pingpong transfers in dedicated threads + vector> ppAsyncTransfers; + for (int i = 0; i < transferCount; i++) { + auto& rss = exeInfo.resources[i]; + if (rss.laps == 0) continue; + ppAsyncTransfers.emplace_back(std::async(std::launch::async, + [&rss, &totalTimeMsec_i = totalTimeMsec[i], iteration, &cfg, exeIndex]() -> ErrResult { + if (rss.laps >= 0 && rss.flagMem) + usleep(1); + int const laps = abs(rss.laps); + for (int lap = 0; lap < laps; ++lap) { + if (rss.laps < 0) + CpuWait(FlagSlot(static_cast(rss.flagMem), lap, rss.flagStride, rss.flagAllocBytes)); + + int dstByteOff = (lap * rss.flagStride) % rss.flagAllocBytes; + for (auto& wrVec : rss.sendWorkRequests) + for (auto& wr : wrVec) + wr.wr.rdma.remote_addr += dstByteOff; + + auto lapStart = std::chrono::high_resolution_clock::now(); + ERR_CHECK(ExecuteNicTransfer(iteration, cfg, exeIndex, rss)); + + for (auto& wrVec : rss.sendWorkRequests) + for (auto& wr : wrVec) + wr.wr.rdma.remote_addr -= dstByteOff; + + ibv_cq* cq = rss.srcIsExeNic ? rss.srcCompQueue : rss.dstCompQueue; + struct ibv_wc wc; + int completed = 0; + while (completed < rss.qpCount) { + int nc = ibv_poll_cq(cq, 1, &wc); + if (nc > 0) { + if (wc.status != IBV_WC_SUCCESS) + return {ERR_FATAL, "Transfer %d: pingpong NIC completion error [status %d]", rss.transferIdx, wc.status}; + completed++; + } else if (nc < 0) { + return {ERR_FATAL, "Transfer %d: pingpong NIC poll error", rss.transferIdx}; + } + } + + if (rss.laps > 0) + CpuWait(FlagSlot(static_cast(rss.flagMem), lap, rss.flagStride, rss.flagAllocBytes)); + + auto lapDelta = std::chrono::high_resolution_clock::now() - lapStart; + if (iteration >= 0) + totalTimeMsec_i += std::chrono::duration_cast>(lapDelta).count() * 1000.0; + } + return ERR_NONE; + })); + } + + // Normal transfers: existing batch loop (skip pingpong) + size_t normalCount = 0; + for (int i = 0; i < transferCount; i++) + if (exeInfo.resources[i].laps == 0) normalCount++; + int subIterations = 0; auto cpuStart = std::chrono::high_resolution_clock::now(); std::vector transferTimers(transferCount); - do { - std::vector receivedQPs(transferCount, 0); - // post the sends - for (auto i = 0; i < transferCount; i++) { - transferTimers[i] = std::chrono::high_resolution_clock::now(); - ERR_CHECK(ExecuteNicTransfer(iteration, cfg, exeIndex, exeInfo.resources[i])); - } - // poll for completions - size_t completedTransfers = 0; - int pollBatch = std::max(1, cfg.nic.cqPollBatch); - std::vector wc((size_t)pollBatch); - ibv_wc* wc_array = wc.data(); - while (completedTransfers < transferCount) { + if (normalCount > 0) { + do { + std::vector receivedQPs(transferCount, 0); + // post the sends for (auto i = 0; i < transferCount; i++) { - if(receivedQPs[i] < exeInfo.resources[i].qpCount) { - auto& rss = exeInfo.resources[i]; - // Poll the completion queue until all queue pairs are complete - // The order of completion doesn't matter because this completion queue is dedicated to this Transfer - // Use batch polling to drain multiple completions at once for better efficiency - int nc = ibv_poll_cq(rss.srcIsExeNic ? rss.srcCompQueue : rss.dstCompQueue, pollBatch, wc_array); - if (nc > 0) { - // Process all completions in the batch - for (int j = 0; j < nc; j++) { - if (wc_array[j].status != IBV_WC_SUCCESS) { - return {ERR_FATAL, "Transfer %d: Received unsuccessful work completion [status code %d]", rss.transferIdx, wc_array[j].status}; + if (exeInfo.resources[i].laps != 0) continue; + transferTimers[i] = std::chrono::high_resolution_clock::now(); + ERR_CHECK(ExecuteNicTransfer(iteration, cfg, exeIndex, exeInfo.resources[i])); + } + // poll for completions + size_t completedTransfers = 0; + int pollBatch = std::max(1, cfg.nic.cqPollBatch); + std::vector wc((size_t)pollBatch); + ibv_wc* wc_array = wc.data(); + while (completedTransfers < normalCount) { + for (auto i = 0; i < transferCount; i++) { + if (exeInfo.resources[i].laps != 0) continue; + if(receivedQPs[i] < exeInfo.resources[i].qpCount) { + auto& rss = exeInfo.resources[i]; + int nc = ibv_poll_cq(rss.srcIsExeNic ? rss.srcCompQueue : rss.dstCompQueue, pollBatch, wc_array); + if (nc > 0) { + for (int j = 0; j < nc; j++) { + if (wc_array[j].status != IBV_WC_SUCCESS) { + return {ERR_FATAL, "Transfer %d: Received unsuccessful work completion [status code %d]", rss.transferIdx, wc_array[j].status}; + } + receivedQPs[i]++; } - receivedQPs[i]++; + } else if (nc < 0) { + return {ERR_FATAL, "Transfer %d: Received negative work completion", rss.transferIdx}; } - } else if (nc < 0) { - return {ERR_FATAL, "Transfer %d: Received negative work completion", rss.transferIdx}; - } - if(receivedQPs[i] == rss.qpCount) { - auto cpuDelta = std::chrono::high_resolution_clock::now() - transferTimers[i]; - double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0; - if (iteration >= 0) { - totalTimeMsec[i] += deltaMsec; + if(receivedQPs[i] == rss.qpCount) { + auto cpuDelta = std::chrono::high_resolution_clock::now() - transferTimers[i]; + double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0; + if (iteration >= 0) { + totalTimeMsec[i] += deltaMsec; + } + completedTransfers++; } - completedTransfers++; } } } - } - } while(++subIterations < cfg.general.numSubIterations); + } while(++subIterations < cfg.general.numSubIterations); + } + + // Wait for pingpong threads to finish + for (auto& f : ppAsyncTransfers) + ERR_CHECK(f.get()); auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; double deltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0 / cfg.general.numSubIterations; @@ -4354,7 +4518,8 @@ static bool IsConfiguredGid(union ibv_gid const& gid) exeInfo.totalDurationMsec += deltaMsec; for (int i = 0; i < transferCount; i++) { auto& rss = exeInfo.resources[i]; - double transferTimeMsec = totalTimeMsec[i] / cfg.general.numSubIterations; + int const divisor = (rss.laps != 0) ? abs(rss.laps) : cfg.general.numSubIterations; + double transferTimeMsec = totalTimeMsec[i] / divisor; rss.totalDurationMsec += transferTimeMsec; if (cfg.general.recordPerIteration) rss.perIterMsec.push_back(transferTimeMsec); @@ -4489,11 +4654,6 @@ static bool IsConfiguredGid(union ibv_gid const& gid) __global__ void __launch_bounds__(LAUNCH_BOUND) GpuReduceKernel(SubExecParam* params, int seType, int waveOrder, int numSubIterations) { - int64_t startCycle; - // For warp-level, each warp's first thread records timing; for threadblock-level, only first thread of block - bool shouldRecordTiming = (seType == 1) ? (threadIdx.x % warpSize == 0) : (threadIdx.x == 0); - if (shouldRecordTiming) startCycle = GetTimestamp(); - // seType: 0=threadblock, 1=warp int subExecIdx; if (seType == 0) { @@ -4508,6 +4668,17 @@ static bool IsConfiguredGid(union ibv_gid const& gid) SubExecParam& p = params[subExecIdx]; + // Delay non-pong transfers so pong can start waiting on the flag first + if (p.laps >= 0 && p.flagMem) { + int64_t t0 = GetTimestamp(); + while (GetTimestamp() - t0 < 1000) {} + } + + int64_t startCycle; + // For warp-level, each warp's first thread records timing; for threadblock-level, only first thread of block + bool shouldRecordTiming = (seType == 1) ? (threadIdx.x % warpSize == 0) : (threadIdx.x == 0); + if (shouldRecordTiming) startCycle = GetTimestamp(); + // For warp-level dispatch, inactive warps should return early if (seType == 1 && p.N == 0) return; @@ -4553,8 +4724,21 @@ static bool IsConfiguredGid(union ibv_gid const& gid) case 5: /* C,W,U */ teamStride = 1; waveStride = nTeams; unrlStride = nTeams * nWaves; teamStride2 = 1; waveStride2 = nTeams; break; } + int const iterations = (p.laps == 0) ? numSubIterations : abs(p.laps); int subIterations = 0; while (1) { + // Pong: wait for ping's signal before each lap + if (p.laps < 0 && p.flagMem) { + if (threadIdx.x == 0) + GpuWait(FlagSlot(p.flagMem, subIterations, p.flagStride, p.flagAllocBytes)); + if (seType == 0) __syncthreads(); else __syncwarp(); + } + + // For pingpong, offset dst per lap so the 8-byte write lands on the correct flag slot + size_t const dstFloatOff = (p.laps != 0) + ? ((subIterations * p.flagStride) % p.flagAllocBytes) / sizeof(float) + : 0; + // First loop: Each wavefront in the team works on UNROLL PACKED_FLOAT per thread size_t const loop1Stride = nTeams * nWaves * UNROLL * warpSize; size_t const loop1Limit = numPackedFloat / loop1Stride * loop1Stride; @@ -4632,12 +4816,19 @@ static bool IsConfiguredGid(union ibv_gid const& gid) } for (int d = 0; d < numDsts; d++) - Store(val, &p.dst[d][idx]); + Store(val, &p.dst[d][idx + dstFloatOff]); } } } - if (++subIterations == numSubIterations) break; + // Ping: wait for pong's signal before next lap + if (p.laps > 0 && p.flagMem) { + if (threadIdx.x == 0) + GpuWait(FlagSlot(p.flagMem, subIterations, p.flagStride, p.flagAllocBytes)); + if (seType == 0) __syncthreads(); else __syncwarp(); + } + + if (++subIterations == iterations) break; } // Wait for all threads to finish @@ -4709,6 +4900,9 @@ static bool IsConfiguredGid(union ibv_gid const& gid) ConfigOptions const& cfg, TransferResources& rss) { + if (rss.laps >= 0 && rss.flagMem) + usleep(1); + auto cpuStart = std::chrono::high_resolution_clock::now(); int numSubExecs = rss.subExecParamCpu.size(); @@ -4739,14 +4933,15 @@ static bool IsConfiguredGid(union ibv_gid const& gid) ERR_CHECK(hipStreamSynchronize(stream)); auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; - double cpuDeltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0 / cfg.general.numSubIterations; + int const divisor = (rss.laps != 0) ? abs(rss.laps) : cfg.general.numSubIterations; + double cpuDeltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0 / divisor; if (iteration >= 0) { double deltaMsec = cpuDeltaMsec; if (startEvent != NULL) { float gpuDeltaMsec; ERR_CHECK(hipEventElapsedTime(&gpuDeltaMsec, startEvent, stopEvent)); - deltaMsec = gpuDeltaMsec / cfg.general.numSubIterations; + deltaMsec = gpuDeltaMsec / divisor; } rss.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) { @@ -4851,7 +5046,8 @@ static bool IsConfiguredGid(union ibv_gid const& gid) } } double deltaMsec = (maxStopCycle - minStartCycle) / (double)(exeInfo.wallClockRate); - deltaMsec /= cfg.general.numSubIterations; + int const divisor = (rss.laps != 0) ? abs(rss.laps) : cfg.general.numSubIterations; + deltaMsec /= divisor; rss.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) { rss.perIterMsec.push_back(deltaMsec); @@ -4876,9 +5072,13 @@ static bool IsConfiguredGid(union ibv_gid const& gid) ConfigOptions const& cfg, TransferResources& resources) { + if (resources.laps >= 0 && resources.flagMem) + usleep(1); + auto cpuStart = std::chrono::high_resolution_clock::now(); ERR_CHECK(hipSetDevice(exeIndex)); + int const iterations = (resources.laps != 0) ? abs(resources.laps) : cfg.general.numSubIterations; int subIterations = 0; if (!useSubIndices && !cfg.dma.useHsaCopy) { if (cfg.dma.useHipEvents) @@ -4886,15 +5086,28 @@ static bool IsConfiguredGid(union ibv_gid const& gid) // Use DMA copy engine do { + // Pong: enqueue wait kernel before DMA copy + if (resources.laps < 0 && resources.flagMem) + GpuDmaWaitKernel<<<1, 1, 0, stream>>>(FlagSlot(static_cast(resources.flagMem), subIterations, resources.flagStride, resources.flagAllocBytes)); + + // For pingpong, offset dst per lap + int dstByteOff = (resources.laps != 0) ? (subIterations * resources.flagStride) % resources.flagAllocBytes : 0; + void* lapDst = (char*)resources.dstMem[0] + dstByteOff; + #if defined(__NVCC__) - ERR_CHECK(cuMemcpyAsync((CUdeviceptr)resources.dstMem[0], + ERR_CHECK(cuMemcpyAsync((CUdeviceptr)lapDst, (CUdeviceptr)resources.srcMem[0], resources.numBytes, stream)); #else - ERR_CHECK(hipMemcpyAsync(resources.dstMem[0], resources.srcMem[0], resources.numBytes, + ERR_CHECK(hipMemcpyAsync(lapDst, resources.srcMem[0], resources.numBytes, hipMemcpyDefault, stream)); #endif - } while (++subIterations != cfg.general.numSubIterations); + + // Ping: enqueue wait kernel after DMA copy (wait for pong before next lap) + if (resources.laps > 0 && resources.flagMem) + GpuDmaWaitKernel<<<1, 1, 0, stream>>>(FlagSlot(static_cast(resources.flagMem), subIterations, resources.flagStride, resources.flagAllocBytes)); + + } while (++subIterations != iterations); if (cfg.dma.useHipEvents) ERR_CHECK(hipEventRecord(stopEvent, stream)); @@ -4905,14 +5118,24 @@ static bool IsConfiguredGid(union ibv_gid const& gid) #else // Use HSA async copy do { + // Pong: enqueue wait kernel before HSA copy + if (resources.laps < 0 && resources.flagMem) { + GpuDmaWaitKernel<<<1, 1, 0, stream>>>(FlagSlot(static_cast(resources.flagMem), subIterations, resources.flagStride, resources.flagAllocBytes)); + ERR_CHECK(hipStreamSynchronize(stream)); + } + + // For pingpong, offset dst per lap + int hsaDstByteOff = (resources.laps != 0) ? (subIterations * resources.flagStride) % resources.flagAllocBytes : 0; + void* hsaLapDst = (char*)resources.dstMem[0] + hsaDstByteOff; + hsa_signal_store_screlease(resources.signal, 1); if (!useSubIndices) { - ERR_CHECK(hsa_amd_memory_async_copy(resources.dstMem[0], resources.dstAgent, + ERR_CHECK(hsa_amd_memory_async_copy(hsaLapDst, resources.dstAgent, resources.srcMem[0], resources.srcAgent, resources.numBytes, 0, NULL, resources.signal)); } else { - HSA_CALL(hsa_amd_memory_async_copy_on_engine(resources.dstMem[0], resources.dstAgent, + HSA_CALL(hsa_amd_memory_async_copy_on_engine(hsaLapDst, resources.dstAgent, resources.srcMem[0], resources.srcAgent, resources.numBytes, 0, NULL, resources.signal, @@ -4922,18 +5145,25 @@ static bool IsConfiguredGid(union ibv_gid const& gid) while(hsa_signal_wait_scacquire(resources.signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE) >= 1); - } while (++subIterations != cfg.general.numSubIterations); + + // Ping: enqueue wait kernel after HSA copy (wait for pong before next lap) + if (resources.laps > 0 && resources.flagMem) { + GpuDmaWaitKernel<<<1, 1, 0, stream>>>(FlagSlot(static_cast(resources.flagMem), subIterations, resources.flagStride, resources.flagAllocBytes)); + ERR_CHECK(hipStreamSynchronize(stream)); + } + + } while (++subIterations != iterations); #endif } auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; - double cpuDeltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0 / cfg.general.numSubIterations; + double cpuDeltaMsec = std::chrono::duration_cast>(cpuDelta).count() * 1000.0 / iterations; if (iteration >= 0) { double deltaMsec = cpuDeltaMsec; if (!useSubIndices && !cfg.dma.useHsaCopy && cfg.dma.useHipEvents) { float gpuDeltaMsec; ERR_CHECK(hipEventElapsedTime(&gpuDeltaMsec, startEvent, stopEvent)); - deltaMsec = gpuDeltaMsec / cfg.general.numSubIterations; + deltaMsec = gpuDeltaMsec / iterations; } resources.totalDurationMsec += deltaMsec; if (cfg.general.recordPerIteration) @@ -4953,6 +5183,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) vector> asyncTransfers; for (int i = 0; i < exeInfo.resources.size(); i++) { + auto& rss = exeInfo.resources[i]; asyncTransfers.emplace_back(std::async(std::launch::async, ExecuteDmaTransfer, iteration, @@ -5107,6 +5338,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) TransferResources resource = {}; resource.transferIdx = i; + resource.laps = t.laps; ExeInfo& exeInfo = executorMap[exeDevice]; exeInfo.totalBytes += t.numBytes; @@ -5138,6 +5370,54 @@ static bool IsConfiguredGid(union ibv_gid const& gid) } } + // Cross-link flagMem for pingpong pairs (ping and pong are consecutive in transfers) + for (size_t i = 0; i + 1 < transfers.size(); i++) { + if (transfers[i].laps > 0 && transfers[i+1].laps < 0) { + auto pingRss = transferResources[i]; + auto pongRss = transferResources[i + 1]; + pingRss->flagMem = pongRss->dstMem[0]; + pongRss->flagMem = pingRss->dstMem[0]; + int stride = cfg.general.pingpongStride; + int allocBytes = (int)std::min((size_t)1024, (size_t)8 * abs(transfers[i].laps)); + pingRss->flagStride = stride; + pongRss->flagStride = stride; + pingRss->flagAllocBytes = allocBytes; + pongRss->flagAllocBytes = allocBytes; + i++; + } + } + + // Propagate flagMem into SubExecParam for pingpong transfers + for (auto& exeInfoPair : executorMap) { + ExeDevice const& exeDevice = exeInfoPair.first; + ExeInfo& exeInfo = exeInfoPair.second; + if (exeDevice.exeRank != localRank) continue; + + for (auto& rss : exeInfo.resources) { + if (rss.laps == 0) continue; + volatile int64_t* flag = static_cast(rss.flagMem); + + // Update per-transfer CPU-side SubExecParam (used by CPU executor) + for (auto& p : rss.subExecParamCpu) { + p.flagMem = flag; + p.flagStride = rss.flagStride; + p.flagAllocBytes = rss.flagAllocBytes; + } + + // For GFX executors, also update the packed GPU-side buffer + if (exeDevice.exeType == EXE_GPU_GFX) { + int subIdx = rss.subExecIdx[0]; + exeInfo.subExecParamCpu[subIdx].flagMem = flag; + exeInfo.subExecParamCpu[subIdx].flagStride = rss.flagStride; + exeInfo.subExecParamCpu[subIdx].flagAllocBytes = rss.flagAllocBytes; + ERR_APPEND(hipMemcpy(exeInfo.subExecParamGpu + subIdx, + &exeInfo.subExecParamCpu[subIdx], + sizeof(SubExecParam), + hipMemcpyHostToDevice), errResults); + } + } + } + // Prepare reference src/dst arrays - only once for largest size size_t maxN = maxNumBytes / sizeof(float); vector outputBuffer(maxN); @@ -5208,6 +5488,23 @@ static bool IsConfiguredGid(union ibv_gid const& gid) // Wait for all ranks before starting any timing System::Get().Barrier(); + // Zero flag memory for pingpong transfers so each lap's slot starts at 0 + for (size_t i = 0; i + 1 < transfers.size(); i++) { + if (transfers[i].laps > 0 && transfers[i+1].laps < 0) { + size_t allocBytes = std::min((size_t)1024, (size_t)8 * abs(transfers[i].laps)); + for (int k = 0; k < 2; k++) { + void* dst = transferResources[i + k]->dstMem[0]; + if (!dst) continue; + MemType mt = transfers[i + k].dsts[0].memType; + if (mt == MEM_CPU || mt == MEM_NULL) + memset(dst, 0, allocBytes); + else + hipMemset(dst, 0, allocBytes); + } + i++; + } + } + // Start CPU timing for this iteration auto cpuStart = std::chrono::high_resolution_clock::now(); @@ -5684,7 +5981,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) ErrResult ParseTransfers(std::string line, std::vector& transfers) { - // Replace any round brackets or '->' with spaces, + // Replace round brackets, '->', and ':' with spaces, but preserve '+' for (int i = 1; line[i]; i++) if (line[i] == '(' || line[i] == ')' || line[i] == '-' || line[i] == ':' || line[i] == '>' ) line[i] = ' '; @@ -5746,11 +6043,82 @@ static bool IsConfiguredGid(union ibv_gid const& gid) ERR_CHECK(ParseMemType(dstStr, wct.mem[1])); ERR_CHECK(ParseExeType(exeStr, wct.exe)); - // Perform wildcard expansion - int numRanks = GetNumRanks(); - for (int localRankIndex = 0; localRankIndex < numRanks; localRankIndex++) { - bool localRankModified = RecursiveWildcardTransferExpansion(wct, localRankIndex, numBytes, numSubExecs, transfers); - if (!localRankModified) break; + // Check for '+' to detect pingpong pair + std::string nextToken; + auto pos = iss.tellg(); + bool isPingpong = (iss >> nextToken && nextToken == "+"); + if (!isPingpong) { + iss.clear(); + iss.seekg(pos); + } + + if (isPingpong) { + // Parse the pong (backward) triplet + std::string pongSrcStr, pongExeStr, pongDstStr; + iss >> pongSrcStr >> pongExeStr >> pongDstStr; + if (iss.fail()) + return {ERR_FATAL, + "Parsing error: Incomplete pong triplet for Pingpong %d", i+1}; + + WildcardTransfer pongWct; + ERR_CHECK(ParseMemType(pongSrcStr, pongWct.mem[0])); + ERR_CHECK(ParseMemType(pongDstStr, pongWct.mem[1])); + ERR_CHECK(ParseExeType(pongExeStr, pongWct.exe)); + + // Check for optional trailing 'x' (e.g. x50) + int numLaps = 1; + auto lapsPos = iss.tellg(); + std::string lapsToken; + if (iss >> lapsToken && lapsToken.size() > 1 && + (lapsToken[0] == 'x' || lapsToken[0] == 'X')) { + numLaps = atoi(lapsToken.c_str() + 1); + if (numLaps < 1) + return {ERR_FATAL, "Parsing error: Pingpong %d lap count must be positive (got %d)", i+1, numLaps}; + } else { + iss.clear(); + iss.seekg(lapsPos); + } + + // Expand ping half + std::vector pingTransfers; + int numRanks = GetNumRanks(); + for (int r = 0; r < numRanks; r++) { + if (!RecursiveWildcardTransferExpansion(wct, r, numBytes, numSubExecs, pingTransfers)) + break; + } + + // Expand pong half + std::vector pongTransfers; + for (int r = 0; r < numRanks; r++) { + if (!RecursiveWildcardTransferExpansion(pongWct, r, numBytes, numSubExecs, pongTransfers)) + break; + } + + // Cartesian product: pair every ping with every pong + for (size_t p = 0; p < pingTransfers.size(); p++) { + for (size_t q = 0; q < pongTransfers.size(); q++) { + Transfer ping = pingTransfers[p]; + Transfer pong = pongTransfers[q]; + ping.laps = numLaps; + pong.laps = -numLaps; + ping.numBytes = 8; + pong.numBytes = 8; + ping.numSubExecs = 1; + pong.numSubExecs = 1; + ping.flag = ping.dsts[0]; + pong.flag = pong.dsts[0]; + transfers.push_back(ping); + transfers.push_back(pong); + } + } + + } else { + // Normal transfer -- expand into transfers (existing behavior) + int numRanks = GetNumRanks(); + for (int localRankIndex = 0; localRankIndex < numRanks; localRankIndex++) { + bool localRankModified = RecursiveWildcardTransferExpansion(wct, localRankIndex, numBytes, numSubExecs, transfers); + if (!localRankModified) break; + } } }