diff --git a/ucxx_cudf_example/CMakeLists.txt b/ucxx_cudf_example/CMakeLists.txt new file mode 100644 index 0000000..37ba490 --- /dev/null +++ b/ucxx_cudf_example/CMakeLists.txt @@ -0,0 +1,85 @@ +cmake_minimum_required(VERSION 3.26 FATAL_ERROR) + +# C++ only - no CUDA compiler needed, just CUDA runtime +project(ucxx_cudf_example VERSION 1.0.0 LANGUAGES C CXX) + +################################################################################################### +# - Conda environment ----------------------------------------------------------------------------- + +if(DEFINED ENV{CONDA_PREFIX}) + set(CMAKE_PREFIX_PATH "$ENV{CONDA_PREFIX};${CMAKE_PREFIX_PATH}") + set(CONDA_INCLUDE_DIRS "$ENV{CONDA_PREFIX}/include") + set(CONDA_LINK_DIRS "$ENV{CONDA_PREFIX}/lib") + # Point CUDAToolkit to conda environment + set(CUDAToolkit_ROOT "$ENV{CONDA_PREFIX}") + message(STATUS "Conda environment detected, CMAKE_PREFIX_PATH set to: ${CMAKE_PREFIX_PATH}") +endif() + +################################################################################################### +# - Build type ------------------------------------------------------------------------------------ + +set(DEFAULT_BUILD_TYPE "Release") +if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) + message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.") + set(CMAKE_BUILD_TYPE "${DEFAULT_BUILD_TYPE}" CACHE STRING "Choose the type of build." FORCE) +endif() + +################################################################################################### +# - Compiler options ------------------------------------------------------------------------------ + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +if(CMAKE_COMPILER_IS_GNUCXX) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra") +endif() + +################################################################################################### +# - Find dependencies ----------------------------------------------------------------------------- + +# Find CUDA Toolkit (modern CMake way, provides CUDA::cudart target) +find_package(CUDAToolkit REQUIRED) +message(STATUS "Found CUDA Toolkit: ${CUDAToolkit_VERSION}") + +# Find cudf (installed via conda) +find_package(cudf REQUIRED) +message(STATUS "Found cudf: ${cudf_VERSION}") + +# Find ucxx (installed via conda) +find_package(ucxx REQUIRED) +message(STATUS "Found ucxx: ${ucxx_VERSION}") + +# Find rmm (installed via conda) +find_package(rmm REQUIRED) +message(STATUS "Found rmm: ${rmm_VERSION}") + +################################################################################################### +# - Build executable ------------------------------------------------------------------------------ + +add_executable(ucxx_cudf_example src/ucxx_cudf_example.cpp) + +# Add conda paths if available +if(CONDA_INCLUDE_DIRS) + target_include_directories(ucxx_cudf_example PRIVATE "${CONDA_INCLUDE_DIRS}") +endif() + +if(CONDA_LINK_DIRS) + target_link_directories(ucxx_cudf_example PRIVATE "${CONDA_LINK_DIRS}") +endif() + +target_link_libraries(ucxx_cudf_example PRIVATE + cudf::cudf + ucxx::ucxx + rmm::rmm + CUDA::cudart # CUDA runtime library +) + +################################################################################################### +# - Install targets ------------------------------------------------------------------------------- + +include(GNUInstallDirs) + +install(TARGETS ucxx_cudf_example + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) diff --git a/ucxx_cudf_example/Dockerfile b/ucxx_cudf_example/Dockerfile new file mode 100644 index 0000000..dcc774e --- /dev/null +++ b/ucxx_cudf_example/Dockerfile @@ -0,0 +1,56 @@ +# UCXX cuDF Example - Dockerfile +# This Dockerfile creates an environment for building and running +# an example that sends cuDF columns via UCXX communication library. + +ARG CUDA_VERSION=12.6.3 +ARG LINUX_VERSION=ubuntu22.04 + +FROM nvidia/cuda:${CUDA_VERSION}-base-${LINUX_VERSION} + +ARG DEBIAN_FRONTEND=noninteractive +ARG PARALLEL_LEVEL=8 +ENV PARALLEL_LEVEL=${PARALLEL_LEVEL} + +# Install basic dependencies +RUN apt-get update -y && apt-get install -y \ + build-essential \ + git \ + wget \ + curl \ + ca-certificates \ + libssl-dev \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install Miniforge (uses conda-forge by default, no ToS acceptance required) +RUN wget https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-Linux-x86_64.sh -O /tmp/miniforge.sh \ + && bash /tmp/miniforge.sh -b -p /opt/conda \ + && rm /tmp/miniforge.sh + +ENV PATH="/opt/conda/bin:${PATH}" +ENV CONDA_PREFIX="/opt/conda" + +# Enable conda in bash +SHELL ["/bin/bash", "-c"] + +# Copy the project files +COPY . /rapids/ucxx_cudf_example + +WORKDIR /rapids/ucxx_cudf_example + +# Create the conda environment with libcudf and libucxx from conda +RUN conda env create -f /rapids/ucxx_cudf_example/conda/ucxx_cudf_example.yml + +# Configure and build the example +RUN source activate ucxx_cudf_example \ + && cmake \ + -DCMAKE_BUILD_TYPE=Release \ + -S /rapids/ucxx_cudf_example \ + -B /rapids/ucxx_cudf_example/build \ + && cmake --build /rapids/ucxx_cudf_example/build -j${PARALLEL_LEVEL} + +# Set up entrypoint to activate conda environment +RUN echo "source activate ucxx_cudf_example" >> ~/.bashrc + +SHELL ["/bin/bash", "-l"] +CMD ["/bin/bash", "-l"] diff --git a/ucxx_cudf_example/README.md b/ucxx_cudf_example/README.md new file mode 100644 index 0000000..53d51f5 --- /dev/null +++ b/ucxx_cudf_example/README.md @@ -0,0 +1,177 @@ +# UCXX cuDF Example + +This example demonstrates how to create a cuDF integer column using `cudf::sequence` and transfer it between endpoints using the UCXX communication library. + +## Overview + +The example: +1. Creates a cuDF column with integer sequence `[0, 1, 2, ..., N-1]` using `cudf::sequence` +2. Sets up a UCXX listener and client endpoint (loopback connection) +3. Sends the column data from the listener endpoint to the client endpoint +4. Verifies the received data matches the original sequence +5. Reconstructs a new cuDF column from the received data + +This demonstrates a pattern useful for distributed GPU computing where cuDF DataFrames/columns need to be transferred between nodes. + +## Dependencies + +All dependencies are installed via conda (no source compilation required): +- **libcudf** (>=25.08) - RAPIDS cuDF library (requires C++20) +- **libucxx** (>=0.40) - UCX C++ bindings +- **rmm** (>=25.08) - RAPIDS Memory Manager +- **cuda-nvcc** - NVIDIA CUDA compiler (>=12.0) + +## Quick Start + +### Option 1: Build with Conda (Local) + +```bash +./build_with_conda.sh +``` + +Then run: +```bash +conda activate ucxx_cudf_example +./build/ucxx_cudf_example +``` + +### Option 2: Build with Docker + +```bash +./build_with_docker.sh +``` + +Then run: +```bash +docker run --gpus all --rm -it ucxx_cudf_example ./build/ucxx_cudf_example +``` + +## Usage + +``` +Usage: ucxx_cudf_example [parameters] + +Creates a cuDF integer sequence column and sends it between endpoints +using UCXX communication library. + +Parameters: + -p Port number to listen at (default: 12345) + -s Number of elements in the sequence (default: 1000) + -h Print this help +``` + +## Testing + +### Basic Test +Run with default parameters (1000 elements): +```bash +./build/ucxx_cudf_example +``` + +### Test with Larger Data +```bash +# 100,000 elements (~400KB) +./build/ucxx_cudf_example -s 100000 + +# 1 million elements (~4MB) +./build/ucxx_cudf_example -s 1000000 +``` + +### Test with Docker +```bash +# Basic test +docker run --gpus all --rm -it ucxx_cudf_example ./build/ucxx_cudf_example + +# Test with 100K elements +docker run --gpus all --rm -it ucxx_cudf_example ./build/ucxx_cudf_example -s 100000 + +# Interactive shell for debugging +docker run --gpus all --rm -it ucxx_cudf_example bash +``` + +### Expected Output + +A successful run produces output like: +``` +=== UCXX cuDF Example === +Port: 12345 +Sequence size: 1000 + +Created cuDF sequence column with 1000 elements [0, 1, 2, ..., 999] +Column data size: 4000 bytes +Sender Column preview: [0, 1, 2, 3, 4, ..., 995, 996, 997, 998, 999] + +Setting up UCXX communication... +Waiting for connection... +Server received connection request from 127.0.0.1:xxxxx +Connection established! + +Performing wireup exchange... +Wireup complete! + +Sending cuDF column data... +Transfer complete! + +Receiver Column preview: [0, 1, 2, 3, 4, ..., 995, 996, 997, 998, 999] + +Verifying received data... +Verification PASSED! All 1000 elements match. + +Creating new cuDF column from received data... +Created new cuDF column with 1000 elements + +=== Example completed successfully === +``` + +The key indicators of success: +- `Verification PASSED!` - Data was transferred correctly +- `Example completed successfully` - All steps completed without errors + +## Key Concepts + +### cudf::sequence + +Creates a column filled with a sequence of values: +```cpp +cudf::numeric_scalar init_scalar(0, true, stream); // Start at 0 +cudf::numeric_scalar step_scalar(1, true, stream); // Step by 1 +auto column = cudf::sequence(size, init_scalar, step_scalar); +``` + +### UCXX Tag Send/Receive + +Tag-based messaging for point-to-point communication: +```cpp +// Send data +endpoint->tagSend(data_ptr, size, ucxx::Tag{tag_value}); + +// Receive data +endpoint->tagRecv(buffer_ptr, size, ucxx::Tag{tag_value}, ucxx::TagMaskFull); +``` + +## Extending This Example + +To send more complex cuDF data structures: + +1. **DataFrames**: Serialize column-by-column, sending metadata first (column names, types, sizes) +2. **Strings columns**: Use `cudf::strings_column_view` to access the offsets and char data separately +3. **Nullable columns**: Also transfer the null bitmask + +## Project Structure + +``` +ucxx_cudf_example/ +├── build_with_conda.sh # Build script using conda +├── build_with_docker.sh # Build script using Docker +├── CMakeLists.txt # CMake build configuration +├── Dockerfile # Docker image definition +├── README.md # This file +├── conda/ +│ └── ucxx_cudf_example.yml # Conda environment specification +└── src/ + └── ucxx_cudf_example.cpp # Main example source code +``` + +## License + +Apache-2.0 diff --git a/ucxx_cudf_example/build_with_conda.sh b/ucxx_cudf_example/build_with_conda.sh new file mode 100755 index 0000000..5d7ef04 --- /dev/null +++ b/ucxx_cudf_example/build_with_conda.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# ============================================================================= +# build_with_conda.sh - Build UCXX cuDF Example using Conda +# ============================================================================= +# +# This script sets up a conda environment with libcudf and libucxx installed +# from conda channels (no source compilation required), then builds the example. +# +# Prerequisites: +# - Conda (Miniconda or Anaconda) installed +# - NVIDIA GPU with CUDA support +# +# Usage: +# ./build_with_conda.sh # Build with default settings +# ./build_with_conda.sh clean # Clean build directory and rebuild +# +# ============================================================================= + +set -e # Exit on error + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ENV_NAME="ucxx_cudf_example" +BUILD_DIR="${SCRIPT_DIR}/build" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${GREEN}=== UCXX cuDF Example - Conda Build ===${NC}" + +# ----------------------------------------------------------------------------- +# Check prerequisites +# ----------------------------------------------------------------------------- +if ! command -v conda &> /dev/null; then + echo -e "${RED}Error: conda is not installed or not in PATH${NC}" + echo "Please install Miniconda or Anaconda first." + exit 1 +fi + +# Initialize conda for the current shell +eval "$(conda shell.bash hook)" + +# ----------------------------------------------------------------------------- +# Handle clean build +# ----------------------------------------------------------------------------- +if [[ "$1" == "clean" ]]; then + echo -e "${YELLOW}Cleaning build directory...${NC}" + rm -rf "${BUILD_DIR}" +fi + +# ----------------------------------------------------------------------------- +# Create or update conda environment +# ----------------------------------------------------------------------------- +if conda env list | grep -q "^${ENV_NAME} "; then + echo -e "${YELLOW}Conda environment '${ENV_NAME}' already exists.${NC}" + echo "Activating existing environment..." +else + echo -e "${GREEN}Creating conda environment '${ENV_NAME}'...${NC}" + echo "This will install libcudf, libucxx, rmm, and CUDA tools from conda." + echo "This may take several minutes on first run..." + conda env create -f "${SCRIPT_DIR}/conda/${ENV_NAME}.yml" +fi + +# Activate the environment +echo -e "${GREEN}Activating conda environment...${NC}" +conda activate "${ENV_NAME}" + +# ----------------------------------------------------------------------------- +# Configure with CMake +# ----------------------------------------------------------------------------- +echo -e "${GREEN}Configuring with CMake...${NC}" +cmake \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CUDA_ARCHITECTURES=native \ + -S "${SCRIPT_DIR}" \ + -B "${BUILD_DIR}" + +# ----------------------------------------------------------------------------- +# Build +# ----------------------------------------------------------------------------- +echo -e "${GREEN}Building...${NC}" +cmake --build "${BUILD_DIR}" -j"$(nproc)" + +# ----------------------------------------------------------------------------- +# Done +# ----------------------------------------------------------------------------- +echo "" +echo -e "${GREEN}=== Build Complete ===${NC}" +echo "" +echo "To run the example:" +echo " conda activate ${ENV_NAME}" +echo " ./build/ucxx_cudf_example" +echo "" +echo "Run with custom parameters:" +echo " ./build/ucxx_cudf_example -s 10000 # 10000 elements" +echo " ./build/ucxx_cudf_example -p 54321 # Use port 54321" +echo " ./build/ucxx_cudf_example -h # Show help" + diff --git a/ucxx_cudf_example/build_with_docker.sh b/ucxx_cudf_example/build_with_docker.sh new file mode 100755 index 0000000..224eb6d --- /dev/null +++ b/ucxx_cudf_example/build_with_docker.sh @@ -0,0 +1,99 @@ +#!/bin/bash +# ============================================================================= +# build_with_docker.sh - Build UCXX cuDF Example using Docker +# ============================================================================= +# +# This script builds a Docker image containing the UCXX cuDF example. +# The Docker image includes: +# - CUDA runtime and development tools +# - Conda environment with libcudf, libucxx, rmm (installed from conda) +# - Pre-built example binary +# +# Prerequisites: +# - Docker installed and running +# - NVIDIA Container Toolkit (for GPU support) +# +# Usage: +# ./build_with_docker.sh # Build image with default name +# ./build_with_docker.sh my_image # Build image with custom name +# ./build_with_docker.sh --no-cache # Build without Docker cache +# +# ============================================================================= + +set -e # Exit on error + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DEFAULT_IMAGE_NAME="ucxx_cudf_example" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Parse arguments +IMAGE_NAME="${DEFAULT_IMAGE_NAME}" +DOCKER_ARGS="" + +for arg in "$@"; do + case $arg in + --no-cache) + DOCKER_ARGS="--no-cache" + ;; + -*) + DOCKER_ARGS="${DOCKER_ARGS} ${arg}" + ;; + *) + IMAGE_NAME="${arg}" + ;; + esac +done + +echo -e "${GREEN}=== UCXX cuDF Example - Docker Build ===${NC}" + +# ----------------------------------------------------------------------------- +# Check prerequisites +# ----------------------------------------------------------------------------- +if ! command -v docker &> /dev/null; then + echo -e "${RED}Error: docker is not installed or not in PATH${NC}" + exit 1 +fi + +if ! docker info &> /dev/null; then + echo -e "${RED}Error: Docker daemon is not running${NC}" + exit 1 +fi + +# ----------------------------------------------------------------------------- +# Build Docker image +# ----------------------------------------------------------------------------- +echo -e "${GREEN}Building Docker image '${IMAGE_NAME}'...${NC}" +echo "This will:" +echo " 1. Set up a CUDA development environment" +echo " 2. Install Miniconda" +echo " 3. Create conda environment with libcudf, libucxx, rmm" +echo " 4. Build the example" +echo "" +echo "This may take 10-20 minutes on first build..." +echo "" + +cd "${SCRIPT_DIR}" +docker build ${DOCKER_ARGS} -t "${IMAGE_NAME}" . + +# ----------------------------------------------------------------------------- +# Done +# ----------------------------------------------------------------------------- +echo "" +echo -e "${GREEN}=== Docker Build Complete ===${NC}" +echo "" +echo "Image: ${IMAGE_NAME}" +echo "" +echo "To run the example:" +echo " docker run --gpus all --rm -it ${IMAGE_NAME} ./build/ucxx_cudf_example" +echo "" +echo "Run with custom parameters:" +echo " docker run --gpus all --rm -it ${IMAGE_NAME} ./build/ucxx_cudf_example -s 10000" +echo "" +echo "Interactive shell:" +echo " docker run --gpus all --rm -it ${IMAGE_NAME} bash" + diff --git a/ucxx_cudf_example/conda/ucxx_cudf_example.yml b/ucxx_cudf_example/conda/ucxx_cudf_example.yml new file mode 100644 index 0000000..39f2d5a --- /dev/null +++ b/ucxx_cudf_example/conda/ucxx_cudf_example.yml @@ -0,0 +1,19 @@ +name: ucxx_cudf_example +channels: + - rapidsai + - conda-forge + - nvidia +dependencies: + - libcudf>=25.08 + - libucxx>=0.40 + - rmm>=25.08 + - cmake>=3.26 + - ninja + - cuda-version>=12.2 + - cuda-toolkit # Provides CUDA runtime headers and CMake config + - gcc_linux-64>=13 + - gxx_linux-64>=13 + - spdlog + - fmt + - python>=3.10 + # Note: libcudf 25.x requires C++20 diff --git a/ucxx_cudf_example/src/ucxx_cudf_example.cpp b/ucxx_cudf_example/src/ucxx_cudf_example.cpp new file mode 100644 index 0000000..4098157 --- /dev/null +++ b/ucxx_cudf_example/src/ucxx_cudf_example.cpp @@ -0,0 +1,457 @@ +/** + * UCXX cuDF Example + * + * This example demonstrates how to: + * 1. Create a simple cuDF integer column using cudf::sequence + * 2. Send the column data directly from GPU memory using UCXX + * 3. Receive the data into GPU memory + * 4. Verify the received data matches (copying to host only for verification) + * + * Similar to ucxx/cpp/examples/basic.cpp but transfers cuDF column data + * directly between GPU buffers. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// UCXX headers +#include +#include +#include +#include + +// cuDF headers +#include +#include +#include +#include +#include +#include + +// RMM headers +#include +#include + +// ============================================================================= +// Column Utilities +// ============================================================================= + +/** + * @brief Copy column data from device to host (only used for verification/printing) + */ +std::vector columnToHost(cudf::column_view const& column) +{ + std::vector host_buffer(column.size()); + cudaMemcpy(host_buffer.data(), + column.data(), + column.size() * sizeof(int32_t), + cudaMemcpyDeviceToHost); + cudaStreamSynchronize(0); + return host_buffer; +} + +/** + * @brief Create a cuDF column from a device buffer + * + * @param device_buffer RMM device buffer containing the data (ownership transferred) + * @param num_elements Number of int32 elements in the buffer + */ +std::unique_ptr deviceBufferToColumn(rmm::device_buffer&& device_buffer, + cudf::size_type num_elements) +{ + return std::make_unique(cudf::data_type(cudf::type_id::INT32), + num_elements, + std::move(device_buffer), + rmm::device_buffer{}, // no null mask + 0); // null count +} + +/** + * @brief Allocate a device buffer for receiving column data + */ +rmm::device_buffer allocateDeviceBuffer(size_t size_bytes) +{ + return rmm::device_buffer(size_bytes, cudf::get_default_stream()); +} + +/** + * @brief Create a cuDF integer sequence column [0, 1, 2, ..., size-1] + */ +std::unique_ptr createSequenceColumn(cudf::size_type size) +{ + cudf::numeric_scalar init_scalar(0, true, cudf::get_default_stream()); + cudf::numeric_scalar step_scalar(1, true, cudf::get_default_stream()); + return cudf::sequence(size, init_scalar, step_scalar); +} + +/** + * @brief Print preview of column data (first/last few elements) + */ +void printColumnPreview(std::vector const& data, std::string const& label = "") +{ + if (!label.empty()) std::cout << label << " "; + + const int preview_count = std::min(5, static_cast(data.size())); + std::cout << "["; + + for (int i = 0; i < preview_count; ++i) { + std::cout << data[i]; + if (i < preview_count - 1) std::cout << ", "; + } + + if (static_cast(data.size()) > preview_count * 2) { + std::cout << ", ..."; + } + + if (static_cast(data.size()) > preview_count) { + std::cout << ", "; + int start = std::max(preview_count, static_cast(data.size()) - preview_count); + for (size_t i = start; i < data.size(); ++i) { + std::cout << data[i]; + if (i < data.size() - 1) std::cout << ", "; + } + } + std::cout << "]" << std::endl; +} + +/** + * @brief Verify two columns have identical data (copies to host for comparison) + */ +bool verifyColumnsMatch(cudf::column_view const& expected, cudf::column_view const& actual) +{ + if (expected.size() != actual.size()) { + std::cerr << "Size mismatch: expected " << expected.size() << ", got " << actual.size() + << std::endl; + return false; + } + + // Copy both to host for verification + auto expected_host = columnToHost(expected); + auto actual_host = columnToHost(actual); + + for (cudf::size_type i = 0; i < expected.size(); ++i) { + if (expected_host[i] != actual_host[i]) { + std::cerr << "Data mismatch at index " << i << ": expected " << expected_host[i] << ", got " + << actual_host[i] << std::endl; + return false; + } + } + return true; +} + +// ============================================================================= +// UCXX Communication Layer +// ============================================================================= + +/** + * @brief Manages a UCXX connection between a listener and client endpoint + * + * This class encapsulates the setup and teardown of UCXX communication, + * providing simple send/receive methods for transferring data. + * Supports both host and device memory transfers. + */ +class UCXXConnection { + public: + /** + * @brief Construct a new UCXX connection on the specified port + */ + explicit UCXXConnection(uint16_t port) : _port(port) + { + // Create UCXX context and worker + _context = ucxx::createContext({}, ucxx::Context::defaultFeatureFlags); + _worker = _context->createWorker(); + + // Setup listener + _listener = _worker->createListener(_port, listenerCallback, this); + + // Create client endpoint that connects to our own listener (loopback) + _client_endpoint = _worker->createEndpointFromHostname("127.0.0.1", _port, true); + + // Wait for connection to be established + while (!_server_endpoint) { + _worker->progress(); + } + + // Perform wireup exchange (using host memory for small control message) + performWireup(); + } + + ~UCXXConnection() = default; + + // Disable copy + UCXXConnection(UCXXConnection const&) = delete; + UCXXConnection& operator=(UCXXConnection const&) = delete; + + /** + * @brief Send data from server to client (supports device memory) + */ + void sendToClient(void const* data, size_t size, uint64_t tag) + { + // Ensure any pending GPU work is complete before sending + cudaStreamSynchronize(0); + auto request = _server_endpoint->tagSend(const_cast(data), size, ucxx::Tag{tag}); + waitForRequest(request); + } + + /** + * @brief Receive data on client from server (supports device memory) + */ + void recvOnClient(void* buffer, size_t size, uint64_t tag) + { + auto request = _client_endpoint->tagRecv(buffer, size, ucxx::Tag{tag}, ucxx::TagMaskFull); + waitForRequest(request); + // Ensure receive is complete before GPU uses the data + cudaStreamSynchronize(0); + } + + /** + * @brief Send data from client to server (supports device memory) + */ + void sendToServer(void const* data, size_t size, uint64_t tag) + { + cudaStreamSynchronize(0); + auto request = _client_endpoint->tagSend(const_cast(data), size, ucxx::Tag{tag}); + waitForRequest(request); + } + + /** + * @brief Receive data on server from client (supports device memory) + */ + void recvOnServer(void* buffer, size_t size, uint64_t tag) + { + auto request = _server_endpoint->tagRecv(buffer, size, ucxx::Tag{tag}, ucxx::TagMaskFull); + waitForRequest(request); + cudaStreamSynchronize(0); + } + + /** + * @brief Get the port this connection is using + */ + uint16_t port() const { return _port; } + + private: + uint16_t _port; + std::shared_ptr _context; + std::shared_ptr _worker; + std::shared_ptr _listener; + std::shared_ptr _server_endpoint; // Created from connection request + std::shared_ptr _client_endpoint; // Created by connecting to listener + + void waitForRequest(std::shared_ptr const& request) + { + while (!request->isCompleted()) { + _worker->progress(); + } + request->checkError(); + } + + void performWireup() + { + // Small exchange to let UCX identify capabilities (host memory for control) + int32_t wireup_send = 42; + int32_t wireup_recv = 0; + auto send_req = _server_endpoint->tagSend(&wireup_send, sizeof(int32_t), ucxx::Tag{0}); + auto recv_req = + _client_endpoint->tagRecv(&wireup_recv, sizeof(int32_t), ucxx::Tag{0}, ucxx::TagMaskFull); + waitForRequest(send_req); + waitForRequest(recv_req); + } + + static void listenerCallback(ucp_conn_request_h conn_request, void* arg) + { + auto* self = reinterpret_cast(arg); + + // Log connection + char ip_str[INET6_ADDRSTRLEN]; + char port_str[INET6_ADDRSTRLEN]; + ucp_conn_request_attr_t attr{}; + attr.field_mask = UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR; + ucp_conn_request_query(conn_request, &attr); + ucxx::utils::sockaddr_get_ip_port_str(&attr.client_address, ip_str, port_str, INET6_ADDRSTRLEN); + std::cout << "Connection established from " << ip_str << ":" << port_str << std::endl; + + // Create server endpoint from connection request + self->_server_endpoint = self->_listener->createEndpointFromConnRequest(conn_request, true); + } +}; + +// ============================================================================= +// Column Transfer API (Device-to-Device) +// ============================================================================= + +/** + * @brief Transfer a cuDF column over UCXX using device memory directly + * + * Sends data directly from the source column's device buffer and receives + * into a newly allocated device buffer. No host copies during transfer. + * + * @param conn The UCXX connection to use + * @param column The column to transfer (data stays on GPU) + * @param tag Message tag for this transfer + * @return New column created from received device data + */ +std::unique_ptr transferColumn(UCXXConnection& conn, + cudf::column_view const& column, + uint64_t tag = 1) +{ + size_t data_size = column.size() * sizeof(int32_t); + auto num_elements = column.size(); + + // Get pointer to source data on device + void const* send_ptr = column.data(); + + // Allocate receive buffer on device + auto recv_buffer = allocateDeviceBuffer(data_size); + void* recv_ptr = recv_buffer.data(); + + // Transfer directly between device buffers: server sends, client receives + conn.sendToClient(send_ptr, data_size, tag); + conn.recvOnClient(recv_ptr, data_size, tag); + + // Create new column from received device buffer + return deviceBufferToColumn(std::move(recv_buffer), num_elements); +} + +/** + * @brief Transfer a cuDF column and verify it matches the original + * + * Transfers data directly between GPU buffers using UCXX. + * Only copies to host for verification/printing. + * + * @param conn The UCXX connection to use + * @param column The column to transfer and verify + * @param tag Message tag for this transfer + * @param verbose Print progress messages + * @return true if transfer succeeded and data matches + */ +bool transferAndVerify(UCXXConnection& conn, + cudf::column_view const& column, + uint64_t tag = 1, + bool verbose = true) +{ + size_t data_size = column.size() * sizeof(int32_t); + + if (verbose) { + std::cout << "Transferring column: " << column.size() << " elements, " << data_size + << " bytes" << std::endl; + std::cout << " Transfer mode: GPU-to-GPU (device buffers)" << std::endl; + + // Copy to host only for display + std::cout << " Send data: "; + printColumnPreview(columnToHost(column)); + } + + // Transfer the column (device-to-device) + auto received = transferColumn(conn, column, tag); + + if (verbose) { + // Copy to host only for display + std::cout << " Recv data: "; + printColumnPreview(columnToHost(received->view())); + } + + // Verify by copying both to host and comparing + bool success = verifyColumnsMatch(column, received->view()); + + if (verbose) { + if (success) { + std::cout << " Verification: PASSED (" << column.size() << " elements match)" << std::endl; + } else { + std::cout << " Verification: FAILED" << std::endl; + } + } + + return success; +} + +// ============================================================================= +// Command Line Parsing +// ============================================================================= + +struct Args { + uint16_t port{12345}; + int32_t size{1000}; + bool help{false}; + + bool parse(int argc, char* const argv[]) + { + int c; + while ((c = getopt(argc, argv, "p:s:h")) != -1) { + switch (c) { + case 'p': + port = static_cast(atoi(optarg)); + break; + case 's': + size = atoi(optarg); + break; + case 'h': + default: + help = true; + return false; + } + } + return port > 0 && size > 0; + } + + static void printUsage() + { + std::cerr << "Usage: ucxx_cudf_example [options]\n" + << "\n" + << "Transfers a cuDF integer column via UCXX using GPU memory directly.\n" + << "Data is sent/received in device buffers; only copied to host for verification.\n" + << "\n" + << "Options:\n" + << " -p Port number (default: 12345)\n" + << " -s Number of elements (default: 1000)\n" + << " -h Show this help\n"; + } +}; + +// ============================================================================= +// Main +// ============================================================================= + +int main(int argc, char** argv) +{ + // Parse arguments + Args args; + if (!args.parse(argc, argv)) { + Args::printUsage(); + return args.help ? 0 : 1; + } + + std::cout << "=== UCXX cuDF Example (GPU-to-GPU Transfer) ===" << std::endl; + std::cout << "Port: " << args.port << ", Size: " << args.size << std::endl; + std::cout << std::endl; + + // Create test column on GPU + std::cout << "Creating cuDF sequence column [0, 1, 2, ..., " << (args.size - 1) << "] on GPU" + << std::endl; + auto column = createSequenceColumn(args.size); + + // Setup UCXX connection + std::cout << std::endl << "Setting up UCXX connection..." << std::endl; + UCXXConnection conn(args.port); + + // Transfer and verify (GPU-to-GPU, verify on host) + std::cout << std::endl; + bool success = transferAndVerify(conn, column->view()); + + std::cout << std::endl; + if (success) { + std::cout << "=== Example completed successfully ===" << std::endl; + return 0; + } else { + std::cout << "=== Example FAILED ===" << std::endl; + return 1; + } +} +