From 8c43d1ba42ef3c3d312bd357e17c9031e79f7fa3 Mon Sep 17 00:00:00 2001 From: wsa Date: Fri, 15 Aug 2025 13:32:04 +0800 Subject: [PATCH] support BRPOP/BLPOP command --- src/tendisplus/commands/command_test.cpp | 210 ++++++++++++++++++++ src/tendisplus/commands/list.cpp | 126 ++++++++++++ src/tendisplus/network/network.cpp | 68 +++++++ src/tendisplus/network/network.h | 20 ++ src/tendisplus/network/worker_pool.cpp | 35 ++++ src/tendisplus/network/worker_pool.h | 34 ++++ src/tendisplus/network/worker_pool_test.cpp | 32 +++ src/tendisplus/server/server_entry.cpp | 2 + src/tendisplus/server/server_entry.h | 205 +++++++++++++++++++ src/tendisplus/server/session.h | 4 + src/tendisplus/utils/status.h | 11 +- 11 files changed, 743 insertions(+), 4 deletions(-) diff --git a/src/tendisplus/commands/command_test.cpp b/src/tendisplus/commands/command_test.cpp index 4dc9b51f..1599e689 100644 --- a/src/tendisplus/commands/command_test.cpp +++ b/src/tendisplus/commands/command_test.cpp @@ -1392,6 +1392,216 @@ TEST(Command, testObject) { #endif } +void testBrpop(std::shared_ptr svr) { + asio::io_context ioContext; + asio::ip::tcp::socket socket(ioContext), socket1(ioContext); + NetSession sess(svr, std::move(socket), 1, false, nullptr, nullptr); + sess.setArgs({"lpush", "list", "a"}); + auto expect = Command::runSessionCmd(&sess); + EXPECT_TRUE(expect.ok()); + sess.setArgs({"brpop", "list", "10"}); + expect = Command::runSessionCmd(&sess); + EXPECT_TRUE(expect.ok()); +} + +TEST(Command, brpop) { + const auto guard = MakeGuard([] { destroyEnv(); }); + + EXPECT_TRUE(setupEnv()); + + auto cfg = makeServerParam(); + auto server = makeServerEntry(cfg); + + testBrpop(server); + +#ifndef _WIN32 + server->stop(); + EXPECT_EQ(server.use_count(), 1); +#endif +} + +void testBlockCommand(std::shared_ptr svr) { + asio::io_context ioContext; + asio::ip::tcp::socket socket(ioContext), socket1(ioContext), + socket2(ioContext); + auto sess = std::make_shared( + svr, std::move(socket), 1, false, nullptr, nullptr); + auto sess1 = std::make_shared( + svr, std::move(socket1), 2, false, nullptr, nullptr); + auto sess2 = std::make_shared( + svr, std::move(socket2), 3, false, nullptr, nullptr); + svr->addSession(sess); + svr->addSession(sess1); + svr->addSession(sess2); + { + sess->setArgs({"blpop", "list1", "1"}); + auto expect = Command::runSessionCmd(sess.get()); + EXPECT_EQ(sess->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + EXPECT_EQ(sess->isBlocked(), true); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + EXPECT_EQ(sess->isBlocked(), true); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + EXPECT_EQ(sess->isBlocked(), false); + } + { + sess->setArgs({"blpop", "list1", "0"}); + auto expect = Command::runSessionCmd(sess.get()); + EXPECT_EQ(sess->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess1->setArgs({"rpush", "list1", "a"}); + expect = Command::runSessionCmd(sess1.get()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(sess->isBlocked(), false); + } + { + sess2->setArgs({"brpop", "list1", "0"}); + auto expect = Command::runSessionCmd(sess2.get()); + EXPECT_EQ(sess2->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess->setArgs({"brpop", "list1", "0"}); + expect = Command::runSessionCmd(sess.get()); + EXPECT_EQ(sess->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + + svr->endSession(sess2->id()); + sess1->setArgs({"lpush", "list1", "a"}); + expect = Command::runSessionCmd(sess1.get()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(sess->isBlocked(), false); + } +} + +TEST(Command, testBlockCommand) { + const auto guard = MakeGuard([] { destroyEnv(); }); + + EXPECT_TRUE(setupEnv()); + + auto cfg = makeServerParam(); + auto server = makeServerEntry(cfg); + testBlockCommand(server); + +#ifndef _WIN32 + server->stop(); + EXPECT_EQ(server.use_count(), 1); +#endif +} + +TEST(Command, BlockCommand) { + const auto guard = MakeGuard([] { destroyEnv(); }); + + EXPECT_TRUE(setupEnv()); + + auto cfg = makeServerParam(); + auto server = makeServerEntry(cfg); + asio::io_context ioContext; + asio::ip::tcp::socket socket(ioContext), socket1(ioContext), + socket2(ioContext); + auto sess = std::make_shared( + server, std::move(socket), 1, false, nullptr, nullptr); + auto sess1 = std::make_shared( + server, std::move(socket1), 2, false, nullptr, nullptr); + auto sess2 = std::make_shared( + server, std::move(socket2), 3, false, nullptr, nullptr); + server->addSession(sess); + server->addSession(sess1); + server->addSession(sess2); + // test blpop case 1: block command nerver block permanently due to lost + // wakeup + for (int i = 0; i < 20; i++) { + sess1->setArgs({"brpop", "list1", "list2", "0"}); + auto expect = Command::runSessionCmd(sess1.get()); + EXPECT_EQ(sess1->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess2->setArgs({"brpop", "list1", "0"}); + expect = Command::runSessionCmd(sess2.get()); + EXPECT_EQ(sess2->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess->setArgs({"lpush", "list2", "b"}); + expect = Command::runSessionCmd(sess.get()); + sess->setArgs({"lpush", "list1", "a"}); + expect = Command::runSessionCmd(sess.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + EXPECT_EQ(sess1->isBlocked(), false); + EXPECT_EQ(sess2->isBlocked(), false); + } + // test blpop case 2: block command nerver block session on the same key twice + for (int i = 0; i < 20; i++) { + sess1->setArgs({"brpop", "list1", "list1", "0"}); + auto expect = Command::runSessionCmd(sess1.get()); + EXPECT_EQ(sess1->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess2->setArgs({"brpop", "list1", "0"}); + expect = Command::runSessionCmd(sess2.get()); + EXPECT_EQ(sess2->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + sess->setArgs({"lpush", "list1", "a", "b"}); + expect = Command::runSessionCmd(sess.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(sess1->isBlocked(), false); + EXPECT_EQ(sess2->isBlocked(), false); + } + // test blpop case 3: block command will be waked up by any key which is in + // the key list + for (int i = 0; i < 20; i++) { + sess1->setArgs({"brpop", "list1", "list2", "0"}); + auto expect = Command::runSessionCmd(sess1.get()); + EXPECT_EQ(sess1->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + auto key = genRand() % 2 ? "list1" : "list2"; + sess->setArgs({"lpush", key, "a"}); + expect = Command::runSessionCmd(sess.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(sess1->isBlocked(), false); + } + // test blpop case 4: block command will not be waked up by any key when key + // is was poped + for (int i = 0; i < 20; i++) { + sess1->setArgs({"brpop", "list1", "list2", "0"}); + auto expect = Command::runSessionCmd(sess1.get()); + EXPECT_EQ(sess1->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + auto key = genRand() % 2 ? "list1" : "list2"; + { + std::lock_guard lock(sess1->_mtx); + sess->setArgs({"lpush", key, "a"}); + expect = Command::runSessionCmd(sess.get()); + sess->setArgs({"lpop", key}); + expect = Command::runSessionCmd(sess.get()); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(sess1->isBlocked(), true); + sess->setArgs({"lpush", key, "a"}); + expect = Command::runSessionCmd(sess.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(sess1->isBlocked(), false); + } + // test blpop case 5: block command will be timeout or wakeup + for (int i = 0; i < 20; i++) { + sess1->setArgs({"brpop", "list1", "list2", "1"}); + auto expect = Command::runSessionCmd(sess1.get()); + EXPECT_EQ(sess1->isBlocked(), true); + EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD); + auto key = genRand() % 2 ? "list1" : "list2"; + { + std::lock_guard lock(sess1->_mtx); + sess->setArgs({"lpush", key, "a"}); + expect = Command::runSessionCmd(sess.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(995)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(sess1->isBlocked(), false); + sess->setArgs({"lpop", key}); + expect = Command::runSessionCmd(sess.get()); + } +#ifndef _WIN32 + server->stop(); + EXPECT_EQ(server.use_count(), 1); +#endif +} + + void testRenameCommand(std::shared_ptr svr) { asio::io_context ioContext; asio::ip::tcp::socket socket(ioContext), socket1(ioContext); diff --git a/src/tendisplus/commands/list.cpp b/src/tendisplus/commands/list.cpp index f5bbc20d..541e4a6b 100644 --- a/src/tendisplus/commands/list.cpp +++ b/src/tendisplus/commands/list.cpp @@ -295,6 +295,131 @@ class RPopCommand : public ListPopWrapper { RPopCommand() : ListPopWrapper(ListPos::LP_TAIL, "wF") {} } rpopCommand; +class BlockPopWrapper : public Command { + public: + explicit BlockPopWrapper(ListPos pos, const char* sflags) + : Command(pos == ListPos::LP_HEAD ? "blpop" : "brpop", sflags), _pos(pos) {} + + ssize_t arity() const { + return -2; + } + + int32_t firstkey() const { + return 1; + } + + int32_t lastkey() const { + return 1; + } + + int32_t keystep() const { + return 1; + } + + Expected tryPop(Session* sess, const std::string& key) { + SessionCtx* pCtx = sess->getCtx(); + INVARIANT(pCtx != nullptr); + + auto server = sess->getServerEntry(); + auto expdb = server->getSegmentMgr()->getDbWithKeyLock( + sess, key, mgl::LockMode::LOCK_X); + if (!expdb.ok()) { + return expdb.status(); + } + Expected rv = + Command::expireKeyIfNeeded(sess, key, RecordType::RT_LIST_META); + if (!rv.ok()) { + return rv.status(); + } + + // record exists + RecordKey metaRk(expdb.value().chunkId, + pCtx->getDbId(), + RecordType::RT_LIST_META, + key, + ""); + PStore kvstore = expdb.value().store; + + auto ptxn = sess->getCtx()->createTransaction(kvstore); + if (!ptxn.ok()) { + return ptxn.status(); + } + Expected s1 = + genericPop(sess, kvstore, ptxn.value(), metaRk, rv, _pos); + if (!s1.ok()) { + return s1.status(); + } + auto s = sess->getCtx()->commitTransaction(ptxn.value()); + if (s.ok()) { + std::stringstream ss; + Command::fmtMultiBulkLen(ss, 2); + Command::fmtBulk(ss, key); + Command::fmtBulk(ss, s1.value()); + return ss.str(); + } + return s.status(); + } + + Expected tryPop(Session* sess, + const std::vector& keys) { + for (const auto& key : keys) { + auto status = tryPop(sess, key); + if (status.ok()) { + return status; + } + } + return {ErrorCodes::ERR_NOTFOUND, "not found"}; + } + + Expected run(Session* sess) final { + const std::vector& args = sess->getArgs(); + std::vector keys; + for (size_t i = 1; i + 1 < args.size(); i++) { + keys.push_back(args[i]); + } + Expected timeout = tendisplus::stod(args.back()); + if (!timeout.ok()) { + return {timeout.status().code(), "timeout is not a valid float"}; + } + auto status = tryPop(sess, keys); + if (status.ok()) { + return status; + } + startBlocking(sess, keys, timeout.value()); + return {ErrorCodes::ERR_BLOCKCMD, "block command"}; + } + + void startBlocking(Session* sess, + const std::vector& keys, + double timeout) { + auto nSess = dynamic_cast(sess); + if (!nSess) { + return; + } + auto executor = [this, nSess](const std::string& key) { + return tryPop(nSess, key); + }; + + auto duration_sec = std::chrono::duration(timeout); + auto microsec = + std::chrono::duration_cast(duration_sec); + nSess->pauseSession(std::move(executor), keys, microsec); + } + + private: + ListPos _pos; +}; + +class BLPopCommand : public BlockPopWrapper { + public: + BLPopCommand() : BlockPopWrapper(ListPos::LP_HEAD, "wF") {} +} blpopCommand; + +class BRPopCommand : public BlockPopWrapper { + public: + BRPopCommand() : BlockPopWrapper(ListPos::LP_TAIL, "wF") {} +} brpopCommand; + class ListPushWrapper : public Command { public: explicit ListPushWrapper(const std::string& name, @@ -368,6 +493,7 @@ class ListPushWrapper : public Command { } auto s = sess->getCtx()->commitTransaction(ptxn.value()); if (s.ok()) { + server->notifyKeyAvailable(key, valargs.size()); return s1.value(); } else if (s.status().code() != ErrorCodes::ERR_COMMIT_RETRY) { return s.status(); diff --git a/src/tendisplus/network/network.cpp b/src/tendisplus/network/network.cpp index dfbdfed0..26790232 100644 --- a/src/tendisplus/network/network.cpp +++ b/src/tendisplus/network/network.cpp @@ -562,6 +562,74 @@ void NetSession::schedule() { _server->schedule([this, self]() { stepState(); }, _ioCtxId); } +void NetSession::addBlockTimer(std::chrono::microseconds timeout) { + if (!timeout.count() || !_isBlocked.load(std::memory_order_acquire)) + return; + auto self(shared_from_this()); + auto timeout_task = [this, self]() { + std::lock_guard lk(_mtx); + if (!_isBlocked.load(std::memory_order_relaxed)) + return; + _isBlocked.store(false, std::memory_order_release); + _server->unblockSessionOnKeys(this, _blockedKeys); + clearBlockStatus(); + setResponse(Command::fmtNull()); + drainRsp(); + }; + _timerId = _server->schedule_at(std::move(timeout_task), _ioCtxId, timeout); +} + +void NetSession::cancelTimer() { + if (_timerId.first == UINT32_MAX) { + return; + } + _server->cancelTimer(_timerId.first, _timerId.second); + _timerId = {UINT32_MAX, 0}; +} + +void NetSession::clearBlockStatus() { + _server->unblockSessionOnKeys(this, _blockedKeys); + _blockedKeys.clear(); + cancelTimer(); +} + +void NetSession::setBlockingCompletionCb( + std::function(const std::string&)> cb) { + _blocking_completion_cb = std::move(cb); +} + +bool NetSession::isBlocked() const { + return _isBlocked.load(std::memory_order_acquire); +} + +void NetSession::pauseSession( + std::function(const std::string&)> cb, + const std::vector& keys, + std::chrono::microseconds timeout) { + _isBlocked.store(true, std::memory_order_release); + _server->blockSessionOnKeys(this, keys); + _blockedKeys = keys; + _blocking_completion_cb = std::move(cb); + addBlockTimer(timeout); +} + +Expected NetSession::resumeSession(const std::string& key) { + std::lock_guard lk(_mtx); + if (!_isBlocked.load(std::memory_order_relaxed)) { + return {ErrorCodes::ERR_FINISHCMD, "session not blocked"}; + } + auto status = _blocking_completion_cb(key); + if (!status.ok()) { + return status; + } + _isBlocked.store(false, std::memory_order_release); + setResponse(std::move(status.value())); + drainRsp(); + clearBlockStatus(); + return status; +} + + asio::ip::tcp::socket NetSession::borrowConn() { return std::move(_sock); } diff --git a/src/tendisplus/network/network.h b/src/tendisplus/network/network.h index 16dcfd9e..154b1a5a 100644 --- a/src/tendisplus/network/network.h +++ b/src/tendisplus/network/network.h @@ -191,6 +191,12 @@ class NetSession : public Session { const std::vector& getArgs() const; void setArgs(const std::vector&); + bool isBlocked() const override; + void pauseSession(std::function(const std::string&)>, + const std::vector&, + std::chrono::microseconds); + Expected resumeSession(const std::string&); + enum class State { Created, DrainReqNet, @@ -233,6 +239,7 @@ class NetSession : public Session { private: FRIEND_TEST(NetSession, drainReqInvalid); FRIEND_TEST(NetSession, Completed); + FRIEND_TEST(Command, BlockCommand); FRIEND_TEST(Command, common); friend class NoSchedNetSession; @@ -245,6 +252,12 @@ class NetSession : public Session { // utils to shift parsed partial params from _queryBuf void shiftQueryBuf(ssize_t start, ssize_t end); + void cancelTimer(); + void clearBlockStatus(); + void addBlockTimer(std::chrono::microseconds); + void setBlockingCompletionCb( + std::function(const std::string&)>); + protected: uint64_t _connId; bool _closeAfterRsp; @@ -284,6 +297,13 @@ class NetSession : public Session { bool _haveExceedSoftLimit; const std::chrono::steady_clock::time_point _firstTimePoint; std::chrono::steady_clock::time_point _softLimitReachedTime; + + std::vector _blockedKeys; + std::pair _timerId; + std::atomic _isBlocked{false}; + std::function(const std::string&)> + _blocking_completion_cb; + mutable std::mutex _mtx; }; } // namespace tendisplus diff --git a/src/tendisplus/network/worker_pool.cpp b/src/tendisplus/network/worker_pool.cpp index 0ec84bbd..ade8e6be 100644 --- a/src/tendisplus/network/worker_pool.cpp +++ b/src/tendisplus/network/worker_pool.cpp @@ -126,6 +126,7 @@ void WorkerPool::stop() { LOG(INFO) << "workerPool begins to stop..."; _isRunning.store(false, std::memory_order_relaxed); _ioCtx->stop(); + clear_all_timers(); for (auto& t : _threads) { t.second.join(); } @@ -225,4 +226,38 @@ void WorkerPool::resizeDecrease(size_t size) { } } +uint64_t WorkerPool::schedule_timer(std::function cb, + std::chrono::microseconds timeout) { + if (!_isRunning) + return 0; + + const uint64_t timer_id = ++_idGenerator; + auto timer_handler = [this, timer_id, cb = std::move(cb), timeout] { + auto timer = std::make_shared(*_ioCtx); + timer->expires_after(timeout); + // store timer + { + std::lock_guard lock(_timersMutex); + _activeTimers[timer_id] = timer; + } + auto task = + [this, timer_id, cb = std::move(cb)](const asio::error_code& ec) mutable { + // cancled or stopped + if (ec == asio::error::operation_aborted || !_isRunning.load()) + return; + // remove timer + { + std::lock_guard lock(_timersMutex); + _activeTimers.erase(timer_id); + } + + // schedule task + schedule([cb = std::move(cb)] { cb(); }); + }; + timer->async_wait(std::move(task)); + }; + asio::post(*_ioCtx, std::move(timer_handler)); + return timer_id; +} + } // namespace tendisplus diff --git a/src/tendisplus/network/worker_pool.h b/src/tendisplus/network/worker_pool.h index 3142d4b1..b10fd090 100644 --- a/src/tendisplus/network/worker_pool.h +++ b/src/tendisplus/network/worker_pool.h @@ -10,10 +10,12 @@ #include #include #include +#include #include #include #include "asio.hpp" // NOLINT(build/include_subdir) +#include "asio/steady_timer.hpp" #include "tendisplus/server/server_params.h" #include "tendisplus/utils/atomic_utility.h" @@ -88,6 +90,15 @@ class WorkerPool { std::string getName() const { return _name; } + // add a timer + uint64_t timer_add(std::function cb, + const std::chrono::microseconds& timeout) { + return schedule_timer(std::move(cb), timeout); + } + + void timer_cancel(uint64_t timer_id) { + asio::post(*_ioCtx, [this, timer_id] { cancel_timer(timer_id); }); + } private: void consumeTasks(size_t idx); @@ -100,6 +111,29 @@ class WorkerPool { std::shared_ptr _matrix; std::atomic _idGenerator; std::map _threads; + uint64_t schedule_timer(std::function cb, + std::chrono::microseconds timeout); + // cancel a timer + void cancel_timer(uint64_t timer_id) { + std::lock_guard lock(_timersMutex); + auto it = _activeTimers.find(timer_id); + if (it != _activeTimers.end()) { + it->second->cancel(); + _activeTimers.erase(it); + } + } + + // clear all timers + void clear_all_timers() { + std::lock_guard lock(_timersMutex); + for (auto& [id, timer] : _activeTimers) { + timer->cancel(); + } + _activeTimers.clear(); + } + mutable std::mutex _timersMutex; + using Timer = asio::steady_timer; + std::unordered_map> _activeTimers; }; } // namespace tendisplus diff --git a/src/tendisplus/network/worker_pool_test.cpp b/src/tendisplus/network/worker_pool_test.cpp index d42e7db9..1b82be10 100644 --- a/src/tendisplus/network/worker_pool_test.cpp +++ b/src/tendisplus/network/worker_pool_test.cpp @@ -1,6 +1,8 @@ // Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. // Please refer to the license text that comes with this tendis open source // project for additional information. +#include +#include #include "gtest/gtest.h" @@ -72,3 +74,33 @@ TEST(Workerpool, schedule) { t.join(); auto guard = tendisplus::MakeGuard([]() { tendisplus::destroyEnv(); }); } + +TEST(Workerpool, addTimer) { + auto matrix = std::make_shared(); + auto pool = std::make_shared("test-pool", matrix); + pool->startup(3); + + std::atomic val{5}; + pool->timer_add([&val]() { val.store(10, std::memory_order_seq_cst); }, + std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(val.load(std::memory_order_seq_cst), 10); + pool->stop(); + auto guard = tendisplus::MakeGuard([]() { tendisplus::destroyEnv(); }); +} + +TEST(Workerpool, cancelTimer) { + auto matrix = std::make_shared(); + auto pool = std::make_shared("test-pool", matrix); + std::thread t([&pool]() { pool->startup(3); }); + std::atomic val{1}; + auto timerId = + pool->timer_add([&val]() { val.store(42, std::memory_order_seq_cst); }, + std::chrono::seconds(2)); + pool->timer_cancel(timerId); + std::this_thread::sleep_for(std::chrono::seconds(3)); + ASSERT_EQ(val.load(std::memory_order_seq_cst), 1); + pool->stop(); + t.join(); + auto guard = tendisplus::MakeGuard([]() { tendisplus::destroyEnv(); }); +} diff --git a/src/tendisplus/server/server_entry.cpp b/src/tendisplus/server/server_entry.cpp index f3b02ca6..965ed2da 100644 --- a/src/tendisplus/server/server_entry.cpp +++ b/src/tendisplus/server/server_entry.cpp @@ -1578,6 +1578,8 @@ bool ServerEntry::processRequest(Session* sess) { auto expect = Command::runSessionCmd(sess); if (!expect.ok()) { + if (expect.status().code() == ErrorCodes::ERR_BLOCKCMD) + return true; auto s = sess->setResponse(Command::fmtErr(expect.status().toString())); if (!s.ok()) { return false; diff --git a/src/tendisplus/server/server_entry.h b/src/tendisplus/server/server_entry.h index cc9498a6..411c970d 100644 --- a/src/tendisplus/server/server_entry.h +++ b/src/tendisplus/server/server_entry.h @@ -5,6 +5,7 @@ #ifndef SRC_TENDISPLUS_SERVER_SERVER_ENTRY_H_ #define SRC_TENDISPLUS_SERVER_SERVER_ENTRY_H_ +#include #include #include #include @@ -41,6 +42,83 @@ #define SLOWLOG_ENTRY_MAX_STRING 128; namespace tendisplus { + +template +class LinkedSet { + private: + std::list orderList; + std::unordered_map::iterator> indexMap; + + public: + bool contains(const T& value) const { + return indexMap.find(value) != indexMap.end(); + } + + size_t size() const { + return indexMap.size(); + } + + bool empty() const { + return indexMap.empty(); + } + + void push_back(const T& value) { + if (contains(value)) { + orderList.erase(indexMap[value]); + indexMap.erase(value); + } + + orderList.push_back(value); + indexMap[value] = std::prev(orderList.end()); + } + + void push_front(const T& value) { + if (contains(value)) + return; + orderList.push_front(value); + indexMap[value] = orderList.begin(); + } + + T pop_front() { + if (orderList.empty()) { + throw std::out_of_range("LinkedSet is empty"); + } + T value = orderList.front(); + orderList.pop_front(); + indexMap.erase(value); + return value; + } + + void erase(const T& value) { + auto it = indexMap.find(value); + if (it != indexMap.end()) { + orderList.erase(it->second); + indexMap.erase(it); + } + } + + const T& front() const { + if (orderList.empty()) { + throw std::out_of_range("LinkedSet is empty"); + } + return orderList.front(); + } + + const T& back() const { + if (orderList.empty()) { + throw std::out_of_range("LinkedSet is empty"); + } + return orderList.back(); + } + + auto begin() const { + return orderList.begin(); + } + + auto end() const { + return orderList.end(); + } +}; class Session; class NetworkAsio; class NetworkMatrix; @@ -189,6 +267,22 @@ class ServerEntry : public std::enable_shared_from_this { } _executorList[ctxId]->schedule(std::forward(task)); } + template + std::pair schedule_at(fn&& task, + uint32_t ctxId, + std::chrono::microseconds us) { + std::shared_lock lock(_exeThreadMutex); + if (ctxId == UINT32_MAX || ctxId >= _executorList.size()) { + ctxId = _scheduleNum.fetch_add(1, std::memory_order_relaxed) % + _executorList.size(); + } + auto timerId = _executorList[ctxId]->timer_add(std::forward(task), us); + return {ctxId, timerId}; + } + void cancelTimer(uint32_t ctxId, uint64_t timerId) { + std::shared_lock lock(_exeThreadMutex); + _executorList[ctxId]->timer_cancel(timerId); + } uint32_t getExeThreadNum() const { std::shared_lock lock(_exeThreadMutex); return _executorList.size() * _executorList.back()->size(); @@ -420,6 +514,113 @@ class ServerEntry : public std::enable_shared_from_this { void CloseChannelBySlot(SlotsBitmap slots); void CloseAllChannel(); + void blockSessionOnKeys(Session* sess, const std::vector& keys) { + std::lock_guard lk(_mutex_block_sesslist); + for (auto& key : keys) { + _block_sesslist[key].push_back(sess->id()); + } + } + + void unblockSession(uint64_t sessId) { + std::lock_guard lk(_mutex_block_sesslist); + for (auto it = _block_sesslist.begin(); it != _block_sesslist.end();) { + auto& sessionSet = it->second; + sessionSet.erase(sessId); + + if (sessionSet.empty()) { + it = _block_sesslist.erase(it); + } else { + ++it; + } + } + } + + void unblockSessionOnKeys(Session* sess, + const std::vector& keys) { + std::lock_guard lk(_mutex_block_sesslist); + for (auto& key : keys) { + auto& sessList = _block_sesslist[key]; + sessList.erase(sess->id()); + if (sessList.empty()) { + _block_sesslist.erase(key); + } + } + } + + bool hasBlockedSessions(const std::string& key) { + std::lock_guard lk(_mutex_block_sesslist); + auto iter = _block_sesslist.find(key); + return iter != _block_sesslist.end() && !iter->second.empty(); + } + + std::vector notifyKey(const std::string& key, size_t max_wakeups) { + std::lock_guard lk(_mutex_block_sesslist); + std::vector ready_list; + auto iter = _block_sesslist.find(key); + if (iter == _block_sesslist.end() || iter->second.empty()) { + return ready_list; + } + auto& blokedList = iter->second; + size_t count = 0; + for (auto id_iter = blokedList.begin(); + id_iter != blokedList.end() && count < max_wakeups; + ++id_iter) { + auto sess = getSession(*id_iter); + if (sess && !sess->isBlocked()) { // avoid notify twice + continue; + } + ready_list.push_back(*id_iter); + count++; + } + + return ready_list; + } + + void notifyKeyAvailable(const std::string& key, size_t max_wakeups) { + if (!hasBlockedSessions(key)) + return; // no blocked session, do nothing + std::vector ready_list = notifyKey(key, max_wakeups); + + auto executor = [this, ready_list = std::move(ready_list), key]() mutable { + size_t wakeupsMore = 0, idx = 0; + while (idx < ready_list.size()) { + while (idx < ready_list.size()) { + auto s_id = ready_list[idx++]; + auto sess = getSession(s_id); + auto netSess = dynamic_cast(sess.get()); + if (netSess && !netSess->isBlocked()) { + wakeupsMore++; + } else if (netSess) { + auto status = netSess->resumeSession(key); + if (!status.ok()) { + if (!netSess->isBlocked()) { + // when a blocking command completes, causing subsequent command + // execution to fail, wake an extra session.​ + wakeupsMore++; + } else { + break; + } + } + } else { + // remove session_id from blocked list + unblockSession(s_id); + wakeupsMore++; + } + } + + if (idx == ready_list.size() && wakeupsMore > 0) { + ready_list = notifyKey(key, wakeupsMore); // notify more + wakeupsMore = 0; + idx = 0; + } else if (idx < ready_list.size()) { + break; + } + } + }; + uint32_t ctxId = UINT32_MAX; + schedule(std::move(executor), ctxId); + } + private: ServerEntry(); Status adaptSomeThreadNumByCpuNum(const std::shared_ptr& cfg); @@ -521,6 +722,10 @@ class ServerEntry : public std::enable_shared_from_this { SlowlogStat _slowlogStat; LatencyMonitorSet _latencyMonitorSet; uint32_t _lastJeprofDumpMemoryGB; + + std::unordered_map> + _block_sesslist; // using linkedset + mutable std::mutex _mutex_block_sesslist; }; } // namespace tendisplus diff --git a/src/tendisplus/server/session.h b/src/tendisplus/server/session.h index ecc1c9b3..5fa9895f 100644 --- a/src/tendisplus/server/session.h +++ b/src/tendisplus/server/session.h @@ -66,6 +66,10 @@ class Session : public std::enable_shared_from_this { return {ErrorCodes::ERR_NETWORK, ""}; } + virtual bool isBlocked() const { + return false; + } + std::string getName() const; void setName(const std::string&); Type getType() const { diff --git a/src/tendisplus/utils/status.h b/src/tendisplus/utils/status.h index 78041f79..8c8fef9d 100644 --- a/src/tendisplus/utils/status.h +++ b/src/tendisplus/utils/status.h @@ -68,6 +68,9 @@ enum class ErrorCodes { ERR_CLUSTER_REDIR_DOWN_UNBOUND, ERR_LUA, ERR_LUA_NOSCRIPT, + + ERR_BLOCKCMD, + ERR_FINISHCMD, }; class Status { @@ -184,10 +187,10 @@ Expected makeExpected(Args&&... args) { } \ } while (0) -#define RET_IF_MEMORY_REQUEST_FAILED(SESS, SIZE) \ - auto tempStatus = (SESS) -> memLimitRequest((SIZE)); \ - if (!tempStatus.ok()) { \ - return tempStatus; \ +#define RET_IF_MEMORY_REQUEST_FAILED(SESS, SIZE) \ + auto tempStatus = (SESS)->memLimitRequest((SIZE)); \ + if (!tempStatus.ok()) { \ + return tempStatus; \ } } // namespace tendisplus