如果返回值为0则表示相同,否则会返回差值,这里是按照ascll的顺序来进行比较的
// Return true iff "x" is a prefix of "*this" bool starts_with(const Slice& x) const { return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0)); }下面还有两个切片的比较函数
假如说我们调用a.compare(b)
那么比较的逻辑就是先看a和b谁比较长一点。
然后取较小的长度去进行比较
如果在较小的长度内a和b是相同的,那么就是谁长谁就更大
如果在较小的长度内a和b不是想同的,那么就以较小长度内的比较为准
inline int Slice::compare(const Slice& b) const { const size_t min_len = (size_ < b.size_) ? size_ : b.size_; int r = memcmp(data_, b.data_, min_len); if (r == 0) { if (size_ < b.size_) r = -1; else if (size_ > b.size_) r = +1; } return r; }写一个测试代码
auto a = new leveldb::Slice("123"); leveldb::Slice b; b = leveldb::Slice("21"); std:: cout << "a 与 b 的比较结果" << a->compare(b) << std::endl;上述代码会输出-1表示a < b这符合我们的预期
好了我们上面知道了key和value都是以切片的形式进行存储的。ok下面继续分析写操作
随后进入db/db_imp.cc中的DB::Put函数
// Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }这里会定义一个writeBatch来进行写入操作,它会调用batch.put来实现原子写。
不过我们前面有说写操作会先写Log来防止出现意外。而数据则会先写入memtable中
Write Log操作
在调用write(opt, &batch)的时候
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); // 排队写入,直到我们在 front while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));上述操作就是写入log的操作。而下面的代码则是写入memTable
if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); // 这里写入,mem_ 是 MemTable, }关于log写入的具体分析后面会在log详解的时候分析
写入memtable
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter); }上面的代码最终会执行到。memtable.cc中
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; // 为要put的key value 创建空间 char* buf = arena_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); // copy进去 std::memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); // 将索引写入SkipList table_.Insert(buf); }前面说过当memtable满了之后会写入磁盘也就是sstable。对应的代码在MakeRoomForWrite后面再仔细分析了
Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); 2. 读操作的实现同样的还是通过debug的方式追踪代码
代码位于db_impl.cc:DBImpl::Get
// Unlock while reading from files and memtables { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { // Done } else if (imm != nullptr && imm->Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); } if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); return s;可以发现读操作会先根据key值做一个查找操作loocupKey
随后去memtable中查找。如果memtable没有则会去 immutable中寻找
如果上面两个地方都查不到的话最后则要去sstable中查找。
4. 总结