From 054efc2b9097b4cd360bc6918ca9d69bac9f3819 Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 00:57:43 -0600 Subject: [PATCH 1/6] fix: memory safety issues in accl.cpp - Fix exception thrown as pointer (memory leak): Changed `throw new std::runtime_error(...)` to `throw std::runtime_error(...)` in configure_arithmetic() - Fix uninitialized pointer: Initialize `buf = nullptr` in setup_rendezvous_spare_buffers() to prevent undefined behavior if device type doesn't match any condition - Add bounds validation: Check utility_spares.size() before accessing elements to provide clear error messages instead of std::out_of_range - Replace magic number: Extract hardcoded `3` to named constant REQUIRED_SPARE_BUFFERS for clarity Fixes #214 Co-Authored-By: Claude Opus 4.5 --- driver/xrt/src/accl.cpp | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/driver/xrt/src/accl.cpp b/driver/xrt/src/accl.cpp index 9e98efb7..5dddf225 100644 --- a/driver/xrt/src/accl.cpp +++ b/driver/xrt/src/accl.cpp @@ -1115,8 +1115,8 @@ void ACCL::initialize(const std::vector &ranks, int local_rank, void ACCL::configure_arithmetic() { if (communicators.size() < 1) { - throw new std::runtime_error("Communicators unconfigured, please call " - "configure_communicator() first"); + throw std::runtime_error("Communicators unconfigured, please call " + "configure_communicator() first"); } for (auto &[_key, arithcfg] : arith_config) { @@ -1172,9 +1172,11 @@ void ACCL::setup_eager_rx_buffers(size_t n_egr_rx_bufs, addr_t egr_rx_buf_size, } void ACCL::setup_rendezvous_spare_buffers(addr_t rndzv_spare_buf_size, const std::vector &devicemem){ + constexpr int REQUIRED_SPARE_BUFFERS = 3; max_rndzv_msg_size = rndzv_spare_buf_size; - for(int i=0; i<3; i++){ - Buffer *buf; + + for(int i=0; i *buf = nullptr; if (sim_mode) { buf = new SimBuffer(new int8_t[max_rndzv_msg_size](), max_rndzv_msg_size, dataType::int8, static_cast(cclo)->get_context()); @@ -1184,15 +1186,27 @@ void ACCL::setup_rendezvous_spare_buffers(addr_t rndzv_spare_buf_size, const std } else if(cclo->get_device_type() == CCLO::coyote_device){ buf = new CoyoteBuffer(max_rndzv_msg_size, dataType::int8, static_cast(cclo)); } + + if (buf == nullptr) { + throw std::runtime_error("Failed to create spare buffer: unsupported device type"); + } + buf->sync_to_device(); utility_spares.emplace_back(buf); } - cclo->write(CCLO_ADDR::SPARE1_OFFSET, utility_spares.at(0)->address() & 0xffffffff); - cclo->write(CCLO_ADDR::SPARE1_OFFSET+4, (utility_spares.at(0)->address() >> 32) & 0xffffffff); - cclo->write(CCLO_ADDR::SPARE2_OFFSET, utility_spares.at(1)->address() & 0xffffffff); - cclo->write(CCLO_ADDR::SPARE2_OFFSET+4, (utility_spares.at(1)->address() >> 32) & 0xffffffff); - cclo->write(CCLO_ADDR::SPARE3_OFFSET, utility_spares.at(2)->address() & 0xffffffff); - cclo->write(CCLO_ADDR::SPARE3_OFFSET+4, (utility_spares.at(2)->address() >> 32) & 0xffffffff); + + if (utility_spares.size() < REQUIRED_SPARE_BUFFERS) { + throw std::runtime_error("Insufficient utility spare buffers allocated: expected " + + std::to_string(REQUIRED_SPARE_BUFFERS) + ", got " + + std::to_string(utility_spares.size())); + } + + cclo->write(CCLO_ADDR::SPARE1_OFFSET, utility_spares[0]->address() & 0xffffffff); + cclo->write(CCLO_ADDR::SPARE1_OFFSET+4, (utility_spares[0]->address() >> 32) & 0xffffffff); + cclo->write(CCLO_ADDR::SPARE2_OFFSET, utility_spares[1]->address() & 0xffffffff); + cclo->write(CCLO_ADDR::SPARE2_OFFSET+4, (utility_spares[1]->address() >> 32) & 0xffffffff); + cclo->write(CCLO_ADDR::SPARE3_OFFSET, utility_spares[2]->address() & 0xffffffff); + cclo->write(CCLO_ADDR::SPARE3_OFFSET+4, (utility_spares[2]->address() >> 32) & 0xffffffff); } void ACCL::configure_tuning_parameters(){ From e859db6ffcc036df99f5590d879e55be0c60e098 Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 01:08:58 -0600 Subject: [PATCH 2/6] fix: add bounds checking to HLS depacketizer session arrays - Add MAX_SESSIONS, SESSION_INDEX_BITS, and SESSION_INDEX_MASK constants to eth_intf.h for centralized configuration - Fix potential buffer overflow in tcp_depacketizer.cpp: - session_id is 16-bit (0-65535) but arrays are 1024 elements - Add session index masking to ensure safe array access - Replace hardcoded 1024 with MAX_SESSIONS constant - Apply same fix to udp_depacketizer.cpp and rdma_depacketizer.cpp The bit masking approach (session_id & SESSION_INDEX_MASK) is: - Hardware efficient (simple AND operation) - Maintains II=1 pipeline performance - Prevents out-of-bounds memory access in FPGA fabric Co-Authored-By: Claude Opus 4.5 --- kernels/cclo/hls/eth_intf/eth_intf.h | 7 +++++++ .../cclo/hls/eth_intf/rdma_depacketizer.cpp | 19 +++++++++++-------- .../cclo/hls/eth_intf/tcp_depacketizer.cpp | 19 +++++++++++-------- .../cclo/hls/eth_intf/udp_depacketizer.cpp | 19 +++++++++++-------- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/kernels/cclo/hls/eth_intf/eth_intf.h b/kernels/cclo/hls/eth_intf/eth_intf.h index 35ad9b4c..5a0ef64c 100644 --- a/kernels/cclo/hls/eth_intf/eth_intf.h +++ b/kernels/cclo/hls/eth_intf/eth_intf.h @@ -27,6 +27,13 @@ #define DWIDTH16 16 #define DWIDTH8 8 +// Maximum number of concurrent sessions supported +// Used for session state arrays in depacketizers +// Must be a power of 2 for efficient index masking +#define MAX_SESSIONS 1024 +#define SESSION_INDEX_BITS 10 +#define SESSION_INDEX_MASK (MAX_SESSIONS - 1) // 0x3FF for 1024 sessions + #define RATE_CONTROL typedef ap_axiu pkt512; diff --git a/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp b/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp index 8f822023..7dde66ba 100644 --- a/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp @@ -66,8 +66,8 @@ void rdma_depacketizer( unsigned constexpr bytes_per_word = DATA_WIDTH/8; unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? - static unsigned int remaining[1024] = {0};//one RAMB36 for max 1024 sessions - static ap_uint target_strm[1024] = {0};//one RAMB18 for max 1024 sessions + static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions + static ap_uint target_strm[MAX_SESSIONS] = {0};//one RAMB18 for max sessions static eth_notification notif; static bool continue_notif = false; @@ -91,6 +91,9 @@ void rdma_depacketizer( notif = STREAM_READ(notif_in); } + // Mask session_id to prevent out-of-bounds array access + unsigned int session_idx = notif.session_id & SESSION_INDEX_MASK; + #ifndef ACCL_SYNTHESIS std::stringstream ss; ss << "RDMA Depacketizer: Processing incoming fragment count=" << notif.length << " for qpn " << notif.session_id << "\n"; @@ -99,8 +102,8 @@ void rdma_depacketizer( //get remaining message bytes, from local storage //TODO: cache latest accessed value - if(prev_session_id != notif.session_id){ - message_rem = remaining[notif.session_id]; + if(prev_session_id != session_idx){ + message_rem = remaining[session_idx]; } if(message_rem == 0){//if remaining bytes is zero, then this is the start of a new message @@ -144,10 +147,10 @@ void rdma_depacketizer( //decrement the length to reflect the fact that we have removed the 64B header //Note: the rxHandler must make sure to not give us fragments less than 64B notif.length -= bytes_per_word; - target_strm[notif.session_id] = message_strm; + target_strm[session_idx] = message_strm; } } else{//if remaining bytes is not zero, then this is a continuation of an old message - message_strm = target_strm[notif.session_id]; + message_strm = target_strm[session_idx]; } //write out notifications //in case the fragment spans the end of the current message and beginning of another, @@ -175,11 +178,11 @@ void rdma_depacketizer( STREAM_WRITE(notif_out, downstream_notif); } //update session info (remaining bytes and target of currently processing message) - remaining[notif.session_id] = message_rem; + remaining[session_idx] = message_rem; //if we're not finished with this fragment, skip notification read on the next run continue_notif = (notif.length > 0); continue_message = (notif.length > 0) && (message_rem > 0); //update session id for caching - prev_session_id = notif.session_id; + prev_session_id = session_idx; } } \ No newline at end of file diff --git a/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp b/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp index 9dec6332..0f2493ca 100644 --- a/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp @@ -62,8 +62,8 @@ void tcp_depacketizer( unsigned constexpr bytes_per_word = DATA_WIDTH/8; unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? - static unsigned int remaining[1024] = {0};//one RAMB36 for max 1024 sessions - static ap_uint target_strm[1024] = {0};//one RAMB18 for max 1024 sessions + static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions + static ap_uint target_strm[MAX_SESSIONS] = {0};//one RAMB18 for max sessions static eth_notification notif; static bool continue_notif = false; @@ -85,6 +85,9 @@ void tcp_depacketizer( notif = STREAM_READ(notif_in); } + // Mask session_id to prevent out-of-bounds array access + unsigned int session_idx = notif.session_id & SESSION_INDEX_MASK; + #ifndef ACCL_SYNTHESIS std::stringstream ss; ss << "TCP Depacketizer: Processing incoming fragment count=" << notif.length << " for session " << notif.session_id << "\n"; @@ -93,8 +96,8 @@ void tcp_depacketizer( //get remaining message bytes, from local storage //TODO: cache latest accessed value - if(prev_session_id != notif.session_id){ - message_rem = remaining[notif.session_id]; + if(prev_session_id != session_idx){ + message_rem = remaining[session_idx]; } downstream_notif.session_id = notif.session_id; @@ -114,9 +117,9 @@ void tcp_depacketizer( //decrement the length to reflect the fact that we have removed the 64B header //Note: the rxHandler must make sure to not give us fragments less than 64B notif.length -= bytes_per_word; - target_strm[notif.session_id] = message_strm; + target_strm[session_idx] = message_strm; } else{//if remaining bytes is not zero, then this is a continuation of an old message - message_strm = target_strm[notif.session_id]; + message_strm = target_strm[session_idx]; } //write out notifications //in case the fragment spans the end of the current message and beginning of another, @@ -142,10 +145,10 @@ void tcp_depacketizer( STREAM_WRITE(notif_out, downstream_notif); } //update session info (remaining bytes and target of currently processing message) - remaining[notif.session_id] = message_rem; + remaining[session_idx] = message_rem; //if we're not finished with this fragment, skip notification read on the next run continue_notif = (notif.length > 0); continue_message = (notif.length > 0) && (message_rem > 0); //update session id for caching - prev_session_id = notif.session_id; + prev_session_id = session_idx; } \ No newline at end of file diff --git a/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp b/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp index 9984a805..e98e6f4b 100644 --- a/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp @@ -99,8 +99,8 @@ void udp_depacketizer( unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? //session states (for each session, remaining bytes and STRM values for ongoing messages) - static unsigned int remaining[1024] = {0};//one RAMB36 for max 1024 sessions - static ap_uint target_strm[1024] = {0};//one RAMB18 for max 1024 sessions + static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions + static ap_uint target_strm[MAX_SESSIONS] = {0};//one RAMB18 for max sessions eth_notification notif; @@ -116,6 +116,9 @@ void udp_depacketizer( inword = STREAM_READ(in); notif.session_id = inword.dest; + // Mask session_id to prevent out-of-bounds array access + unsigned int session_idx = notif.session_id & SESSION_INDEX_MASK; + #ifndef ACCL_SYNTHESIS std::stringstream ss; ss << "UDP Depacketizer: Processing incoming fragment for session " << notif.session_id << "\n"; @@ -124,8 +127,8 @@ void udp_depacketizer( #endif //get remaining message bytes, from local storage - if(prev_session_id != notif.session_id){ - message_rem = remaining[notif.session_id]; + if(prev_session_id != session_idx){ + message_rem = remaining[session_idx]; } if(message_rem == 0){//if remaining bytes is zero, then this is the start of a new message @@ -137,14 +140,14 @@ void udp_depacketizer( //Note: the rxHandler must make sure to not give us fragments less than 64B notif.length = message_rem; notif.type = 0; //for SOM - target_strm[notif.session_id] = message_strm; + target_strm[session_idx] = message_strm; if(message_strm == 0){ //put SOM notification and header in output streams STREAM_WRITE(sts, hdr); STREAM_WRITE(notif_out, notif); } } else{//if remaining bytes is not zero, then this is a continuation of an old message - message_strm = target_strm[notif.session_id]; + message_strm = target_strm[session_idx]; inword.dest = message_strm; STREAM_WRITE(out, inword); current_bytes += (message_rem < bytes_per_word) ? message_rem : bytes_per_word; @@ -175,7 +178,7 @@ void udp_depacketizer( STREAM_WRITE(notif_out, notif); } //update session info (remaining bytes and target of currently processing message) - remaining[notif.session_id] = message_rem; + remaining[session_idx] = message_rem; //update session id for caching - prev_session_id = notif.session_id; + prev_session_id = session_idx; } \ No newline at end of file From 7e1c89a9020eb62ab087d5fcdd0aa1c14f249fd3 Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 01:17:17 -0600 Subject: [PATCH 3/6] refactor: migrate ACCL class to smart pointers - Convert CCLO* cclo to std::unique_ptr for automatic memory management - Convert std::vector*> to std::vector>> for both eager_rx_buffers and utility_spares vectors - Update constructors to use std::make_unique<>() for XRTDevice and SimDevice - Remove explicit delete in destructor (unique_ptr handles cleanup) - Update deinit() to work with unique_ptr (no manual delete needed) - Update setup_eager_rx_buffers() and setup_rendezvous_spare_buffers() to create buffers as unique_ptr and use std::move() for ownership transfer - Update all template buffer creation functions in accl.hpp to use cclo.get() when casting to derived device types Benefits: - Eliminates memory leaks from early returns or exceptions - Makes ownership semantics explicit - Follows modern C++ best practices (Rule of Zero) - No manual memory management required Note: Coyote constructor still accepts raw CoyoteDevice* for backwards compatibility - ownership is transferred to the unique_ptr. Co-Authored-By: Claude Opus 4.5 --- driver/xrt/include/accl.hpp | 35 ++++++++++++------------ driver/xrt/src/accl.cpp | 54 ++++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/driver/xrt/include/accl.hpp b/driver/xrt/include/accl.hpp index a98667fd..3728c030 100644 --- a/driver/xrt/include/accl.hpp +++ b/driver/xrt/include/accl.hpp @@ -30,6 +30,7 @@ #include "accl/simdevice.hpp" #include "accl/coyotebuffer.hpp" #include "accl/coyotedevice.hpp" +#include #include #include #include @@ -778,12 +779,12 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, std::unique_ptr> create_buffer_host(size_t length, dataType type) { if (sim_mode) { return std::unique_ptr>(new SimBuffer( - length, type, static_cast(cclo)->get_context(), true)); + length, type, static_cast(cclo.get())->get_context(), true)); } else if (cclo->get_device_type() == CCLO::xrt_device) { return std::unique_ptr>(new XRTBuffer( - length, type, *(static_cast(cclo)->get_device()), xrt::bo::flags::host_only, (xrt::memory_group)0)); + length, type, *(static_cast(cclo.get())->get_device()), xrt::bo::flags::host_only, (xrt::memory_group)0)); } else { - return std::unique_ptr>(new CoyoteBuffer(length, type, cclo)); + return std::unique_ptr>(new CoyoteBuffer(length, type, static_cast(cclo.get()))); } } @@ -808,13 +809,13 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, std::unique_ptr> create_buffer(size_t length, dataType type, unsigned mem_grp) { if (sim_mode) { return std::unique_ptr>(new SimBuffer( - length, type, static_cast(cclo)->get_context(), false, mem_grp)); + length, type, static_cast(cclo.get())->get_context(), false, mem_grp)); } else if (cclo->get_device_type() == CCLO::xrt_device) { return std::unique_ptr>(new XRTBuffer( - length, type, *(static_cast(cclo)->get_device()), (xrt::memory_group)mem_grp)); + length, type, *(static_cast(cclo.get())->get_device()), (xrt::memory_group)mem_grp)); } else { return std::unique_ptr>(new CoyoteBuffer( - length, type, cclo)); + length, type, static_cast(cclo.get()))); } } @@ -874,10 +875,10 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, if (sim_mode) { return std::unique_ptr>( new SimBuffer(host_buffer, length, type, - static_cast(cclo)->get_context(), false, mem_grp)); + static_cast(cclo.get())->get_context(), false, mem_grp)); } else if(cclo->get_device_type() == CCLO::xrt_device ){ return std::unique_ptr>(new XRTBuffer( - host_buffer, length, type, *(static_cast(cclo)->get_device()), (xrt::memory_group)mem_grp)); + host_buffer, length, type, *(static_cast(cclo.get())->get_device()), (xrt::memory_group)mem_grp)); } return std::unique_ptr>(nullptr); } @@ -905,8 +906,8 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, dataType type) { if (sim_mode) { return std::unique_ptr>( - new SimBuffer(bo, *(static_cast(cclo)->get_device()), length, type, - static_cast(cclo)->get_context())); + new SimBuffer(bo, *(static_cast(cclo.get())->get_device()), length, type, + static_cast(cclo.get())->get_context())); } else { return std::unique_ptr>( new XRTBuffer(bo, length, type)); @@ -956,10 +957,10 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, unsigned mem_grp) { if (sim_mode) { return std::unique_ptr>(new SimBuffer( - length, type, static_cast(cclo)->get_context())); + length, type, static_cast(cclo.get())->get_context())); } else if(cclo->get_device_type() == CCLO::xrt_device ){ return std::unique_ptr>(new XRTBuffer( - length, type, *(static_cast(cclo)->get_device()), xrt::bo::flags::p2p, (xrt::memory_group)mem_grp)); + length, type, *(static_cast(cclo.get())->get_device()), xrt::bo::flags::p2p, (xrt::memory_group)mem_grp)); } else { //for Coyote there's no concept of a p2p buffer throw std::runtime_error("p2p buffers not supported in Coyote"); @@ -967,7 +968,7 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, } /** - * Construct a new coyote buffer object without an existing host buffer + * Construct a new coyote buffer object without an existing host buffer * * Coyote buffer object doesn't have a notion of memory banks * @@ -982,7 +983,7 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, if (sim_mode) { throw std::runtime_error("create_coyotebuffer sim_mode unsupported!!!"); } else { - return std::unique_ptr>(new CoyoteBuffer(length, type, static_cast(cclo))); + return std::unique_ptr>(new CoyoteBuffer(length, type, static_cast(cclo.get()))); } } @@ -1066,7 +1067,7 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, void close_con(communicatorId comm_id = GLOBAL_COMM); private: - CCLO *cclo{}; + std::unique_ptr cclo; // Supported types and corresponding arithmetic config arithConfigMap arith_config; // Address to put new configurations like arithmetic configs @@ -1074,11 +1075,11 @@ ACCLRequest *barrier(communicatorId comm_id = GLOBAL_COMM, addr_t current_config_address{}; // RX spare buffers for eager mode addr_t max_eager_msg_size{}; - std::vector *> eager_rx_buffers; + std::vector>> eager_rx_buffers; addr_t eager_rx_buffer_size{}; // Spare buffers for use in rendezvous reduces addr_t max_rndzv_msg_size{}; - std::vector *> utility_spares; + std::vector>> utility_spares; // List of communicators, to which users will add std::vector communicators; // safety checks diff --git a/driver/xrt/src/accl.cpp b/driver/xrt/src/accl.cpp index 5dddf225..e32d59c8 100644 --- a/driver/xrt/src/accl.cpp +++ b/driver/xrt/src/accl.cpp @@ -32,7 +32,7 @@ ACCL::ACCL(xrt::device &device, xrt::ip &cclo_ip, xrt::kernel &hostctrl_ip, const arithConfigMap &arith_config) : arith_config(arith_config), sim_mode(false), _devicemem(devicemem), rxbufmem(rxbufmem) { - cclo = new XRTDevice(cclo_ip, hostctrl_ip, device); + cclo = std::make_unique(cclo_ip, hostctrl_ip, device); } // Simulation constructor @@ -40,10 +40,11 @@ ACCL::ACCL(unsigned int sim_start_port, unsigned int local_rank, const arithConfigMap &arith_config) : arith_config(arith_config), sim_mode(true), _devicemem(0), rxbufmem({}) { - cclo = new SimDevice(sim_start_port, local_rank); + cclo = std::make_unique(sim_start_port, local_rank); } // constructor for coyote fpga device +// Note: Takes ownership of the CoyoteDevice pointer ACCL::ACCL(CoyoteDevice *dev, const arithConfigMap &arith_config) : arith_config(arith_config), sim_mode(false), _devicemem(0), rxbufmem(0), cclo(dev) {} @@ -51,7 +52,7 @@ ACCL::ACCL(CoyoteDevice *dev, const arithConfigMap &arith_config) // destructor ACCL::~ACCL() { deinit(); - delete cclo; + // cclo is automatically cleaned up by unique_ptr } void ACCL::soft_reset() { @@ -77,13 +78,13 @@ void ACCL::deinit() { for (auto &buf : eager_rx_buffers) { buf->free_buffer(); - delete buf; + // unique_ptr automatically handles deletion } eager_rx_buffers.clear(); for (auto &buf : utility_spares) { buf->free_buffer(); - delete buf; + // unique_ptr automatically handles deletion } utility_spares.clear(); } @@ -1134,19 +1135,28 @@ void ACCL::setup_eager_rx_buffers(size_t n_egr_rx_bufs, addr_t egr_rx_buf_size, eager_rx_buffer_size = egr_rx_buf_size; for (size_t i = 0; i < n_egr_rx_bufs; ++i) { // create, clear and sync buffers to device - Buffer *buf; + std::unique_ptr> buf; if (sim_mode) { - buf = new SimBuffer(new int8_t[eager_rx_buffer_size](), eager_rx_buffer_size, dataType::int8, - static_cast(cclo)->get_context()); + buf = std::make_unique>( + new int8_t[eager_rx_buffer_size](), eager_rx_buffer_size, dataType::int8, + static_cast(cclo.get())->get_context()); } else if(cclo->get_device_type() == CCLO::xrt_device ){ - buf = new XRTBuffer(eager_rx_buffer_size, dataType::int8, *(static_cast(cclo)->get_device()), devicemem[i % devicemem.size()]); + buf = std::make_unique>( + eager_rx_buffer_size, dataType::int8, + *(static_cast(cclo.get())->get_device()), + devicemem[i % devicemem.size()]); } else if(cclo->get_device_type() == CCLO::coyote_device){ - buf = new CoyoteBuffer(eager_rx_buffer_size, dataType::int8, static_cast(cclo)); + buf = std::make_unique>( + eager_rx_buffer_size, dataType::int8, + static_cast(cclo.get())); + } + + if (!buf) { + throw std::runtime_error("Failed to create eager RX buffer: unsupported device type"); } buf->sync_to_device(); - eager_rx_buffers.emplace_back(buf); // program this buffer into the accelerator address += 4; cclo->write(address, 0); @@ -1159,6 +1169,7 @@ void ACCL::setup_eager_rx_buffers(size_t n_egr_rx_bufs, addr_t egr_rx_buf_size, address += 4; cclo->write(address, 0); } + eager_rx_buffers.push_back(std::move(buf)); } //write buffer len @@ -1176,23 +1187,28 @@ void ACCL::setup_rendezvous_spare_buffers(addr_t rndzv_spare_buf_size, const std max_rndzv_msg_size = rndzv_spare_buf_size; for(int i=0; i *buf = nullptr; + std::unique_ptr> buf; if (sim_mode) { - buf = new SimBuffer(new int8_t[max_rndzv_msg_size](), max_rndzv_msg_size, dataType::int8, - static_cast(cclo)->get_context()); + buf = std::make_unique>( + new int8_t[max_rndzv_msg_size](), max_rndzv_msg_size, dataType::int8, + static_cast(cclo.get())->get_context()); } else if(cclo->get_device_type() == CCLO::xrt_device ){ - buf = new XRTBuffer(max_rndzv_msg_size, dataType::int8, - *(static_cast(cclo)->get_device()), devicemem[i % devicemem.size()]); + buf = std::make_unique>( + max_rndzv_msg_size, dataType::int8, + *(static_cast(cclo.get())->get_device()), + devicemem[i % devicemem.size()]); } else if(cclo->get_device_type() == CCLO::coyote_device){ - buf = new CoyoteBuffer(max_rndzv_msg_size, dataType::int8, static_cast(cclo)); + buf = std::make_unique>( + max_rndzv_msg_size, dataType::int8, + static_cast(cclo.get())); } - if (buf == nullptr) { + if (!buf) { throw std::runtime_error("Failed to create spare buffer: unsupported device type"); } buf->sync_to_device(); - utility_spares.emplace_back(buf); + utility_spares.push_back(std::move(buf)); } if (utility_spares.size() < REQUIRED_SPARE_BUFFERS) { From 932e479b548b2878ffa1ca18f742a3268a848429 Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 01:24:20 -0600 Subject: [PATCH 4/6] fix: replace assertions with proper exception handling Replace assert() calls with std::logic_error exceptions in device request handling code. Assertions are compiled out in Release builds, making critical invariant checks no-ops in production. Changes: - xrtdevice.cpp: Replace 2 assertions in FPGARequest::start() and XRTDevice::launch_request() - simdevice.cpp: Replace 2 assertions in SimRequest::start() and SimDevice::launch_request() - coyotedevice.cpp: Replace 2 assertions in CoyoteRequest::start() and CoyoteDevice::launch_request() All assertions checked operation status invariants: - start() requires EXECUTING state - launch_request() requires QUEUED state Replace with includes. Benefits: - Checks remain active in Release builds - Proper exception with descriptive message instead of abort() - Consistent error handling throughout codebase - Easier debugging with clear error messages Co-Authored-By: Claude Opus 4.5 --- driver/xrt/src/coyotedevice.cpp | 9 +++++++-- driver/xrt/src/simdevice.cpp | 10 +++++++--- driver/xrt/src/xrtdevice.cpp | 10 +++++++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/driver/xrt/src/coyotedevice.cpp b/driver/xrt/src/coyotedevice.cpp index fd96b904..b48100e9 100644 --- a/driver/xrt/src/coyotedevice.cpp +++ b/driver/xrt/src/coyotedevice.cpp @@ -21,6 +21,7 @@ #include "cProcess.hpp" #include #include +#include static void finish_coyote_request(ACCL::CoyoteRequest *req) { req->wait_kernel(); @@ -36,7 +37,9 @@ static void finish_coyote_request(ACCL::CoyoteRequest *req) { namespace ACCL { void CoyoteRequest::start() { - assert(this->get_status() == operationStatus::EXECUTING); + if (this->get_status() != operationStatus::EXECUTING) { + throw std::logic_error("CoyoteRequest::start() called but request is not in EXECUTING state"); + } int function, arg_id = 0; @@ -443,7 +446,9 @@ void CoyoteDevice::launch_request() { // This guarantees permission to only one thread trying to start an operation if (queue.run()) { CoyoteRequest *req = queue.front(); - assert(req->get_status() == operationStatus::QUEUED); + if (req->get_status() != operationStatus::QUEUED) { + throw std::logic_error("CoyoteDevice::launch_request() - request is not in QUEUED state"); + } req->set_status(operationStatus::EXECUTING); req->start(); } diff --git a/driver/xrt/src/simdevice.cpp b/driver/xrt/src/simdevice.cpp index b0519f0d..67445f97 100644 --- a/driver/xrt/src/simdevice.cpp +++ b/driver/xrt/src/simdevice.cpp @@ -18,8 +18,8 @@ #include "accl/simdevice.hpp" #include "accl/common.hpp" -#include #include +#include #include "zmq_client.h" static void finish_sim_request(ACCL::SimRequest *req) { @@ -36,7 +36,9 @@ static void finish_sim_request(ACCL::SimRequest *req) { namespace ACCL { void SimRequest::start() { - assert(this->get_status() == operationStatus::EXECUTING); + if (this->get_status() != operationStatus::EXECUTING) { + throw std::logic_error("SimRequest::start() called but request is not in EXECUTING state"); + } int function; @@ -171,7 +173,9 @@ void SimDevice::launch_request() { // This guarantees permission to only one thread trying to start an operation if (queue.run()) { SimRequest *req = queue.front(); - assert(req->get_status() == operationStatus::QUEUED); + if (req->get_status() != operationStatus::QUEUED) { + throw std::logic_error("SimDevice::launch_request() - request is not in QUEUED state"); + } req->set_status(operationStatus::EXECUTING); req->start(); } diff --git a/driver/xrt/src/xrtdevice.cpp b/driver/xrt/src/xrtdevice.cpp index 8d93c464..003131a7 100644 --- a/driver/xrt/src/xrtdevice.cpp +++ b/driver/xrt/src/xrtdevice.cpp @@ -19,7 +19,7 @@ #include "accl/xrtdevice.hpp" #include "accl/common.hpp" #include -#include +#include static void finish_fpga_request(ACCL::FPGARequest *req) { req->wait_kernel(); @@ -34,7 +34,9 @@ static void finish_fpga_request(ACCL::FPGARequest *req) { namespace ACCL { void FPGARequest::start() { - assert(this->get_status() == operationStatus::EXECUTING); + if (this->get_status() != operationStatus::EXECUTING) { + throw std::logic_error("FPGARequest::start() called but request is not in EXECUTING state"); + } int function, arg_id = 0; @@ -288,7 +290,9 @@ void XRTDevice::launch_request() { // This guarantees permission to only one thread trying to start an operation if (queue.run()) { FPGARequest *req = queue.front(); - assert(req->get_status() == operationStatus::QUEUED); + if (req->get_status() != operationStatus::QUEUED) { + throw std::logic_error("XRTDevice::launch_request() - request is not in QUEUED state"); + } req->set_status(operationStatus::EXECUTING); req->start(); } From 591fe5d9b514d270144496711ce483e783478c1c Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 01:30:00 -0600 Subject: [PATCH 5/6] fix: add comprehensive input validation to network utilities Add input validation to all public functions in accl_network_utils to prevent crashes and undefined behavior from invalid parameters. Changes: - Add validate_local_rank() helper function for consistent validation - configure_vnx(): Validate local_rank bounds - configure_tcp(): Validate local_rank bounds - exchange_qp(): Validate master_rank, slave_rank, local_rank bounds and check for empty vectors - configure_cyt_rdma(): Validate local_rank and null device pointer - configure_cyt_tcp(): Validate local_rank and null device pointer - get_ips(): Validate world_size is positive and <= 254 - generate_ranks() (both overloads): - Validate local_rank is non-negative - Validate world_size is positive - Validate local_rank < world_size - Check for port number overflow (start_port + count > 65535) - initialize_accl(): - Validate local_rank bounds - Validate nbufs > 0 - Validate bufsize > 0 - Require xclbin path for non-simulator mode All validation uses descriptive exception messages with function name and parameter values to aid debugging. Exception types: - std::invalid_argument: Invalid parameter values - std::out_of_range: Index out of bounds - std::overflow_error: Port number overflow Co-Authored-By: Claude Opus 4.5 --- .../src/accl_network_utils.cpp | 108 +++++++++++++++++- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/driver/utils/accl_network_utils/src/accl_network_utils.cpp b/driver/utils/accl_network_utils/src/accl_network_utils.cpp index ad14c448..76f6bece 100644 --- a/driver/utils/accl_network_utils/src/accl_network_utils.cpp +++ b/driver/utils/accl_network_utils/src/accl_network_utils.cpp @@ -127,9 +127,28 @@ bool check_arp(vnx::Networklayer &network_layer, } // namespace namespace accl_network_utils { + +// Helper function to validate local_rank against ranks vector +inline void validate_local_rank(int local_rank, size_t ranks_size, const std::string &function_name) { + if (ranks_size == 0) { + throw std::invalid_argument(function_name + ": ranks vector is empty"); + } + if (local_rank < 0) { + throw std::invalid_argument(function_name + ": local_rank cannot be negative (got " + + std::to_string(local_rank) + ")"); + } + if (static_cast(local_rank) >= ranks_size) { + throw std::out_of_range(function_name + ": local_rank (" + std::to_string(local_rank) + + ") is out of range for ranks vector of size " + + std::to_string(ranks_size)); + } +} + void configure_vnx(vnx::CMAC &cmac, vnx::Networklayer &network_layer, const std::vector &ranks, int local_rank, bool rsfec) { + validate_local_rank(local_rank, ranks.size(), "configure_vnx"); + if (ranks.size() > vnx::max_sockets_size) { throw std::runtime_error("Too many ranks. VNX supports up to " + std::to_string(vnx::max_sockets_size) + @@ -195,6 +214,8 @@ void configure_vnx(vnx::CMAC &cmac, vnx::Networklayer &network_layer, void configure_tcp(XRTBuffer &tx_buf_network, XRTBuffer &rx_buf_network, xrt::kernel &network_krnl, xrt::kernel &session_krnl, std::vector &ranks, int local_rank) { + validate_local_rank(local_rank, ranks.size(), "configure_tcp"); + tx_buf_network.sync_to_device(); rx_buf_network.sync_to_device(); @@ -270,7 +291,25 @@ void configure_tcp(XRTBuffer &tx_buf_network, XRTBuffer &rx_buf_ } void exchange_qp(unsigned int master_rank, unsigned int slave_rank, unsigned int local_rank, std::vector &ibvQpConn_vec, std::vector &ranks){ - + // Validate all rank indices + if (ranks.empty()) { + throw std::invalid_argument("exchange_qp: ranks vector is empty"); + } + if (ibvQpConn_vec.empty()) { + throw std::invalid_argument("exchange_qp: ibvQpConn_vec is empty"); + } + if (master_rank >= ranks.size()) { + throw std::out_of_range("exchange_qp: master_rank (" + std::to_string(master_rank) + + ") out of range for ranks size " + std::to_string(ranks.size())); + } + if (slave_rank >= ranks.size()) { + throw std::out_of_range("exchange_qp: slave_rank (" + std::to_string(slave_rank) + + ") out of range for ranks size " + std::to_string(ranks.size())); + } + if (slave_rank >= ibvQpConn_vec.size() || master_rank >= ibvQpConn_vec.size()) { + throw std::out_of_range("exchange_qp: rank index out of range for ibvQpConn_vec"); + } + if (local_rank == master_rank) { std::cout<<"Local rank "< &ranks, int local_rank, ACCL::CoyoteDevice* device){ + validate_local_rank(local_rank, ranks.size(), "configure_cyt_rdma"); + + if (device == nullptr) { + throw std::invalid_argument("configure_cyt_rdma: device pointer is null"); + } std::cout<<"Initializing QP connections..."< &ranks, int local_rank, ACCL:: } void configure_cyt_tcp(std::vector &ranks, int local_rank, ACCL::CoyoteDevice* device){ + validate_local_rank(local_rank, ranks.size(), "configure_cyt_tcp"); + + if (device == nullptr) { + throw std::invalid_argument("configure_cyt_tcp: device pointer is null"); + } + std::cout<<"Configuring Coyote TCP..."< get_ips(fs::path config_file) { } std::vector get_ips(bool local, int world_size) { + if (world_size <= 0) { + throw std::invalid_argument("get_ips: world_size must be positive (got " + + std::to_string(world_size) + ")"); + } + if (world_size > 254) { + throw std::invalid_argument("get_ips: world_size cannot exceed 254 for IP generation (got " + + std::to_string(world_size) + ")"); + } + std::vector ips{}; for (int i = 0; i < world_size; ++i) { if (local) { @@ -424,11 +483,24 @@ std::vector get_ips(bool local, int world_size) { std::vector generate_ranks(fs::path config_file, int local_rank, std::uint16_t start_port, unsigned int rxbuf_size) { + if (local_rank < 0) { + throw std::invalid_argument("generate_ranks: local_rank cannot be negative"); + } + std::vector ranks{}; std::vector ips = get_ips(config_file); + if (ips.empty()) { + throw std::runtime_error("generate_ranks: no IPs found in config file"); + } + + // Check for port overflow + if (static_cast(start_port) + ips.size() - 1 > 65535) { + throw std::overflow_error("generate_ranks: start_port + number of ranks would exceed max port 65535"); + } + for (int i = 0; i < static_cast(ips.size()); ++i) { - rank_t new_rank = {ips[i], start_port + i, i, rxbuf_size}; + rank_t new_rank = {ips[i], static_cast(start_port + i), i, rxbuf_size}; ranks.emplace_back(new_rank); } @@ -438,10 +510,26 @@ std::vector generate_ranks(fs::path config_file, int local_rank, std::vector generate_ranks(bool local, int local_rank, int world_size, std::uint16_t start_port, unsigned int rxbuf_size) { + if (local_rank < 0) { + throw std::invalid_argument("generate_ranks: local_rank cannot be negative"); + } + if (world_size <= 0) { + throw std::invalid_argument("generate_ranks: world_size must be positive"); + } + if (local_rank >= world_size) { + throw std::out_of_range("generate_ranks: local_rank (" + std::to_string(local_rank) + + ") must be less than world_size (" + std::to_string(world_size) + ")"); + } + + // Check for port overflow + if (static_cast(start_port) + world_size - 1 > 65535) { + throw std::overflow_error("generate_ranks: start_port + world_size would exceed max port 65535"); + } + std::vector ranks{}; std::vector ips = get_ips(local, world_size); for (int i = 0; i < static_cast(ips.size()); ++i) { - rank_t new_rank = {ips[i], start_port + i, i, rxbuf_size}; + rank_t new_rank = {ips[i], static_cast(start_port + i), i, rxbuf_size}; ranks.emplace_back(new_rank); } @@ -451,8 +539,20 @@ std::vector generate_ranks(bool local, int local_rank, int world_size, std::unique_ptr initialize_accl(std::vector &ranks, int local_rank, bool simulator, acclDesign design, xrt::device device, - fs::path xclbin, unsigned int nbufs, unsigned int bufsize, + fs::path xclbin, unsigned int nbufs, unsigned int bufsize, unsigned int egrsize, bool rsfec) { + validate_local_rank(local_rank, ranks.size(), "initialize_accl"); + + if (nbufs == 0) { + throw std::invalid_argument("initialize_accl: nbufs must be greater than 0"); + } + if (bufsize == 0) { + throw std::invalid_argument("initialize_accl: bufsize must be greater than 0"); + } + if (!simulator && xclbin.empty()) { + throw std::invalid_argument("initialize_accl: xclbin path required for non-simulator mode"); + } + std::size_t world_size = ranks.size(); std::unique_ptr accl; From f3443b74a65e547cf67c038d33c66c9cc3c056e6 Mon Sep 17 00:00:00 2001 From: Core Alcoser Date: Tue, 27 Jan 2026 01:36:55 -0600 Subject: [PATCH 6/6] refactor: replace magic numbers with named constants - Add configuration constants to constants.hpp: - MAX_STREAM_ID (246) for stream_put validation - DEFAULT_RESET_TIMEOUT_MS (100) for soft reset operations - DEFAULT_OPERATION_TIMEOUT (1000000) for hardware operations - FLAT_TREE_MAX_COUNT (32KB) for gather/reduce operations - DEFAULT_GATHER_FLAT_TREE_MAX_FANIN (2) - DEFAULT_BCAST_FLAT_TREE_MAX_RANKS (3) - DEFAULT_REDUCE_FLAT_TREE_MAX_RANKS (4) - Add MAX_DMA_BYTES constant to eth_intf.h (8MB-1) - Update tcp_depacketizer.cpp, udp_depacketizer.cpp, rdma_depacketizer.cpp to use the shared constant This improves code maintainability and self-documentation by giving meaningful names to configuration values. Co-Authored-By: Claude Opus 4.5 --- driver/xrt/include/accl/constants.hpp | 18 ++++++++++++ driver/xrt/src/accl.cpp | 29 ++++++++++--------- kernels/cclo/hls/eth_intf/eth_intf.h | 4 +++ .../cclo/hls/eth_intf/rdma_depacketizer.cpp | 3 +- .../cclo/hls/eth_intf/tcp_depacketizer.cpp | 3 +- .../cclo/hls/eth_intf/udp_depacketizer.cpp | 5 ++-- 6 files changed, 41 insertions(+), 21 deletions(-) diff --git a/driver/xrt/include/accl/constants.hpp b/driver/xrt/include/accl/constants.hpp index a8db21b2..10c28d32 100644 --- a/driver/xrt/include/accl/constants.hpp +++ b/driver/xrt/include/accl/constants.hpp @@ -40,6 +40,24 @@ const addr_t EXCHANGE_MEM_ADDRESS_RANGE = 0x2000; /** Global Communicator */ const communicatorId GLOBAL_COMM = 0x0; +/** + * Configuration constants + */ +/** Maximum valid stream ID for stream_put operations (0-246, so max is 246) */ +constexpr unsigned int MAX_STREAM_ID = 246; +/** Default timeout in milliseconds for soft reset operations */ +constexpr unsigned int DEFAULT_RESET_TIMEOUT_MS = 100; +/** Default timeout value for ACCL operations (in hardware cycles) */ +constexpr unsigned int DEFAULT_OPERATION_TIMEOUT = 1000000; +/** Maximum count for flat tree gather/reduce operations (32 KB) */ +constexpr unsigned int FLAT_TREE_MAX_COUNT = 32 * 1024; +/** Default max fanin for flat tree gather operations */ +constexpr unsigned int DEFAULT_GATHER_FLAT_TREE_MAX_FANIN = 2; +/** Default max ranks for flat tree broadcast operations */ +constexpr unsigned int DEFAULT_BCAST_FLAT_TREE_MAX_RANKS = 3; +/** Default max ranks for flat tree reduce operations */ +constexpr unsigned int DEFAULT_REDUCE_FLAT_TREE_MAX_RANKS = 4; + /** * Address offsets inside the HOSTCTRL internal memory * diff --git a/driver/xrt/src/accl.cpp b/driver/xrt/src/accl.cpp index e32d59c8..0a95fba3 100644 --- a/driver/xrt/src/accl.cpp +++ b/driver/xrt/src/accl.cpp @@ -62,7 +62,7 @@ void ACCL::soft_reset() { options.scenario = operation::config; options.cfg_function = cfgFunc::reset_periph; ACCLRequest *handle = call_async(options); - std::chrono::milliseconds timeout(100); + std::chrono::milliseconds timeout(DEFAULT_RESET_TIMEOUT_MS); if(!wait(handle, timeout)){ throw std::runtime_error("CCLO failed to soft reset"); } @@ -179,8 +179,8 @@ ACCLRequest *ACCL::stream_put(BaseBuffer &srcbuf, unsigned int count, std::vector waitfor) { CCLO::Options options{}; - if (stream_id > 246) { - throw std::invalid_argument("Stream ID must < 247"); + if (stream_id > MAX_STREAM_ID) { + throw std::invalid_argument("Stream ID must be <= " + std::to_string(MAX_STREAM_ID)); } if (from_fpga == false) { @@ -211,8 +211,8 @@ ACCLRequest *ACCL::stream_put(dataType src_data_type, unsigned int count, std::vector waitfor) { CCLO::Options options{}; - if (stream_id > 246) { - throw std::invalid_argument("Stream ID must < 247"); + if (stream_id > MAX_STREAM_ID) { + throw std::invalid_argument("Stream ID must be <= " + std::to_string(MAX_STREAM_ID)); } options.scenario = operation::send; @@ -1097,7 +1097,7 @@ void ACCL::initialize(const std::vector &ranks, int local_rank, cclo->write(CCLO_ADDR::CFGRDY_OFFSET, 1); debug("Set timeout"); - set_timeout(1000000); + set_timeout(DEFAULT_OPERATION_TIMEOUT); debug("Set max eager size: " + std::to_string(max_egr_size)); set_max_eager_msg_size(max_egr_size); @@ -1227,14 +1227,15 @@ void ACCL::setup_rendezvous_spare_buffers(addr_t rndzv_spare_buf_size, const std void ACCL::configure_tuning_parameters(){ //tune gather to reduce fan-in of flat tree above certain message sizes - cclo->write(CCLO_ADDR::GATHER_FLAT_TREE_MAX_FANIN_OFFSET, 2); - cclo->write(CCLO_ADDR::GATHER_FLAT_TREE_MAX_COUNT_OFFSET, 32*1024); - //tune bcast to execute flat tree up to 3 ranks - cclo->write(CCLO_ADDR::BCAST_FLAT_TREE_MAX_RANKS_OFFSET, 3); - //tune reduce to execute flat tree up to 4 ranks or up to 32KB - unsigned int max_reduce_flat_tree_size = 4; - cclo->write(CCLO_ADDR::REDUCE_FLAT_TREE_MAX_RANKS_OFFSET, max_reduce_flat_tree_size); - cclo->write(CCLO_ADDR::REDUCE_FLAT_TREE_MAX_COUNT_OFFSET, std::min(max_rndzv_msg_size/max_reduce_flat_tree_size, (long unsigned int)32*1024)); + cclo->write(CCLO_ADDR::GATHER_FLAT_TREE_MAX_FANIN_OFFSET, DEFAULT_GATHER_FLAT_TREE_MAX_FANIN); + cclo->write(CCLO_ADDR::GATHER_FLAT_TREE_MAX_COUNT_OFFSET, FLAT_TREE_MAX_COUNT); + //tune bcast to execute flat tree up to configured max ranks + cclo->write(CCLO_ADDR::BCAST_FLAT_TREE_MAX_RANKS_OFFSET, DEFAULT_BCAST_FLAT_TREE_MAX_RANKS); + //tune reduce to execute flat tree up to configured max ranks or up to FLAT_TREE_MAX_COUNT + cclo->write(CCLO_ADDR::REDUCE_FLAT_TREE_MAX_RANKS_OFFSET, DEFAULT_REDUCE_FLAT_TREE_MAX_RANKS); + cclo->write(CCLO_ADDR::REDUCE_FLAT_TREE_MAX_COUNT_OFFSET, + std::min(max_rndzv_msg_size / DEFAULT_REDUCE_FLAT_TREE_MAX_RANKS, + static_cast(FLAT_TREE_MAX_COUNT))); } void ACCL::check_return_value(const std::string function_name, ACCLRequest *request) { diff --git a/kernels/cclo/hls/eth_intf/eth_intf.h b/kernels/cclo/hls/eth_intf/eth_intf.h index 5a0ef64c..8a36d300 100644 --- a/kernels/cclo/hls/eth_intf/eth_intf.h +++ b/kernels/cclo/hls/eth_intf/eth_intf.h @@ -34,6 +34,10 @@ #define SESSION_INDEX_BITS 10 #define SESSION_INDEX_MASK (MAX_SESSIONS - 1) // 0x3FF for 1024 sessions +// Maximum bytes per DMA transfer (8 MB - 1 byte) +// Used in depacketizers to limit fragment sizes +#define MAX_DMA_BYTES (8*1024*1024-1) + #define RATE_CONTROL typedef ap_axiu pkt512; diff --git a/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp b/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp index 7dde66ba..44109bfc 100644 --- a/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/rdma_depacketizer.cpp @@ -64,7 +64,6 @@ void rdma_depacketizer( #pragma HLS PIPELINE II=1 style=flp unsigned constexpr bytes_per_word = DATA_WIDTH/8; - unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions static ap_uint target_strm[MAX_SESSIONS] = {0};//one RAMB18 for max sessions @@ -171,7 +170,7 @@ void rdma_depacketizer( notif.length = (notif.length < bytes_per_word) ? 0u : (unsigned int)notif.length-bytes_per_word;//floor at zero current_bytes += (message_rem < bytes_per_word) ? message_rem : bytes_per_word; message_rem = (message_rem < bytes_per_word) ? 0u : message_rem-bytes_per_word;//slight problem here if the message doesnt end on a 64B boundary... - } while(notif.length > 0 && message_rem > 0 && current_bytes < (max_dma_bytes-bytes_per_word)); + } while(notif.length > 0 && message_rem > 0 && current_bytes < (MAX_DMA_BYTES-bytes_per_word)); if(message_strm == 0){ downstream_notif.type = 2; //for EOF downstream_notif.length = current_bytes; diff --git a/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp b/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp index 0f2493ca..2f3a0971 100644 --- a/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/tcp_depacketizer.cpp @@ -60,7 +60,6 @@ void tcp_depacketizer( #pragma HLS PIPELINE II=1 style=flp unsigned constexpr bytes_per_word = DATA_WIDTH/8; - unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions static ap_uint target_strm[MAX_SESSIONS] = {0};//one RAMB18 for max sessions @@ -138,7 +137,7 @@ void tcp_depacketizer( notif.length = (notif.length < bytes_per_word) ? 0u : (unsigned int)notif.length-bytes_per_word;//floor at zero current_bytes += (message_rem < bytes_per_word) ? message_rem : bytes_per_word; message_rem = (message_rem < bytes_per_word) ? 0u : message_rem-bytes_per_word;//slight problem here if the message doesnt end on a 64B boundary... - } while(notif.length > 0 && message_rem > 0 && current_bytes < (max_dma_bytes-bytes_per_word)); + } while(notif.length > 0 && message_rem > 0 && current_bytes < (MAX_DMA_BYTES-bytes_per_word)); if(message_strm == 0){ downstream_notif.type = 2; //for EOF downstream_notif.length = current_bytes; diff --git a/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp b/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp index e98e6f4b..e8c07392 100644 --- a/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp +++ b/kernels/cclo/hls/eth_intf/udp_depacketizer.cpp @@ -96,7 +96,6 @@ void udp_depacketizer( #pragma HLS PIPELINE II=1 style=flp unsigned const bytes_per_word = DATA_WIDTH/8; - unsigned const max_dma_bytes = 8*1024*1024-1;//TODO: convert to argument? //session states (for each session, remaining bytes and STRM values for ongoing messages) static unsigned int remaining[MAX_SESSIONS] = {0};//one RAMB36 for max sessions @@ -155,7 +154,7 @@ void udp_depacketizer( } //write out SOF if(message_strm == 0){ - notif.length = max_dma_bytes; + notif.length = MAX_DMA_BYTES; notif.type = 1; //for SOF STREAM_WRITE(notif_out, notif); } @@ -169,7 +168,7 @@ void udp_depacketizer( STREAM_WRITE(out, inword); current_bytes += (message_rem < bytes_per_word) ? message_rem : bytes_per_word; message_rem = (message_rem < bytes_per_word) ? 0u : message_rem-bytes_per_word;//slight problem here if the message doesnt end on a 64B boundary... - } while(inword.last == 0 && message_rem > 0 && current_bytes < (max_dma_bytes-bytes_per_word)); + } while(inword.last == 0 && message_rem > 0 && current_bytes < (MAX_DMA_BYTES-bytes_per_word)); } //write out EOF if(message_strm == 0){