diff --git a/examples/mscclang/allgather_a100_pcie_hierarchical.py b/examples/mscclang/allgather_a100_pcie_hierarchical.py new file mode 100644 index 0000000..8c88355 --- /dev/null +++ b/examples/mscclang/allgather_a100_pcie_hierarchical.py @@ -0,0 +1,81 @@ +import argparse +from msccl.language import * +from msccl.topologies import * +from msccl.language.collectives import AllGather + +def allpairs_all_gather(gpuIds, size, offset): + ngpus = len(gpuIds) + + # Each rank sends its nth chunk to all other gpus + for r1 in range(ngpus): + for r2 in range(ngpus): + if r1 != r2: + index = offset + for r in range(size): # one chunk per copy command, so they can be overlapped by the runtime + c = chunk(gpuIds[r1], Buffer.input, index, 1) + c.copy(gpuIds[r2], Buffer.input, index, sendtb=gpuIds[r2], recvtb=gpuIds[r1]) + index += 1 + + +# Performs two levels of allGather +def hierarchical_allgather(gpus, instances, protocol): + ileaved = False + ncols = 2 + nrows = gpus // ncols + chunks_per_gpu = 1 + nchunks = gpus * chunks_per_gpu + topology = fully_connected(gpus) + + collective = AllGather(gpus, chunks_per_gpu, True) + # Note: If chunks_per_gpu > 1, then interleaved_replication needs to be set True + if (chunks_per_gpu > 1): + ileaved = True + + with MSCCLProgram("hierarchical_allgather", topology, collective, instances, protocol=protocol, + interleaved_replication=ileaved, threadblock_policy=ThreadblockPolicy.manual, dependence_nop=True): + + # A100-PCIe arrangemment: + # 0 1 + # 2 3 + # + # A 4 x 3 GPU arranagement: 4 local GPUs, 3 instances, GPU Ids are numbered as such + # 0 1 2 + # 3 4 5 + # 6 7 8 + # 9 10 11 + # + # AllGather: AllGather goes in following fashion, first gather across rows of GPU + # Each GPU sends 1/(nrows * ncols) of data to all other GPUs in the row + # After this step, Each GPU in a rows have 1/ncols of data + size = nchunks // (nrows * ncols) + + for n in range(nrows): + gpuIds = [] + for m in range(ncols): + gpuIds.append(n * ncols + m) + + allpairs_all_gather(gpuIds, size, offset=0) + + # AllGather: AllGather phase goes in reverse order, 2nd AllGather across columns of GPU + # After this step, Each GPU the systems will have complete reduced data + size = nchunks // nrows + base = 0 + + for n in range(ncols): + gpuIds = [] + for m in range(nrows): + gpuIds.append( n + m * ncols) + + allpairs_all_gather(gpuIds, size, offset= -1 * n * chunks_per_gpu) + + XML() + Check() + +parser = argparse.ArgumentParser() +parser.add_argument('num_gpus', type=int, help ='number of gpus') +parser.add_argument('instances', type=int, help='number of instances') +parser.add_argument('--protocol', type=str, default='LL', choices=['Simple', 'LL128', 'LL'], help='Protocol') + +args = parser.parse_args() + +hierarchical_allgather(args.num_gpus, args.instances, args.protocol) \ No newline at end of file diff --git a/examples/mscclang/allreduce_mi200_pairs.py b/examples/mscclang/allreduce_mi200_pairs.py new file mode 100755 index 0000000..d85c07b --- /dev/null +++ b/examples/mscclang/allreduce_mi200_pairs.py @@ -0,0 +1,63 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import argparse +from msccl.language import * +from msccl.topologies import * +from msccl.language.collectives import AllReduce + +def allreduce_ltd_pairs(gpus, instances, protocol): + # Define ranks/GPUs those would be performing reduction in the system + # For max hops=3, one or more inner set of GPU can be selected + # 1, 2, 9, 10 + gpuIds = [1, 2, 9, 10] # GPU IDs that perform reduction, max hops =3 + + # For max hops=4, following set of GPUs can be selected + # 0, 1, 2, 3, 8, 9, 10, 11 + rsize = len(gpuIds) # number of reducer ranks in system, also, number of chunks per rank + size = 16 # chunks multiplier + chunksperloop = size * rsize # Total number of chunks per rank + topology = fully_connected(gpus) + collective = AllReduce(gpus, chunksperloop, True) + + with MSCCLProgram("allreduce_ltd_pairs", topology, collective, instances, protocol=protocol, + interleaved_replication=False, threadblock_policy=ThreadblockPolicy.manual, dependence_nop=True): + + # Each rank sends the nth chunk to the nth rank into its scratch space: chunks transpose operation + # For Limited pair, Each rank sends nth chunk to nth "pre-determined" set of reducer ranks + # Reducer ranks (could be 1 or more - equal to #GPU) + for r1 in range(gpus): + for r2 in range(rsize): + if r1 != gpuIds[r2]: + index = r2 * size + c = chunk(r1, Buffer.input, index, size) # Reference to the Source chunk + c.copy(gpuIds[r2], 'scratch', sendtb=gpuIds[r2], recvtb=r1) + + # Each reducer rank performs a local reduction on its nth chunk with all remote chunks in its scratch memory + # Utilize 16 threadblocks for this reduction for better parallelism + for r in range(rsize): + for index in range(0, size * (gpus-1)): # Go through entire scratch memory, one chunk from one rank at a time and perfrom reduction + c = chunk(gpuIds[r], Buffer.input, r*size + (index % size)) + c.reduce(chunk(gpuIds[r], 'scratch', index), sendtb=(index % gpus)) + + + # Reduce-Scatter phase done. All-Gather phase now + # Each reducer rank sends the fully reduced nth chunk to all other gpus + for r1 in range(rsize): # Go through all reducer ranks (1 < r <= #GPUs) and send its reduced chunk + for r2 in range(gpus): + if gpuIds[r1] != r2: + index = r1 * size + c = chunk(gpuIds[r1], Buffer.input, index, size) # Reference to the Source chunk + c.copy(r2, Buffer.input, index, sendtb=r2, recvtb=gpuIds[r1]) + + XML() + Check() + +parser = argparse.ArgumentParser() +parser.add_argument('num_gpus', type=int, help ='number of gpus') +parser.add_argument('instances', type=int, help='number of instances') +parser.add_argument('--protocol', type=str, default='LL', choices=['Simple', 'LL128', 'LL'], help='Protocol') + +args = parser.parse_args() + +allreduce_ltd_pairs(args.num_gpus, args.instances, args.protocol) \ No newline at end of file