Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
4dc402e
set init flag
lawirz Mar 18, 2024
a8d1619
Adapt to constructor interface changes and removal of ROCE
lawirz Mar 23, 2024
7b6a18b
initialize private design_ parameter
lawirz Mar 24, 2024
7486550
created generic(xrt+coyote) test script
lawirz Mar 24, 2024
7da1f6b
test-generic fix
lawirz Mar 25, 2024
5d37c0f
Set up runscripts
lawirz Apr 20, 2024
e4b0904
Neural net added to test cases + adapted runscript
lawirz Apr 23, 2024
b788417
Fixed initalization errors, added logging to python part
lawirz Apr 26, 2024
c607564
Fixed rank initialization errors
lawirz Apr 27, 2024
cb092f4
Small pg fix
lawirz Apr 27, 2024
84131ba
Redefining exchange_qp fixes most errors
lawirz May 1, 2024
9cc61bd
Added barriers, waits and removed get-retcode.
lawirz May 2, 2024
cb229c6
Restructured tests and incorporated torchvision test
lawirz May 3, 2024
6feceed
Simulator fix
lawirz May 14, 2024
a94a5e2
Introduced initialization helper functions
lawirz May 17, 2024
756e342
Refactored Scatter, Gather and Allgather
lawirz May 18, 2024
c7cbb7e
Refactored rest(send, recv, alltoall)
lawirz May 19, 2024
52b4433
Added MNIST test
lawirz May 25, 2024
6f95caa
Added test for multi-dimensional tensors
lawirz May 27, 2024
f930995
Compatibility changes for new dev branch
lawirz May 31, 2024
fd62eef
Adapted tests to not stop on error
lawirz Jun 4, 2024
f70049d
Added Resnet50 imagenet testcase
lawirz Jun 7, 2024
fe7e507
Added option for sidestepping using OpenMPI (gather, scatter, bcast)
lawirz Jun 16, 2024
e78e2fe
Added Performance measurements
lawirz Jun 16, 2024
3b9465d
Added Allgather and Allreduce Sidestep
lawirz Jun 19, 2024
fa3202a
Added Waiting to SIM
lawirz Jun 19, 2024
f9623d2
Updated README
lawirz Jun 23, 2024
c166a1f
Further README additions
lawirz Jun 25, 2024
75da95e
Added Benchmarking
lawirz Jun 28, 2024
d3a8d5c
Added multidimensional support for Gather and Allgather
lawirz Jun 28, 2024
d353764
MNIST fixes
lawirz Jun 29, 2024
fe43a32
Removed ACCL measured duration to avoid wait time
lawirz Jul 1, 2024
496c13b
Added multidim tensor segment for AlltoAll
lawirz Jul 1, 2024
90e80b2
Furter timestamping on bcast
lawirz Jul 1, 2024
f5a9508
Using new change buffer type to reuse buffers
lawirz Jul 21, 2024
1c422db
Added model saving to the resnet50 test
lawirz Jul 21, 2024
b9b2410
Switch to network_utils initialization
lawirz Jul 29, 2024
736c582
Enabled sidstepping of eager allreduce
lawirz Jul 29, 2024
b3eb6cb
Adapted PG to simplified ACCL interface
lawirz Jul 29, 2024
8665bc6
Disabled bcast and ar sidestepping. added ar segmenting
lawirz Aug 2, 2024
8730249
Added rounding to allreduce. dlrm works
lawirz Aug 3, 2024
c6782f4
Added more fine-grained benchmarking to ar and a2a
lawirz Aug 6, 2024
747961c
Added sidestep bcast with allreduce for resnet50
lawirz Aug 11, 2024
e8ffe0e
Replaced tensor copies with memcpy(solves performance issues)
lawirz Aug 11, 2024
8aaae98
Implemented constant message sizes for resnet50
lawirz Aug 18, 2024
3f00f33
Parametrized splitting
lawirz Aug 20, 2024
2533a4e
Improved Microbenchmarking
lawirz Aug 25, 2024
30e8feb
Added plot scripts
lawirz Aug 25, 2024
36efec4
Added ACCL device measurement to bench
lawirz Aug 25, 2024
39d00e5
Added sleep measurement
lawirz Aug 25, 2024
0e3cf79
Initialization fix and cleanup
lawirz Aug 26, 2024
98a7527
Cleanup of debug statements
lawirz Aug 29, 2024
16ea0f6
Added more naming to bench
lawirz Aug 30, 2024
4b91f0e
Added pytorch side bench to all collectives
lawirz Aug 30, 2024
c8cbb74
plotting support for all collectives
lawirz Aug 30, 2024
fcad6ca
attempt to fix resnet
lawirz Aug 30, 2024
9185e60
fixed segmentation bug
lawirz Aug 31, 2024
3424514
Added measurements to all collectives
lawirz Aug 31, 2024
9c4ef13
Attempt to replace tensor copies
lawirz Sep 1, 2024
ae9072a
Cleaned up test-generic
lawirz Sep 11, 2024
142a437
Main code cleanup
lawirz Sep 11, 2024
e95ac22
Removed old coyote initialization
lawirz Sep 11, 2024
8f52d63
Merge branch 'pytorch_ddp' of https://gitlab.ethz.ch/lawirz/accl into…
lawirz Sep 11, 2024
3ce5185
Removed some comments
lawirz Sep 11, 2024
17c8c56
cleanup + documentation
lawirz Sep 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions integrations/pytorch_ddp/DEVELOPMENT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
This document explains, what the state of development is at and tries to document some of the decisions made

## Structure

Consists of

- wrapper, bindings and helper functionality found in ./accl_process_group
- main C++ files in ./src
- The ACCL repo the process group itself builds on top will be in ./accl . This is replicated such that you can try different versions
- ./test testscripts

## Build process

Check the ./install.py helper for dependency versions

./setup.py sets up the build

See the section in the README on how to avoid the long build using pip

## Basics

- Currently only runs via Coyote RDMA. XRT and GPU support was dropped. Simulator still runs over XRT UDP though
- Needs MPI Library to work. Set in setup.py. Tested only with MPICH
- The test setup in run.sh is for the HACC cluster
- use ACCL_DEBUG=1 both during build and runs
- Everything runs in rendezvous mode
- if you call collectives directly they are run synchronously, but eg allreduce used internally in DDP is executed async
- The PG allocates 2 buffers and reuses them to avoid reallocation. This is supposed to be replaced with a host buffer constructor which takes an existing memory region. To change buffer type you need to use the change_buffer_type branch(maybe already pulled) at https://github.com/lawirz/ACCL
- The torch profiler can see the overall execution time, but setting it up to measure sub-operation within the workerthread was attempted but failed.

## ProcessGroupACCL.cpp

### ProcessGroup structure

A lot of the design comes from the ProcessGroupMPI. There is a concept of WorkEntries, which schedule Work on a separate worker thread. This is currently done using a single Worker thread as is the case with the MPI PG. There is still a lock, probably only relevant in case of a few management operations from the DDP side. With async execution in ACCL, we could try a different structure with AsyncWork as is done on Gloo PG I think.

### Collectives

- There are small wrappers, which do a few checks mostly copied from MPI PG, do the sidestep then setup the WorkEntry
- The WorkEntries manage the Segmentation, which is not yet correctly implemented everywhere. Some collectives still use a version which relies on the input to have one-dimensional shape. Others, which require multiple Segmentations such as Scatter have similar limitations
- Input is copied to the pre-allocated buffer. Generally copies using memcpy seem to be much faster, than using tensor.copy_ for some reason
- ACCL does a host-to-host call. The driver figures out, that it's host to host using the buffer type. The compressed type should be added as an argument to make that work again
- copy back

## Hardware issues

A lot of collectives still fail in hardware. The following can produce issues

- Mixing datatypes especially ints
- High variablity in length
- MPI sidestepping(can't explain why this causes issues)

If you run test-resnet50, you will encounter them.
36 changes: 36 additions & 0 deletions integrations/pytorch_ddp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ python3 -m venv venv
source venv/bin/activate
```

Activate an XRT 21 version. Later versions led to issues before.

<details><summary>Installation without GPU support</summary>
To install the plugin without GPU support, simply run the following from within the venv:

Expand Down Expand Up @@ -42,8 +44,42 @@ source venv/bin/activate
</details>

## Running the plugin

Make sure to source the `setup.sh` script in this directory to load the ACCL plugin before starting a Python script.
Example usage can be found in the various test files under [`test/`](test).

Do make sure not to run python from within the root directory of `pytorch_ddp`, because Python will try to import the
local incomplete [`accl_process_group/`](accl_process_group) folder instead of the actual installation.

The provided `test/run.sh` will launch a testscript via mpirun

## Setup overview

- The whole Processgroup is wrapped in OpenMPI, which is used for initialization
- You can use the OpenMPI implementation of certain collectives using the "sidestep" flags in the ProcessGroupACCL.cpp
- Recompilation using `./install` or `pip install .` can be very slow, you can run `python setup.py build_ext --inplace` and then copy the binary or other files directly. `cp accl_process_group/_c/ProcessGroupACCL.cpython-38-x86_64-linux-gnu.so ~/.local/lib/python3.8/site-packages/accl_process_group/_c/`
- The `install.py` script will not reinstall the driver in case of ACCL updates. You will need to rebuild it yourself
- Set `ACCL_DEBUG=1` if you want more output(also set during build). Stdout is sometimes not complete(in simulator), so best log most things in stderr
- The runscript currently just outputs the command to be run(better not use the `&` at the end), which you then run manually. This is because I had bad experiences with the missing output(maybe coinciding with issues mentioned above) and termination on multiple machines, but should also work if you comment the `exit 0` and the `&` at the end of mpirun out. Don't forget, that you should still run the script to clear log files.
- ACCL only supports sizes up to 4MB, If you give it tensors of higher sizes, the PG will try to segment it in first dim. Not all collectives correctly handle multi-dimensional tensors yet.
- Setting up the simulator with 4MB takes long, better set it lower for quick tests.
- You can init the process group as if it were udp and run on a `cyt_rdma` simulator
- There is no reason to not support the rdma + SIM initialization. It just hasn't been implemented yet. Certain case-splits assume no-sim if cyt_rdma is given...

### How to install torchvision

- install torch using the script
- clone vision, go to the fitting version v0.16.0
- clone libpng, configure with prefix set to local directory
- add the bin to the path
- not sure if needed: supply the path of the library and include to torchvision as in their development doc
- disable the version check in torchvision setup.py, because it doesn't correctly parse the version.
- run vision setup.py with debug, include, library and use png flags

### Tests available
Check `test/run.sh` for ACCL_SCRIPT examples

- `test-generic.py` tests everything in isolation + a small dual layer model learning a linear function
- `test-mnist.py` should be able to be run non-distributed as well(check arguments)
- `test-imagenet.py` does finetuning of Resnet50 according to: <https://docs.ray.io/en/latest/train/examples/pytorch/pytorch_resnet_finetune.html> and should alse be able to be run non-distributed
- For DLRM you will need to use a small fork of the DLRM-repo with ACCL-support hosted at <https://gitlab.ethz.ch/lawirz/dlrm>. It contains a `run.sh`
2 changes: 1 addition & 1 deletion integrations/pytorch_ddp/accl_process_group/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

from ._c.ProcessGroupACCL import ProcessGroupACCL, Rank, DataType, ACCLDesign
from .process_group_wrapper import create_process_group, \
create_process_group_coyote, create_simulate_process_group, initialize, \
initialize, \
set_compression, get_compression, get_local_qp, set_remote_qp
109 changes: 28 additions & 81 deletions integrations/pytorch_ddp/accl_process_group/process_group_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,117 +19,58 @@
from typing import Optional
from . import ProcessGroupACCL, Rank, DataType, ACCLDesign
import torch
import logging
from torch.distributed import Backend
from torch.distributed.distributed_c10d import ProcessGroup, Store

import sys
import os

process_group: Optional[ProcessGroupACCL] = None

#Configure logging
logger = logging.getLogger(__name__)
if "ACCL_DEBUG" in os.environ and os.environ["ACCL_DEBUG"]=="1":
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)

def create_process_group(
ranks: list[Rank],
xclbin: str, device_index: int, design: ACCLDesign,
ranks: list[Rank], design: ACCLDesign,
*, nbufs: int = 16, bufsize: int = 1024,
compression: Optional[dict[DataType, DataType]] = None,
p2p_enabled: bool = False, profiling_ranks: Optional[list[int]] = None,
profiling_timeout: float = 0.0, rsfec: bool = False,
simulation: bool = False,
initialize: bool = True) -> ProcessGroup:
if design == ACCLDesign.cyt_rdma or design == ACCLDesign.cyt_tcp:
raise RuntimeError(f"{design} is an incompatible design for XRT")

if compression is None:
compression = {}
else:
# Copy compression since it will be used later in the lambda function
compression = compression.copy()

logger.debug(f'Compression: {compression}')

if profiling_ranks is None:
profiling_ranks = []
else:
profiling_ranks = profiling_ranks.copy()

def create_process_group_wrapper(store, rank, size, _timeout):
global process_group
if process_group is not None:
raise RuntimeError("ACCL ProcessGroup already created, "
"can only create one.")

pg = ProcessGroupACCL(store, rank, size, ranks, False, design,
xclbin=xclbin, device_index=device_index,
bufsize=bufsize, rsfec=rsfec, nbufs=nbufs,
compression=compression,
p2p_enabled=p2p_enabled,
profiling_ranks=profiling_ranks,
profiling_timeout=profiling_timeout)

process_group = pg
if initialize:
pg.initialize()

return pg

Backend.register_backend("ACCL", create_process_group_wrapper)

def create_simulate_process_group(ranks: list[Rank], *,
nbufs: int = 16, udp: bool = False,
compression: Optional[dict[DataType,
DataType]] = None,
bufsize: int = 1024,
initialize: bool = True) -> ProcessGroup:
if compression is None:
compression = {}
else:
# Copy compression since it will be used later in the lambda function
compression = compression.copy()
logger.debug(f'Profiling_ranks: {profiling_ranks}')

def create_process_group_wrapper(store, rank, size, _timeout):
global process_group
if process_group is not None:
raise RuntimeError("ACCL ProcessGroup already created, "
"can only create one.")

design = ACCLDesign.udp if udp else ACCLDesign.tcp
# if simulation:
#overwrite the design choice in simulation
# design = ACCLDesign.udp

pg = ProcessGroupACCL(store, rank, size, ranks, True, design,
compression=compression, nbufs=nbufs,
bufsize=bufsize)

process_group = pg
if initialize:
pg.initialize()

return pg

Backend.register_backend("ACCL", create_process_group_wrapper)

def create_process_group_coyote(
ranks: list[Rank], design: ACCLDesign,
*, nbufs: int = 16, bufsize: int = 1024,
compression: Optional[dict[DataType, DataType]] = None,
p2p_enabled: bool = False, profiling_ranks: Optional[list[int]] = None,
profiling_timeout: float = 0.0, rsfec: bool = False,
initialize: bool = False) -> ProcessGroup:
if design != ACCLDesign.cyt_rdma and design != ACCLDesign.cyt_tcp:
raise RuntimeError(f"{design} is an incompatible design for coyote")

if compression is None:
compression = {}
else:
# Copy compression since it will be used later in the lambda function
compression = compression.copy()

if profiling_ranks is None:
profiling_ranks = []
else:
profiling_ranks = profiling_ranks.copy()

def create_process_group_wrapper(store, rank, size, _timeout):
global process_group
if process_group is not None:
raise RuntimeError("ACCL ProcessGroup already created, "
"can only create one.")

pg = ProcessGroupACCL(store, rank, size, ranks, False, design,
logger.debug(f'Creating ProcessGroupACCL for: rank {rank}')

pg = ProcessGroupACCL(store, rank, size, ranks, simulation, design,
bufsize=bufsize, rsfec=rsfec, nbufs=nbufs,
compression=compression,
p2p_enabled=p2p_enabled,
Expand All @@ -138,31 +79,37 @@ def create_process_group_wrapper(store, rank, size, _timeout):

process_group = pg
if initialize:
logger.debug('Initializing Process Group')
pg.initialize()

return pg

Backend.register_backend("ACCL", create_process_group_wrapper)
#CPU only for now
logger.debug('Registering ACCL Backend')
Backend.register_backend("ACCL", create_process_group_wrapper, devices='cpu')

def initialize() -> None:
logger.debug('Initialize called')
if process_group is None:
raise RuntimeError("Cannot initialize before ACCL ProcessGroup "
"is created.")
process_group.initialize()

def get_local_qp(rank: int) -> list[int]:
logger.debug('Get_local_qp called')
if process_group is None:
raise RuntimeError("Cannot get local qp before ACCL ProcessGroup "
"is created.")
return process_group.get_local_qp(rank)

def set_remote_qp(rank: int, qp: list[int]) -> None:
logger.debug('Set_remote_qp called')
if process_group is None:
raise RuntimeError("Cannot set remote qp before ACCL ProcessGroup "
"is created.")
return process_group.set_remote_qp(rank, qp)

def set_compression(compression: dict[DataType, DataType]):
logger.debug(f'Setting compression to {compression}')
if process_group is None:
raise RuntimeError("Cannot set compression before ACCL ProcessGroup "
"is initialized.")
Expand Down
33 changes: 26 additions & 7 deletions integrations/pytorch_ddp/include/ProcessGroupACCL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,17 +266,21 @@ class TORCH_API ProcessGroupACCL : public ProcessGroup {

void run_send(at::Tensor tensor, int dstRank, int tag);
void run_recv(at::Tensor tensor, int rcvRank, int tag);
void run_broadcast(at::Tensor tensor, const BroadcastOptions &opts);
void run_allreduce(at::Tensor tensor, const AllreduceOptions &opts);
void run_reduce(at::Tensor tensor, const ReduceOptions &opts);
void run_allgather(at::Tensor srctensor,
void run_broadcast(at::Tensor in_tensor, const BroadcastOptions &opts);
void run_allreduce(at::Tensor in_tensor, const AllreduceOptions &opts);
void run_reduce(at::Tensor in_tensor, const ReduceOptions &opts);
void run_allgather(at::Tensor in_tensor,
const std::vector<at::Tensor> &dsttensors);
void run_gather(at::Tensor srctensor,
void run_gather(at::Tensor in_tensor,
const std::vector<at::Tensor> &dsttensors,
const GatherOptions &opts);
void run_scatter(std::vector<at::Tensor> &srctensors, at::Tensor dsttensor,
void run_scatter(std::vector<at::Tensor> &in_tensors, at::Tensor dsttensor,
const ScatterOptions &opts);
void run_alltoall(at::Tensor srctensor, at::Tensor dsttensor, const AllToAllOptions &opts);

void run_alltoall(at::Tensor in_tensor, at::Tensor dsttensor, const AllToAllOptions &opts);

void run_alltoall_vec(std::vector<at::Tensor> &in_tensor_vec,
std::vector<at::Tensor> &out_tensor_vec, const AllToAllOptions &opts);

ACCL::dataType get_compressed_type(c10::ScalarType datatype);

Expand All @@ -292,6 +296,17 @@ class TORCH_API ProcessGroupACCL : public ProcessGroup {
// Global states
static void initACCLOnce();
static void acclExit();

void init_input_tensor(at::Tensor &tensor, std::unique_ptr<ACCL::BaseBuffer> &data, bool do_on_root, bool do_on_others, int opts_root_rank = 0);

void init_input_tensor_new(at::Tensor &tensor, ACCL::BaseBuffer *data, bool do_on_root, bool do_on_others, int opts_root_rank = 0);

void init_input_data_vec(std::vector<at::Tensor> &tensor_vec, std::unique_ptr<ACCL::BaseBuffer> &data, const at::TensorOptions &options, bool do_on_root, bool do_on_others, int opts_root_rank = 0);

void copy_back_tensor(at::Tensor tensor_original, std::unique_ptr<ACCL::BaseBuffer> &data, bool do_on_root, bool do_on_others, int opts_root_rank = 0);

void copy_back_tensorvec(const std::vector<at::Tensor> &dsttensorvec, std::unique_ptr<ACCL::BaseBuffer> &data, at::Tensor &dsttensor, int numel, int offset, bool do_on_root, bool do_on_others, int opts_root_rank = 0);

static std::once_flag onceFlagInitACCL;

static std::mutex pgGlobalMutex_;
Expand All @@ -309,6 +324,7 @@ class TORCH_API ProcessGroupACCL : public ProcessGroup {

ACCL::CoyoteDevice *cyt_device;
std::vector<fpga::ibvQpConn*> ibvQpConn_vec;
xrt::device xrt_device;

std::unique_ptr<ACCL::ACCL> accl;
uint64_t bufsize;
Expand All @@ -318,6 +334,9 @@ class TORCH_API ProcessGroupACCL : public ProcessGroup {
bool initialized;
xrt::bo buf0;
xrt::bo buf1;

std::unique_ptr<ACCL::BaseBuffer> in_buf;
std::unique_ptr<ACCL::BaseBuffer> out_buf;
};

} // namespace c10d
12 changes: 0 additions & 12 deletions integrations/pytorch_ddp/include/coyote_init.hpp

This file was deleted.

Loading