LevelDB 阅读笔记

LevelDB 的背景

LevelDB 是 Google 出品的一个单机版 Key-Value 存储引擎,LevelDB 的想法是来自于 Google 大名鼎鼎的 Bigtable,因为 Bigtable 是通过 Google 很多内部的代码实现的,这些代码并没有开源,所以两位作者 Jeffrey Dean 和 Sanjay Ghemawat 做了一个更适合开源的版本,也就是我们现在看到的 LevelDB。LevelDB 是一个适用于写多读少的数据库,里面最核心的思想就是 LSM 树(log-structured merge-tree),以下简称 LSM,LSM 更多是一种思想,LevelDB 通过 LSM 的实现方式,减少随机写的次数,提高了写入的效率,并且通过 Compaction 这种内部的数据整合机制,达到了平衡读写速率的效果。在这里可以看到在每个 Value 都不是非常大(100,000 bytes each)的时候,LevelDB 都有不俗的表现。

LevelDB 总体架构

其实数据库无外乎就是要解决两个问题:怎么存储数据怎么读取数据。为了解决上面的两个核心问题,就需要数据库这样一个调度系统,而内存和硬盘就是被调度的对象,解决好了内存和硬盘之间的互动关系,自然也就能理解数据库是怎么读写数据了。我们可以先看一下 LevelDB 的写入概括图:

LevelDB概括图

在这个图里面确实还有很多别的东西没有包括进来,比如 log 文件,manifest 文件,current 文件等,这些是用来做数据恢复,记录数据库版本的,关于这些部分的内容我以后会慢慢完善,现在暂时不会涉及到。

从概括图我们可以看到会有以下几个比较重要的元素,Memtable, Immutable Memtable, Compaction, Level0 ~ Level6, SSTable, 我们会分成每个小部分来讲解,最后会把这所有的元素联系起来。


Memtable

在 LevelDB 的里面,Memtable 是完全存在于内存里面的,其核心就是一个跳表(skiplist),所有的 KV 都是根据 Key 排好序的,所以在设计 Memtable 的时候,要使用一种数据结构,方便于插入,并且这种插入操作的复杂度不能过高。其实这种数据结构有很多,比如 AVL(Adelson-Velskii and Landis)树,BST(Binary Search Tree)等,这些数据结构的插入复杂度都是 O(logN),但树状数据结构都会有个问题,就是数据如果按照某种特定的顺序插入的话,可能在某一个时间点进行树高的调整,也就是让我们的树更加 balance,而往往这一个开销是比较大的。所以 LevelDB 采用了另一种数据结构,跳表。跳表是一种随机化的数据结构,而跳表是线性结构,所以不需要进行 balance 这个操作,通过随机化,插入复杂度是 amortized O(logN)。

skiplist

我们拿上面这个图来作为例子来对跳表有个基本的认识,在跳表的最下面一层,[4, 15, 20, 34, 40, 45, 55, 57, 60, 90],这个里面存储了跳表的所有元素,并且这些元素都是有序的,而除了最下面一层其他层元素,可以将其他层视为“高速公路”,通过这些“高速公路”我们可以相对比较快的找到我们想要的最底层元素。我们首先规定一下,在上图中,[4, 45]所在的层数为第 0 层,[4, 34, 45, 57]所处的层数为第 1 层,然后[4, 15, 20, 34, 40, 45, 55, 57, 60, 90]所处的层数为第 2 层,然后我们在下面分别分析一下这个数据结构的查找和插入是怎么实现的。

我们先看看 skiplist 里面的节点是怎样,直接上代码:

struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
  }

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }

 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];
};

这里面牵扯到 memory barrier 的概念,memory barrier 是用来处理并发的,这个概念以后我们再解释,主要要注意的是port::AtomicPointer next_[1];,这是个变长的数组,它的长度和节点的高度是一致的,这个后面会用到。

查找

比如我们现在要查找 55 的所在位置。我们的指针一开始在第 0 层元素 4 的位置,我们看同一层元素 4 的下一个指针所指的值,其值为 45,因为 45<55,因此按照跳表的性质,我们知道 55 肯定在 45 后面,所以我们在同一层进行跳跃,现在我们的指针处在第 0 层 45 的位置,然后我们看第 0 层 45 的下一个指针,其值为空,所以我们这时候会往下跳跃,也就是我们现在的指针所在的位置为第 1 层 45 的位置,然后我们还是按照之前的步骤,看第 1 层 45 的下一个元素,其值为 57,因为 55<57 的,所以我们知道 55 这个元素一定在 57 之前,所以这时候我们的指针会从第 1 层 45 的位置继续往下跳,跳到第 2 层 45 的位置,接着我们重复上面的步骤继续在同层跳跃,我们会发现第 2 层 45 的下个元素就是 55 了。

我们来看一下代码上的实现:

typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) { // 如果key在本节点node之后,继续前进
      // Keep searching in this list
      x = next;
    } else {
      if (prev != nullptr) prev[level] = x; // 如果小于本节点node,把本节点的level层上的前节点指针记录进数组prev中
      if (level == 0) {
        return next; // 如果是最后一层,则返回下一个位置
      } else {
        // Switch to next list
        level--; // 否则继续往下跳
      }
    }
  }
}

插入

比如我们现在要插入一个数字 56,我们会先调用FindGreaterOrEqual函数,这个函数会给我们两个信息,第一个是返回值,返回值是新的节点应该插入的位置,另一个是prev数组,每一次在换层的时候,我们prev会记录下当前层最后一个节点(if (prev != nullptr) prev[level] = x;)。那么当我们要插入数字 56 的时候,prev会记录下图中红色的节点。

prev_path

正如我上面提到的,skiplist 是一个随机数化据结构,随机化体现在高度上,我们会随机生成一个节点的高度,比如我们现在随机生成的高度是 3,那么通过prev,我们就可以插入新的节点 56,结果如下图所示。

insert

我们来看一下代码上的实现:

void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);  //查找待插入的位置

  // Our data structure does not allow duplicate insertion
  // 不允许插入相同的数据
  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
    // 并发的问题我们以后再讨论,先搞懂整个流程。
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (nullptr), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since nullptr sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    // 把新插入的节点x的next指针设为prev的next所指的值
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    // 把prev的指针的next设为新的节点
    prev[i]->SetNext(i, x);
  }
}

skiplist 比较核心的东西就是以上这些了,确实我没有提到关于删除的操作,LevelDB 是采用 mark as deleted 的方式来删除数据,在从内存写到硬盘里面的时候会删除这些数据,这里暂时不提及了,以后会慢慢补全。

Immutable Memtable

Immutable Memtable 其实是由 Memtable 转换过来的,当 Memtable 的大小达到了write_buffer_size(默认的是 4MB)的时候,会专门有个线程将指向 Immutable Memtable 的指针指向 Memtable,并且建立一个全新 Memtable,所以 Immutable Memtable 其实本质也就是一个 skiplist,和 Memtable 的区别在于 Immutable Memtable 是只读的。

我们来看看代码是怎么实现的:

Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
    } else if (imm_ != nullptr) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n");
      background_work_finished_signal_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      background_work_finished_signal_.Wait();
    } else {
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

核心代码:

imm_ = mem_;
mem_ = new MemTable(internal_comparator_);
mem_->Ref();

那么我们什么时候会触发这几行代码呢?我们看一下几个else if的条件:

(1) allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger
(2) !force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)
(3) imm_ != nullptr
(4) versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger

当以上条件均不满足的时候就会触发将 Memtable 转换成 Immutable Memtable。也就是说当

  1. 当前的写操作不允许异步写或者当前第 0 层的文件数量小于 8 个文件数量(kL0_SlowdownWritesTrigger)
  2. 当前的写操作不允许异步写或者当前 Memtable 的大小大于 4MB(write_buffer_size)
  3. 当前 Immutable 的指针为空
  4. 当前第 0 层的文件数量小于 12 个文件数量(kL0_StopWritesTrigger)

上述条件均满足的时候时候,就会将 Memtable 转换成 Immutable Memtable。

总的来说,当 Memtable 一直增长到一定的大小(4MB),并且磁盘中 Level0 的文件数量不是那么多的时候,我们就会将 Memtable 转换成 Immutable Memtable,并且可能准备需要做 Compaction 了。关于磁盘中的数据怎么存储,以及 Level 的解释会在后面有所提及。现在只要理解 Memtable 和 Immutable Table 就好了。

SSTable

SSTable 是存在于磁盘上的,我们可以先对 SSTable 有个逻辑结构上的认识。

SSTable

SSTable 中一共存在这几种不同类型的数据,Data Block, Filter Block, Meta Index Block, Index Block, Footer。我们一个个来理解:

Data Block

首先在 LevelDB 里面一个 Block 的大小定义为 4k,在讲 Data Block 的结构之前,我们要先了解一个简单的压缩存储方法,在下文中,Entry 表示的是每个 KV 在 Data Block 中的一条记录。

假设我们现在一共有 4 个 KV 记录,分别是:

  1. abcdefghij : 123
  2. abcdefgk : 1234
  3. abcdlmn : 456
  4. abcdz : holy

我们可以看到,由于这 4 个 KV 的存储是按顺序存储的(这个正是 LevelDB 的特性,自从数据进入内存那一刻,直到写到硬盘上,所有数据都是范围内有序的),他们的 key 存在一些 common prefix,如果我们把所有的 key 都完整的记录下来,其实相当于存储了一些冗杂的信息。一个比较直观的压缩方法是我们可以记录下 common prefix,那么这样的话每一个 Entry 只需要记录下不同的部分就可以了,LevelDB 的做法是这样的:每间隔若干个 KV Entry 将会记录下完整的 Key 的值(这个值叫做block_restart_interval,默认的间隔为 16,也就是 16 个 KV 后就会记录下完整的 Key),每个重新存储完整 key 的点称之为 Restart point。Restart point 的作用是在读取 SSTable 的时候可以加速读取速度。在每个间隔内,我们每次插入一个 KV,将会记录下这个新插入的 Key 的值和上一个插入的 Key 的 common prefix,以及别的信息,我们来看一下 Entry 的结构:

Entry

那么假设我们的block_restart_interval为 3,那么我们的 Entry 将会是这样子的:

Entry_Example

从上图中我们可以看到所有的数据其实是存在一个很长的 string 里面的,这个也是非常中规中矩的数据库存储数据的方式,拿 string 来存储 Key 的值以及 Value 的值很合理,但是我们不要忽略我们也记录下了长度,长度是一个整型数据,LevelDB 里面用了一种自己的编码方式,这个后面再说,这种特殊的编码方式可以将整型数据尽可能小的存储成 string 的形式,只要长度这个整型数据小于 128 就可以当成是一个 byte。

当我们有了这个很长的 string 以后,我们还要记录下每个重新存储完整 key 的点的偏移量,以便后面读取的时候可以提高查找速度,这个正是我们上面提到的 Restart point。还是以我们上图为例子,我们一共有两个 Restart point,分别是第一个 Entry 以及第四个 Entry,所以 Restart point 这个数组记录下来的便是 0(第一个 Entry 在 string 这个数组的偏移量)以及 33(第四个 Entry 在 string 这个数组的偏移量)。

核心代码:

void BlockBuilder::Add(const Slice& key, const Slice& value) {
  Slice last_key_piece(last_key_);
  assert(!finished_);
  assert(counter_ <= options_->block_restart_interval);
  assert(buffer_.empty() // No values yet?
         || options_->comparator->Compare(key, last_key_piece) > 0);
  size_t shared = 0;
  if (counter_ < options_->block_restart_interval) { // 在一个重启点的间隔内
    // See how much sharing to do with previous string
    const size_t min_length = std::min(last_key_piece.size(), key.size()); // 计算和上个key的shared common prefix
    while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
      shared++;
    }
  } else {
    // Restart compression
    restarts_.push_back(buffer_.size()); // 否则记录下重启点的偏移量
    counter_ = 0;
  }
  const size_t non_shared = key.size() - shared;

  // Add "<shared><non_shared><value_size>" to buffer_
  PutVarint32(&buffer_, shared); // key的shared common prefix的大小
  PutVarint32(&buffer_, non_shared); // key的非 shared common prefix的大小
  PutVarint32(&buffer_, value.size()); // Value的大小

  // Add string delta to buffer_ followed by value
  buffer_.append(key.data() + shared, non_shared); // key的非shared common prefix的内容
  buffer_.append(value.data(), value.size()); // value的内容

  // Update state
  last_key_.resize(shared);
  last_key_.append(key.data() + shared, non_shared);
  assert(Slice(last_key_) == key);
  counter_++;
}

当我们写满了一个 data block 之后,最后要记得把重启点也写入我们的 data block,并且记录下重启点数组restarts_的长度。

核心代码:

Slice BlockBuilder::Finish() {
  // Append restart array
  for (size_t i = 0; i < restarts_.size(); i++) {
    PutFixed32(&buffer_, restarts_[i]); // 记录下重启点的位置
  }
  PutFixed32(&buffer_, restarts_.size()); //记录下重启点数组的长度
  finished_ = true;
  return Slice(buffer_);
}

我们现在可以对整个 data block 的结构有个比较完整的认识了,下图就是整个 data block 的结构了:

data_block

Filter Block 以及 Meta Index Block

这一块暂时不提及,这里涉及到 bloom filter 的数据结构。

Index Block

正如我们上面看到的 data block 的组成结构,都是有序的数据块,其实很自然的可以想到了二分搜索,index block 正是这个想法,在读取的时候,index block 可以更方便的用二分搜索来查找。 Index block 的结构其实很简单,主要就是要记录下上一个 data block 最后一个 key 和下一个 data block 第一个 key 的分割值(我们称这个值为 Max Key),以及上一个 data block 的偏移量以及大小(这些信息是存储在一个叫做BlockHandle里面的)。我们可以直接看看 index block 的结构:

index_block

其他的东西都比较好理解,这里解释一下这个 Max Key,比如:

上个 data block 最后一个 key 的值:helloleveldb

下个 data block 第一个 key 的值:helloocean

那么 Max Key 则是 hellom。Max Key 要确保比上个 data block 所有的 key 都要大,比下个 data block 所有的 key 都要小。

核心代码:

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  if (r->pending_index_entry) { //Invariant: r->pending_index_entry is true only if data_block is empty.
    // 当且仅当遇到了一个新的data block的时候,pending_index_entry才为true
    assert(r->data_block.empty());
    r->options.comparator->FindShortestSeparator(&r->last_key, key); // FindShortestSeparator这个函数就是用来计算Max Key的
    std::string handle_encoding;
    r->pending_handle.EncodeTo(&handle_encoding);
    r->index_block.Add(r->last_key, Slice(handle_encoding)); // index block和data block的本质都是一样的,调用的Add函数也是block_builder里面的Add函数
    // 只不过现在的key为Max Key,value为BlockHandle pending_handle,这个里面记录了上个data block的偏移量以及大小
    r->pending_index_entry = false; // 重新将pending_index_entry设为false,等待下次遇到新的data block
  }

  if (r->filter_block != nullptr) {
    r->filter_block->AddKey(key);
  }

  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, value);

  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

Status TableBuilder::Finish() {
  Rep* r = rep_;
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != nullptr) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != nullptr) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  // 这里的代码结构和上面Add函数一致,可以参考上面我写的注释,除了这里会调用WriteBlock函数,下面会有解释
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}

void TableBuilder::Flush() {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->data_block.empty()) return;
  assert(!r->pending_index_entry);
  // 将block写入磁盘,这里调用了WriteBlock函数,Write Block在下面解释
  WriteBlock(&r->data_block, &r->pending_handle);
  if (ok()) {
    // 重新将pending_index_entry设为false,等待下次遇到新的data block
    r->pending_index_entry = true;
    r->status = r->file->Flush();
  }
  if (r->filter_block != nullptr) {
    r->filter_block->StartBlock(r->offset);
  }
}

void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  // File format contains a sequence of blocks where each block has:
  //    block_data: uint8[n]
  //    type: uint8
  //    crc: uint32
  assert(ok());
  Rep* r = rep_;
  Slice raw = block->Finish();

  Slice block_contents;
  CompressionType type = r->options.compression;
  // TODO(postrelease): Support more compression options: zlib?
  // 这里是一些压缩算法,以后慢慢补全
  switch (type) {
    case kNoCompression:
      block_contents = raw;
      break;

    case kSnappyCompression: {
      std::string* compressed = &r->compressed_output;
      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
          compressed->size() < raw.size() - (raw.size() / 8u)) {
        block_contents = *compressed;
      } else {
        // Snappy not supported, or compressed less than 12.5%, so just
        // store uncompressed form
        block_contents = raw;
        type = kNoCompression;
      }
      break;
    }
  }
  // 真正核心在WriteRawBlock函数
  WriteRawBlock(block_contents, type, handle);
  r->compressed_output.clear();
  block->Reset();
}

void TableBuilder::WriteRawBlock(const Slice& block_contents,
                                 CompressionType type,
                                 BlockHandle* handle) {
  Rep* r = rep_;
  // 这里记录下了上个data block的大小以及偏移量
  handle->set_offset(r->offset);
  handle->set_size(block_contents.size());
  r->status = r->file->Append(block_contents);
  if (r->status.ok()) {
    char trailer[kBlockTrailerSize];
    trailer[0] = type;
    // 关于crc的校验的部分我会以后补全,现在并不是重点。
    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
    crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
    EncodeFixed32(trailer+1, crc32c::Mask(crc));
    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    if (r->status.ok()) {
      r->offset += block_contents.size() + kBlockTrailerSize;
    }
  }
}

Footer

最后一部分是 Footer,Footer 记录下了 meta index block 以及 index block 的信息,它位于 table 的底部,Footer 是 SST 文件解析开始的地方,读文件会先读取这一部分的信息,通过 Footer 中记录的这两个关键元信息 Block 的位置(Metaindex Block 以及 Index Block),和前面几个不同的是 footer 的长度是固定的。

核心代码:

Status TableBuilder::Finish() {
  Rep* r = rep_;
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != nullptr) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != nullptr) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    // 记录下metaindex block以及index block的偏移量以及大小
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}

class Footer {
 public:
  Footer() { }

  // The block handle for the metaindex block of the table
  const BlockHandle& metaindex_handle() const { return metaindex_handle_; }
  void set_metaindex_handle(const BlockHandle& h) { metaindex_handle_ = h; }

  // The block handle for the index block of the table
  const BlockHandle& index_handle() const {
    return index_handle_;
  }
  void set_index_handle(const BlockHandle& h) {
    index_handle_ = h;
  }

  void EncodeTo(std::string* dst) const;
  Status DecodeFrom(Slice* input);

  // Encoded length of a Footer.  Note that the serialization of a
  // Footer will always occupy exactly this many bytes.  It consists
  // of two block handles and a magic number.
  // footer是固定长度的,两个handle的大小,以及一个checksum的8byte
  enum {
    kEncodedLength = 2*BlockHandle::kMaxEncodedLength + 8
  };

 private:
  // Handle就是记录下大小和偏移量的封装
  BlockHandle metaindex_handle_;
  BlockHandle index_handle_;
};

目前为止,我们已经把几个核心的部分都介绍完了,我们提到了内存里面的 Memtable,Immutable Memtable,在硬盘上的 SSTable,通过代码也了解到了他们的结构。我们也稍微提及了一下 Memtable 和 Immutable Memtable 转换的过程,但是我们还没讲到最重要的一环,就是整个写入的过程,也就是整个 LevelDB 的数据流,其核心就是 LSM,LevelDB 伴随着 LSM 还有一个机制就是 Compaction。

LSM 以及 Compaction

LSM 这个是整个 LevelDB 的核心问题了,LSM 这个是一个思想,并不能说是一个具体的算法,而 Compaction 是一个 LevelDB 调整内部数据存储结构的过程,是为了达到读写均衡的目的。在网上看了很多关于 LSM 别人写的 blog,我一直没有找到解释的非常透彻的文章,最原生态的 paper 可以在这里找到,但是这是一篇非常 academic 的 paper,阅读起来难度不小。我个人猜测为什么大部分的 blog 并没有讲的很透测的原因是他们并没有深入到硬件结构上面,基本上在讲 LSM 的时候都是一笔带过,然后大部分的篇幅在讲 Compaction 的过程,可是我们学习一个思想,很重要的是要思考为什么,为什么 LSM 是一个适用于写多读少的场景,这个对我们以后的成长才有帮助。

这里扯一点题外话,自己在辞职后,开始反复会问问自己这些个工具是怎么做出来的,为什么要这样做,如果我自己来设计,我会怎么设计,才慢慢体会到了一些“大工匠”的能力,在我看来,一个“大工匠”应该是对整个计算机的底层到高层都非常熟悉的人,正如 LevelDB 的作者,Jeff Dean 和 Sanjay Ghemawat,为了查第一代谷歌的核心系统的 bug,可以从上层的 Application 一直查到 0 和 1 的级别。我曾经听到过一个非常好的比喻,虽然这个比喻是用来形容disruptor的作者的,但我觉得很恰当。这个比喻是这么说的,设计这个工具的人(指 disruptor 的作者),就好比一个经验非常丰富的 F1 赛车手。往往经验丰富的赛车手在坐上车子后,只要通过听发动机的声音,以及在驾驶的时候感受到和往常一些微妙的差别,便可以很清楚的告诉身边的人车子的毛病出在哪一个地方。这样的人在我心里这就是“大工匠”吧!

炉石传说里面的大工匠

废话不多说了,一起先来看看我们计算机硬盘的构造。

硬盘构造

我们来看看传统的磁盘的构造,传统的磁盘是由盘片(platters) 组成的,每个盘片上面会有磁性记录材料,而这些盘片的中央有一个旋转的主轴,这个主轴可以使得盘片以每分钟 5400 ~ 15000 转的速度旋转。每个盘片的表面会有一个个磁道(track),而每个磁道被划分为一个个扇区(sector),一般一个扇区是 512byte,也就是 8 个连续的扇区组成一个文件系统块(block),文件系统块就是文件系统中最小存储单元的抽象,而每个盘面上面都会有一个悬空的 磁头(head),磁头在不同的磁道上面的机械过程,我们称之为寻道(seek)

https://commons.wikimedia.org/wiki/File:Cylinder_Head_Sector.svg

( 感谢此图片由 LionKimbro 提供,来源:https://commons.wikimedia.org/wiki/File:Cylinder_Head_Sector.svg

那么当我们要写入一个文件到我们的磁盘,我们看看需要哪些操作:

  1. 磁头需要移动到对应的磁道(seek time)
  2. 通过磁盘的旋转,磁头需要在磁道上找到对应的扇区(spinning time)
  3. 将数据写入到扇区所需要的传输时间(transfer time)

在上述三个时间中,spinning time 是非常小的,因为盘片的旋转速度非常快,而 transfer time 是固有的时间,也就是写入那么多数据,transfer time 就会必须花费那么多的时间。所以最费时间的就是 seek time 了,seek time 的开销,是所有数据库都在一直试图缩短的时间。在顺序写入的过程中,我们的磁头只需要一次的 seek time 以及 spinning time 在加上固有的 transfer time 我们便可以将数据写入。而随机写的过程中,每一次的写操作,都需要一次 seek time + spinning time + transfer time,大部分的时间全部浪费在了 seek time 上面。

那么传统的数据库(比如 MySQL 的存储引擎 InnoDB)多是以 B+树的形式来维护 index 的,每次写入一个数据的时候,我们最起码要做这几个事情:

  1. Update 我们的 index,也就是 update 我们的 B+树的叶子节点,如果 B+树比较大,叶子节点是存在于磁盘上面的话,这里我们必须要做一次随机写的 IO。
  2. 通过找到的 block 的所在地,将新加入的数据写入磁盘,这里也必须做一次随机写的 IO,这里又会耗费一定的时间。

那么这里就暴露了 B+树索引的一个弊端,通过上面的描述,当我们不停写入数据的时候,我们反复在做上面两个步骤。举个例子,比如我们先插入一个数据 A,我们通过上面两个步骤,找到了 A 所应该在的位置,A 所在的位置应该是 5 号磁道中的 25 号扇区,然后我们将 A 写入磁盘,但是我们下个要写入的数据 B,通过上面两个步骤,找到了 B 所在的位置是 100 号磁道 48 号扇区,那么磁头必须移动到该位置才能写入新的数据。在不停寻找 block 的过程中,时间大量花费在了 seek time 上面,这个正是所谓的随机写所带来的问题。

LSM

我们仔细思考一下上面设计的问题,这个设计最大的弊端在于内存中维护着的是一个 index,也就是我们的 B+树,数据在进入内存的那一刻起,首先需要做两个事情,更新 index,然后才写入硬盘。内存中并不是真正的数据,而是数据应该所在的位置,数据的顺序是依靠着 B+树来维护的,就好像我们经常听到的 Master-Slave 的结构,index 正是我们的 Master,他负责指引数据的走向,内存的东西和硬盘的东西并没有相似之处。

而 LSM 是完全另外一种想法,我们来看看 LevelDB 的写入过程,当一个写入操作发生的时候可以分为几个步骤:

  1. 这个写操作会先写到内存里面,也就是我们的 Memtable(这里会涉及到批量写的问题,LevelDB 用了一个生产者消费者模型,这个会后面会提到)
  2. 当 Memtable 写满了以后,就会出现我们之前介绍 Memtable 提到的情况,LevelDB 会把 Memtable 转换成 Immutable Table(这个条件上面介绍 Memtable 和 Immutable Table 的时候已经提及了),相当于这时候我们 freeze 了我们的 Memtable。
  3. Immutable Table 会慢慢的写入第 0 层的 SSTable,至此,这个数据变成了持久化的存储了。
  4. Compaction 这个机制会不断调整各个 Level 之间的 Table,尽量使得每个 SSTable 不会有过多的 Missed Read,随着 Compaction 的不断调整,低 Level 的一些 table 会慢慢合并到高 Level 的 table 里面。

其中第三步和第四部分别叫做 Minor Compaction 以及 Major Compaction,等介绍到 Compaction 的时候我们再细说。

我们可以看到,当一个数据进入了 LevelDB 的时候,他在内存的 skiplist 上已经是有序的了,然后从内存的 skiplist 往第 0 层的 SSTable 上写,也就是我们的硬盘上,依旧是保持有序的状态。并且最重要的原因是对于整个 SSTable 文件的写入,这时候我们在硬盘上只需要做顺序写就可以了,不存在任何寻找磁道的过程,这样的写法省略了相当多的时间,我们的磁头一直就在做一个事情,就是沿着磁道往里面写!这其实就是为什么 LevelDB 非常适合写多读少的情景。

对 LSM 这个想法有一定了解后,我们来看看真个写入过程

当我们调用put函数的时候,代码会生成一个batch的实例作为数据库的最小写入单元,这个单元的结构如下:

batch

这个结构里面的Type有两个值,分别是kTypeDeletion,还有kTypeValue,在 LevelDB 里面删除操作是一种 Mark as deleted 的操作,通过 type 的控制,我们在 Compaction 阶段会删除掉已经删除的数据,Compaction 在后面会介绍。

在代码中,LevelDB 用了一个 deque 的数据结构来作为一个生产者消费者模型,我们假设同时有 w1, w2, w3, w4, w5, w6 几个并发同时要求写入,这些写操作都是可以竞争 mutext lock 的,这些写操作中会有一个拿到 mutext lock,这个写操作会被 push 到 writers中,这个 writers就是一个 deque 的数据结构,其他写操作暂时没办法拿到 lock,还不能被 push 进这个 deque,比如拿到这个 lock 的写操作是 w1,那么 w1 处在这个 deque 的开头,并且 status 并不是 done,所以会跳出循环,这时候其他的写操作还不能放入 deque 内,在确保有空间写入 Memtable 后(MakeRoomForWrite),我们会做一个BuildBatchGroup,因为现在 deque 里面只有一个写操作,也就是这个写操作就是一个 group,然后我们释放掉这个锁,一旦释放掉这个锁,后面的写操作 w2,w3,w4,w5,w6 就可以就可以继续竞争锁并且放入我们的 deque 里面,并且因为现在 deque 里面已经有一个 w1 写操作了,所以后面的写操作都不在 deque 的最前面,所以都会在 while 循环里面 wait,继而剩下的写操作继续竞争锁,所以我们的 deque 现在可能变成(w3,w5,w2)。然后我们的 w1 会写入 log 以及 memtable,因为这时候只有一个线程在写入我们的 log 以及 memtable,所以是安全的,直到 w1 完成了 log 以及 memtable 的写入,又会竞争到 mutex lock,这时我们的 deque 因为得不到锁,所以不会再被修改了,现在的 deque 可能是(w3,w5,w2,w4)这个状态,然后我们的 w1 会被 pop 出来,接着会唤醒 deque 前面的第一个元素也就是我们的 w3。w3 被唤醒了,发现自己在 deque 最前面,便可以调用BuildBatchGroup了,在这个函数里面会遍历整个 deque,并且把剩下的 w3,w5,w2,w4 都合并成一个 batch,直到 w3 释放了锁,deque 又可以写入新的写操作,会变成(w3, w5, w2, w4, w6, w9, w8)。然后就会和前面一样了,将 batch 写入 log 以及 memtable,w3 会被 pop 出来,对 w5, w2, w4 唤醒,他们会退出循环,再唤醒 w6。整个过程就是这样的,这就是一个生产者消费者模型,只不过这里很巧妙的把其中一个生产者作为了消费者而已。这段代码真的写的很巧妙,需要一定时间去理解。

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != nullptr) {  // nullptr batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

整个写入的过程大概就是这样了,我们上面提到了MakeRoomForWrite这个函数,其实前面介绍 Memtable 怎么转换成 Immutable Table 我也提到了这个函数,这个函数里面有个非常重要的机制,就是 Compaction,其实 LSM 是一种思想,而 Compaction 更是一种具体的做法,也是 LevelDB 里面非常重要的一个保证读写均衡的机制。

Compaction

前面我们介绍了 Memtable,Immutable Table,SSTable,对写入的过程也做了一些介绍,前面也提到了 Memtable 是在什么情况下写入 Immutable Table 的,那么剩下来的问题就是 Immutable Table 是怎么写到 SSTable 的,以及各个 Level 之间的 SSTable 是怎么调整的。这里就是 LevelDB 的 Compaction 机制。

Compaction 分为两种方式,一种是 Minor Compaction,还有一种是 Major Compaction。

在我们看 Compaction 之前,我觉得值得一提的是 Compaction 这个机制在代码中的存在是怎样的。我们前面提到了MakeRoomForWrite,这个函数就是确保在插入数据的时候,Memtable 一定有足够的空间。

Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
    } else if (imm_ != nullptr) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n");
      background_work_finished_signal_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      background_work_finished_signal_.Wait();
    } else {
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

只有在!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)的时候,我们才会有 break 语句,这就是确保我们的 Memtable 有足够的空间写入新的数据。

而当我们的 Memtable 转换成 Immutable Table 的时候,是会触发另一个函数的MaybeScheduleCompaction,我们直接看下这个函数。

void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (background_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == nullptr &&
             manual_compaction_ == nullptr &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    background_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);
  }
}

当我们需要 Compaction 的时候,这个函数其实就是在做两个事情,首先这个函数会开启另一个另一个线程,并且从第一个条件看出,LevelDB 在同一个时刻,是只允许有一个背景线程存在的。这个线程会做两个事情:

  1. 将 Immutable Table 写入到 SSTable
  2. 根据一定条件,将某些 Level 的 SSTable 合并到高一级的 Level SSTable

当 Compaction 完成后,我们会尝试开启另外一个线程,因为可能又有新的 Immutable Table 会写入 SSTable,总之,我们可以看出,LevelDB 会一直有且仅有一个线程在后面跑着做着 Compaction 的工作。

核心代码:

void DBImpl::BGWork(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

void DBImpl::BackgroundCall() {
  MutexLock l(&mutex_);
  assert(background_compaction_scheduled_);
  if (shutting_down_.Acquire_Load()) {
    // No more background work when shutting down.
  } else if (!bg_error_.ok()) {
    // No more background work after a background error.
  } else {
    BackgroundCompaction();
  }

  background_compaction_scheduled_ = false;

  // Previous compaction may have produced too many files in a level,
  // so reschedule another compaction if needed.
  MaybeScheduleCompaction();
  background_work_finished_signal_.SignalAll();
}

Minor Compaction

Minor Compaction 其实就是将 Immutable Table 写入 SSTable 的过程,这个过程的做法也是相当简单的,正如我们前面提到,Immutable Table 是一个 skiplist,那么我们要做的就是遍历整个 skiplist,然后将数据写入 SSTable,直接看代码吧。

核心代码:

void DBImpl::CompactMemTable() {
  mutex_.AssertHeld();
  assert(imm_ != nullptr);

  // Save the contents of the memtable as a new Table
  VersionEdit edit;
  Version* base = versions_->current();
  base->Ref();
  Status s = WriteLevel0Table(imm_, &edit, base);
  base->Unref();

  if (s.ok() && shutting_down_.Acquire_Load()) {
    s = Status::IOError("Deleting DB during memtable compaction");
  }

  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    s = versions_->LogAndApply(&edit, &mutex_);
  }

  if (s.ok()) {
    // Commit to the new state
    imm_->Unref();
    imm_ = nullptr;
    has_imm_.Release_Store(nullptr);
    DeleteObsoleteFiles();
  } else {
    RecordBackgroundError(s);
  }
}

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
                                Version* base) {
  mutex_.AssertHeld();
  const uint64_t start_micros = env_->NowMicros();
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  pending_outputs_.insert(meta.number);
  Iterator* iter = mem->NewIterator();
  Log(options_.info_log, "Level-0 table #%llu: started",
      (unsigned long long) meta.number);

  Status s;
  {
    mutex_.Unlock();
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
    mutex_.Lock();
  }

  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
      (unsigned long long) meta.number,
      (unsigned long long) meta.file_size,
      s.ToString().c_str());
  delete iter;
  pending_outputs_.erase(meta.number);


  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    if (base != nullptr) {
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    }
    edit->AddFile(level, meta.number, meta.file_size,
                  meta.smallest, meta.largest);
  }

  CompactionStats stats;
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
  stats_[level].Add(stats);
  return s;
}

int Version::PickLevelForMemTableOutput(
    const Slice& smallest_user_key,
    const Slice& largest_user_key) {
  int level = 0;
  if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
    // Push to next level if there is no overlap in next level,
    // and the #bytes overlapping in the level after that are limited.
    InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
    std::vector<FileMetaData*> overlaps;
    while (level < config::kMaxMemCompactLevel) {
      if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
        break;
      }
      if (level + 2 < config::kNumLevels) {
        // Check that file does not overlap too many grandparent bytes.
        GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
        const int64_t sum = TotalFileSize(overlaps);
        if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
          break;
        }
      }
      level++;
    }
  }
  return level;
}

其实我们可以在PickLevelForMemTableOutput看到,Immutable Table 并不一定是放到 Level0 的,但这里我们要说一个 LevelDB 的 SSTable 文件结构。

Imutable Table 虽然可以一开始就放到 Level1,但是要确保的是和 Level1 其他的 SSTable 的 Key 值是没有重复的,并且和 Level2 的 SSTable 不能够有过多的重复的 Key,但是如果直接放到 Level0 就没有这个要求。对于 LevelDB 来说,Level0 的 SSTable,因为是直接从 Immutable Table 放进来的,所以 Level0 的 SSTable 是会有重复的 Key 的,正是这个原因,我们在读取 LevelDB 数据的时候,如果内存中的 Memtable 和 Immutable Table 都 miss 了的话,我们只能够将 Level0 的所有 SSTable 都读一遍,虽然有 Cache,但是为了数据的准确性,这个是必须的。

如果我们一直往 Level0 里面放数据,而 Level0 的 SSTable 他们的 Key 是有重复的,随着数据越来越多,Level0 的 SSTable 也会越来越多,读性能会很差,LevelDB 用了 Major Compaction 这个机制确保在 level >= 1 的情况下,同 Level 的 SSTable 是不可以有重复 Key 的,这也就是为什么上面PickLevelForMemTableOutput,Immutable Table 并不一定是放到 Level0 的,要确保的是和 Level1 其他的 SSTable 的 Key 值是没有重复的,并且和 Level2 的 SSTable 不能够有过多的重复的 Key。

Major Compaction

触发 Major Compaction 有以下三个条件:

  1. 第 0 层的文件超过了上限(4 个)
  2. 当第 i 层的文件总的大小超过(10 ^ i) MB(i 表示的是层数)
  3. 当某个文件 Missed Read 的次数过多

Major Compaction 的优先级也是跟着这个来的。条件 1 保证的是确保 Memtable 有足够的空间给予我们新插入的数据,当第 0 层文件过多,而我们又要做 Compaction 的时候,第 0 层一定是优先级最高的,因为我们必须腾出空间给我们新的数据去写入内存。条件 2 是为了不让某一层的文件的总的大小过大,LevelDB 中除了最后一层,其余层数的所有文件大小是有要求的,score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);当这个得分过高的时候,证明了该层需要做 Compaction,但是为什么当某一层的文件总大小超过一定数量就需要 Compaction 呢,这个问题是这样解释的,因为 SSTable 是写在硬盘上的,也就是每次读取,最多只会访问一个 SSTable,那么这个消耗是固定的,那么我们怎样才能更快,答案就是降低 Compaction 的开销,如果某一层的文件过大了,Compaction 的开销自然会大,所以尽量要均摊我们每一层的文件大小。关于第三个条件,作者 Jeff Dean 是这么认为的,一次 IO 寻道时间(seek time)为 10ms,读写 1MB 为 10ms(transfer time),也就是(100MB/s),那么对一个 1MB 文件做 Compaction 的时间为,读这个 1MB 文件的时间+下一层读 12MB(最坏情况)+写入下层 12MB(最坏情况)的总时间,也就是 25MB 数据的 IO,也就是 250ms,也就是相当于 25 次寻道时间,就可以做一次 1MB 数据的 Compaction 了,也就是 1 次的寻道时间大约等于 40kb 数据的 Compaction 时间,但 Jeff dean 觉得我们应该保守点的认为一次寻道时间大概约等于 16kb 的数据,所以也就是当一个文件最大允许 Missed Read 的次数为f->allowed_seeks = (f->file_size / 16384);.

bool Version::UpdateStats(const GetStats& stats) {
  FileMetaData* f = stats.seek_file;
  if (f != nullptr) {
    f->allowed_seeks--;
    if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
      file_to_compact_ = f;
      file_to_compact_level_ = stats.seek_file_level;
      return true;
    }
  }
  return false;
}

Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != nullptr);
  if (size_compaction) {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level+1 < config::kNumLevels);
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {
    level = current_->file_to_compact_level_;
    c = new Compaction(options_, level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return nullptr;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  SetupOtherInputs(c);

  return c;
}

这里有一个 version 的东西,这个可以单独拿出来讲一个篇章,version 我自己也还没有深入看,但是 version 会记录下当前数据库每一层上次做 compaction 的最大的 key 的值compact_pointer_,把 version 当成是一个记录数据库状态的类就可以了。这里要注意一下 Level0 的 SSTable,正如我前面说的 Level0 的几个 SSTable 是有重复的 Key 的,所以在确定了输入的 Level0 的文件后,要在 Level0 里面先寻找有重叠 Key 的文件,然后我们去 Level1 的文件找和上面几个 Level0 有重叠的文件,当我们确定了 Level1 的文件后,还要反过来再看 Level0 的几个文件,因为新确定的 Level1 的文件里面,可能包含了一些 key,而这些 key 也存在于别的 Level0 的文件中,而一开始这些 Level0 的文件并不存在于我们一开始确定的输入文件里面。这个也就是写放大的问题。

举个例子,比如红色的 Level0 的文件为初始输入文件,橙色为同层的有 overlaping key 的输入文件,蓝色为下一层的输入文件,而最外围的紫色的框才是最终的输入文件。

write_expand

最后我们来看一下 Compaction 的过程了。首先考虑一种最简单的情况,第 i 层的文件和第 i+1 层的文件没有任何的 key 重叠,并且与第 i+2 层的 key 重叠的数量不是很多,我们叫这种为IsTrivialMove,这时候情况很简单,只要直接第 i 层的文件往下移一层就可以了。

bool Compaction::IsTrivialMove() const {
  const VersionSet* vset = input_version_->vset_;
  // Avoid a move if there is lots of overlapping grandparent data.
  // Otherwise, the move could create a parent file that will require
  // a very expensive merge later on.
  return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
          TotalFileSize(grandparents_) <=
              MaxGrandParentOverlapBytes(vset->options_));
}

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  if (imm_ != nullptr) {
    CompactMemTable();
    return;
  }

  Compaction* c;
  bool is_manual = (manual_compaction_ != nullptr);
  InternalKey manual_end;
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == nullptr);
    if (c != nullptr) {
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
    Log(options_.info_log,
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
        m->level,
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  } else {
    c = versions_->PickCompaction();
  }

  Status status;
  if (c == nullptr) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) { // 这里就是最简单的情况了。
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    VersionSet::LevelSummaryStorage tmp;
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
        static_cast<unsigned long long>(f->number),
        c->level() + 1,
        static_cast<unsigned long long>(f->file_size),
        status.ToString().c_str(),
        versions_->LevelSummary(&tmp));
  } else {
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact); // 这里就是最核心的Compaction了
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();
  }
  delete c;

  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
    Log(options_.info_log,
        "Compaction error: %s", status.ToString().c_str());
  }

  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    if (!status.ok()) {
      m->done = true;
    }
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
      m->tmp_storage = manual_end;
      m->begin = &m->tmp_storage;
    }
    manual_compaction_ = nullptr;
  }
}

接下来我们看一下最硬核的DoCompactionWork的部分了,这部分的代码很长,是一个 200 行的代码,我会把一些核心的代码提取出来:

首先我们会获取一个 iterator,在前面的代码中,我们会把需要合并的文件放入 compaction 中,这里就是获取一个可以可以找出每一个 input 的 KV 的迭代器。

Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();

接下来就是 iterate 我们所有的 input 文件,是一个巨大的 loop:

 for (; input->Valid() && !shutting_down_.Acquire_Load(); )

每次循环一开始都要看看我们的 Immutable Table 是不是为空,如果为空,那么先 Compact Immtable Table,这里是保证 Compaction 的优先级,确保新进来的数据可以写进内存的 Memtable。

接着我们会提取出我们的 KV 值,然后我们需要做一个判断,判断当前这个 KV 准备写入的 SSTable 是不是会与其下一层的 SSTable 有过多的 overlap,如果 overlap 的大小太大了,就不要将这个 KV 写入当前的 SSTable 了,当前的 SSTable 可以直接结束写入了。因为如果当前的 output 和下一层的 SSTable 有过多的 Key 的 overlap,下次 Compaction 还是要处理新生成的 SSTable,并且正是因为当前 SSTable 和下一层的好几个 SSTable 重复的 Key 太多了,下次 Compaction 就会比较耗时,Compaction 会更加成为一个 performance 的瓶颈。

 int64_t overlapped_bytes_;  // Bytes of overlap between current output
                              // and grandparent files

bool Compaction::ShouldStopBefore(const Slice& internal_key) {
  const VersionSet* vset = input_version_->vset_;
  // Scan to find earliest grandparent file that contains key.
  const InternalKeyComparator* icmp = &vset->icmp_;
  while (grandparent_index_ < grandparents_.size() &&
      icmp->Compare(internal_key,
                    grandparents_[grandparent_index_]->largest.Encode()) > 0) {
    if (seen_key_) {
      overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
    }
    grandparent_index_++;
  }
  seen_key_ = true;

  if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
    // Too much overlap for current output; start new output
    overlapped_bytes_ = 0;
    return true;
  } else {
    return false;
  }
}

接着是判断当前的 Key 是不是第一次遇到,如果以前遇到过了,说明这个 Key 已经过期了,就可以丢掉了,因为第一次遇到的那个 Key 一定是更加“新鲜”的。所以旧的可以丢弃掉。

if (!has_current_user_key ||
        user_comparator()->Compare(ikey.user_key,
                                   Slice(current_user_key)) != 0) {
      // First occurrence of this user key
      current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
      has_current_user_key = true;
      last_sequence_for_key = kMaxSequenceNumber;
    }

然后会有另外的判断,这里牵扯到了快照(snap_shot)以及 sequence 的部分,这部分我还没有读透,先不提及,但是值得一提的是我前面提到了 LevelDB 是一种 Mark as Deleted 的做法,前面说到写操作单元的时候,里面有个 type,kTypeDeletion就是在这里用到的,Compaction 的时候如果遇到了这个 type,就不需要再将这个数据保留下来了。

if (last_sequence_for_key <= compact->smallest_snapshot) {
        // Hidden by an newer entry for same user key
        drop = true;    // (A)
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= compact->smallest_snapshot &&
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
      }

      last_sequence_for_key = ikey.sequence;

如果上面的条件都没有满足,证明了这个 KV 是可以下入新的 SSTable 的,代码如下:

 if (!drop) {
    // Open output file if necessary
    if (compact->builder == nullptr) {
      status = OpenCompactionOutputFile(compact);
      if (!status.ok()) {
        break;
      }
    }
    if (compact->builder->NumEntries() == 0) {
      compact->current_output()->smallest.DecodeFrom(key);
    }
    compact->current_output()->largest.DecodeFrom(key);
    compact->builder->Add(key, input->value()); // 将KV加入新的SSTable

    // Close output file if it is big enough
    // 确保SSTable的size不会过大
    if (compact->builder->FileSize() >=
        compact->compaction->MaxOutputFileSize()) {
      status = FinishCompactionOutputFile(compact, input);
      if (!status.ok()) {
        break;
      }
    }
  }

这里值得看的东西是MaxOutputFileSize,这个值的大小是根据当前所在的层数决定的,当我们发现新生成的 SSTable 已经够大了,就需要停止写入了,转而写入新的 SSTable,其实也就是一次的 Compaction,有可能产生多个 SSTable 的。下面这个图可能可以更直观看到 Compaction 的过程。

major_compaction

到此为止,Compaction 里面重要的部分将的也差不多了,Compaction 是 LevelDB 里面最硬核的东西了,其实我们回过头来看看,Compaction 的目的是什么。

  1. Minor Compaction 的目的很明确,这也是优先级最高的 Compaction,确保我们新的数据写进来的时候,不会因为 Immutable Table 满了,或者是 Level0 的 SSTable 满了而写不进来。
  2. Major Compaction 使得我们的数据在每一层 Level 都更加均衡,并且保证在 Level>=1 是没有重叠的 Key 的,这个其实是为了提高读取的性能,否则每一次读都要做个 full scan。
  3. 在 Compaction 的时候,我们会删除一些已经被删掉的旧的 KV。因为 LevelDB 的删除机制是 Mark as Deleted,每个数据都是 Append 进来的,所以写的速度是比较快的,但是弊端就是硬盘上会有旧的、重复的垃圾数据,而这些数据在慢慢 Compaction 的过程中会被删除。
  4. 减少整个读写系统不必要的开销,这个就是上面提到的计算,每次 Missed Read 都会造成资源的浪费,Compcation 把 SSTable 重组,这样就可以相对减少某个 SSTable 的 Missed Read 的次数,减少不必要的开销。

小结

传统数据库的引擎多采用的是 B+树,我读书的时候拜读了一部分数据库系统组成原理,所以我自己刚看 LevelDB 的思想的时候,觉得 LSM 是一种很反常规的做法,确实花了不少时间去阅读代码,理解 LSM 的想法,和传统的方法相比,我个人觉得 LevelDB 更好的发挥了 Memory 和 disk 各自的优势,避免了各自的劣势(内存快,我们就尽快把数据往里面写,硬盘慢,我们就尽量顺序写,写的东西虽然不是那么的“美观”,可能有重复数据等,那就开个线程慢慢调整。),而不是像 B 树一样,把内存当成一个指挥官,这个指挥官会来控制写入读取,这样忽略了写盘时磁头运动所产生的大量的消耗。正如我提到的其实数据库就是一个调度系统,是玩一个怎么让内存和硬盘相互协调工作的游戏,其实我们现在用的一些分布式的大轮子(Redis,HBase,Cassendra 等),在单机上面也是玩这个游戏,就好像 Bigtable 下面的 LevelDB 一样。Skiplist,开链哈希,bloom filter,LSM,版本号,Recovery 等小工具,在上面的轮子里面都是普遍存在的。

还有一些比较重要的东西没在这里提到,比如用到的 Lock-free programming,多线程的处理,LevelDB 编码方式,版本号,快照,Cache,log,以及怎么 recovery 等,因为自己还没有觉得有把握说 100%都理解了,所以也不敢写出来,我会慢慢再看看剩余的部分,然后再在这里填坑。确实有一些地方还没有讲的很透彻,甚至自己也可能理解上会有偏差,如果发现了这样的地方,也欢迎大家多交流。