Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/tendisplus/commands/command_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,63 @@ void testScan(std::shared_ptr<ServerEntry> svr) {
EXPECT_FALSE(expect.ok());
}

void testOtherDBScan(std::shared_ptr<ServerEntry> svr) {
asio::io_context ioContext;
asio::ip::tcp::socket socket(ioContext), socket1(ioContext);
NetSession sess(svr, std::move(socket), 1, false, nullptr, nullptr);
for (int i = 0; i < 20000; i++) {
sess.setArgs({"set", randomStr(10, false), std::to_string(i)});
auto expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
}

{
sess.setArgs({"select", "15"});
auto expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
sess.setArgs({"scan", "0"});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
EXPECT_NE(expect.value().find("0"), std::string::npos);
EXPECT_NE(expect.value().find("*0"), std::string::npos);

auto key = randomStr(6, false);
sess.setArgs({"set", key, randomStr(10, false)});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());

sess.setArgs({"scan", "0"});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
EXPECT_NE(expect.value().find("1"), std::string::npos);
EXPECT_TRUE(expect.value().find(key) != std::string::npos);

auto key2 = randomStr(6, false);
sess.setArgs({"set", key2, randomStr(10, false)});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());

sess.setArgs({"scan", "0", "count", "1"});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
EXPECT_NE(expect.value().find("2"), std::string::npos);
EXPECT_TRUE((expect.value().find(key) != std::string::npos ||
expect.value().find(key2) != std::string::npos));
}

{
sess.setArgs({"select", "4"});
auto expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());

sess.setArgs({"scan", "0"});
expect = Command::runSessionCmd(&sess);
EXPECT_TRUE(expect.ok());
EXPECT_NE(expect.value().find("0"), std::string::npos);
EXPECT_NE(expect.value().find("*0"), std::string::npos);
}
}

void testMulti(std::shared_ptr<ServerEntry> svr) {
asio::io_context ioCtx;
asio::ip::tcp::socket socket(ioCtx), socket1(ioCtx);
Expand Down Expand Up @@ -1182,6 +1239,21 @@ TEST(Command, common_scan) {
#endif
}

TEST(Command, otherdb_scan) {
const auto guard = MakeGuard([] { destroyEnv(); });

EXPECT_TRUE(setupEnv());
auto cfg = makeServerParam();
auto server = makeServerEntry(cfg);

testOtherDBScan(server);

#ifndef _WIN32
server->stop();
EXPECT_EQ(server.use_count(), 1);
#endif
}

TEST(Command, tendisex) {
const auto guard = MakeGuard([] { destroyEnv(); });

Expand Down
41 changes: 26 additions & 15 deletions src/tendisplus/commands/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ class ZScanbyscoreCommand : public ScanGenericCommand {
Command::fmtBulk(ss, cursor);
Command::fmtMultiBulkLen(ss, rcds.size() * 2);
for (const auto& v : rcds) {
auto str = v.toString() + " ";
std::cout << str;
if (v.getRecordKey().getSecondaryKey() ==
std::to_string(ZSlMetaValue::HEAD_ID)) {
continue;
Expand Down Expand Up @@ -816,7 +814,7 @@ class ScanCommand : public Command {
// NOTE: no need increase seq. If it meets the criteria,
// it must have been increased seq.
// WARNING: scan this kvstore over no need this record ! ! !
if (*scanTimes == scanMaxTimes &&
if (*scanTimes == scanMaxTimes && *seq > 0 &&
expRecord.status().code() != ErrorCodes::ERR_EXHAUST) {
if (result.empty() || (result.back() != record.getRecordKey())) {
result.emplace_back(record.getRecordKey());
Expand All @@ -841,7 +839,22 @@ class ScanCommand : public Command {
// recordType == RecordType::RT_DATA_META) {
// (*seq)++;
// }
auto nextSlot = getNextSlot(slots, recordSlotId);
// check if we should retain current slot
bool retainCurrentSlot = false;
// The structure of the UserKey is: SlotID | Type | DBID | PK | 0 | ...
// The record located by the seek operation may satisfy the SlotID
// condition, but its position could be preceding the target UserKey.
// In this case, the current slot must be retained, and the search
// should continue until the correct position is found.
if (kvstoreSlots.test(recordSlotId)) {
retainCurrentSlot =
(rt2Char(recordType) < rt2Char(RecordType::RT_DATA_META)) ||
(recordType == RecordType::RT_DATA_META && recordDbId < dbId);
}
// retainCurrentSlot = false, need to seek next slot
int32_t nextSlot = !retainCurrentSlot
? getNextSlot(kvstoreSlots, recordSlotId)
: recordSlotId;
// nextSlot == -1 means this kv-store has been iterated over
if (nextSlot == -1) {
break;
Expand All @@ -853,23 +866,21 @@ class ScanCommand : public Command {
continue;
}

/**
* check if this record has expired
* expired key shouldn't increase scanTimes,
* they only haven't been removed yet.
*/
if (record.getRecordValue().getTtl() > 0 &&
record.getRecordValue().getTtl() < msSinceEpoch()) {
continue;
}

// this record should increase seq.
// SLOTS, DBID, TYPE, TTL
// SLOTS, DBID, TYPE all match
// NOTE: lastScanKey has increased seq.
if (record.getRecordKey() != lastScanRecordKey) {
(*seq)++;
}

// check if this record has expired
// expired keys are skipped but still count towards seq (cursor position)
// since they occupy a logical position in the database
if (record.getRecordValue().getTtl() > 0 &&
record.getRecordValue().getTtl() < msSinceEpoch()) {
continue;
}

// filter
if (!filter.filter(record)) {
continue;
Expand Down