diff --git a/mk/spdk.common.mk b/mk/spdk.common.mk index 0c771f54e41..613d7378de2 100644 --- a/mk/spdk.common.mk +++ b/mk/spdk.common.mk @@ -321,7 +321,7 @@ LDFLAGS += -fsanitize=fuzzer-no-link SYS_LIBS += $(CONFIG_FUZZER_LIB) endif -SPDK_GIT_COMMIT := 0efce164f90d98de445e756559933f981650dc77 +SPDK_GIT_COMMIT := dce40dd4f63bfa77ebf8397768cd9131fee56505 ifneq (, $(SPDK_GIT_COMMIT)) COMMON_CFLAGS += -DSPDK_GIT_COMMIT=$(SPDK_GIT_COMMIT) endif diff --git a/mk/spdk.modules.mk b/mk/spdk.modules.mk index f11be8c4430..dfaad4f2f2a 100644 --- a/mk/spdk.modules.mk +++ b/mk/spdk.modules.mk @@ -74,7 +74,7 @@ endif ifeq ($(CONFIG_RBD),y) BLOCKDEV_MODULES_LIST += bdev_rbd -BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd +BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd -lstdc++ endif ifeq ($(CONFIG_DAOS),y) diff --git a/module/bdev/rbd/Makefile b/module/bdev/rbd/Makefile index 375dfaba213..782e69496b4 100644 --- a/module/bdev/rbd/Makefile +++ b/module/bdev/rbd/Makefile @@ -10,6 +10,7 @@ SO_VER := 8 SO_MINOR := 0 C_SRCS = bdev_rbd.c bdev_rbd_rpc.c +CXX_SRCS = bdev_rbd_spdk_context_wq.cpp LIBNAME = bdev_rbd SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map diff --git a/module/bdev/rbd/bdev_rbd.c b/module/bdev/rbd/bdev_rbd.c index 45e4709a803..533fee28c3a 100644 --- a/module/bdev/rbd/bdev_rbd.c +++ b/module/bdev/rbd/bdev_rbd.c @@ -6,6 +6,7 @@ #include "spdk/stdinc.h" #include "bdev_rbd.h" +#include "bdev_rbd_spdk_context_wq.h" #include #include @@ -28,6 +29,7 @@ static int bdev_rbd_count = 0; * global parameter to control CRC32C usage in RBD write operations. */ static bool g_rbd_with_crc32c = false; +static bool g_rbd_with_spdk_wq = false; struct bdev_rbd_pool_ctx { rados_t *cluster_p; @@ -75,6 +77,9 @@ struct bdev_rbd { int (*reservation_fn_cbk)(void *ns); char cluster_fsid[37]; + /* SPDK ContextWQ for this bdev */ + struct bdev_rbd_spdk_context_wq *spdk_context_wq; + }; struct bdev_rbd_io_channel { @@ -224,6 +229,16 @@ bdev_rbd_free(struct bdev_rbd *rbd) rbd_close(rbd->image); } + /* Clean up SPDK ContextWQ after RBD image is closed. + * This ensures no new I/O completions can occur after ContextWQ is destroyed. + * The drain() function in the destructor will wait for any pending messages + * to complete before the ContextWQ is actually destroyed. + */ + if (rbd->spdk_context_wq != NULL) { + bdev_rbd_spdk_context_wq_destroy(rbd->spdk_context_wq); + rbd->spdk_context_wq = NULL; + } + free(rbd->disk.name); free(rbd->rbd_name); free(rbd->user_id); @@ -488,11 +503,21 @@ bdev_rbd_init_context(void *arg) } assert(io_ctx != NULL); + if (g_rbd_with_spdk_wq) { + /* Find reactor thread, create SpdkContextWQ if available, then open with context_wq (NULL uses default AsioContextWQ) */ + struct spdk_thread *reactor_thread = bdev_rbd_find_reactor_thread(); + rbd->spdk_context_wq = bdev_rbd_spdk_context_wq_create_from_ioctx( + *io_ctx, reactor_thread); + } else { + rbd->spdk_context_wq = NULL; + SPDK_NOTICELOG("rbd_with_spdk_wq is disabled, using AsioContextWQ for RBD image %s/%s\n", + rbd->pool_name, rbd->rbd_name); + } if (rbd->rbd_read_only) { SPDK_DEBUGLOG(bdev_rbd, "Will open RBD image %s/%s as read-only\n", rbd->pool_name, rbd->rbd_name); - rc = rbd_open_read_only(*io_ctx, rbd->rbd_name, &rbd->image, NULL); + rc = rbd_open_read_only_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq); } else { - rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL); + rc = rbd_open_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq); } if (rc < 0) { SPDK_ERRLOG("Failed to open specified rbd device\n"); @@ -1948,3 +1973,16 @@ bdev_rbd_set_with_crc32c(bool enable) { g_rbd_with_crc32c = enable; } + +bool +bdev_rbd_get_with_spdk_wq(void) +{ + return g_rbd_with_spdk_wq; +} + +/** enable or disable SPDK ContextWQ for RBD operations */ +void +bdev_rbd_set_with_spdk_wq(bool enable) +{ + g_rbd_with_spdk_wq = enable; +} diff --git a/module/bdev/rbd/bdev_rbd.h b/module/bdev/rbd/bdev_rbd.h index 8ed607ce95b..f6f9529e2bc 100644 --- a/module/bdev/rbd/bdev_rbd.h +++ b/module/bdev/rbd/bdev_rbd.h @@ -96,4 +96,19 @@ bool bdev_rbd_get_with_crc32c(void); */ void bdev_rbd_set_with_crc32c(bool enable); +/** + * Get the current rbd_with_spdk_wq setting. + * + * \return true if SPDK ContextWQ is enabled, false otherwise + */ +bool bdev_rbd_get_with_spdk_wq(void); + +/** + * Set the rbd_with_spdk_wq parameter to enable/disable SPDK ContextWQ + * for RBD operations. + * + * \param enable true to enable SPDK ContextWQ, false to disable (uses AsioContextWQ) + */ +void bdev_rbd_set_with_spdk_wq(bool enable); + #endif /* SPDK_BDEV_RBD_H */ diff --git a/module/bdev/rbd/bdev_rbd_rpc.c b/module/bdev/rbd/bdev_rbd_rpc.c index db8530e5a28..7181bf5e19a 100644 --- a/module/bdev/rbd/bdev_rbd_rpc.c +++ b/module/bdev/rbd/bdev_rbd_rpc.c @@ -439,3 +439,53 @@ SPDK_RPC_REGISTER("bdev_rbd_wait_for_latest_osdmap", rpc_bdev_rbd_wait_for_lates SPDK_RPC_REGISTER("bdev_rbd_get_with_crc32c", rpc_bdev_rbd_get_with_crc32c, SPDK_RPC_RUNTIME) SPDK_RPC_REGISTER("bdev_rbd_set_with_crc32c", rpc_bdev_rbd_set_with_crc32c, SPDK_RPC_STARTUP) + +/** + * RPC function to get the current rbd_with_spdk_wq setting + */ +static void +rpc_bdev_rbd_get_with_spdk_wq(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct spdk_json_write_ctx *w; + + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq()); + spdk_jsonrpc_end_result(request, w); +} + +/** + * RPC function to set the rbd_with_spdk_wq parameter + */ +struct rpc_bdev_rbd_set_with_spdk_wq { + bool enable; +}; + +static const struct spdk_json_object_decoder rpc_bdev_rbd_set_with_spdk_wq_decoders[] = { + {"enable", offsetof(struct rpc_bdev_rbd_set_with_spdk_wq, enable), spdk_json_decode_bool}, +}; + +static void +rpc_bdev_rbd_set_with_spdk_wq(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct rpc_bdev_rbd_set_with_spdk_wq req = {}; + struct spdk_json_write_ctx *w; + + if (spdk_json_decode_object(params, rpc_bdev_rbd_set_with_spdk_wq_decoders, + SPDK_COUNTOF(rpc_bdev_rbd_set_with_spdk_wq_decoders), + &req)) { + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS, + "Missing or invalid enable parameter"); + return; + } + + bdev_rbd_set_with_spdk_wq(req.enable); + + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq()); + spdk_jsonrpc_end_result(request, w); +} + +SPDK_RPC_REGISTER("bdev_rbd_get_with_spdk_wq", rpc_bdev_rbd_get_with_spdk_wq, SPDK_RPC_RUNTIME) +SPDK_RPC_REGISTER("bdev_rbd_set_with_spdk_wq", rpc_bdev_rbd_set_with_spdk_wq, SPDK_RPC_STARTUP) diff --git a/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp b/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp new file mode 100644 index 00000000000..0ddfeb11423 --- /dev/null +++ b/module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp @@ -0,0 +1,255 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2025,2026 IBM, Inc. + * All rights reserved. + */ + +#include +#include +#include +#include +#include + +#include "bdev_rbd_spdk_context_wq.h" + +extern "C" { +#include "spdk/stdinc.h" +#include "spdk/thread.h" +#include "spdk/log.h" +#include "spdk/env.h" +#include "spdk_internal/event.h" +} + +/** + * Encapsulates the set of reactor threads for round-robin assignment. + */ +class ReactorThreadPool { +public: + /** Ensures reactor list exists (lock-free one-time init via CAS). */ + static void ensure_discovered() { + if (g_reactor_list.load(std::memory_order_acquire) != nullptr) { + return; + } + auto *p = new std::vector(); + discover_into(p); + std::vector *expected = nullptr; + if (!g_reactor_list.compare_exchange_strong(expected, p, std::memory_order_release)) { + delete p; + } + } + + static struct spdk_thread *get_next() { + std::vector *list = + g_reactor_list.load(std::memory_order_acquire); + if (list == nullptr || list->empty()) { + SPDK_ERRLOG("bdev_rbd: reactor thread pool is empty, no reactor threads available for SpdkContextWQ\n"); + return NULL; + } + size_t n = list->size(); + size_t idx = g_reactor_next.fetch_add(1, std::memory_order_relaxed) % n; + struct spdk_thread *t = (*list)[idx]; + const char *name = spdk_thread_get_name(t); + SPDK_NOTICELOG("bdev_rbd: next reactor thread=%p (id=%lu, name=%s, index=%zu/%zu)\n", + t, spdk_thread_get_id(t), name ? name : "NULL", idx, n); + return t; + } + +private: + static void discover_into(std::vector *vec) { + uint32_t lcore; + SPDK_ENV_FOREACH_CORE(lcore) { + struct spdk_reactor *reactor = spdk_reactor_get(lcore); + if (reactor == NULL || !reactor->flags.is_valid) { + continue; + } + if (reactor->thread_count == 0) { + continue; + } + struct spdk_lw_thread *lw_thread = TAILQ_FIRST(&reactor->threads); + if (lw_thread == NULL) { + continue; + } + struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread); + if (thread == NULL || spdk_thread_is_app_thread(thread)) { + continue; + } + const char *name = spdk_thread_get_name(thread); + SPDK_NOTICELOG("bdev_rbd: discovered reactor thread=%p (id=%lu, name=%s, index=%zu)\n", + thread, spdk_thread_get_id(thread), name ? name : "NULL", vec->size()); + vec->push_back(thread); + } + if (!vec->empty()) { + SPDK_NOTICELOG("bdev_rbd: reactor thread pool: discovered %zu reactor(s) for round-robin\n", + vec->size()); + } + } + + static std::atomic *> g_reactor_list; + static std::atomic g_reactor_next; +}; + +std::atomic *> ReactorThreadPool::g_reactor_list{nullptr}; +std::atomic ReactorThreadPool::g_reactor_next{0}; + +namespace librbd { +namespace asio { + +SpdkContextWQ::SpdkContextWQ(void* cct, struct spdk_thread* reactor_thread) + : ContextWQ(cct), m_reactor_thread(reactor_thread) { + assert(reactor_thread != nullptr); +} + +SpdkContextWQ::~SpdkContextWQ() { + // Set shutdown flag to reject new operations + m_shutdown.store(true, std::memory_order_release); + + // Wait for all pending messages to complete + drain(); + + // Verify all messages are processed + uint64_t queued = m_queued_ops.load(std::memory_order_acquire); + if (queued > 0) { + SPDK_ERRLOG("SpdkContextWQ::~SpdkContextWQ: Warning: %lu operations still pending during destruction\n", queued); + } +} + +void SpdkContextWQ::queue(Context *ctx, int r) { + // Check if shutdown is in progress + if (m_shutdown.load(std::memory_order_acquire)) { + // ContextWQ is shutting down, complete with error + SPDK_ERRLOG("SpdkContextWQ::queue: ContextWQ is shutting down, rejecting new operation\n"); + rbd_context_complete(ctx, -ESHUTDOWN); + return; + } + + // Increment queued operations counter + m_queued_ops.fetch_add(1, std::memory_order_acq_rel); + + // Allocate message structure to pass context and return value + auto msg = new SpdkContextMsg{ctx, r, this}; + + // Schedule work on the SPDK reactor thread + int rc = spdk_thread_send_msg(m_reactor_thread, spdk_msg_handler, msg); + if (rc != 0) { + // If message send failed, we need to clean up and complete with error + m_queued_ops.fetch_sub(1, std::memory_order_acq_rel); + delete msg; + // Complete context with error using public API + SPDK_ERRLOG("SpdkContextWQ::queue: calling rbd_context_complete(ctx=%p, r=%d) on error path\n", ctx, rc); + rbd_context_complete(ctx, rc); + } +} + +void SpdkContextWQ::spdk_msg_handler(void *arg) { + auto msg = static_cast(arg); + + if (msg == nullptr) { + SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg is nullptr\n"); + return; + } + + if (msg->wq == nullptr) { + SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg->wq is nullptr\n"); + delete msg; + return; + } + + if (msg->ctx == nullptr) { + SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg->ctx is nullptr\n"); + delete msg; + return; + } + + // Execute the context callback on the reactor thread + rbd_context_complete(msg->ctx, msg->r); + + uint64_t queued_ops_before = msg->wq->m_queued_ops.load(std::memory_order_acquire); + if (queued_ops_before == 0) { + SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: WARNING: m_queued_ops is 0, expected > 0\n"); + } + + // Update queued ops counter + msg->wq->m_queued_ops.fetch_sub(1, std::memory_order_acq_rel); + + // Free the message structure + delete msg; +} + +void SpdkContextWQ::drain() { + // Wait for all pending messages to be processed. + // Note: This relies on the SPDK reactor thread to be actively polling. + // TODO: conf parameter, non busy wait implementation + const int max_iterations = 100000; // 10 seconds at 100us per iteration + int iterations = 0; + + // Wait for all queued operations to complete + while (m_queued_ops.load(std::memory_order_acquire) > 0 && + iterations < max_iterations) { + // Yield to allow SPDK reactor thread to process messages + spdk_delay_us(100); + ++iterations; + } + + uint64_t queued = m_queued_ops.load(std::memory_order_acquire); + if (queued > 0) { + SPDK_ERRLOG("SpdkContextWQ::drain: Incomplete drain - queued_ops=%lu after %d iterations\n", + queued, iterations); + } +} + +} // namespace asio +} // namespace librbd + +// C API implementation +extern "C" { + +struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rados_ioctx_t io_ctx, struct spdk_thread* reactor_thread) +{ + if (io_ctx == NULL || reactor_thread == NULL) { + return NULL; + } + + // Convert rados_ioctx_t to librados::IoCtx to get CephContext + librados::IoCtx ioctx; + librados::IoCtx::from_rados_ioctx_t(io_ctx, ioctx); + void* cct_ptr = ioctx.cct(); + + if (cct_ptr == NULL) { + SPDK_ERRLOG("Failed to get CephContext from rados_ioctx_t\n"); + return NULL; + } + + // Create SpdkContextWQ + uint64_t thread_id = spdk_thread_get_id(reactor_thread); + const char *thread_name = spdk_thread_get_name(reactor_thread); + try { + auto wq = new librbd::asio::SpdkContextWQ(cct_ptr, reactor_thread); + // Cast to opaque struct pointer for type safety + struct bdev_rbd_spdk_context_wq* result = reinterpret_cast(wq); + SPDK_NOTICELOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Successfully created SpdkContextWQ=%p with reactor thread=%p (id=%lu, name=%s)\n", + result, reactor_thread, thread_id, thread_name ? thread_name : "NULL"); + return result; + } catch (...) { + SPDK_ERRLOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Failed to create SpdkContextWQ with reactor thread=%p (id=%lu, name=%s)\n", + reactor_thread, thread_id, thread_name ? thread_name : "NULL"); + return NULL; + } +} + +void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq) +{ + if (context_wq == NULL) { + return; + } + + // Cast back to SpdkContextWQ and delete + auto wq = reinterpret_cast(context_wq); + delete wq; +} + +struct spdk_thread* bdev_rbd_find_reactor_thread(void) +{ + ReactorThreadPool::ensure_discovered(); + return ReactorThreadPool::get_next(); +} + +} // extern "C" diff --git a/module/bdev/rbd/bdev_rbd_spdk_context_wq.h b/module/bdev/rbd/bdev_rbd_spdk_context_wq.h new file mode 100644 index 00000000000..e905715066f --- /dev/null +++ b/module/bdev/rbd/bdev_rbd_spdk_context_wq.h @@ -0,0 +1,110 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (C) 2025,2026 IBM, Inc. + * All rights reserved. + */ + +#ifndef SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H +#define SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H + +// Forward declaration for SPDK thread +struct spdk_thread; + +// Forward declaration for rbd_image_t (defined in ) +// We use void* here to avoid including librbd.h in the header +#ifndef rbd_image_t +typedef void* rbd_image_t; +#endif + +// Forward declaration for rados_ioctx_t (defined in ) +#ifndef rados_ioctx_t +typedef void* rados_ioctx_t; +#endif + +// Opaque type for SpdkContextWQ - provides type safety in C code +// The actual implementation is C++ and is hidden behind this opaque pointer +struct bdev_rbd_spdk_context_wq; + +// C API for creating SpdkContextWQ from C code (bdev_rbd.c) +// These declarations are available to both C and C++ code +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Create a SpdkContextWQ from rados_ioctx_t and SPDK reactor thread. + * The returned pointer must be freed by calling bdev_rbd_spdk_context_wq_destroy(). + * + * @param io_ctx RADOS I/O context (rados_ioctx_t) + * @param reactor_thread SPDK reactor thread + * @return Pointer to SpdkContextWQ, or NULL on error + */ +struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rados_ioctx_t io_ctx, struct spdk_thread* reactor_thread); + +/** + * Destroy a SpdkContextWQ created by bdev_rbd_spdk_context_wq_create_from_ioctx(). + * + * @param context_wq Pointer to SpdkContextWQ + */ +void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq); + +/** + * Return the next reactor thread for SpdkContextWQ (round-robin). + * Lazy-initializes the reactor list on first call (lock-free, atomic CAS). + * Each call returns a different reactor so RBD images are balanced across reactors. + * Returns NULL if no reactor threads were discovered (error is logged). + */ +struct spdk_thread* bdev_rbd_find_reactor_thread(void); + +#ifdef __cplusplus +} + +// C++ class definition - only available when compiling C++ code +#include + +namespace librbd { +namespace asio { + +/** + * ContextWQ implementation that schedules work on SPDK reactor threads + */ +class SpdkContextWQ : public ContextWQ { +public: + explicit SpdkContextWQ(void* cct, struct spdk_thread* reactor_thread); + ~SpdkContextWQ(); + + void drain() override; + + /** + * Queue a context to be executed on the SPDK reactor thread. + + * @param ctx Context to execute + * @param r Return value to pass to context + */ + void queue(Context *ctx, int r = 0) override; + +private: + struct spdk_thread* m_reactor_thread; + std::atomic m_shutdown{false}; // Flag to prevent new operations during shutdown + + /** + * Message handler for SPDK thread messages. + * Executes the context callback on the reactor thread. + */ + static void spdk_msg_handler(void *arg); + + /** + * Internal structure to pass context and return value through SPDK message. + */ + struct SpdkContextMsg { + Context* ctx; + int r; + SpdkContextWQ* wq; // Pointer to ContextWQ (protected by lifetime guard via m_queued_ops) + }; +}; + +} // namespace asio +} // namespace librbd + +#endif // __cplusplus + +#endif /* SPDK_BDEV_RBD_SPDK_CONTEXT_WQ_H */ diff --git a/python/spdk/rpc/bdev.py b/python/spdk/rpc/bdev.py index ac8ba045c17..9b60791760a 100644 --- a/python/spdk/rpc/bdev.py +++ b/python/spdk/rpc/bdev.py @@ -1167,6 +1167,24 @@ def bdev_rbd_set_with_crc32c(client, enable): return client.call('bdev_rbd_set_with_crc32c', params) +def bdev_rbd_get_with_spdk_wq(client): + """Get SPDK ContextWQ usage in RBD operations. + Returns: + True if SPDK ContextWQ is enabled, False otherwise + """ + return client.call('bdev_rbd_get_with_spdk_wq') + + +def bdev_rbd_set_with_spdk_wq(client, enable): + """Set SPDK ContextWQ usage in RBD operations. + Args: + enable: enable or disable SPDK ContextWQ (False uses AsioContextWQ) + """ + params = dict() + params['enable'] = enable + return client.call('bdev_rbd_set_with_spdk_wq', params) + + @deprecated_method def bdev_error_create(client, base_name, uuid=None): """Construct an error injection block device.