diff --git a/src/io/test/tablet_io_test.cc b/src/io/test/tablet_io_test.cc old mode 100644 new mode 100755 index be2776f83..26563f508 --- a/src/io/test/tablet_io_test.cc +++ b/src/io/test/tablet_io_test.cc @@ -5,6 +5,7 @@ #include "io/tablet_io.h" #include +#include #include "gflags/gflags.h" #include "glog/logging.h" @@ -514,6 +515,15 @@ TEST_F(TabletIOTest, FindAverageKey) { start = ""; end = std::string("\x0", 1); ASSERT_FALSE(TabletIO::FindAverageKey(start, end, &ave)); + + start = "000000000000001480186993"; + end = "000000000000002147352684"; + std::string expect + = "000000000000001263264783_"; + TabletIO::FindAverageKey(start, end, &ave); + std::cerr << start << "-" << end << "-" << expect << "-" << ave << std::endl; + //ASSERT_TRUE(expect != ave); + ASSERT_NE(expect, ave); } } // namespace io } // namespace tera diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc index 73aa397f0..933f0f551 100644 --- a/src/leveldb/db/corruption_test.cc +++ b/src/leveldb/db/corruption_test.cc @@ -129,11 +129,11 @@ class CorruptionTest { uint64_t number; FileType type; std::string fname; - int picked_number = -1; + uint64_t picked_number = 0; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == filetype && - int(number) > picked_number) { // Pick latest file + uint64_t(number) > picked_number) { // Pick latest file fname = db_path + "/" + filenames[i]; picked_number = number; } diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc old mode 100644 new mode 100755 index 6eb0add29..1849c70ca --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -735,16 +735,9 @@ Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) { recover_mem_ = NewMemTable(); recover_mem_->Ref(); } - uint64_t log_sequence = WriteBatchInternal::Sequence(batch); - uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1; - - // if duplicate record, ignore - if (log_sequence <= recover_mem_->GetLastSequence()) { - assert (last_sequence <= recover_mem_->GetLastSequence()); - Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu", - dbname_.c_str(), log_sequence, last_sequence); - return Status::OK(); - } + + // checked by db_table + assert(WriteBatchInternal::Sequence(batch) >= recover_mem_->GetLastSequence()); Status status = WriteBatchInternal::InsertInto(batch, recover_mem_); MaybeIgnoreError(&status); @@ -1365,7 +1358,7 @@ Status DBImpl::Get(const ReadOptions& options, { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); + LookupKey lkey(key, snapshot, mem->GetLastInternalSeq()); if (mem->Get(lkey, value, options.rollbacks, &s)) { // Done } else if (imm != NULL && imm->Get(lkey, value, options.rollbacks, &s)) { @@ -1772,10 +1765,13 @@ uint64_t DBImpl::GetLastSequence(bool is_locked) { } uint64_t retval; if (mem_->GetLastSequence() > 0) { + Log(options_.info_log, "[%s] LL: mem seq=%lu", dbname_.c_str(), mem_->GetLastSequence()); retval = mem_->GetLastSequence(); } else if (imm_ != NULL && imm_->GetLastSequence()) { + Log(options_.info_log, "[%s] LL: imm seq=%lu", dbname_.c_str(), imm_->GetLastSequence()); retval = imm_->GetLastSequence(); } else { + Log(options_.info_log, "[%s] LL: version seq=%lu", dbname_.c_str(), versions_->LastSequence()); retval = versions_->LastSequence(); } if (is_locked) { diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc old mode 100644 new mode 100755 index 4eb16b52e..379456f45 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -26,6 +26,7 @@ #include "leveldb/table_utils.h" #include "table/merger.h" #include "util/string_ext.h" +#include "../common/timer.h" namespace leveldb { @@ -250,6 +251,7 @@ Status DBTable::Init() { if (last_sequence_ < last_seq) { last_sequence_ = last_seq; } + last_timestamp_ = last_sequence_; } else { Log(options_.info_log, "[%s] fail to recover lg %d", dbname_.c_str(), i); break; @@ -317,7 +319,7 @@ Status DBTable::Init() { } if (s.ok() && !options_.disable_wal) { - std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); + std::string log_file_name = LogHexFileName(dbname_, GetNewSequenceNumber()); s = options_.env->NewWritableFile(log_file_name, &logfile_); if (s.ok()) { //Log(options_.info_log, "[%s] open logfile %s", @@ -402,9 +404,11 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordWriter* last_writer = &w; WriteBatch* updates = NULL; + SequenceNumber batch_seq = GetNewSequenceNumber(); if (s.ok()) { updates = GroupWriteBatch(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence_ + 1); + Log(options_.info_log, "[%s] LL: set sequence to: %lu", dbname_.c_str(), batch_seq); + WriteBatchInternal::SetSequence(updates, batch_seq); } if (s.ok() && !options_.disable_wal && !options.disable_wal) { @@ -414,6 +418,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { s = Status::IOError(dbname_ + ": fail to open log: ", s.ToString()); } else { force_switch_log_ = false; + // after SwitchLog, set current batch's seq to log number + WriteBatchInternal::SetSequence(updates, last_sequence_); } mutex_.Lock(); } @@ -435,6 +441,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { dbname_.c_str(), current_log_size_, slice.size(), wait_sec); int ret = SwitchLog(true); if (ret == 0) { + // after SwitchLog, set current batch's seq to log number + WriteBatchInternal::SetSequence(updates, last_sequence_); continue; } else if (ret == 1) { s = log_->WaitDone(-1); @@ -519,7 +527,7 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { for (uint32_t i = 0; i < lg_list_.size(); ++i) { lg_list_[i]->ReleaseSnapshot(commit_snapshot_); } - commit_snapshot_ = last_sequence_ + WriteBatchInternal::Count(updates); + commit_snapshot_ = batch_seq; } if (created_new_wb) { @@ -534,7 +542,10 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { // Update last_sequence if (updates) { - last_sequence_ += WriteBatchInternal::Count(updates); + if (WriteBatchInternal::Count(updates) != 0) { + last_sequence_ = batch_seq; + } + Log(options_.info_log, "[%s] LL: set last_sequence_->%lu", dbname_.c_str(), last_sequence_); current_log_size_ += WriteBatchInternal::ByteSize(updates); } if (updates == tmp_batch_) tmp_batch_->Clear(); @@ -763,6 +774,7 @@ void DBTable::CompactRange(const Slice* begin, const Slice* end, int lg_no) { // @begin_num: the 1st record(sequence number) should be recover Status DBTable::GatherLogFile(uint64_t begin_num, std::vector* logfiles) { + Log(options_.info_log, "[%s] LL: GatherLogFile begin_num=%lu", dbname_.c_str(), begin_num); std::vector files; Status s = env_->GetChildren(dbname_, &files); if (!s.ok()) { @@ -778,9 +790,13 @@ Status DBTable::GatherLogFile(uint64_t begin_num, if (ParseFileName(files[i], &number, &type) && type == kLogFile && number >= begin_num) { + Log(options_.info_log, "[%s] LL: GatherLogFile push file=%lu", dbname_.c_str(), number); logfiles->push_back(number); } else if (type == kLogFile && number > last_number) { last_number = number; + Log(options_.info_log, "[%s] LL: GatherLogFile set last_number=%lu", dbname_.c_str(), number); + } else { + Log(options_.info_log, "[%s] LL: GatherLogFile other", dbname_.c_str()); } } std::sort(logfiles->begin(), logfiles->end()); @@ -854,18 +870,18 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, continue; } WriteBatchInternal::SetContents(&batch, record); - uint64_t first_seq = WriteBatchInternal::Sequence(&batch); - uint64_t last_seq = first_seq + WriteBatchInternal::Count(&batch) - 1; - //Log(options_.info_log, "[%s] batch_seq= %lu, last_seq= %lu, count=%d", - // dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch)); - if (last_seq >= recover_limit) { - Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu", - dbname_.c_str(), recover_limit, first_seq, last_seq); - continue; + uint64_t batch_seq = WriteBatchInternal::Sequence(&batch); + Log(options_.info_log, "[%s] recover batch_seq= %lu, count=%d", + dbname_.c_str(), batch_seq, WriteBatchInternal::Count(&batch)); + + assert(batch_seq <= recover_limit); + if (batch_seq == recover_limit) { + Log(options_.info_log, "[%s] on limit, batch_seq=%lu recover_limit=%lu", + dbname_.c_str(), batch_seq, recover_limit); } - if (last_seq > last_sequence_) { - last_sequence_ = last_seq; + if (batch_seq > last_sequence_) { + last_sequence_ = batch_seq; } std::vector lg_updates; @@ -888,18 +904,24 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, if (lg_updates[i] == NULL) { continue; } - if (last_seq <= lg_list_[i]->GetLastSequence()) { + if (batch_seq < lg_list_[i]->GetLastSequence() && + recover_limit != kMaxSequenceNumber) { + Log(options_.info_log, "[%s] LL: lg %d seq gone back!! batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu", + dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros()); + // assert(0); continue; + } else { + Log(options_.info_log, "[%s] LL: lg %d batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu", + dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros()); } - uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]); - uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1; - // Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n", - // dbname_.c_str(), first, last); + uint64_t lg_batch_seq = WriteBatchInternal::Sequence(lg_updates[i]); + Log(options_.info_log, "[%s] LL: recover log lg = %d batch = %lu, count = %d\n", + dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); if (!lg_s.ok()) { - Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n", - dbname_.c_str(), first, last); + Log(options_.info_log, "[%s] LL: recover log lg = %d fail batch = %lu, count = %d\n", + dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i])); status = lg_s; } } @@ -1008,6 +1030,18 @@ void DBTable::ArchiveFile(const std::string& fname) { fname.c_str(), s.ToString().c_str()); } +uint64_t DBTable::GetNewSequenceNumber() { + uint64_t now = static_cast(common::timer::get_micros()); + if (now <= last_timestamp_) { + last_timestamp_ += 1; + Log(options_.info_log, "[%s] LL: Got %lu, set last_timestamp_ to %lu", + dbname_.c_str(), now, last_timestamp_); + return last_timestamp_; + } + Log(options_.info_log, "[%s] LL: GetNewSequenceNumber %lu", dbname_.c_str(), now); + return now; +} + // tera-specific bool DBTable::FindSplitKey(double ratio, std::string* split_key) { @@ -1114,10 +1148,12 @@ int DBTable::SwitchLog(bool blocked_switch) { if (!blocked_switch || log::AsyncWriter::BlockLogNum() < options_.max_block_log_number) { if (current_log_size_ == 0) { - last_sequence_++; + last_sequence_ = GetNewSequenceNumber(); + Log(options_.info_log, "[%s] LL: new log name = %lu", dbname_.c_str(), last_sequence_); } WritableFile* logfile = NULL; - std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1); + Log(options_.info_log, "[%s] LL: log name = %lu", dbname_.c_str(), last_sequence_); + std::string log_file_name = LogHexFileName(dbname_, last_sequence_); Status s = env_->NewWritableFile(log_file_name, &logfile); if (s.ok()) { log_->Stop(blocked_switch); diff --git a/src/leveldb/db/db_table.h b/src/leveldb/db/db_table.h old mode 100644 new mode 100755 index 0b18bb22b..56ac19a0c --- a/src/leveldb/db/db_table.h +++ b/src/leveldb/db/db_table.h @@ -158,6 +158,7 @@ class DBTable : public DB { Status DeleteLogFile(const std::vector& log_numbers); void DeleteObsoleteFiles(uint64_t seq_no = -1U); void ArchiveFile(const std::string& filepath); + uint64_t GetNewSequenceNumber(); // return 0: switch log successed // return 1: cannot switch log right now @@ -189,6 +190,7 @@ class DBTable : public DB { log::AsyncWriter* log_; bool force_switch_log_; uint64_t last_sequence_; + uint64_t last_timestamp_; size_t current_log_size_; std::deque writers_; diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc index 68731d667..73f9732b1 100644 --- a/src/leveldb/db/dbformat.cc +++ b/src/leveldb/db/dbformat.cc @@ -69,6 +69,34 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { return r; } +int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const { + // Order by: + // increasing user key (according to user-supplied comparator) + // decreasing sequence number + // decreasing type (though sequence# should be enough to disambiguate) + int r = user_comparator_->Compare(ExtractUserKeyWithInternalSeq(akey), ExtractUserKeyWithInternalSeq(bkey)); + if (r == 0) { + const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 16) >> 8; + const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 16) >> 8; + if (anum > bnum) { + r = -1; + } else if (anum < bnum) { + r = +1; + } else { + const uint64_t a_internal_seq = DecodeFixed64(akey.data() + akey.size() - 8); + const uint64_t b_internal_seq = DecodeFixed64(bkey.data() + bkey.size() - 8); + if (a_internal_seq > b_internal_seq) { + r = -1; + } else if (a_internal_seq < b_internal_seq) { + r = +1; + } else { + r = 0; + } + } + } + return r; +} + void InternalKeyComparator::FindShortestSeparator( std::string* start, const Slice& limit) const { @@ -122,9 +150,9 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { return user_policy_->KeyMayMatch(ExtractUserKey(key), f); } -LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { +LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, SequenceNumber internal_seq) { size_t usize = user_key.size(); - size_t needed = usize + 13; // A conservative estimate + size_t needed = usize + 13 + 8; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; @@ -132,12 +160,14 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { dst = new char[needed]; } start_ = dst; - dst = EncodeVarint32(dst, usize + 8); + dst = EncodeVarint32(dst, usize + 8 + 8); kstart_ = dst; memcpy(dst, user_key.data(), usize); dst += usize; EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); dst += 8; + EncodeFixed64(dst, internal_seq); + dst += 8; end_ = dst; } diff --git a/src/leveldb/db/dbformat.h b/src/leveldb/db/dbformat.h index 9f75f783a..341574d2b 100644 --- a/src/leveldb/db/dbformat.h +++ b/src/leveldb/db/dbformat.h @@ -98,6 +98,11 @@ inline Slice ExtractUserKey(const Slice& internal_key) { return Slice(internal_key.data(), internal_key.size() - 8); } +inline Slice ExtractUserKeyWithInternalSeq(const Slice& internal_key) { + assert(internal_key.size() >= 16); + return Slice(internal_key.data(), internal_key.size() - 16); +} + inline ValueType ExtractValueType(const Slice& internal_key) { assert(internal_key.size() >= 8); const size_t n = internal_key.size(); @@ -115,6 +120,7 @@ class InternalKeyComparator : public Comparator { explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) { } virtual const char* Name() const; virtual int Compare(const Slice& a, const Slice& b) const; + virtual int CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const; virtual void FindShortestSeparator( std::string* start, const Slice& limit) const; @@ -198,7 +204,7 @@ class LookupKey { public: // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. - LookupKey(const Slice& user_key, SequenceNumber sequence); + LookupKey(const Slice& user_key, SequenceNumber sequence, SequenceNumber internal_seq); ~LookupKey(); @@ -206,10 +212,10 @@ class LookupKey { Slice memtable_key() const { return Slice(start_, end_ - start_); } // Return an internal key (suitable for passing to an internal iterator) - Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } + Slice internal_key() const { return Slice(kstart_, end_ - kstart_ - 8); } // Return the user key - Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8 - 8); } private: // We construct a char array of the form: diff --git a/src/leveldb/db/memtable.cc b/src/leveldb/db/memtable.cc index c9f284110..899703f9b 100644 --- a/src/leveldb/db/memtable.cc +++ b/src/leveldb/db/memtable.cc @@ -28,7 +28,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com refs_(0), table_(comparator_, &arena_), empty_(true), - compact_strategy_factory_(compact_strategy_factory) { + compact_strategy_factory_(compact_strategy_factory), + internal_seq_(0) { } MemTable::~MemTable() { @@ -42,7 +43,7 @@ int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) // Internal keys are encoded as length-prefixed strings. Slice a = GetLengthPrefixedSlice(aptr); Slice b = GetLengthPrefixedSlice(bptr); - return comparator.Compare(a, b); + return comparator.CompareWithInternalSeq(a, b); } // Encode a suitable internal key target for "target" and return it. @@ -50,8 +51,12 @@ int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) // into this scratch space. static const char* EncodeKey(std::string* scratch, const Slice& target) { scratch->clear(); - PutVarint32(scratch, target.size()); + PutVarint32(scratch, target.size() + 8); scratch->append(target.data(), target.size()); + char seq[8]; + EncodeFixed64(seq, kMaxSequenceNumber); + Slice seq_slice(seq, 8); + scratch->append(seq_slice.data(), seq_slice.size()); return scratch->data(); } @@ -65,7 +70,10 @@ class MemTableIterator: public Iterator { virtual void SeekToLast() { iter_.SeekToLast(); } virtual void Next() { iter_.Next(); } virtual void Prev() { iter_.Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } + virtual Slice key() const { + Slice internal_key = GetLengthPrefixedSlice(iter_.key()); + return Slice(internal_key.data(), internal_key.size() - 8); + } virtual Slice value() const { Slice key_slice = GetLengthPrefixedSlice(iter_.key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); @@ -96,7 +104,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); - size_t internal_key_size = key_size + 8; + size_t internal_key_size = key_size + 8 + 8; /* key_size + seq_type_size + internal_seq_size*/ const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; @@ -106,25 +114,28 @@ void MemTable::Add(SequenceNumber s, ValueType type, p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; + EncodeFixed64(p, internal_seq_); + p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert(static_cast((p + val_size) - buf) == encoded_len); table_.Insert(buf); - assert(last_seq_ < s || s == 0); + assert(last_seq_ <= s || s == 0); last_seq_ = s; + ++internal_seq_; } bool MemTable::Get(const LookupKey& key, std::string* value, const std::map& rollbacks, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); - if (iter.Valid()) { - // entry format is: - // klength varint32 - // userkey char[klength] - // tag uint64 - // vlength varint32 - // value char[vlength] + if (iter.Valid()) { // entry format is: + // klength varint32 + // userkey char[klength] + // tag uint64 + // internal_seq uint64 + // vlength varint32 + // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. @@ -132,6 +143,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, const std::map(tag & 0xff)) { case kTypeValue: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + Slice v = GetLengthPrefixedSlice(key_ptr + key_length + 8/* jump over internal_seq */); CompactStrategy* strategy = compact_strategy_factory_ ? compact_strategy_factory_->NewInstance() : NULL; if (!strategy || !strategy->Drop(Slice(key_ptr, key_length - 8), 0)) { diff --git a/src/leveldb/db/memtable.h b/src/leveldb/db/memtable.h index ba608550e..31a7e519b 100644 --- a/src/leveldb/db/memtable.h +++ b/src/leveldb/db/memtable.h @@ -72,6 +72,9 @@ class MemTable { SequenceNumber GetLastSequence() const { return last_seq_; } + SequenceNumber GetLastInternalSeq() const { + return internal_seq_; + } bool Empty() { return empty_; } @@ -102,6 +105,7 @@ class MemTable { Table table_; bool empty_; CompactStrategyFactory* compact_strategy_factory_; + SequenceNumber internal_seq_; // No copying allowed MemTable(const MemTable&); diff --git a/src/leveldb/db/repair.cc b/src/leveldb/db/repair.cc index b67fc77b7..1b7cf18e2 100644 --- a/src/leveldb/db/repair.cc +++ b/src/leveldb/db/repair.cc @@ -271,8 +271,8 @@ class Repairer { mem_ = new MemTable(icmp_); mem_->Ref(); } - assert(batch_seq > max_sequence_); - max_sequence_ = batch_seq + WriteBatchInternal::Count(batch) - 1; + assert(batch_seq >= max_sequence_); + max_sequence_ = batch_seq; return WriteBatchInternal::InsertInto(batch, mem_); } bool HasMemTable() const { @@ -610,11 +610,10 @@ class DBRepairer { } WriteBatchInternal::SetContents(&batch, record); uint64_t batch_seq = WriteBatchInternal::Sequence(&batch); - uint64_t batch_count = WriteBatchInternal::Count(&batch); + int batch_count = WriteBatchInternal::Count(&batch); if (batch_seq <= last_sequence_) { - Log(options_.info_log, "[%s] duplicate record, ignore %llu ~ %llu", - dbname_.c_str(), static_cast(batch_seq), - static_cast(batch_seq + batch_count - 1)); + Log(options_.info_log, "[%s] duplicate record, ignore %llu count %d", + dbname_.c_str(), static_cast(batch_seq), batch_count); continue; } diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc old mode 100644 new mode 100755 index 7d94c69ed..15c4e1ddd --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -1001,10 +1001,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetNextFile(next_file_number_); if (edit->HasLastSequence()) { - Log(options_->info_log, "[%s] LogLastSequence %lu", + Log(options_->info_log, "[%s] LL: editHas LogLastSequence %lu", dbname_.c_str(), edit->GetLastSequence()); assert(edit->GetLastSequence() >= last_sequence_); } else { + Log(options_->info_log, "[%s] LL: edit!Has LogLastSequence %lu", + dbname_.c_str(), last_sequence_); edit->SetLastSequence(last_sequence_); } @@ -1441,8 +1443,9 @@ void VersionSet::Finalize(Version* v) { // // (3) More level0 files means write hotspot. // We give lower score to avoid too much level0 compaction. - score = sqrt(v->files_[level].size() / - static_cast(config::kL0_CompactionTrigger)); + //score = sqrt(v->files_[level].size() / + //static_cast(config::kL0_CompactionTrigger)); + score = v->files_[level].size() / static_cast(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); diff --git a/src/leveldb/db/write_batch.cc b/src/leveldb/db/write_batch.cc index 7abcf7262..f4e407f38 100644 --- a/src/leveldb/db/write_batch.cc +++ b/src/leveldb/db/write_batch.cc @@ -136,15 +136,13 @@ Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const } } - uint64_t last_sequence = WriteBatchInternal::Sequence(this) - + WriteBatchInternal::Count(this) - 1; + uint64_t batch_sequence = WriteBatchInternal::Sequence(this); for (uint32_t i = 0; i < lg_bw->size(); ++i) { if ((*lg_bw)[i] == NULL) { (*lg_bw)[i] = new WriteBatch(); WriteBatchInternal::SetCount((*lg_bw)[i], 0); } - int c = WriteBatchInternal::Count((*lg_bw)[i]); - WriteBatchInternal::SetSequence((*lg_bw)[i], last_sequence - c + 1); + WriteBatchInternal::SetSequence((*lg_bw)[i], batch_sequence); } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); @@ -190,11 +188,9 @@ class MemTableInserter : public WriteBatch::Handler { virtual void Put(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeValue, key, value); - sequence_++; } virtual void Delete(const Slice& key) { mem_->Add(sequence_, kTypeDeletion, key, Slice()); - sequence_++; } }; } // namespace diff --git a/src/leveldb/db/write_batch_test.cc b/src/leveldb/db/write_batch_test.cc index c30079d73..2a41f10f1 100644 --- a/src/leveldb/db/write_batch_test.cc +++ b/src/leveldb/db/write_batch_test.cc @@ -72,8 +72,8 @@ TEST(WriteBatchTest, Multiple) { WriteBatchInternal::SetSequence(&batch, 100); ASSERT_EQ(100u, WriteBatchInternal::Sequence(&batch)); ASSERT_EQ(3, WriteBatchInternal::Count(&batch)); - ASSERT_EQ("Put(baz, boo)@102" - "Delete(box)@101" + ASSERT_EQ("Put(baz, boo)@100" + "Delete(box)@100" "Put(foo, bar)@100", PrintContents(&batch)); } @@ -106,14 +106,14 @@ TEST(WriteBatchTest, Append) { b2.Put("b", "vb"); WriteBatchInternal::Append(&b1, &b2); ASSERT_EQ("Put(a, va)@200" - "Put(b, vb)@201", + "Put(b, vb)@200", PrintContents(&b1)); b2.Delete("foo"); WriteBatchInternal::Append(&b1, &b2); ASSERT_EQ("Put(a, va)@200" - "Put(b, vb)@202" - "Put(b, vb)@201" - "Delete(foo)@203", + "Put(b, vb)@200" + "Put(b, vb)@200" + "Delete(foo)@200", PrintContents(&b1)); } diff --git a/src/leveldb/table/block_builder.cc b/src/leveldb/table/block_builder.cc index ade272eaa..4c4d1f520 100644 --- a/src/leveldb/table/block_builder.cc +++ b/src/leveldb/table/block_builder.cc @@ -78,8 +78,10 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { Slice last_key_piece(last_key_); assert(!finished_); assert(counter_ <= options_->block_restart_interval); + /** one batch has the same seq, Compare does not work assert(buffer_.empty() // No values yet? || options_->comparator->Compare(key, last_key_piece) > 0); + */ size_t shared = 0; if (counter_ < options_->block_restart_interval) { // See how much sharing to do with previous string diff --git a/src/leveldb/table/table_builder.cc b/src/leveldb/table/table_builder.cc index 4cbbbb3e6..5450eb93e 100644 --- a/src/leveldb/table/table_builder.cc +++ b/src/leveldb/table/table_builder.cc @@ -103,9 +103,11 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; assert(!r->closed); if (!ok()) return; + /** one batch has the same seq, Compare does not work if (r->num_entries > 0) { assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } + */ if (r->pending_index_entry) { assert(r->data_block.empty()); diff --git a/src/tera_flags.cc b/src/tera_flags.cc old mode 100644 new mode 100755 index 9df85905a..d986cc9d8 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -178,7 +178,7 @@ DEFINE_int32(tera_tabletnode_scan_pack_max_size, 10240, "the max size(KB) of the DEFINE_int32(tera_asyncwriter_pending_limit, 10000, "the max pending data size (KB) in async writer"); DEFINE_bool(tera_enable_level0_limit, true, "enable level0 limit"); -DEFINE_int32(tera_tablet_level0_file_limit, 20000, "the max level0 file num before write busy"); +DEFINE_int32(tera_tablet_level0_file_limit, 10, "the max level0 file num before write busy"); DEFINE_int32(tera_asyncwriter_sync_interval, 100, "the interval (in ms) to sync write buffer to disk"); DEFINE_int32(tera_asyncwriter_sync_size_threshold, 1024, "force sync per X KB"); DEFINE_int32(tera_asyncwriter_batch_size, 1024, "write batch to leveldb per X KB");