diff --git a/.azure-pipelines/multi-nodes-test.yml b/.azure-pipelines/multi-nodes-test.yml index 3b3ebe1ff..2748813d7 100644 --- a/.azure-pipelines/multi-nodes-test.yml +++ b/.azure-pipelines/multi-nodes-test.yml @@ -93,29 +93,37 @@ jobs: remoteScript: | bash /root/mscclpp/test/deploy/run_tests.sh mscclpp-test - - template: templates/run-remote-task.yml - parameters: - name: RunMultiNodeUnitTest - displayName: Run multi-nodes unit tests - runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' - remoteScript: | - bash /root/mscclpp/test/deploy/run_tests.sh mp-ut + # - template: templates/run-remote-task.yml + # parameters: + # name: RunMultiNodeUnitTest + # displayName: Run multi-nodes unit tests + # runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' + # remoteScript: | + # bash /root/mscclpp/test/deploy/run_tests.sh mp-ut - - template: templates/run-remote-task.yml - parameters: - name: RunMultiNodePythonTests - displayName: Run multi-nodes python tests - runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' - remoteScript: | - bash /root/mscclpp/test/deploy/run_tests.sh pytests + # - template: templates/run-remote-task.yml + # parameters: + # name: RunMultiNodePythonTests + # displayName: Run multi-nodes python tests + # runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' + # remoteScript: | + # bash /root/mscclpp/test/deploy/run_tests.sh pytests + + # - template: templates/run-remote-task.yml + # parameters: + # name: RunMultiNodePythonBenchmark + # displayName: Run multi-nodes python benchmark + # runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' + # remoteScript: | + # bash /root/mscclpp/test/deploy/run_tests.sh py-benchmark - template: templates/run-remote-task.yml parameters: - name: RunMultiNodePythonBenchmark - displayName: Run multi-nodes python benchmark + name: RunMultiNodeExecutorTests + displayName: Run multi-nodes executor tests runRemoteArgs: '--hostfile $(System.DefaultWorkingDirectory)/test/deploy/hostfile --host ${{ parameters.vmssName }}000000 --user azureuser' remoteScript: | - bash /root/mscclpp/test/deploy/run_tests.sh py-benchmark + bash /root/mscclpp/test/deploy/run_tests.sh executor-tests - template: templates/stop.yml parameters: diff --git a/test/deploy/run_tests.sh b/test/deploy/run_tests.sh index 6a70c76e7..dbac9cbfe 100644 --- a/test/deploy/run_tests.sh +++ b/test/deploy/run_tests.sh @@ -96,8 +96,25 @@ function run_py_benchmark() -x MSCCLPP_HOME=/root/mscclpp -npernode 8 python3 /root/mscclpp/python/mscclpp_benchmark/allreduce_bench.py } +function run_executor_tests() +{ + echo "==================Run multi-node executor tests======================" + ALGOS_DIR=/root/mscclpp/test/executor-tests/algos + PLANS_DIR=/root/mscclpp/test/executor-tests/execution-plans + TEST_SCRIPT=/root/mscclpp/python/test/executor_test.py + PYTHON_BIN=/root/venv/bin/python3 + + echo "Generating execution plans" + ${PYTHON_BIN} ${ALGOS_DIR}/multi_node_transfer.py --name multi_node_transfer > ${PLANS_DIR}/multi_node_transfer.json + ${PYTHON_BIN} ${ALGOS_DIR}/multi_node_transfer_pkt.py --name multi_node_transfer_pkt > ${PLANS_DIR}/multi_node_transfer_pkt.json + + echo "Running multi-node transfer test with in-place buffers" + mpirun ${MPI_ARGS} -np 2 -npernode 1 ${MSCCLPP_ENV} ${PYTHON_BIN} $TEST_SCRIPT -path $PLANS_DIR/multi_node_transfer.json --size 1M --in_place + mpirun ${MPI_ARGS} -np 2 -npernode 1 ${MSCCLPP_ENV} ${PYTHON_BIN} $TEST_SCRIPT -path $PLANS_DIR/multi_node_transfer_pkt.json --size 1M --in_place +} + if [ $# -lt 1 ]; then - echo "Usage: $0 " + echo "Usage: $0 " exit 1 fi test_name=$1 @@ -118,6 +135,10 @@ case $test_name in echo "==================Run python benchmark================================" run_py_benchmark ;; + executor-tests) + echo "==================Run executor tests=================================" + run_executor_tests + ;; *) echo "Unknown test name: $test_name" exit 1 diff --git a/test/executor-tests/algos/multi_node_transfer.py b/test/executor-tests/algos/multi_node_transfer.py new file mode 100644 index 000000000..2fff1e7a4 --- /dev/null +++ b/test/executor-tests/algos/multi_node_transfer.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Multi-Node Transfer Test + +This file tests the SIGNAL, WAIT, PUT, PUT_WITH_SIGNAL and +PUT_WITH_SIGNAL_AND_FLUSH operations on PortChannels in a multi-node +environment. It implements a 2-GPU allgather using the Simple protocol, +exercising the different port-channel synchronization primitives. +""" + +import argparse +from mscclpp.language.channel import * +from mscclpp.language.rank import * +from mscclpp.language.general import * +from mscclpp.language.program import * +from mscclpp.language.collectives import * + + +def multi_node_transfer(name, num_threads_per_block, min_message_size, max_message_size): + chunksperloop = 2 + gpu_size = 2 + collective = AllGather(gpu_size, chunksperloop, True) + with CollectiveProgram( + name, + collective, + gpu_size, + protocol="Simple", + num_threads_per_block=num_threads_per_block, + use_double_scratch_buffer=False, + min_message_size=min_message_size, + max_message_size=max_message_size, + ): + # Setup ranks, channels, output and scratch buffers for 2-GPU allgather + first_rank = Rank(0) + second_rank = Rank(1) + first_ch1 = PortChannel(1, 0) + second_ch1 = PortChannel(0, 1) + first_ch2 = PortChannel(1, 0) + second_ch2 = PortChannel(0, 1) + first_output_buffer = first_rank.get_output_buffer() + second_output_buffer = second_rank.get_output_buffer() + + # Initial handshake on both port channels: peers exchange SIGNAL/WAIT to + # ensure remote buffers are ready before any data transfer begins. + first_ch1.signal(tb=0) + second_ch1.signal(tb=0) + first_ch1.wait(tb=0) + second_ch1.wait(tb=0) + first_ch2.signal(tb=1) + second_ch2.signal(tb=1) + first_ch2.wait(tb=1) + second_ch2.wait(tb=1) + + # Rank 0 -> rank 1 via ch1: PUT followed by an explicit SIGNAL and FLUSH + first_ch1.put(second_output_buffer[0:1], first_output_buffer[0:1], tb=0) + first_ch1.signal(tb=0) + first_ch1.flush(tb=0) + # Rank 0 -> rank 1 via ch2: PUT_WITH_SIGNAL fuses the data transfer with + # the completion signal, followed by a separate FLUSH + first_ch2.put_with_signal(second_output_buffer[1:2], first_output_buffer[1:2], tb=1) + first_ch2.flush(tb=1) + # Rank 1 -> rank 0 via ch1: PUT_WITH_SIGNAL_AND_FLUSH fuses PUT, SIGNAL + # and FLUSH into a single operation + second_ch1.put_with_signal_and_flush(first_output_buffer[2:4], second_output_buffer[2:4], tb=0) + + # Final WAITs ensure all incoming transfers have completed on each rank + first_ch1.wait(tb=0) + second_ch1.wait(tb=0) + second_ch2.wait(tb=1) + + print(JSON()) + + +parser = argparse.ArgumentParser() + +parser.add_argument("--name", type=str, help="name of the program") +parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block") +parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size") +parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size") + +args = parser.parse_args() + +multi_node_transfer(args.name, args.num_threads_per_block, args.min_message_size, args.max_message_size) diff --git a/test/executor-tests/algos/multi_node_transfer_pkt.py b/test/executor-tests/algos/multi_node_transfer_pkt.py new file mode 100644 index 000000000..6924a5cbc --- /dev/null +++ b/test/executor-tests/algos/multi_node_transfer_pkt.py @@ -0,0 +1,70 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Multi-Node Transfer Pack Test + +This file tests the PUT_PACKETS and READ_PUT_PACKETS operations on +PortChannels in a multi-node environment. It implements a 2-GPU allgather +with the LL (low-latency) packet protocol, using port channels for inter-node +communication. +""" + +import argparse +from mscclpp.language.channel import * +from mscclpp.language.rank import * +from mscclpp.language.general import * +from mscclpp.language.program import * +from mscclpp.language.collectives import * + + +def multi_node_transfer_pkt(name, num_threads_per_block, min_message_size, max_message_size): + chunksperloop = 1 + gpu_size = 2 + collective = AllGather(gpu_size, chunksperloop, True) + with CollectiveProgram( + name, + collective, + gpu_size, + protocol="LL", + num_threads_per_block=num_threads_per_block, + use_double_scratch_buffer=True, + min_message_size=min_message_size, + max_message_size=max_message_size, + ): + # Setup ranks, channels, output and scratch buffers for 2-GPU allgather + first_rank = Rank(0) + second_rank = Rank(1) + first_ch = PortChannel(1, 0) + second_ch = PortChannel(0, 1) + first_output_buffer = first_rank.get_output_buffer() + second_output_buffer = second_rank.get_output_buffer() + first_scratch_buffer = Buffer(0, 2) + second_scratch_buffer = Buffer(1, 2) + + # Each rank stages its own output chunk into its local scratch buffer as packets + first_rank.copy_packets(first_scratch_buffer[0:1], first_output_buffer[0:1], tb=0) + second_rank.copy_packets(second_scratch_buffer[1:2], second_output_buffer[1:2], tb=0) + + # Rank 0 pushes packets to rank 1's scratch via port channel (PUT_PACKETS) + # Rank 1 reads from rank 0's scratch and pushes packets back via port channel (READ_PUT_PACKETS) + first_ch.put_packets(second_scratch_buffer[0:1], first_scratch_buffer[0:1], tb=0) + second_ch.read_put_packets(first_scratch_buffer[1:2], second_scratch_buffer[1:2], tb=1) + + # Both ranks unpack received packets from scratch into their output buffers + first_rank.unpack_packets(first_output_buffer[1:2], first_scratch_buffer[1:2], tb=1) + second_rank.unpack_packets(second_output_buffer[0:1], second_scratch_buffer[0:1], tb=2) + + print(JSON()) + + +parser = argparse.ArgumentParser() + +parser.add_argument("--name", type=str, help="name of the program") +parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block") +parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size") +parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size") + +args = parser.parse_args() + +multi_node_transfer_pkt(args.name, args.num_threads_per_block, args.min_message_size, args.max_message_size) diff --git a/test/executor-tests/algos/test.json b/test/executor-tests/algos/test.json new file mode 100644 index 000000000..2b7195bac --- /dev/null +++ b/test/executor-tests/algos/test.json @@ -0,0 +1,239 @@ +{ + "name": "allgather", + "collective": "allgather", + "protocol": "LL", + "inplace": true, + "reuse_resources": false, + "gpus": [ + { + "id": 0, + "input_chunks": 1, + "output_chunks": 2, + "scratch_chunks": 2, + "threadblocks": [ + { + "id": 0, + "ops": [ + { + "name": "cpkt", + "src_buff": [ + { + "type": "o", + "index": 0, + "size": 1 + } + ], + "dst_buff": [ + { + "type": "s", + "index": 0, + "size": 1 + } + ] + }, + { + "name": "ppkt", + "src_buff": [ + { + "type": "s", + "index": 0, + "size": 1 + } + ], + "dst_buff": [ + { + "buffer_id": 0, + "index": 0, + "size": 1 + } + ], + "channel_ids": [ + 0 + ], + "channel_type": "port" + } + ], + "channels": [ + { + "channel_type": "port", + "channel_ids": [ + 0 + ] + } + ], + "remote_buffer_refs": [ + { + "access_channel_type": "port", + "remote_buffer_ids": [ + 0 + ] + } + ] + }, + { + "id": 1, + "ops": [ + { + "name": "upkt", + "src_buff": [ + { + "type": "s", + "index": 1, + "size": 1 + } + ], + "dst_buff": [ + { + "type": "o", + "index": 1, + "size": 1 + } + ] + } + ], + "channels": [], + "remote_buffer_refs": [] + } + ], + "channels": [ + { + "channel_type": "port", + "connected_to": [ + 1 + ] + } + ], + "remote_buffers": [ + { + "rank": 1, + "type": "s", + "access_channel_types": [ + "port" + ] + } + ], + "semaphores": [] + }, + { + "id": 1, + "input_chunks": 1, + "output_chunks": 2, + "scratch_chunks": 2, + "threadblocks": [ + { + "id": 0, + "ops": [ + { + "name": "cpkt", + "src_buff": [ + { + "type": "o", + "index": 1, + "size": 1 + } + ], + "dst_buff": [ + { + "type": "s", + "index": 1, + "size": 1 + } + ] + } + ], + "channels": [], + "remote_buffer_refs": [] + }, + { + "id": 1, + "ops": [ + { + "name": "rppkt", + "src_buff": [ + { + "type": "s", + "index": 1, + "size": 1 + } + ], + "dst_buff": [ + { + "buffer_id": 0, + "index": 1, + "size": 1 + } + ], + "channel_ids": [ + 0 + ], + "channel_type": "port" + } + ], + "channels": [ + { + "channel_type": "port", + "channel_ids": [ + 0 + ] + } + ], + "remote_buffer_refs": [ + { + "access_channel_type": "port", + "remote_buffer_ids": [ + 0 + ] + } + ] + }, + { + "id": 2, + "ops": [ + { + "name": "upkt", + "src_buff": [ + { + "type": "s", + "index": 0, + "size": 1 + } + ], + "dst_buff": [ + { + "type": "o", + "index": 0, + "size": 1 + } + ] + } + ], + "channels": [], + "remote_buffer_refs": [] + } + ], + "channels": [ + { + "channel_type": "port", + "connected_to": [ + 0 + ] + } + ], + "remote_buffers": [ + { + "rank": 0, + "type": "s", + "access_channel_types": [ + "port" + ] + } + ], + "semaphores": [] + } + ], + "num_threads_per_block": 1024, + "use_double_scratch_buffer": true, + "buffer_alignment": 16, + "min_message_size": 0, + "max_message_size": 18446744073709551615 +}