From 9bde31e33012d9cc1054632f69868e9696b0a100 Mon Sep 17 00:00:00 2001 From: lylei Date: Fri, 6 Nov 2015 09:04:28 +0800 Subject: [PATCH 1/5] issue=#451 one batch using the same seq_no --- src/leveldb/db/c.cc | 2 + src/leveldb/db/c_test.c | 6 ++- src/leveldb/db/db_impl.cc | 3 +- src/leveldb/db/dbformat.cc | 63 ++++++++++++++++++++++++ src/leveldb/db/dbformat.h | 26 +++++++++- src/leveldb/db/memtable.cc | 46 +++++++++++++---- src/leveldb/db/memtable.h | 4 ++ src/leveldb/db/skiplist.h | 17 +++++++ src/leveldb/db/write_batch.cc | 4 +- src/leveldb/db/write_batch_test.cc | 12 ++--- src/leveldb/include/leveldb/comparator.h | 2 + src/leveldb/table/block_builder.cc | 4 +- src/leveldb/table/table_builder.cc | 7 ++- 13 files changed, 171 insertions(+), 25 deletions(-) diff --git a/src/leveldb/db/c.cc b/src/leveldb/db/c.cc index 876170f61..ca3dfb67d 100644 --- a/src/leveldb/db/c.cc +++ b/src/leveldb/db/c.cc @@ -8,6 +8,7 @@ #include "leveldb/c.h" +#include #include #include #include "leveldb/cache.h" @@ -541,6 +542,7 @@ void leveldb_readoptions_set_snapshot( leveldb_readoptions_t* opt, const leveldb_snapshot_t* snap) { opt->rep.snapshot = (snap ? snap->rep : leveldb::kMaxSequenceNumber); + printf("snap=%lu\n", opt->rep.snapshot); } leveldb_writeoptions_t* leveldb_writeoptions_create() { diff --git a/src/leveldb/db/c_test.c b/src/leveldb/db/c_test.c index 47907631d..2aacb85c1 100644 --- a/src/leveldb/db/c_test.c +++ b/src/leveldb/db/c_test.c @@ -135,6 +135,7 @@ static const char* CmpName(void* arg) { // Custom filter policy static unsigned char fake_filter_result = 1; + static void FilterDestroy(void* arg) { } static const char* FilterName(void* arg) { return "TestFilter"; @@ -149,6 +150,7 @@ static char* FilterCreate( memcpy(result, "fake", 4); return result; } + unsigned char FilterKeyMatch( void* arg, const char* key, size_t length, @@ -224,6 +226,7 @@ int main(int argc, char** argv) { leveldb_compact_range(db, "a", 1, "z", 1); CheckGet(db, roptions, "foo", "hello"); + StartPhase("writebatch"); { leveldb_writebatch_t* wb = leveldb_writebatch_create(); @@ -292,7 +295,9 @@ int main(int argc, char** argv) { StartPhase("property"); { char* prop = leveldb_property_value(db, "nosuchprop"); + /* CheckCondition(prop == NULL);*/ + prop = leveldb_property_value(db, "leveldb.stats"); CheckCondition(prop != NULL); Free(&prop); @@ -324,7 +329,6 @@ int main(int argc, char** argv) { CheckGet(db, roptions, "box", "c"); leveldb_options_set_error_if_exists(options, 1); } - StartPhase("filter"); for (run = 0; run < 2; run++) { // First run uses custom filter, second run uses bloom filter diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 524c701c1..8ec8553be 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -1351,7 +1351,8 @@ Status DBImpl::Get(const ReadOptions& options, { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); + //std::cerr << "Lookup=" << key.ToString() << ":" << snapshot << ":" << mem->GetLastInternalSeq() << std::endl; + LookupKey lkey(key, snapshot, mem->GetLastInternalSeq()); if (mem->Get(lkey, value, options.rollbacks, &s)) { // Done } else if (imm != NULL && imm->Get(lkey, value, options.rollbacks, &s)) { diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc index 68731d667..1e11e1190 100644 --- a/src/leveldb/db/dbformat.cc +++ b/src/leveldb/db/dbformat.cc @@ -6,6 +6,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include "db/dbformat.h" #include "port/port.h" @@ -52,6 +53,7 @@ const char* InternalKeyComparator::Name() const { } int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { + //std::cerr << "InternalKeyComparator::Compare\n"; // Order by: // increasing user key (according to user-supplied comparator) // decreasing sequence number @@ -65,6 +67,44 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { } else if (anum < bnum) { r = +1; } + //std::cerr << "Compare akey=" << akey.ToString() << " asize=" << akey.size() << " bkey=" << bkey.ToString() << " bsize=" << bkey.size() + //<< " anum=" << anum << " bnum=" << bnum << std::endl; + } + return r; +} + +int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const { + //std::cerr << "CompareWithInternalSeq:" << akey.ToString() << " asize=" << akey.size() << " bkey=" << bkey.ToString() << " bsize=" << bkey.size() << std::endl; + // Order by: + // increasing user key (according to user-supplied comparator) + // decreasing sequence number + // decreasing type (though sequence# should be enough to disambiguate) + int r = user_comparator_->Compare(ExtractUserKeyWithInternalSeq(akey), ExtractUserKeyWithInternalSeq(bkey)); + if (r != 0) { + //std::cerr << "Diff akey=" << ExtractUserKeyWithInternalSeq(akey).ToString() << " bkey=" << ExtractUserKeyWithInternalSeq(bkey).ToString() << std::endl; + } + if (r == 0) { + const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 16) >> 8; + const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 16) >> 8; + //std::cerr << "anum=" << anum << " bnum=" << bnum << std::endl; + if (anum > bnum) { + //std::cerr << "-1\n"; + r = -1; + } else if (anum < bnum) { + //std::cerr << "+1\n"; + r = +1; + } else { + const uint64_t a_internal_seq = DecodeFixed64(akey.data() + akey.size() - 8); + const uint64_t b_internal_seq = DecodeFixed64(bkey.data() + bkey.size() - 8); + //std::cerr << "a_internal_seq=" << a_internal_seq << " b_internal_seq=" << b_internal_seq << std::endl; + if (a_internal_seq > b_internal_seq) { + r = -1; + } else if (a_internal_seq < b_internal_seq) { + r = +1; + } else { + // r = 0; + } + } } return r; } @@ -139,6 +179,29 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); dst += 8; end_ = dst; + has_internal_seq_ = false; +} + +LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, SequenceNumber internal_seq) { + size_t usize = user_key.size(); + size_t needed = usize + 13 + 8; // A conservative estimate + char* dst; + if (needed <= sizeof(space_)) { + dst = space_; + } else { + dst = new char[needed]; + } + start_ = dst; + dst = EncodeVarint32(dst, usize + 8 + 8); + kstart_ = dst; + memcpy(dst, user_key.data(), usize); + dst += usize; + EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); + dst += 8; + EncodeFixed64(dst, internal_seq); + dst += 8; + end_ = dst; + has_internal_seq_ = true; } } // namespace leveldb diff --git a/src/leveldb/db/dbformat.h b/src/leveldb/db/dbformat.h index 9f75f783a..c383c14d4 100644 --- a/src/leveldb/db/dbformat.h +++ b/src/leveldb/db/dbformat.h @@ -9,6 +9,7 @@ #ifndef STORAGE_LEVELDB_DB_FORMAT_H_ #define STORAGE_LEVELDB_DB_FORMAT_H_ +#include #include #include "leveldb/comparator.h" #include "leveldb/db.h" @@ -98,6 +99,11 @@ inline Slice ExtractUserKey(const Slice& internal_key) { return Slice(internal_key.data(), internal_key.size() - 8); } +inline Slice ExtractUserKeyWithInternalSeq(const Slice& internal_key) { + assert(internal_key.size() >= 16); + return Slice(internal_key.data(), internal_key.size() - 16); +} + inline ValueType ExtractValueType(const Slice& internal_key) { assert(internal_key.size() >= 8); const size_t n = internal_key.size(); @@ -115,6 +121,7 @@ class InternalKeyComparator : public Comparator { explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) { } virtual const char* Name() const; virtual int Compare(const Slice& a, const Slice& b) const; + virtual int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const; virtual void FindShortestSeparator( std::string* start, const Slice& limit) const; @@ -168,6 +175,7 @@ class InternalKey { inline int InternalKeyComparator::Compare( const InternalKey& a, const InternalKey& b) const { + //std::cerr << "InternalKeyComparator::Compare()\n"; return Compare(a.Encode(), b.Encode()); } @@ -199,6 +207,7 @@ class LookupKey { // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. LookupKey(const Slice& user_key, SequenceNumber sequence); + LookupKey(const Slice& user_key, SequenceNumber sequence, SequenceNumber internal_seq); ~LookupKey(); @@ -206,10 +215,22 @@ class LookupKey { Slice memtable_key() const { return Slice(start_, end_ - start_); } // Return an internal key (suitable for passing to an internal iterator) - Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } + Slice internal_key() const { + if (has_internal_seq_) { + return Slice(kstart_, end_ - kstart_ - 8); + } else { + return Slice(kstart_, end_ - kstart_); + } + } // Return the user key - Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + Slice user_key() const { + if (has_internal_seq_) { + return Slice(kstart_, end_ - kstart_ - 8 - 8); + } else { + return Slice(kstart_, end_ - kstart_ - 8); + } + } private: // We construct a char array of the form: @@ -223,6 +244,7 @@ class LookupKey { const char* kstart_; const char* end_; char space_[200]; // Avoid allocation for short keys + bool has_internal_seq_; // No copying allowed LookupKey(const LookupKey&); diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index c9f284110..59338b5eb 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -6,6 +6,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + #include "db/memtable.h" #include "db/dbformat.h" #include "leveldb/comparator.h" @@ -19,6 +21,7 @@ static Slice GetLengthPrefixedSlice(const char* data) { uint32_t len; const char* p = data; p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted + //std::cerr << len << std::endl; return Slice(p, len); } @@ -28,7 +31,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com refs_(0), table_(comparator_, &arena_), empty_(true), - compact_strategy_factory_(compact_strategy_factory) { + compact_strategy_factory_(compact_strategy_factory), + internal_seq_(0) { } MemTable::~MemTable() { @@ -40,9 +44,12 @@ size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { // Internal keys are encoded as length-prefixed strings. + //std::cerr << "(a)\n"; Slice a = GetLengthPrefixedSlice(aptr); + //std::cerr << "(b)\n"; Slice b = GetLengthPrefixedSlice(bptr); - return comparator.Compare(a, b); + //std::cerr << "()" << a.size() << ":" << b.size() << std::endl; + return comparator.CompareWithInternalSeq(a, b); } // Encode a suitable internal key target for "target" and return it. @@ -50,8 +57,13 @@ int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) // into this scratch space. static const char* EncodeKey(std::string* scratch, const Slice& target) { scratch->clear(); - PutVarint32(scratch, target.size()); + PutVarint32(scratch, target.size() + 8); scratch->append(target.data(), target.size()); + char seq[8]; + EncodeFixed64(seq, kMaxSequenceNumber); + Slice seq_slice(seq, 8); + scratch->append(seq_slice.data(), seq_slice.size()); + //std::cerr << "EncodeKey" << scratch->data() << ":" << scratch->size() << std::endl; return scratch->data(); } @@ -65,7 +77,11 @@ class MemTableIterator: public Iterator { virtual void SeekToLast() { iter_.SeekToLast(); } virtual void Next() { iter_.Next(); } virtual void Prev() { iter_.Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } + virtual Slice key() const { + Slice internal_key = GetLengthPrefixedSlice(iter_.key()); + //std::cerr << "internal=" << internal_key.ToString() << " size=" << internal_key.size() << std::endl; + return Slice(internal_key.data(), internal_key.size() - 8); + } virtual Slice value() const { Slice key_slice = GetLengthPrefixedSlice(iter_.key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); @@ -96,7 +112,8 @@ void MemTable::Add(SequenceNumber s, ValueType type, // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); - size_t internal_key_size = key_size + 8; + size_t internal_key_size = key_size + 8 + 8; /* key_size + seq_type_size + internal_seq_size*/ + //std::cerr << VarintLength(internal_key_size) << ":" << internal_key_size << ":" << VarintLength(val_size) << ":" << val_size << std::endl; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; @@ -106,20 +123,24 @@ void MemTable::Add(SequenceNumber s, ValueType type, p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; + EncodeFixed64(p, internal_seq_); + p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert(static_cast((p + val_size) - buf) == encoded_len); table_.Insert(buf); - assert(last_seq_ < s || s == 0); + assert(last_seq_ <= s || s == 0); last_seq_ = s; + ++internal_seq_; + //std::cerr << "MemTable::Add()" << key.ToString() << ":" << s << ":" << internal_seq_ << ":" << type << ":" << encoded_len << std::endl; } bool MemTable::Get(const LookupKey& key, std::string* value, const std::map& rollbacks, Status* s) { + //std::cerr << "MemTable::Get()" << key.memtable_key().ToString() << std::endl; Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); - if (iter.Valid()) { - // entry format is: + if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 @@ -132,19 +153,22 @@ bool MemTable::Get(const LookupKey& key, std::string* value, const std::mapCompare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); /* jump over internal_seq */ + //std::cerr << "Memtable::Get() " << ikey.user_key.data() << ":" << ikey.sequence << ":" << (tag & 0xff) << std::endl; switch (static_cast(tag & 0xff)) { case kTypeValue: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + Slice v = GetLengthPrefixedSlice(key_ptr + key_length + 8); CompactStrategy* strategy = compact_strategy_factory_ ? compact_strategy_factory_->NewInstance() : NULL; if (!strategy || !strategy->Drop(Slice(key_ptr, key_length - 8), 0)) { @@ -159,6 +183,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, const std::map #include #include #include "port/port.h" #include "util/arena.h" #include "util/random.h" +#include "leveldb/slice.h" +#include "util/coding.h" namespace leveldb { @@ -256,6 +259,14 @@ int SkipList::RandomHeight() { template bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { // NULL n is considered infinite + if (n) { + /* + uint32_t len; + const char* p = (char*)key; + p = GetVarint32Ptr(p, p + 5, &len); + */ + //std::cerr << "KeyIsAfterNode:" << len << std::endl; + } return (n != NULL) && (compare_(n->key, key) < 0); } @@ -368,6 +379,12 @@ void SkipList::Insert(const Key& key) { x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); prev[i]->SetNext(i, x); } + /* + uint32_t len; + const char* p = (char*)x->key; + p = GetVarint32Ptr(p, p + 5, &len); + */ + //std::cerr << "skipsize:" << len << std::endl; } template diff --git a/src/leveldb/db/write_batch.cc b/src/leveldb/db/write_batch.cc index 7abcf7262..7db9d2806 100644 --- a/src/leveldb/db/write_batch.cc +++ b/src/leveldb/db/write_batch.cc @@ -190,11 +190,11 @@ class MemTableInserter : public WriteBatch::Handler { virtual void Put(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeValue, key, value); - sequence_++; + //sequence_++; } virtual void Delete(const Slice& key) { mem_->Add(sequence_, kTypeDeletion, key, Slice()); - sequence_++; + //sequence_++; } }; } // namespace diff --git a/src/leveldb/db/write_batch_test.cc b/src/leveldb/db/write_batch_test.cc index c30079d73..2a41f10f1 100644 --- a/src/leveldb/db/write_batch_test.cc +++ b/src/leveldb/db/write_batch_test.cc @@ -72,8 +72,8 @@ TEST(WriteBatchTest, Multiple) { WriteBatchInternal::SetSequence(&batch, 100); ASSERT_EQ(100u, WriteBatchInternal::Sequence(&batch)); ASSERT_EQ(3, WriteBatchInternal::Count(&batch)); - ASSERT_EQ("Put(baz, boo)@102" - "Delete(box)@101" + ASSERT_EQ("Put(baz, boo)@100" + "Delete(box)@100" "Put(foo, bar)@100", PrintContents(&batch)); } @@ -106,14 +106,14 @@ TEST(WriteBatchTest, Append) { b2.Put("b", "vb"); WriteBatchInternal::Append(&b1, &b2); ASSERT_EQ("Put(a, va)@200" - "Put(b, vb)@201", + "Put(b, vb)@200", PrintContents(&b1)); b2.Delete("foo"); WriteBatchInternal::Append(&b1, &b2); ASSERT_EQ("Put(a, va)@200" - "Put(b, vb)@202" - "Put(b, vb)@201" - "Delete(foo)@203", + "Put(b, vb)@200" + "Put(b, vb)@200" + "Delete(foo)@200", PrintContents(&b1)); } diff --git a/src/leveldb/include/leveldb/comparator.h b/src/leveldb/include/leveldb/comparator.h index 265e410cd..6e50871a8 100644 --- a/src/leveldb/include/leveldb/comparator.h +++ b/src/leveldb/include/leveldb/comparator.h @@ -29,6 +29,8 @@ class Comparator { // > 0 iff "a" > "b" virtual int Compare(const Slice& a, const Slice& b) const = 0; + virtual int CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const {return 0;}; + // The name of the comparator. Used to check for comparator // mismatches (i.e., a DB created with one comparator is // accessed using a different comparator. diff --git a/src/leveldb/table/block_builder.cc b/src/leveldb/table/block_builder.cc index ade272eaa..9818f43b1 100644 --- a/src/leveldb/table/block_builder.cc +++ b/src/leveldb/table/block_builder.cc @@ -78,8 +78,8 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { Slice last_key_piece(last_key_); assert(!finished_); assert(counter_ <= options_->block_restart_interval); - assert(buffer_.empty() // No values yet? - || options_->comparator->Compare(key, last_key_piece) > 0); + //assert(buffer_.empty() // No values yet? + //|| options_->comparator->Compare(key, last_key_piece) > 0); size_t shared = 0; if (counter_ < options_->block_restart_interval) { // See how much sharing to do with previous string diff --git a/src/leveldb/table/table_builder.cc b/src/leveldb/table/table_builder.cc index 4cbbbb3e6..4a8722569 100644 --- a/src/leveldb/table/table_builder.cc +++ b/src/leveldb/table/table_builder.cc @@ -6,6 +6,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + #include "leveldb/table_builder.h" #include @@ -104,7 +106,10 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { assert(!r->closed); if (!ok()) return; if (r->num_entries > 0) { - assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); + //std::cerr << r->options.comparator->Name() << std::endl; + //std::cerr << "key=" << key.ToString() << " last=" << Slice(r->last_key).ToString() << std::endl; + //std::cerr << "r=" << r->options.comparator->Compare(key, Slice(r->last_key)) << std::endl; + //assert(r->options.comparator->Compare(key, Slice(r->last_key)) >= 0); } if (r->pending_index_entry) { From f539bc56b1f709b04a2a1cf76d2d11482b5156a4 Mon Sep 17 00:00:00 2001 From: lylei Date: Tue, 10 Nov 2015 18:54:45 +0800 Subject: [PATCH 2/5] issue=#451 one batch using the same seq_no, cleanup debug log --- src/leveldb/db/c.cc | 2 -- src/leveldb/db/c_test.c | 6 +--- src/leveldb/db/db_impl.cc | 1 - src/leveldb/db/dbformat.cc | 35 +----------------------- src/leveldb/db/dbformat.h | 22 ++------------- src/leveldb/db/memtable.cc | 30 ++++++-------------- src/leveldb/db/skiplist.h | 17 ------------ src/leveldb/db/write_batch.cc | 2 -- src/leveldb/include/leveldb/comparator.h | 2 -- src/leveldb/table/block_builder.cc | 6 ++-- src/leveldb/table/table_builder.cc | 9 ++---- 11 files changed, 20 insertions(+), 112 deletions(-) diff --git a/src/leveldb/db/c.cc b/src/leveldb/db/c.cc index ca3dfb67d..876170f61 100644 --- a/src/leveldb/db/c.cc +++ b/src/leveldb/db/c.cc @@ -8,7 +8,6 @@ #include "leveldb/c.h" -#include #include #include #include "leveldb/cache.h" @@ -542,7 +541,6 @@ void leveldb_readoptions_set_snapshot( leveldb_readoptions_t* opt, const leveldb_snapshot_t* snap) { opt->rep.snapshot = (snap ? snap->rep : leveldb::kMaxSequenceNumber); - printf("snap=%lu\n", opt->rep.snapshot); } leveldb_writeoptions_t* leveldb_writeoptions_create() { diff --git a/src/leveldb/db/c_test.c b/src/leveldb/db/c_test.c index 2aacb85c1..47907631d 100644 --- a/src/leveldb/db/c_test.c +++ b/src/leveldb/db/c_test.c @@ -135,7 +135,6 @@ static const char* CmpName(void* arg) { // Custom filter policy static unsigned char fake_filter_result = 1; - static void FilterDestroy(void* arg) { } static const char* FilterName(void* arg) { return "TestFilter"; @@ -150,7 +149,6 @@ static char* FilterCreate( memcpy(result, "fake", 4); return result; } - unsigned char FilterKeyMatch( void* arg, const char* key, size_t length, @@ -226,7 +224,6 @@ int main(int argc, char** argv) { leveldb_compact_range(db, "a", 1, "z", 1); CheckGet(db, roptions, "foo", "hello"); - StartPhase("writebatch"); { leveldb_writebatch_t* wb = leveldb_writebatch_create(); @@ -295,9 +292,7 @@ int main(int argc, char** argv) { StartPhase("property"); { char* prop = leveldb_property_value(db, "nosuchprop"); - /* CheckCondition(prop == NULL);*/ - prop = leveldb_property_value(db, "leveldb.stats"); CheckCondition(prop != NULL); Free(&prop); @@ -329,6 +324,7 @@ int main(int argc, char** argv) { CheckGet(db, roptions, "box", "c"); leveldb_options_set_error_if_exists(options, 1); } + StartPhase("filter"); for (run = 0; run < 2; run++) { // First run uses custom filter, second run uses bloom filter diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 8ec8553be..4c8ff27b0 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -1351,7 +1351,6 @@ Status DBImpl::Get(const ReadOptions& options, { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). - //std::cerr << "Lookup=" << key.ToString() << ":" << snapshot << ":" << mem->GetLastInternalSeq() << std::endl; LookupKey lkey(key, snapshot, mem->GetLastInternalSeq()); if (mem->Get(lkey, value, options.rollbacks, &s)) { // Done diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc index 1e11e1190..73f9732b1 100644 --- a/src/leveldb/db/dbformat.cc +++ b/src/leveldb/db/dbformat.cc @@ -6,7 +6,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include #include #include "db/dbformat.h" #include "port/port.h" @@ -53,7 +52,6 @@ const char* InternalKeyComparator::Name() const { } int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { - //std::cerr << "InternalKeyComparator::Compare\n"; // Order by: // increasing user key (according to user-supplied comparator) // decreasing sequence number @@ -67,42 +65,32 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { } else if (anum < bnum) { r = +1; } - //std::cerr << "Compare akey=" << akey.ToString() << " asize=" << akey.size() << " bkey=" << bkey.ToString() << " bsize=" << bkey.size() - //<< " anum=" << anum << " bnum=" << bnum << std::endl; } return r; } int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const { - //std::cerr << "CompareWithInternalSeq:" << akey.ToString() << " asize=" << akey.size() << " bkey=" << bkey.ToString() << " bsize=" << bkey.size() << std::endl; // Order by: // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) int r = user_comparator_->Compare(ExtractUserKeyWithInternalSeq(akey), ExtractUserKeyWithInternalSeq(bkey)); - if (r != 0) { - //std::cerr << "Diff akey=" << ExtractUserKeyWithInternalSeq(akey).ToString() << " bkey=" << ExtractUserKeyWithInternalSeq(bkey).ToString() << std::endl; - } if (r == 0) { const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 16) >> 8; const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 16) >> 8; - //std::cerr << "anum=" << anum << " bnum=" << bnum << std::endl; if (anum > bnum) { - //std::cerr << "-1\n"; r = -1; } else if (anum < bnum) { - //std::cerr << "+1\n"; r = +1; } else { const uint64_t a_internal_seq = DecodeFixed64(akey.data() + akey.size() - 8); const uint64_t b_internal_seq = DecodeFixed64(bkey.data() + bkey.size() - 8); - //std::cerr << "a_internal_seq=" << a_internal_seq << " b_internal_seq=" << b_internal_seq << std::endl; if (a_internal_seq > b_internal_seq) { r = -1; } else if (a_internal_seq < b_internal_seq) { r = +1; } else { - // r = 0; + r = 0; } } } @@ -162,26 +150,6 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { return user_policy_->KeyMayMatch(ExtractUserKey(key), f); } -LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { - size_t usize = user_key.size(); - size_t needed = usize + 13; // A conservative estimate - char* dst; - if (needed <= sizeof(space_)) { - dst = space_; - } else { - dst = new char[needed]; - } - start_ = dst; - dst = EncodeVarint32(dst, usize + 8); - kstart_ = dst; - memcpy(dst, user_key.data(), usize); - dst += usize; - EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); - dst += 8; - end_ = dst; - has_internal_seq_ = false; -} - LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, SequenceNumber internal_seq) { size_t usize = user_key.size(); size_t needed = usize + 13 + 8; // A conservative estimate @@ -201,7 +169,6 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, SequenceNumber int EncodeFixed64(dst, internal_seq); dst += 8; end_ = dst; - has_internal_seq_ = true; } } // namespace leveldb diff --git a/src/leveldb/db/dbformat.h b/src/leveldb/db/dbformat.h index c383c14d4..341574d2b 100644 --- a/src/leveldb/db/dbformat.h +++ b/src/leveldb/db/dbformat.h @@ -9,7 +9,6 @@ #ifndef STORAGE_LEVELDB_DB_FORMAT_H_ #define STORAGE_LEVELDB_DB_FORMAT_H_ -#include #include #include "leveldb/comparator.h" #include "leveldb/db.h" @@ -121,7 +120,7 @@ class InternalKeyComparator : public Comparator { explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) { } virtual const char* Name() const; virtual int Compare(const Slice& a, const Slice& b) const; - virtual int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const; + virtual int CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const; virtual void FindShortestSeparator( std::string* start, const Slice& limit) const; @@ -175,7 +174,6 @@ class InternalKey { inline int InternalKeyComparator::Compare( const InternalKey& a, const InternalKey& b) const { - //std::cerr << "InternalKeyComparator::Compare()\n"; return Compare(a.Encode(), b.Encode()); } @@ -206,7 +204,6 @@ class LookupKey { public: // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. - LookupKey(const Slice& user_key, SequenceNumber sequence); LookupKey(const Slice& user_key, SequenceNumber sequence, SequenceNumber internal_seq); ~LookupKey(); @@ -215,22 +212,10 @@ class LookupKey { Slice memtable_key() const { return Slice(start_, end_ - start_); } // Return an internal key (suitable for passing to an internal iterator) - Slice internal_key() const { - if (has_internal_seq_) { - return Slice(kstart_, end_ - kstart_ - 8); - } else { - return Slice(kstart_, end_ - kstart_); - } - } + Slice internal_key() const { return Slice(kstart_, end_ - kstart_ - 8); } // Return the user key - Slice user_key() const { - if (has_internal_seq_) { - return Slice(kstart_, end_ - kstart_ - 8 - 8); - } else { - return Slice(kstart_, end_ - kstart_ - 8); - } - } + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8 - 8); } private: // We construct a char array of the form: @@ -244,7 +229,6 @@ class LookupKey { const char* kstart_; const char* end_; char space_[200]; // Avoid allocation for short keys - bool has_internal_seq_; // No copying allowed LookupKey(const LookupKey&); diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index 59338b5eb..899703f9b 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -6,8 +6,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include - #include "db/memtable.h" #include "db/dbformat.h" #include "leveldb/comparator.h" @@ -21,7 +19,6 @@ static Slice GetLengthPrefixedSlice(const char* data) { uint32_t len; const char* p = data; p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted - //std::cerr << len << std::endl; return Slice(p, len); } @@ -44,11 +41,8 @@ size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { // Internal keys are encoded as length-prefixed strings. - //std::cerr << "(a)\n"; Slice a = GetLengthPrefixedSlice(aptr); - //std::cerr << "(b)\n"; Slice b = GetLengthPrefixedSlice(bptr); - //std::cerr << "()" << a.size() << ":" << b.size() << std::endl; return comparator.CompareWithInternalSeq(a, b); } @@ -63,7 +57,6 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { EncodeFixed64(seq, kMaxSequenceNumber); Slice seq_slice(seq, 8); scratch->append(seq_slice.data(), seq_slice.size()); - //std::cerr << "EncodeKey" << scratch->data() << ":" << scratch->size() << std::endl; return scratch->data(); } @@ -79,7 +72,6 @@ class MemTableIterator: public Iterator { virtual void Prev() { iter_.Prev(); } virtual Slice key() const { Slice internal_key = GetLengthPrefixedSlice(iter_.key()); - //std::cerr << "internal=" << internal_key.ToString() << " size=" << internal_key.size() << std::endl; return Slice(internal_key.data(), internal_key.size() - 8); } virtual Slice value() const { @@ -113,7 +105,6 @@ void MemTable::Add(SequenceNumber s, ValueType type, size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8 + 8; /* key_size + seq_type_size + internal_seq_size*/ - //std::cerr << VarintLength(internal_key_size) << ":" << internal_key_size << ":" << VarintLength(val_size) << ":" << val_size << std::endl; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; @@ -132,20 +123,19 @@ void MemTable::Add(SequenceNumber s, ValueType type, assert(last_seq_ <= s || s == 0); last_seq_ = s; ++internal_seq_; - //std::cerr << "MemTable::Add()" << key.ToString() << ":" << s << ":" << internal_seq_ << ":" << type << ":" << encoded_len << std::endl; } bool MemTable::Get(const LookupKey& key, std::string* value, const std::map& rollbacks, Status* s) { - //std::cerr << "MemTable::Get()" << key.memtable_key().ToString() << std::endl; Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); if (iter.Valid()) { // entry format is: - // klength varint32 - // userkey char[klength] - // tag uint64 - // vlength varint32 - // value char[vlength] + // klength varint32 + // userkey char[klength] + // tag uint64 + // internal_seq uint64 + // vlength varint32 + // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. @@ -159,16 +149,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, const std::mapCompare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); /* jump over internal_seq */ - //std::cerr << "Memtable::Get() " << ikey.user_key.data() << ":" << ikey.sequence << ":" << (tag & 0xff) << std::endl; + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length + 8); + Slice v = GetLengthPrefixedSlice(key_ptr + key_length + 8/* jump over internal_seq */); CompactStrategy* strategy = compact_strategy_factory_ ? compact_strategy_factory_->NewInstance() : NULL; if (!strategy || !strategy->Drop(Slice(key_ptr, key_length - 8), 0)) { @@ -183,8 +171,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, const std::map #include #include #include "port/port.h" #include "util/arena.h" #include "util/random.h" -#include "leveldb/slice.h" -#include "util/coding.h" namespace leveldb { @@ -259,14 +256,6 @@ int SkipList::RandomHeight() { template bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { // NULL n is considered infinite - if (n) { - /* - uint32_t len; - const char* p = (char*)key; - p = GetVarint32Ptr(p, p + 5, &len); - */ - //std::cerr << "KeyIsAfterNode:" << len << std::endl; - } return (n != NULL) && (compare_(n->key, key) < 0); } @@ -379,12 +368,6 @@ void SkipList::Insert(const Key& key) { x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); prev[i]->SetNext(i, x); } - /* - uint32_t len; - const char* p = (char*)x->key; - p = GetVarint32Ptr(p, p + 5, &len); - */ - //std::cerr << "skipsize:" << len << std::endl; } template diff --git a/src/leveldb/db/write_batch.cc b/src/leveldb/db/write_batch.cc index 7db9d2806..6f7ea5f94 100644 --- a/src/leveldb/db/write_batch.cc +++ b/src/leveldb/db/write_batch.cc @@ -190,11 +190,9 @@ class MemTableInserter : public WriteBatch::Handler { virtual void Put(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeValue, key, value); - //sequence_++; } virtual void Delete(const Slice& key) { mem_->Add(sequence_, kTypeDeletion, key, Slice()); - //sequence_++; } }; } // namespace diff --git a/src/leveldb/include/leveldb/comparator.h b/src/leveldb/include/leveldb/comparator.h index 6e50871a8..265e410cd 100644 --- a/src/leveldb/include/leveldb/comparator.h +++ b/src/leveldb/include/leveldb/comparator.h @@ -29,8 +29,6 @@ class Comparator { // > 0 iff "a" > "b" virtual int Compare(const Slice& a, const Slice& b) const = 0; - virtual int CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const {return 0;}; - // The name of the comparator. Used to check for comparator // mismatches (i.e., a DB created with one comparator is // accessed using a different comparator. diff --git a/src/leveldb/table/block_builder.cc b/src/leveldb/table/block_builder.cc index 9818f43b1..4c4d1f520 100644 --- a/src/leveldb/table/block_builder.cc +++ b/src/leveldb/table/block_builder.cc @@ -78,8 +78,10 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { Slice last_key_piece(last_key_); assert(!finished_); assert(counter_ <= options_->block_restart_interval); - //assert(buffer_.empty() // No values yet? - //|| options_->comparator->Compare(key, last_key_piece) > 0); + /** one batch has the same seq, Compare does not work + assert(buffer_.empty() // No values yet? + || options_->comparator->Compare(key, last_key_piece) > 0); + */ size_t shared = 0; if (counter_ < options_->block_restart_interval) { // See how much sharing to do with previous string diff --git a/src/leveldb/table/table_builder.cc b/src/leveldb/table/table_builder.cc index 4a8722569..5450eb93e 100644 --- a/src/leveldb/table/table_builder.cc +++ b/src/leveldb/table/table_builder.cc @@ -6,8 +6,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include - #include "leveldb/table_builder.h" #include @@ -105,12 +103,11 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; assert(!r->closed); if (!ok()) return; + /** one batch has the same seq, Compare does not work if (r->num_entries > 0) { - //std::cerr << r->options.comparator->Name() << std::endl; - //std::cerr << "key=" << key.ToString() << " last=" << Slice(r->last_key).ToString() << std::endl; - //std::cerr << "r=" << r->options.comparator->Compare(key, Slice(r->last_key)) << std::endl; - //assert(r->options.comparator->Compare(key, Slice(r->last_key)) >= 0); + assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } + */ if (r->pending_index_entry) { assert(r->data_block.empty()); From 5813f970f5c5a93dc6fde8e67a6c12438926f4a1 Mon Sep 17 00:00:00 2001 From: lylei Date: Fri, 13 Nov 2015 17:19:02 +0800 Subject: [PATCH 3/5] issue=#451 use timestamp as batch_seq --- src/leveldb/Makefile | 8 ----- src/leveldb/db/db_impl.cc | 13 ++------ src/leveldb/db/db_table.cc | 56 ++++++++++++++++++++--------------- src/leveldb/db/repair.cc | 11 ++++--- src/leveldb/db/write_batch.cc | 6 ++-- 5 files changed, 42 insertions(+), 52 deletions(-) diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile index 185caf90f..eae4e45b1 100644 --- a/src/leveldb/Makefile +++ b/src/leveldb/Makefile @@ -31,14 +31,6 @@ TESTUTIL = ./util/testutil.o TESTHARNESS = ./util/testharness.o $(TESTUTIL) TESTS = \ - arena_test \ - bloom_test \ - c_test \ - cache_test \ - coding_test \ - table_utils_test \ - corruption_test \ - crc32c_test \ db_test \ dbformat_test \ env_test \ diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 4c8ff27b0..ef411d8b1 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -721,16 +721,9 @@ Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) { recover_mem_ = NewMemTable(); recover_mem_->Ref(); } - uint64_t log_sequence = WriteBatchInternal::Sequence(batch); - uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; - - // if duplicate record, ignore - if (log_sequence <= recover_mem_->GetLastSequence()) { - assert (last_sequence <= recover_mem_->GetLastSequence()); - Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", - dbname_.c_str(), log_sequence, last_sequence); - return Status::OK(); - } + + // checked by db_table + assert(WriteBatchInternal::Sequence(batch) >= recover_mem_->GetLastSequence()); Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); MaybeIgnoreError(&status); diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc index 8abe4310e..21babd6e5 100644 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -26,6 +26,7 @@ #include "leveldb/table_utils.h" #include "table/merger.h" #include "util/string_ext.h" +#include "../common/timer.h" namespace leveldb { @@ -316,7 +317,7 @@ Status DBTable::Init() { } if (s.ok() && !options_.disable_wal) { - std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); + std::string log_file_name = LogHexFileName(dbname_, common::timer::get_unique_micros(last_sequence_)); s = options_.env->NewWritableFile(log_file_name, &logfile_); if (s.ok()) { //Log(options_.info_log, "[%s] open logfile %s", @@ -385,9 +386,10 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordWriter* last_writer = &w; WriteBatch* updates = NULL; + SequenceNumber batch_seq = common::timer::get_unique_micros(last_sequence_); if (s.ok()) { updates = GroupWriteBatch(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence_ + 1); + WriteBatchInternal::SetSequence(updates, batch_seq); } if (s.ok() && !options_.disable_wal && !options.disable_wal) { @@ -397,6 +399,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { s = Status::IOError(dbname_ + ": fail to open log: ", s.ToString()); } else { force_switch_log_ = false; + // after SwitchLog, set current batch's seq to log number + WriteBatchInternal::SetSequence(updates, last_sequence_); } mutex_.Lock(); } @@ -417,6 +421,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { dbname_.c_str(), current_log_size_); int ret = SwitchLog(true); if (ret == 0) { + // after SwitchLog, set current batch's seq to log number + WriteBatchInternal::SetSequence(updates, last_sequence_); continue; } else if (ret == 1) { s = log_->WaitDone(-1); @@ -501,7 +507,7 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { for (uint32_t i = 0; i < lg_list_.size(); ++i) { lg_list_[i]->ReleaseSnapshot(commit_snapshot_); } - commit_snapshot_ = last_sequence_ + WriteBatchInternal::Count(updates); + commit_snapshot_ = batch_seq; } if (created_new_wb) { @@ -516,7 +522,7 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { // Update last_sequence if (updates) { - last_sequence_ += WriteBatchInternal::Count(updates); + last_sequence_ = batch_seq; current_log_size_ += WriteBatchInternal::ByteSize(updates); } if (updates == tmp_batch_) tmp_batch_->Clear(); @@ -833,18 +839,18 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, continue; } WriteBatchInternal::SetContents(&batch, record); - uint64_t first_seq = WriteBatchInternal::Sequence(&batch); - uint64_t last_seq = first_seq + WriteBatchInternal::Count(&batch) - 1; - //Log(options_.info_log, "[%s] batch_seq= %lu, last_seq= %lu, count=%d", - // dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch)); - if (last_seq >= recover_limit) { - Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu", - dbname_.c_str(), recover_limit, first_seq, last_seq); - continue; + uint64_t batch_seq = WriteBatchInternal::Sequence(&batch); + Log(options_.info_log, "[%s] recover batch_seq= %lu, count=%d", + dbname_.c_str(), batch_seq, WriteBatchInternal::Count(&batch)); + + assert(batch_seq <= recover_limit); + if (batch_seq == recover_limit) { + Log(options_.info_log, "[%s] on limit, batch_seq=%lu recover_limit=%lu", + dbname_.c_str(), batch_seq, recover_limit); } - if (last_seq > last_sequence_) { - last_sequence_ = last_seq; + if (batch_seq > last_sequence_) { + last_sequence_ = batch_seq; } std::vector lg_updates; @@ -867,18 +873,20 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, if (lg_updates[i] == NULL) { continue; } - if (last_seq <= lg_list_[i]->GetLastSequence()) { - continue; + if (batch_seq < lg_list_[i]->GetLastSequence() && + recover_limit != kMaxSequenceNumber) { + Log(options_.info_log, "[%s] lg %d seq gone back!! batch_seq=%lu recover_limit=%lu", + dbname_.c_str(), i, batch_seq, recover_limit); + assert(0); } - uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]); - uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1; - // Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n", - // dbname_.c_str(), first, last); + uint64_t lg_batch_seq = WriteBatchInternal::Sequence(lg_updates[i]); + Log(options_.info_log, "[%s] recover log lg = %d batch = %lu, count = %d\n", + dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); if (!lg_s.ok()) { - Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n", - dbname_.c_str(), first, last); + Log(options_.info_log, "[%s] recover log lg = %d fail batch = %lu, count = %d\n", + dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); status = lg_s; } } @@ -1074,10 +1082,10 @@ int DBTable::SwitchLog(bool blocked_switch) { if (!blocked_switch || log::AsyncWriter::BlockLogNum() < options_.max_block_log_number) { if (current_log_size_ == 0) { - last_sequence_++; + last_sequence_ = common::timer::get_unique_micros(last_sequence_); } WritableFile* logfile = NULL; - std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); + std::string log_file_name = LogHexFileName(dbname_, last_sequence_); Status s = env_->NewWritableFile(log_file_name, &logfile); if (s.ok()) { log_->Stop(blocked_switch); diff --git a/src/leveldb/db/repair.cc b/src/leveldb/db/repair.cc index b67fc77b7..1b7cf18e2 100644 --- a/src/leveldb/db/repair.cc +++ b/src/leveldb/db/repair.cc @@ -271,8 +271,8 @@ class Repairer { mem_ = new MemTable(icmp_); mem_->Ref(); } - assert(batch_seq > max_sequence_); - max_sequence_ = batch_seq + WriteBatchInternal::Count(batch) - 1; + assert(batch_seq >= max_sequence_); + max_sequence_ = batch_seq; return WriteBatchInternal::InsertInto(batch, mem_); } bool HasMemTable() const { @@ -610,11 +610,10 @@ class DBRepairer { } WriteBatchInternal::SetContents(&batch, record); uint64_t batch_seq = WriteBatchInternal::Sequence(&batch); - uint64_t batch_count = WriteBatchInternal::Count(&batch); + int batch_count = WriteBatchInternal::Count(&batch); if (batch_seq <= last_sequence_) { - Log(options_.info_log, "[%s] duplicate record, ignore %llu ~ %llu", - dbname_.c_str(), static_cast(batch_seq), - static_cast(batch_seq + batch_count - 1)); + Log(options_.info_log, "[%s] duplicate record, ignore %llu count %d", + dbname_.c_str(), static_cast(batch_seq), batch_count); continue; } diff --git a/src/leveldb/db/write_batch.cc b/src/leveldb/db/write_batch.cc index 6f7ea5f94..f4e407f38 100644 --- a/src/leveldb/db/write_batch.cc +++ b/src/leveldb/db/write_batch.cc @@ -136,15 +136,13 @@ Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const } } - uint64_t last_sequence = WriteBatchInternal::Sequence(this) - + WriteBatchInternal::Count(this) - 1; + uint64_t batch_sequence = WriteBatchInternal::Sequence(this); for (uint32_t i = 0; i < lg_bw->size(); ++i) { if ((*lg_bw)[i] == NULL) { (*lg_bw)[i] = new WriteBatch(); WriteBatchInternal::SetCount((*lg_bw)[i], 0); } - int c = WriteBatchInternal::Count((*lg_bw)[i]); - WriteBatchInternal::SetSequence((*lg_bw)[i], last_sequence - c + 1); + WriteBatchInternal::SetSequence((*lg_bw)[i], batch_sequence); } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); From 60dc60849690c58b4dc78721925596a1413b451f Mon Sep 17 00:00:00 2001 From: lylei Date: Mon, 23 Nov 2015 15:44:16 +0800 Subject: [PATCH 4/5] issue=#451 fix leveldb unitest --- src/leveldb/db/corruption_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc index 73aa397f0..933f0f551 100644 --- a/src/leveldb/db/corruption_test.cc +++ b/src/leveldb/db/corruption_test.cc @@ -129,11 +129,11 @@ class CorruptionTest { uint64_t number; FileType type; std::string fname; - int picked_number = -1; + uint64_t picked_number = 0; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == filetype && - int(number) > picked_number) { // Pick latest file + uint64_t(number) > picked_number) { // Pick latest file fname = db_path + "/" + filenames[i]; picked_number = number; } From 3ee7f9b3d4a15c99aeec545dafea4209131f78d5 Mon Sep 17 00:00:00 2001 From: lylei Date: Tue, 29 Dec 2015 19:09:59 +0800 Subject: [PATCH 5/5] issue=#451 increasing timestamp --- src/io/test/tablet_io_test.cc | 10 ++++++++ src/leveldb/db/db_impl.cc | 3 +++ src/leveldb/db/db_table.cc | 46 ++++++++++++++++++++++++++++------- src/leveldb/db/db_table.h | 2 ++ src/leveldb/db/version_set.cc | 9 ++++--- src/tera_flags.cc | 2 +- 6 files changed, 59 insertions(+), 13 deletions(-) mode change 100644 => 100755 src/io/test/tablet_io_test.cc mode change 100644 => 100755 src/leveldb/db/db_impl.cc mode change 100644 => 100755 src/leveldb/db/db_table.cc mode change 100644 => 100755 src/leveldb/db/db_table.h mode change 100644 => 100755 src/leveldb/db/version_set.cc mode change 100644 => 100755 src/tera_flags.cc diff --git a/src/io/test/tablet_io_test.cc b/src/io/test/tablet_io_test.cc old mode 100644 new mode 100755 index 0ef7099b2..8b02e8bb1 --- a/src/io/test/tablet_io_test.cc +++ b/src/io/test/tablet_io_test.cc @@ -5,6 +5,7 @@ #include "io/tablet_io.h" #include +#include #include "gflags/gflags.h" #include "glog/logging.h" @@ -495,6 +496,15 @@ TEST_F(TabletIOTest, FindAverageKey) { start = ""; end = std::string("\x0", 1); ASSERT_FALSE(TabletIO::FindAverageKey(start, end, &ave)); + + start = "000000000000001480186993"; + end = "000000000000002147352684"; + std::string expect + = "000000000000001263264783_"; + TabletIO::FindAverageKey(start, end, &ave); + std::cerr << start << "-" << end << "-" << expect << "-" << ave << std::endl; + //ASSERT_TRUE(expect != ave); + ASSERT_NE(expect, ave); } } // namespace io } // namespace tera diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc old mode 100644 new mode 100755 index 59a1b124b..6d95d2d10 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -1716,10 +1716,13 @@ uint64_t DBImpl::GetLastSequence(bool is_locked) { } uint64_t retval; if (mem_->GetLastSequence() > 0) { + Log(options_.info_log, "[%s] LL: mem seq=%lu", dbname_.c_str(), mem_->GetLastSequence()); retval = mem_->GetLastSequence(); } else if (imm_ != NULL && imm_->GetLastSequence()) { + Log(options_.info_log, "[%s] LL: imm seq=%lu", dbname_.c_str(), imm_->GetLastSequence()); retval = imm_->GetLastSequence(); } else { + Log(options_.info_log, "[%s] LL: version seq=%lu", dbname_.c_str(), versions_->LastSequence()); retval = versions_->LastSequence(); } if (is_locked) { diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc old mode 100644 new mode 100755 index adbd14194..729676914 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -251,6 +251,7 @@ Status DBTable::Init() { if (last_sequence_ < last_seq) { last_sequence_ = last_seq; } + last_timestamp_ = last_sequence_; } else { Log(options_.info_log, "[%s] fail to recover lg %d", dbname_.c_str(), i); break; @@ -318,7 +319,7 @@ Status DBTable::Init() { } if (s.ok() && !options_.disable_wal) { - std::string log_file_name = LogHexFileName(dbname_, common::timer::get_unique_micros(last_sequence_)); + std::string log_file_name = LogHexFileName(dbname_, GetNewSequenceNumber()); s = options_.env->NewWritableFile(log_file_name, &logfile_); if (s.ok()) { //Log(options_.info_log, "[%s] open logfile %s", @@ -387,9 +388,10 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordWriter* last_writer = &w; WriteBatch* updates = NULL; - SequenceNumber batch_seq = common::timer::get_unique_micros(last_sequence_); + SequenceNumber batch_seq = GetNewSequenceNumber(); if (s.ok()) { updates = GroupWriteBatch(&last_writer); + Log(options_.info_log, "[%s] LL: set sequence to: %lu", dbname_.c_str(), batch_seq); WriteBatchInternal::SetSequence(updates, batch_seq); } @@ -524,7 +526,10 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { // Update last_sequence if (updates) { - last_sequence_ = batch_seq; + if (WriteBatchInternal::Count(updates) != 0) { + last_sequence_ = batch_seq; + } + Log(options_.info_log, "[%s] LL: set last_sequence_->%lu", dbname_.c_str(), last_sequence_); current_log_size_ += WriteBatchInternal::ByteSize(updates); } if (updates == tmp_batch_) tmp_batch_->Clear(); @@ -753,6 +758,7 @@ void DBTable::CompactRange(const Slice* begin, const Slice* end, int lg_no) { // @begin_num: the 1st record(sequence number) should be recover Status DBTable::GatherLogFile(uint64_t begin_num, std::vector* logfiles) { + Log(options_.info_log, "[%s] LL: GatherLogFile begin_num=%lu", dbname_.c_str(), begin_num); std::vector files; Status s = env_->GetChildren(dbname_, &files); if (!s.ok()) { @@ -768,9 +774,13 @@ Status DBTable::GatherLogFile(uint64_t begin_num, if (ParseFileName(files[i], &number, &type) && type == kLogFile && number >= begin_num) { + Log(options_.info_log, "[%s] LL: GatherLogFile push file=%lu", dbname_.c_str(), number); logfiles->push_back(number); } else if (type == kLogFile && number > last_number) { last_number = number; + Log(options_.info_log, "[%s] LL: GatherLogFile set last_number=%lu", dbname_.c_str(), number); + } else { + Log(options_.info_log, "[%s] LL: GatherLogFile other", dbname_.c_str()); } } std::sort(logfiles->begin(), logfiles->end()); @@ -880,17 +890,21 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, } if (batch_seq < lg_list_[i]->GetLastSequence() && recover_limit != kMaxSequenceNumber) { - Log(options_.info_log, "[%s] lg %d seq gone back!! batch_seq=%lu recover_limit=%lu", - dbname_.c_str(), i, batch_seq, recover_limit); - assert(0); + Log(options_.info_log, "[%s] LL: lg %d seq gone back!! batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu", + dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros()); + // assert(0); + continue; + } else { + Log(options_.info_log, "[%s] LL: lg %d batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu", + dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros()); } uint64_t lg_batch_seq = WriteBatchInternal::Sequence(lg_updates[i]); - Log(options_.info_log, "[%s] recover log lg = %d batch = %lu, count = %d\n", + Log(options_.info_log, "[%s] LL: recover log lg = %d batch = %lu, count = %d\n", dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); if (!lg_s.ok()) { - Log(options_.info_log, "[%s] recover log lg = %d fail batch = %lu, count = %d\n", + Log(options_.info_log, "[%s] LL: recover log lg = %d fail batch = %lu, count = %d\n", dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); status = lg_s; } @@ -1000,6 +1014,18 @@ void DBTable::ArchiveFile(const std::string& fname) { fname.c_str(), s.ToString().c_str()); } +uint64_t DBTable::GetNewSequenceNumber() { + uint64_t now = static_cast(common::timer::get_micros()); + if (now <= last_timestamp_) { + last_timestamp_ += 1; + Log(options_.info_log, "[%s] LL: Got %lu, set last_timestamp_ to %lu", + dbname_.c_str(), now, last_timestamp_); + return last_timestamp_; + } + Log(options_.info_log, "[%s] LL: GetNewSequenceNumber %lu", dbname_.c_str(), now); + return now; +} + // tera-specific bool DBTable::FindSplitKey(double ratio, std::string* split_key) { @@ -1106,9 +1132,11 @@ int DBTable::SwitchLog(bool blocked_switch) { if (!blocked_switch || log::AsyncWriter::BlockLogNum() < options_.max_block_log_number) { if (current_log_size_ == 0) { - last_sequence_ = common::timer::get_unique_micros(last_sequence_); + last_sequence_ = GetNewSequenceNumber(); + Log(options_.info_log, "[%s] LL: new log name = %lu", dbname_.c_str(), last_sequence_); } WritableFile* logfile = NULL; + Log(options_.info_log, "[%s] LL: log name = %lu", dbname_.c_str(), last_sequence_); std::string log_file_name = LogHexFileName(dbname_, last_sequence_); Status s = env_->NewWritableFile(log_file_name, &logfile); if (s.ok()) { diff --git a/src/leveldb/db/db_table.h b/src/leveldb/db/db_table.h old mode 100644 new mode 100755 index 69a96de55..c038f4de7 --- a/src/leveldb/db/db_table.h +++ b/src/leveldb/db/db_table.h @@ -156,6 +156,7 @@ class DBTable : public DB { Status DeleteLogFile(const std::vector& log_numbers); void DeleteObsoleteFiles(uint64_t seq_no = -1U); void ArchiveFile(const std::string& filepath); + uint64_t GetNewSequenceNumber(); // return 0: switch log successed // return 1: cannot switch log right now @@ -187,6 +188,7 @@ class DBTable : public DB { log::AsyncWriter* log_; bool force_switch_log_; uint64_t last_sequence_; + uint64_t last_timestamp_; size_t current_log_size_; std::deque writers_; diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc old mode 100644 new mode 100755 index f4ad289d3..e1a0cfa80 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -993,10 +993,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetNextFile(next_file_number_); if (edit->HasLastSequence()) { - Log(options_->info_log, "[%s] LogLastSequence %lu", + Log(options_->info_log, "[%s] LL: editHas LogLastSequence %lu", dbname_.c_str(), edit->GetLastSequence()); assert(edit->GetLastSequence() >= last_sequence_); } else { + Log(options_->info_log, "[%s] LL: edit!Has LogLastSequence %lu", + dbname_.c_str(), last_sequence_); edit->SetLastSequence(last_sequence_); } @@ -1394,8 +1396,9 @@ void VersionSet::Finalize(Version* v) { // // (3) More level0 files means write hotspot. // We give lower score to avoid too much level0 compaction. - score = sqrt(v->files_[level].size() / - static_cast(config::kL0_CompactionTrigger)); + //score = sqrt(v->files_[level].size() / + //static_cast(config::kL0_CompactionTrigger)); + score = v->files_[level].size() / static_cast(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); diff --git a/src/tera_flags.cc b/src/tera_flags.cc old mode 100644 new mode 100755 index 33073c333..4921af385 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -166,7 +166,7 @@ DEFINE_int32(tera_tabletnode_scan_pack_max_size, 10240, "the max size(KB) of the DEFINE_int32(tera_asyncwriter_pending_limit, 10000, "the max pending data size (KB) in async writer"); DEFINE_bool(tera_enable_level0_limit, true, "enable level0 limit"); -DEFINE_int32(tera_tablet_level0_file_limit, 20000, "the max level0 file num before write busy"); +DEFINE_int32(tera_tablet_level0_file_limit, 10, "the max level0 file num before write busy"); DEFINE_int32(tera_asyncwriter_sync_interval, 100, "the interval (in ms) to sync write buffer to disk"); DEFINE_int32(tera_asyncwriter_sync_size_threshold, 1024, "force sync per X KB"); DEFINE_int32(tera_asyncwriter_batch_size, 1024, "write batch to leveldb per X KB");