leveldb源码分析

开始

leveldb是由Google两位大牛开发的单机KV存储系统,涉及到了skip list、内存KV table、LRU cache管理、table文件存储、operation log系统等。开始之前先来看看Leveldb的基本框架,几大关键组件

leveldb是一种基于operation log的文件系统,是Log-Structured-Merge Tree的典型实现。LSM源自Ousterhout和Rosenblum在1991年发表的经典论文<<The Design and Implementation of a Log-Structured File System >>。

由于采用了op log,它就可以把随机的磁盘写操作,变成了对op log的append操作,因此提高了IO效率,最新的数据则存储在内存memtable中。

op log文件大小超过限定值时,就定时做check point。Leveldb会生成新的Log文件和Memtable,后台调度会将Immutable Memtable的数据导出到磁盘,形成一个新的SSTable文件。SSTable就是由内存中的数据不断导出并进行Compaction操作后形成的,而且SSTable的所有文件是一种层级结构,第一层为Level 0,第二层为Level 1,依次类推,层级逐渐增高,这也是为何称之为LevelDb的原因。

1. 一些约定

先说下代码中的一些约定:

1.1 字节序

Leveldb对于数字的存储是little-endian的,在把int32或者int64转换为char*的函数中,是按照先低位再高位的顺序存放的,也就是little-endian的。

1.2 VarInt

把一个int32或者int64格式化到字符串中,除了上面说的little-endian字节序外,大部分还是变长存储的,也就是VarInt。对于VarInt,每byte的有效存储是7bit的,用最高的8bit位来表示是否结束,如果是1就表示后面还有一个byte的数字,否则表示结束。直接见Encode和Decode函数。

在操作log中使用的是Fixed存储格式。

1.3 字符比较

是基于unsigned char的,而非char。

2. 基本数据结构

别看是基本数据结构,有些也不是那么简单的,像LRU Cache管理和Skip list那都算是leveldb的核心数据结构。

2.1 Slice

Leveldb中的基本数据结构:

  1. 包括length和一个指向外部字节数组的指针。
  2. 和string一样,允许字符串中包含’\0’。

提供一些基本接口,可以把const char和string转换为Slice;把Slice转换为string,取得数据指针const char。

2.2 Status

Leveldb 中的返回状态,将错误号和错误信息封装成Status类,统一进行处理。并定义了几种具体的返回状态,如成功或者文件不存在等。

为了节省空间Status并没有用std::string来存储错误信息,而是将返回码(code), 错误信息message及长度打包存储于一个字符串数组中。

成功状态OK 是NULL state,否则state 是一个包含如下信息的数组:

1
2
3
state_[0..3] == 消息message长度 
state_[4] == 消息code
state_[5..] ==消息message

2.3 Arena

Leveldb的简单的内存池,它所作的工作十分简单,申请内存时,将申请到的内存块放入std::vector blocks_中,在Arena的生命周期结束后,统一释放掉所有申请到的内存,内部结构如图2.3-1所示。

Arena主要提供了两个申请函数:其中一个直接分配内存,另一个可以申请对齐的内存空间。

Arena没有直接调用delete/free函数,而是由Arena的析构函数统一释放所有的内存。

应该说这是和leveldb特定的应用场景相关的,比如一个memtable使用一个Arena,当memtable被释放时,由Arena统一释放其内存。

2.4 Skip list

Skip list(跳跃表)是一种可以代替平衡树的数据结构。Skip lists应用概率保证平衡,平衡树采用严格的旋转(比如平衡二叉树有左旋右旋)来保证平衡,因此Skip list比较容易实现,而且相比平衡树有着较高的运行效率。

从概率上保持数据结构的平衡比显式的保持数据结构平衡要简单的多。对于大多数应用,用skip list要比用树更自然,算法也会相对简单。由于skip list比较简单,实现起来会比较容易,虽然和平衡树有着相同的时间复杂度(O(logn)),但是skip list的常数项相对小很多。skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少),并且不需要存储保持平衡的变量。

如图2.4-1所示。

在Leveldb中,skip list是实现memtable的核心数据结构,memtable的KV数据都存储在skip list中。

2.5 Cache

Leveldb内部通过双向链表实现了一个标准版的LRUCache,先上个示意图,看看几个数据之间的关系,如图2.5-1。

Leveldb实现LRUCache的几个步骤

接下来说说Leveldb实现LRUCache的几个步骤,很直观明了。

S1

定义一个LRUHandle结构体,代表cache中的元素。它包含了几个主要的成员:

1
void* value; 

这个存储的是cache的数据;

1
void (*deleter)(const Slice&, void* value);

这个是数据从Cache中清除时执行的清理函数;

后面的三个成员事关LRUCache的数据的组织结构:

1
LRUHandle *next_hash;

指向节点在hash table链表中的下一个hash(key)相同的元素,在有碰撞时Leveldb采用的是链表法。最后一个节点的next_hash为NULL。

1
LRUHandle *next, *prev;

节点在双向链表中的前驱后继节点指针,所有的cache数据都是存储在一个双向list中,最前面的是最新加入的,每次新加入的位置都是head->next。所以每次剔除的规则就是剔除list tail。

S2

Leveldb自己实现了一个hash table:HandleTable,而不是使用系统提供的hash table。这个类就是基本的hash操作:Lookup、Insert和Delete

Hash table的作用是根据key快速查找元素是否在cache中,并返回LRUHandle节点指针,由此就能快速定位节点在hash表和双向链表中的位置。

它是通过LRUHandle的成员next_hash组织起来的。

HandleTable使用LRUHandle list_存储所有的hash节点,其实就是一个二维数组,**一维是不同的hash(key),另一维则是相同hash(key)的碰撞list。

每次当hash节点数超过当前一维数组的长度后,都会做Resize操作:

1
LRUHandle** new_list = new LRUHandle*[new_length];

然后复制list到new_list中,并删除旧的list

S3

基于HandleTable和LRUHandle,实现了一个标准的LRUcache,并内置了mutex保护锁,是线程安全的。

其中存储所有数据的双向链表是LRUHandle lru_,这是一个list head;

Hash表则是HandleTable table_;

S4

ShardedLRUCache类,实际上到S3,一个标准的LRU Cache已经实现了,为何还要更近一步呢?答案就是速度!

为了多线程访问,尽可能快速,减少锁开销,ShardedLRUCache内部有16个LRUCache,查找Key时首先计算key属于哪一个分片,分片的计算方法是取32位hash值的高4位,然后在相应的LRUCache中进行查找,这样就大大减少了多线程的访问锁的开销。

1
LRUCache shard_[kNumShards]

它就是一个包装类,实现都在LRUCache类中。

2.6 其它

此外还有其它几个Random、Hash、CRC32、Histogram等,都在util文件夹下,不仔细分析了。

3.Int Coding

轻松一刻,前面约定中讲过Leveldb使用了很多VarInt型编码,典型的如后面将涉及到的各种key。其中的编码、解码函数分为VarInt和FixedInt两种。int32和int64操作都是类似的。

3.1 Eecode

首先是FixedInt编码,直接上代码,很简单明了。

1
2
3
4
5
6
7
8
9
10
11
void EncodeFixed32(char* buf, uint32_t value)
{
if (port::kLittleEndian) {
memcpy(buf, &value,sizeof(value));
} else {
buf[0] = value & 0xff;
buf[1] = (value >> 8)& 0xff;
buf[2] = (value >> 16)& 0xff;
buf[3] = (value >> 24)& 0xff;
}
}

下面是VarInt编码,int32和int64格式,代码如下,有效位是7bit的,因此把uint32按7bit分割,对unsigned char赋值时,超出0xFF会自动截断,因此直接*(ptr++) = v|B即可,不需要再把(v|B)与0xFF作&操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
char* EncodeVarint32(char* dst, uint32_t v)
{
unsigned char* ptr =reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
if (v < (1<<7)) {
*(ptr++) = v;
} else if (v < (1<<14)){
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)){
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)){
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}

// 对于uint64,直接循环
char* EncodeVarint64(char* dst, uint64_t v) {
static const int B = 128;
unsigned char* ptr =reinterpret_cast<unsigned char*>(dst);
while (v >= B) {
*(ptr++) = (v & (B-1)) |B;
v >>= 7;
}
*(ptr++) =static_cast<unsigned char>(v);
returnreinterpret_cast<char*>(ptr);
}

3.2 Decode

Fixed Int的Decode,操作,代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline uint32_t DecodeFixed32(const char* ptr)
{
if (port::kLittleEndian) {
uint32_t result;
// gcc optimizes this to a plain load
memcpy(&result, ptr,sizeof(result));
return result;
} else {
return((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
|(static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) <<8)
| (static_cast<uint32_t>(static_cast<unsignedchar>(ptr[2])) << 16)
|(static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) <<24));
}
}

再来看看VarInt的解码,很简单,依次读取1byte,直到最高位为0的byte结束,取低7bit,作(<<7)移位操作组合成Int。看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const char* GetVarint32Ptr(const char* p,
const char* limit,
uint32_t* value)
{
if (p < limit) {
uint32_t result =*(reinterpret_cast<const unsigned char*>(p));
if ((result & 128) == 0) {
*value = result;
return p + 1;
}
}
return GetVarint32PtrFallback(p,limit, value);
}

const char* GetVarint32PtrFallback(const char* p,
const char* limit,
uint32_t* value)
{
uint32_t result = 0;
for (uint32_t shift = 0; shift<= 28 && p < limit; shift += 7) {
uint32_t byte =*(reinterpret_cast<const unsigned char*>(p));
p++;
if (byte & 128) { // More bytes are present
result |= ((byte & 127)<< shift);
} else {
result |= (byte <<shift);
*value = result;
returnreinterpret_cast<const char*>(p);
}
}
return NULL;
}

4. Memtable之一

Memtable是leveldb很重要的一块,leveldb的核心之一。我们肯定关注KV数据在Memtable中是如何组织的,秘密在Skip list中。

4.1 用途

在Leveldb中,所有内存中的KV数据都存储在Memtable中,物理disk则存储在SSTable中。在系统运行过程中,如果Memtable中的数据占用内存到达指定值(Options.write_buffer_size),则Leveldb就自动将Memtable转换为Memtable,并自动生成新的Memtable,也就是Copy-On-Write机制了。

Immutable Memtable则被新的线程Dump到磁盘中,Dump结束则该Immutable Memtable就可以释放了。因名知意,Immutable Memtable是只读的

所以可见,最新的数据都是存储在Memtable中的,Immutable Memtable和物理SSTable则是某个时点的数据。

为了防止系统down机导致内存数据Memtable或者Immutable Memtable丢失,leveldb自然也依赖于log机制来保证可靠性了。

Memtable提供了写入KV记录,删除以及读取KV记录的接口,但是事实上Memtable并不执行真正的删除操作,删除某个Key的Value在Memtable内是作为插入一条记录实施的,但是会打上一个Key的删除标记,真正的删除操作在后面的 Compaction过程中,lazy delete。

4.2 核心是Skip list

另外,Memtable中的KV对是根据Key排序的,leveldb在插入等操作时保证key的有序性。想想,前面看到的Skip list不正是合适的人选吗,因此Memtable的核心数据结构是一个Skip list,Memtable只是一个接口类。当然随之而来的一个问题就是Skip list是如何组织KV数据对的,在后面分析Memtable的插入、查询接口时我们将会看到答案。

4.3 接口说明

先来看看Memtable的接口:

1
2
3
4
5
6
7
8
9
10
11
12
void Ref() { ++refs_; }

void Unref();

Iterator* NewIterator();

void Add(SequenceNumber seq,
ValueType type,
const Slice& key,
const Slice& value);

bool Get(const LookupKey& key, std::string* value, Status* s);

首先Memtable是基于引用计数的机制,如果引用计数为0,则在Unref中删除自己,Ref和Unref就是干这个的。

  • NewIterator是返回一个迭代器,可以遍历访问table的内部数据,很好的设计思想,这种方式隐藏了table的内部实现。外部调用者必须保证使用Iterator访问Memtable的时候该Memtable是live的。
  • Add和Get是添加和获取记录的接口,没有Delete,还记得前面说过,memtable的delete实际上是插入一条type为kTypeDeletion的记录。

4.4 类图

先来看看Memtable相关的整体类层次吧,并不复杂,还是相当清晰的。见图。

4.5 Key结构

Memtable是一个KV存储结构,那么这个key肯定是个重点了,在分析接口实现之前,有必要仔细分析一下Memtable对key的使用。

这里面有5个key的概念,可能会让人混淆,下面就来一个一个的分析。

4.5.1 InternalKey & ParsedInternalKey & User Key

InternalKey是一个复合概念,是有几个部分组合成的一个key,ParsedInternalKey就是对InternalKey分拆后的结果,先来看看ParsedInternalKey的成员,这是一个struct:

1
2
3
4
5
Slice user_key;

SequenceNumber sequence;

ValueType type;

也就是说InternalKey是由User key + SequenceNumber + ValueType组合而成的,顺便先分析下几个Key相关的函数,它们是了解Internal Key和User Key的关键。

首先是InternalKey和ParsedInternalKey相互转换的两个函数,如下。

1
2
3
4
5
bool ParseInternalKey (const Slice& internal_key,
ParsedInternalKey* result);

void AppendInternalKey (std::string* result,
const ParsedInternalKey& key);

函数实现很简单,就是字符串的拼接与把字符串按字节拆分,代码略过。根据实现,容易得到InternalKey的格式为:

1
| User key (string) | sequence number (7 bytes) | value type (1 byte) |

由此还可知道sequence number大小是7 bytes,sequence number是所有基于op log系统的关键数据,它唯一指定了不同操作的时间顺序。

user key放到前面**的原因**是,这样对同一个user key的操作就可以按照sequence number顺序连续存放了,不同的user key是互不相干的,因此把它们的操作放在一起也没有什么意义。

另外用户可以为user key定制比较函数,系统默认是字母序的。

下面的两个函数是分别从InternalKey中拆分出User Key和Value Type的,非常直观,代码也附上吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline Slice ExtractUserKey(const Slice& internal_key)
{
assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
}

inline ValueType ExtractValueType(const Slice& internal_key)
{
assert(internal_key.size() >= 8);
const size_t n = internal_key.size();
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
unsigned char c = num & 0xff;
return static_cast<ValueType>(c);
}

4.5.2 LookupKey & Memtable Key

Memtable的查询接口传入的是LookupKey,它也是由User Key和Sequence Number组合而成的,从其构造函数:

1
LookupKey(const Slice& user_key, SequenceNumber s)

中分析出LookupKey的格式为:

1
| Size (int32变长)| User key (string) | sequence number (7 bytes) | value type (1 byte) |

两点:

  • 这里的Size是user key长度+8,也就是整个字符串长度了;
  • value type是kValueTypeForSeek,它等于kTypeValue。

由于LookupKey的size是变长存储的,因此它使用kstart_记录了user key string的起始地址,否则将不能正确的获取size和user key;

LookupKey导出了三个函数,可以分别从LookupKey得到Internal KeyMemtable KeyUser Key,如下:

1
2
3
4
5
6
7
8
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const { return Slice(start_, end_ - start_); }

// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }

// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

其中start_是LookupKey字符串的开始,end_是结束,kstart_是start_+4,也就是user key字符串的起始地址。

4.Memtable之2

4.6 Comparator

弄清楚了key,接下来就要看看key的使用了,先从Comparator开始分析。首先Comparator是一个抽象类,导出了几个接口。

其中Name()Compare()接口都很明了,另外的两个Find xxx接口都有什么功能呢,直接看程序注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//Advanced functions: these are used to reduce the space requirements 
//for internal data structures like index blocks.
// 这两个函数:用于减少像index blocks这样的内部数据结构占用的空间
// 其中的*start和*key参数都是IN OUT的。
//If *start < limit, changes *start to a short string in [start,limit).
//Simple comparator implementations may return with *start unchanged,
//i.e., an implementation of this method that does nothing is correct.
// 这个函数的作用就是:如果*start < limit,就在[startlimit,)中找到一个
// 短字符串,并赋给*start返回
// 简单的comparator实现可能不改变*start,这也是正确的
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const = 0;
//Changes *key to a short string >= *key.
//Simple comparator implementations may return with *key unchanged,
//i.e., an implementation of this method that does nothing is correct.
//这个函数的作用就是:找一个>= *key的短字符串
//简单的comparator实现可能不改变*key,这也是正确的
virtual void FindShortSuccessor(std::string* key) const = 0;

其中的实现类有两个,一个是内置的BytewiseComparatorImpl,另一个是InternalKeyComparator。下面分别来分析。

4.6.1 BytewiseComparatorImpl

首先是重载的Name和比较函数,比较函数如其名,就是字符串比较,如下:

1
2
virtual const char* Name() const {return"leveldb.BytewiseComparator";}
virtual int Compare(const Slice& a, const Slice& b) const {return a.compare(b);}

再来看看Byte wise的comparator是如何实现FindShortestSeparator()的,没什么特别的,代码 + 注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
virtual void FindShortestSeparator(std::string* start, 
onst Slice& limit) const
{
// 首先计算共同前缀字符串的长度
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index]))
{
diff_index++;
}
if (diff_index >= min_length)
{
// 说明*start是limit的前缀,或者反之,此时不作修改,直接返回
}
else
{
// 尝试执行字符start[diff_index]++,
设置start长度为diff_index+1,并返回
// ++条件:字符< oxff 并且字符+1 < limit上该index的字符
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index]))
{
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}

最后是FindShortSuccessor(),这个更简单了,代码+注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
virtual void FindShortSuccessor(std::string* key) const 
{
// 找到第一个可以++的字符,执行++后,截断字符串;
// 如果找不到说明*key的字符都是0xff啊,那就不作修改,直接返回
size_t n = key->size();
for (size_t i = 0; i < n; i++)
{
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff))
{
(*key)[i] = byte + 1;
key->resize(i+1);
return;
}
}
}

Leveldb内建的基于Byte wise的comparator类就这么多内容了,下面再来看看InternalKeyComparator。

4.6.2 InternalKeyComparator

从上面对Internal Key的讨论可知,由于它是由user key和sequence number和value type组合而成的,因此它还需要user key的比较,所以InternalKeyComparator有一个Comparator usercomparator成员,用于*user key的比较。

在leveldb中的名字为:”leveldb.InternalKeyComparator”,下面来看看比较函数:

1
Compare(const Slice& akey, const Slice& bkey)

代码很简单,其比较逻辑是:

  • S1 首先比较user key,基于用户设置的comparator,如果user key不相等就直接返回比较,否则执行进入S2
  • S2 取出8字节的sequence number | value type,如果akey的 > bkey的则返回-1,如果akey的<bkey的返回1相等返回0

由此可见其排序比较依据依次是:

  1. 首先根据user key按升序排列
  2. 然后根据sequence number按降序排列
  3. 最后根据value type按降序排列

虽然比较时value type并不重要,因为sequence number是唯一的,但是直接取出8byte的sequence number | value type,然后做比较更方便,不需要再次移位提取出7byte的sequence number,又何乐而不为呢。这也是把value type安排在低7byte的好处吧,排序的两个依据就是user key和sequence number

接下来就该看看其FindShortestSeparator()函数实现了,该函数取出Internal Key中的user key字段,根据user指定的comparator找到并替换start,如果start被替换了,就用新的start更新Internal Key,并使用最大的sequence number。否则保持不变。

函数声明:

1
void InternalKeyComparator::FindShortestSeparator(std::string* start, const Slice& limit) const;

函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 尝试更新user key,基于指定的user comparator
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if(tmp.size() < user_start.size() && user_comparator_->Compare(user_start, tmp) < 0)
{
// user key在物理上长度变短了,但其逻辑值变大了.生产新的*start时,
// 使用最大的sequence number,以保证排在相同user key记录序列的第一个
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,
kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}

接下来是FindShortSuccessor(std::string* key)函数,该函数取出Internal Key中的user key字段,根据user指定的comparator找到并替换key,如果key被替换了,就用新的key更新Internal Key,并使用最大的sequence number。否则保持不变。实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
Slice user_key = ExtractUserKey(*key);
// 尝试更新user key,基于指定的user comparator
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if(tmp.size()<user_key.size() &&
user_comparator_->Compare(user_key, tmp)<0)
{
// user key在物理上长度变短了,但其逻辑值变大了.生产新的*start时,
// 使用最大的sequence number,以保证排在相同user key记录序列的第一个
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}

4.7 Memtable::Insert()

把相关的Key和Key Comparator都弄清楚后,是时候分析memtable本身了。首先是向memtable插入记录的接口,函数原型如下:

1
void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// KV entry字符串有下面4部分连接而成
//key_size : varint32 of internal_key.size()
//key bytes : char[internal_key.size()]
//value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len);
able_.Insert(buf);

根据代码,我们可以分析出KV记录在skip list的存储格式等信息,首先总长度为:

1
VarInt(Internal Key size) len + internal key size + VarInt(value) len + value size

它们的相互衔接也就是KV的存储格式:

1
| VarInt(Internal Key size) len | internal key |VarInt(value) len |value|

其中前面说过:

1
2
internal key = |user key |sequence number |type |
Internal key size = key size + 8

4.8 Memtable::Get()

Memtable的查找接口,根据一个LookupKey找到响应的记录,函数声明:

1
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s)

函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
// seek到value>= memkey.data()的第一个记录
if (iter.Valid())
{
// 这里不需要再检查sequence number了,因为Seek()已经跳过了所有
// 值更大的sequence number了
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5,
&key_length);
// 比较user key是否相同,key_ptr开始的len(internal key) -8 byte是user key
if (comparator_.comparator.user_comparator()->Compare
(Slice(key_ptr, key_length - 8), key.user_key()) == 0)
{
// len(internal key)的后8byte是 |sequence number | value type|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff))
{
case kTypeValue:
{
// 只取出value
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;

这段代码,主要就是一个Seek函数,根据传入的LookupmKey得到在emtable中存储的key,然后调用Skip list::Iterator的Seek函数查找。Seek直接调用Skip list的FindGreaterOrEqual(key)接口,返回大于等于key的Iterator。然后取出user key判断时候和传入的user key相同,如果相同取出value,如果记录的Value Type为kTypeDeletion,返回Status::NotFound(Slice())。

4.9 小结

Memtable到此就分析完毕了,本质上就是一个有序的Skip list,排序基于user key的sequence number,其排序比较依据依次是:

  1. 首先根据user key按升序排列
  2. 然后根据sequence number按降序排列
  3. 最后根据value type按降序排列(这个其实无关紧要)

5.操作Log 1

分析完KV在内存中的存储,接下来就是操作日志。所有的写操作都必须先成功的append到操作日志中,然后再更新内存memtable。这样做有两点

  1. 可以将随机的写IO变成append,极大的提高写磁盘速度;
  2. 防止在节点down机导致内存数据丢失,造成数据丢失,这对系统来说是个灾难。

在各种高效的存储系统中,这已经是口水技术了。

5.1 格式

在源码下的文档doc/log_format.txt中,作者详细描述了log格式

1
2
3
4
5
6
7
8
9
The log file contents are a sequence of 32KB blocks. 
The only exception is that the tail of thefile may contain a partial block.
Each block consists of a sequence of records:
block:= record* trailer?
record :=
checksum: uint32 // crc32c of type and data[] ; little-endian
length: uint16 // little-endian
type: uint8 // One of FULL,FIRST, MIDDLE, LAST
data: uint8[length]

A record never starts within the last six bytes of a block (since it won’tfit). Any leftover bytes here form thetrailer, which must consist entirely of zero bytes and must be skipped byreaders.

翻译过来就是:Leveldb把日志文件切分成了大小为32KB的连续block块,block由连续的log record组成,log record的格式为:

注意:CRC32, Length都是little-endian的。

Log Type有4种:FULL = 1、FIRST = 2、MIDDLE = 3、LAST = 4。FULL类型表明该log record包含了完整的user record;而user record可能内容很多,超过了block的可用大小,就需要分成几条log record,第一条类型为FIRST,中间的为MIDDLE,最后一条为LAST。也就是:

  1. FULL,说明该log record包含一个完整的user record;
  2. FIRST,说明是user record的第一条log record
  3. MIDDLE,说明是user record中间的log record
  4. LAST,说明是user record最后的一条log record

翻一下文档上的例子,考虑到如下序列的user records

  • A: length 1000
  • B: length 97270
  • C: length 8000

  • A作为FULL类型的record存储在第一个block中;

  • B将被拆分成3条log record,分别存储在第1、2、3个block中,这时block3还剩6byte,将被填充为0;
  • C将作为FULL类型的record存储在block 4中。

由于一条logrecord长度最短为7,如果一个block的剩余空间<=6byte,那么将被填充为\空字**符串,另外长度为7的log record是不包括任何用户数据的**。

5.2 写日志

写比读简单,而且写入决定了读,所以从写开始分析。有意思的是在写文件时,Leveldb使用了内存映射文件,内存映射文件的读写效率比普通文件要高。其中涉及到的类层次比较简单,如图:

注意Write类的成员typecrc数组,这里存放的为Record Type预先计算的CRC32值,因为Record Type是固定的几种,为了效率。Writer类只有一个接口,就是AddRecord(),传入Slice参数,下面来看函数实现。首先取出slice的字符串指针和长度,初始化begin=true,表明是第一条log record

1
2
3
const char* ptr = slice.data();
size_t left = slice.size();
bool begin = true;

然后进入一个while循环,直到写入出错,或者成功写入全部数据

首先查看当前block是否小于7,如果小于7则补位,并重置block偏移

1
2
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00",leftover));
block_offset_ = 0;

计算block剩余大小,以及本次log record可写入数据长度

1
2
const size_t avail =kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left <avail) ? left : avail

根据两个值,判断log type

1
2
3
4
5
6
RecordType type;
const bool end = (left ==fragment_length); // 两者相等,表明写
if (begin && end) type = kFullType;
else if (begin) type = kFirstType;
else if (end) type = kLastType;
else type = kMiddleType;

调用EmitPhysicalRecord函数,append日志;并更新指针、剩余长度和begin标记

1
2
3
4
s = EmitPhysicalRecord(type, ptr,fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;

接下来看看EmitPhysicalRecord函数,这是实际写入的地方,涉及到log的存储格式。函数声明为:

1
StatusWriter::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n)

参数ptr为用户record数据,参数n为record长度,不包含log header。

计算header,并Append到log文件,共7byte格式为:

1
2
3
4
5
6
7
8
9
| CRC32 (4 byte) | payload length lower + high (2 byte) |   type (1byte)|
char buf[kHeaderSize];
buf[4] = static_cast<char>(n& 0xff);
buf[5] =static_cast<char>(n >> 8);
buf[6] =static_cast<char>(t); // 计算record type和payload的CRC校验值
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // 空间调整
EncodeFixed32(buf, crc);
dest_->Append(Slice(buf,kHeaderSize));

写入payload,并Flush,更新block的当前偏移

1
2
3
s =dest_->Append(Slice(ptr, n));
s = dest_->Flush();
block_offset_ += kHeaderSize +n;

以上就是写日志的逻辑,很直观。

5.3 读日志

日志读取显然比写入要复杂,要检查checksum,检查是否有损坏等等,处理各种错误。

5.3.1 类层次

Reader主要用到了两个接口,一个是汇报错误的Reporter,另一个是log文件读取类SequentialFile

Reporter的接口只有一个

1
void Corruption(size_t bytes,const Status& status);

SequentialFile有两个接口:

1
2
Status Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n);

说明下,Read接口有一个result参数传递结果就行了,为何还有一个scratch呢,这个就和Slice相关了。它的字符串指针是传入的外部char*指针,自己并不负责内存的管理与分配。因此Read接口需要调用者提供一个字符串指针,实际存放字符串的地方。

Reader类有几个成员变量,需要注意:

1
2
3
4
5
6
bool eof_;          
// 上次Read()返回长度< kBlockSize,暗示到了文件结尾EOF
uint64_t last_record_offset_; // 函数ReadRecord返回的上一个record的偏移
uint64_t end_of_buffer_offset_;// 当前的读取偏移
uint64_t const initial_offset_;// 偏移,从哪里开始读取第一条record
Slice buffer_; // 读取的内容

5.3.2日志读取流程

Reader只有一个接口,那就是ReadRecord,下面来分析下这个函数。

S1

根据initial offset跳转到调用者指定的位置,开始读取日志文件。跳转就是直接调用SequentialFile的Seek接口。
另外,需要先调整调用者传入的initialoffset参数,调整和跳转逻辑在SkipToInitialBlock函数中。

1
2
3
4
if (last_record_offset_ <initial_offset_) 
{ // 当前偏移 < 指定的偏移,需要Seek
if (!SkipToInitialBlock()) return false;
}

下面的代码是SkipToInitialBlock函数调整read offset的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
// 计算在block内的偏移位置,并圆整到开始读取block的起始位置
size_t offset_in_block =initial_offset_ % kBlockSize;
uint64_t block_start_location =initial_offset_ - offset_in_block;
// 如果偏移在最后的6byte里,肯定不是一条完整的记录,跳到下一个block
if (offset_in_block >kBlockSize - 6)
{
offset_in_block = 0;
block_start_location +=kBlockSize;
}
end_of_buffer_offset_ =block_start_location;
// 设置读取偏移
if (block_start_location > 0) file_->Skip(block_start_location); // 跳转

首先计算出在block内的偏移位置,然后圆整到要读取block的起始位置。开始读取日志的时候都要保证读取的是完整的block,这就是调整的目的

同时成员变量endof_buffer_offset记录了这个值,在后续读取中会用到。

S2在开始while循环前首先初始化几个标记:

1
2
3
// 当前是否在fragment内,也就是遇到了FIRST 类型的record
bool in_fragmented_record = false;
uint64_t prospective_record_offset = 0; // 我们正在读取的逻辑record的偏移
S3

进入到while(true)循环,直到读取到KLastType或者KFullType的record,或者到了文件结尾。从日志文件读取完整的record是ReadPhysicalRecord函数完成的。

读取出现错误时,并不会退出循环,而是汇报错误,继续执行,直到成功读取一条user record,或者遇到文件结尾。

S3.1 从文件读取record

1
2
uint64_t physical_record_offset = end_of_buffer_offset_ -buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment);

physical_record_offset存储的是当前正在读取的record的偏移值。接下来根据不同的record_type类型,分别处理,一共有7种情况:

S3.2 FULL type(kFullType),表明是一条完整的log record,成功返回读取的user record数据。另外需要对早期版本做些work around,早期的Leveldb会在block的结尾生产一条空的kFirstType log record。

1
2
3
4
5
6
7
8
9
10
11
if (in_fragmented_record) 
{
if (scratch->empty())in_fragmented_record = false;
else ReportCorruption(scratch->size(),"partial record without end(1)");
}

prospective_record_offset= physical_record_offset;
scratch->clear(); // 清空scratch,读取成功不需要返回scratch数据
*record = fragment;
last_record_offset_ =prospective_record_offset; // 更新last record offset
return true;

S3.3 FIRST type(kFirstType),表明是一系列logrecord(fragment)的第一个record。同样需要对早期版本做work around。

把数据读取到scratch中,直到成功读取了LAST类型的log record,才把数据返回到result中,继续下次的读取循环。

如果再次遇到FIRSTor FULL类型的log record,如果scratch不为空,就说明日志文件有错误。

1
2
3
4
5
6
7
8
9
10
11
if (in_fragmented_record) 
{
if (scratch->empty())in_fragmented_record = false;
else ReportCorruption(scratch->size(),"partial record without end(2)");
}

prospective_record_offset =physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
//赋值给scratch
in_fragmented_record =true;
// 设置fragment标记为true

S3.4 MIDDLE type(kMiddleType),这个处理很简单,如果不是在fragment中,报告错误,否则直接append到scratch中就可以了。

1
2
3
4
5
6
if (!in_fragmented_record)
{
ReportCorruption(fragment.size(),
"missing start of fragmentedrecord(1)");
}
else {scratch->append(fragment.data(),fragment.size());}

S3.5 LAST type(kLastType),说明是一系列log record(fragment)中的最后一条。如果不在fragment中,报告错误。

1
2
3
4
5
6
7
8
9
10
11
12
if (!in_fragmented_record) 
{
ReportCorruption(fragment.size(),
"missing start of fragmentedrecord(2)");
}
else
{
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ =prospective_record_offset;
return true;
}

至此,4种正常的log record type已经处理完成,下面3种情况是其它的错误处理,类型声明在Logger类中:

1
2
3
4
5
6
7
8
9
enum
{
kEof = kMaxRecordType + 1, // 遇到文件结尾
// 非法的record,当前有3中情况会返回bad record:
// * CRC校验失败 (ReadPhysicalRecord reports adrop)
// * 长度为0 (No drop is reported)
// * 在指定的initial_offset之外 (No drop is reported)
kBadRecord = kMaxRecordType +2
};

S3.6 遇到文件结尾kEof,返回false。不返回任何结果。

1
2
3
4
5
6
if (in_fragmented_record) 
{
ReportCorruption(scratch->size(), "partial record withoutend(3)");
scratch->clear();
}
return false;

S3.7 非法的record(kBadRecord),如果在fragment中,则报告错误。

1
2
3
4
5
6
if (in_fragmented_record)
{
ReportCorruption(scratch->size(), "error in middle ofrecord");
in_fragmented_record = false;
scratch->clear();
}

S3.8 缺省分支,遇到非法的record 类型,报告错误,清空scratch。

1
2
3
ReportCorruption(…, "unknownrecord type %u", record_type);
in_fragmented_record = false; // 重置fragment标记
scratch->clear();// 清空scratch

上面就是ReadRecord的全部逻辑,解释起来还有些费力。

5.3.3 从log文件读取record

就是前面讲过的ReadPhysicalRecord函数,它调用SequentialFile的Read接口,从文件读取数据。

该函数开始就进入了一个while(true)循环,其目的是为了读取到一个完整的record。读取的内容存放在成员变量buffer_中。这样的逻辑有些奇怪,实际上,完全不需要一个while(true)循环的。

函数基本逻辑如下:

S1

如果buffer_小于block header大小kHeaderSize,进入如下的几个分支:

S1.1 如果eof_为false,表明还没有到文件结尾,清空buffer,并读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
buffer_.clear(); 
// 因为上次肯定读取了一个完整的record
Status status =file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ +=buffer_.size();
// 更新buffer读取偏移值
if (!status.ok())
{
// 读取失败,设置eof_为true,报告错误并返回kEof
buffer_.clear();
ReportDrop(kBlockSize,status);
eof_ = true;
return kEof;
}
else if (buffer_.size()< kBlockSize)
{
eof_ = true; // 实际读取字节<指定(Block Size),表明到了文件结尾
}
continue; // 继续下次循环

S1.2 如果eof_为true并且buffer为空,表明已经到了文件结尾,正常结束,返回kEof。

S1.3 否则,也就是eof_为true,buffer不为空,说明文件结尾包含了一个不完整的record,报告错误,返回kEof。

1
2
3
4
size_t drop_size =buffer_.size();
buffer_.clear();
ReportCorruption(drop_size,"truncated record at end of file");
return kEof;
S2 进入到这里表明上次循环中的Read读取到了一个完整的log record,continue后的第二次循环判断buffer_.size() >= kHeaderSize将执行到此处。

解析出log record的header部分,判断长度是否一致。

根据log的格式,前4byte是crc32。后面就是length和type,解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const char* header = buffer_.data();
const uint32_t length = ((header[4])& 0xff) | ((header[5]&0xff)<<8)
const uint32_t type = header[6];
if (kHeaderSize + length >buffer_.size())
{
// 长度超出了,汇报错误
size_t drop_size =buffer_.size();
buffer_.clear();
ReportCorruption(drop_size,"bad record length");
return kBadRecord; // 返回kBadRecord
}
if (type == kZeroType&& length == 0)
{
// 对于Zero Type类型,不汇报错误
buffer_.clear();
return kBadRecord; // 依然返回kBadRecord
}
S3 校验CRC32,如果校验出错,则汇报错误,并返回kBadRecord。
S4 如果record的开始位置在initial offset之前,则跳过,并返回kBadRecord,否则返回record数据和type。
1
2
3
4
5
6
7
8
9
buffer_.remove_prefix(kHeaderSize+ length);
if (end_of_buffer_offset_ -buffer_.size() - kHeaderSize -
length < initial_offset_)
{
result->clear();
return kBadRecord;
}
*result = Slice(header +kHeaderSize, length);
return type;

从log文件读取record的逻辑就是这样的。至此,读日志的逻辑也完成了。接下来将进入磁盘存储的sstable部分。

6. SSTable之一

SSTable是Leveldb的核心之一,是表数据最终在磁盘上的物理存储。也是体量比较大的模块。

6.1 SSTable的文件组织

作者在文档doc/table_format.txt中描述了表的逻辑结构,如图6.1-1所示。逻辑上可分为两大块,数据存储区Data Block,以及各种Meta信息。

  1. 文件中的k/v对是有序存储的,并且被划分到连续排列的Data Block里面,这些Data Block从文件头开始顺序存储,Data Block的存储格式代码在block_builder.cc中;

  2. 紧跟在Data Block之后的是Meta Block,其格式代码也在block_builder.cc中;Meta Block存储的是Filter信息,比如Bloom过滤器,用于快速定位key是否在data block中。

  3. MetaIndex Block是对Meta Block的索引,它只有一条记录,key是meta index的名字(也就是Filter的名字),value为指向meta index的BlockHandle;BlockHandle是一个结构体,成员offset是Block在文件中的偏移,成员size是block的大小;

  4. Index block是对Data Block的索引,对于其中的每个记录,其key >=Data Block最后一条记录的key,同时<其后Data Block的第一条记录的key;value是指向data index的BlockHandle;

  1. Footer,文件的最后,大小固定,其格式如图6.1-2所示。

  • 成员metaindex_handle指出了meta index block的起始位置和大小;
  • 成员index_handle指出了index block的起始地址和大小;

这两个字段都是BlockHandle对象,可以理解为索引的索引,通过Footer可以直接定位到metaindex和index block。再后面是一个填充区和魔数(0xdb4775248b80fb57)。

6.2 Block存储格式

6.2.1 Block的逻辑存储

Data Block是具体的k/v数据对存储区域,此外还有存储meta的metaIndex Block,存储data block索引信息的Index Block等等,他们都是以Block的方式存储的。来看看Block是如何组织的。每个Block有三部分构成:block data, type, crc32,如图6.2-1所示。

类型type指明使用的是哪种压缩方式,当前支持none和snappy压缩。

虽然block有好几种,但是Block Data都是有序的k/v对,因此写入、读取BlockData的接口都是统一的,对于Block Data的管理也都是相同的。

对Block的写入、读取将在创建、读取sstable时分析,知道了格式之后,其读取写入代码都是很直观的。

由于sstable对数据的存储格式都是Block,因此在分析sstable的读取和写入逻辑之前,我们先来分析下Leveldb对Block Data的管理。

Leveldb对Block Data的管理是读写分离的,读取后的遍历查询操作由Block类实现,BlockData的构建则由BlockBuilder类实现。

6.2.2 重启点-restartpoint

BlockBuilder对key的存储是前缀压缩的,对于有序的字符串来讲,这能极大的减少存储空间。但是却增加了查找的时间复杂度,为了兼顾查找效率,每隔K个key,leveldb就不使用前缀压缩,而是存储整个key,这就是重启点(restartpoint)。

在构建Block时,有参数Options::block_restart_interval定每隔几个key就直接存储一个重启点key。

Block在结尾记录所有重启点的偏移,可以二分查找指定的key。Value直接存储在key的后面,无压缩。

对于一个k/v对,其在block中的存储格式为:

  • 共享前缀长度 shared_bytes: varint32
  • 前缀之后的字符串长度 unshared_bytes: varint32
  • 值的长度 value_length: varint32
  • 前缀之后的字符串 key_delta: char[unshared_bytes]
  • 值 value: char[value_length]

对于重启点,shared_bytes= 0

Block的结尾段格式是:

  • restarts: uint32[num_restarts]
  • num_restarts: uint32 // 重启点个数

元素restarts[i]存储的是block的第i个重启点的偏移。很明显第一个k/v对,总是第一个重启点,也就是restarts[0] = 0;

图给出了block的存储示意图。

总体来看Block可分为k/v存储区和后面的重启点存储区两部分,其中k/v的存储格式如前面所讲,可看做4部分:

前缀压缩的key长度信息 + value长度 + key前缀之后的字符串+ value

最后一个4byte为重启点的个数。

对Block的存储格式了解之后,对Block的构建和读取代码分析就是很直观的事情了。见下面的分析。

6.3 Block的构建与读取

6.3.1 BlockBuilder的接口

首先从Block的构建开始,这就是BlockBuilder类,来看下BlockBuilder的函数接口,一共有5个:

1
2
3
4
5
6
7
8
9
10
11
12
13
void Reset(); // 重设内容,通常在Finish之后调用已构建新的block

//添加k/v,要求:Reset()之后没有调用过Finish();Key > 任何已加入的key

void Add(const Slice& key,const Slice& value);

// 结束构建block,并返回指向block内容的指针

Slice Finish();// 返回Slice的生存周期:Builder的生存周期,or直到Reset()被调用

size_t CurrentSizeEstimate()const; // 返回正在构建block的未压缩大小—估计值

bool empty() const { returnbuffer_.empty();} // 没有entry则返回true

主要成员变量如下:

1
2
3
4
std::string            buffer_;    // block的内容
std::vector<uint32_t> restarts_; // 重启点-后面会分析到
int counter_; // 重启后生成的entry数
std::string last_key_; // 记录最后添加的key

6.3.2 BlockBuilder::Add()

调用Add函数向当前Block中新加入一个k/v对{key, value}。函数处理逻辑如下:

S1

保证新加入的key > 已加入的任何一个key;

1
2
3
4
5
assert(!finished_);  

assert(counter_ <= options_->block_restart_interval);

assert(buffer_.empty() || options_->comparator->Compare(key,last_key_piece) > 0);

S2

如果计数器counter < opions->block_restart_interval,则使用前缀算法压缩key,否则就把key作为一个重启点,无压缩存储;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Slice last_key_piece(last_key_);

if (counter_ < options_->block_restart_interval) { //前缀压缩
// 计算key与last_key_的公共前缀
const size_t min_length= std::min(last_key_piece.size(), key.size());
while ((shared < min_length)&& (last_key_piece[shared] == key[shared])) {
shared++;

} else { // 新的重启点
restarts_.push_back(buffer_.size());
counter_ = 0;
}

Slice last_key_piece(last_key_);

if (counter_ < options_->block_restart_interval) { //前缀压缩
// 计算key与last_key_的公共前缀
const size_t min_length= std::min(last_key_piece.size(), key.size());
while ((shared < min_length)&& (last_key_piece[shared] == key[shared]))
shared++;
} else { // 新的重启点
restarts_.push_back(buffer_.size());
counter_ = 0;
}
S3

根据上面的数据格式存储k/v对,追加到buffer中,并更新block状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
const size_t non_shared = key.size() - shared; // key前缀之后的字符串长度
// append"<shared><non_shared><value_size>" 到buffer_
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());
// 其后是前缀之后的字符串 + value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// 更新状态 ,last_key_ = key及计数器counter_
last_key_.resize(shared); // 连一个string的赋值都要照顾到,使内存copy最小化
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;

6.3.3 BlockBuilder::Finish()

调用该函数完成Block的构建,很简单,压入重启点信息,并返回buffer_,设置结束标记finished_:

1
2
3
4
5
6
7
for (size_t i = 0; i < restarts_.size(); i++) {  // 重启点  
PutFixed32(&buffer_, restarts_[i]);
}

PutFixed32(&buffer_, restarts_.size()); // 重启点数量
finished_ = true;
return Slice(buffer_);

6.3.4 BlockBuilder::Reset() & 大小

还有Reset和CurrentSizeEstimate两个函数,Reset复位函数,清空各个信息;函数CurrentSizeEstimate返回block的预计大小,从函数实现来看,应该在调用Finish之前调用该函数。

1
2
3
4
5
6
7
8
9
10
11
void BlockBuilder::Reset() {  
buffer_.clear(); restarts_.clear(); last_key_.clear();
restarts_.push_back(0); // 第一个重启点位置总是 0
counter_ = 0;
finished_ = false;
}

size_t BlockBuilder::CurrentSizeEstimate () const {
// buffer大小 +重启点数组长度 + 重启点长度(uint32)
return (buffer_.size() + restarts_.size() * sizeof(uint32_t) + sizeof(uint32_t));
}

Block的构建就这些内容了,下面开始分析Block的读取,就是类Block。

6.3.5 Block类接口

对Block的读取是由类Block完成的,先来看看其函数接口和关键成员变量。

Block只有两个函数接口,通过Iterator对象,调用者就可以遍历访问Block的存储的k/v对了;以及几个成员变量,如下:

1
2
3
4
5
6
7
size_t size() const { returnsize_; }
Iterator* NewIterator(constComparator* comparator);

const char* data_; // block数据指针
size_t size_; // block数据大小
uint32_t restart_offset_; // 重启点数组在data_中的偏移
bool owned_; //data_[]是否是Block拥有的

6.3.6 Block初始化

Block的构造函数接受一个BlockContents对象contents初始化,BlockContents是一个有3个成员的结构体。

  • data = Slice();
  • cachable = false; // 无cache
  • heap_allocated = false; // 非heap分配

根据contents为成员赋值

1
data_ = contents.data.data(), size_ =contents.data.size(),owned_ = contents.heap_allocated;

然后从data中解析出重启点数组,如果数据太小,或者重启点计算出错,就设置size_=0,表明该block data解析失败。

1
2
3
4
5
6
if (size_ < sizeof(uint32_t)){
size_ = 0; // 出错了
} else {
restart_offset_ = size_ - (1 +NumRestarts()) * sizeof(uint32_t);
if (restart_offset_ > size_- sizeof(uint32_t)) size_ = 0;
}

NumRestarts()函数就是从最后的uint32解析出重启点的个数,并返回:

1
return DecodeFixed32(data_ +size_ - sizeof(uint32_t))

6.3.7 Block::Iter

这是一个用以遍历Block内部数据的内部类,它继承了Iterator接口。函数NewIterator返回Block::Iter对象:

1
return new Iter(cmp, data_,restart_offset_, num_restarts);

下面我们就分析Iter的实现

主要成员变量有:

1
2
3
4
5
6
const Comparator* constcomparator_; // key比较器
const char* const data_; // block内容
uint32_t const restarts_; // 重启点(uint32数组)在data中的偏移
uint32_t const num_restarts_; // 重启点个数
uint32_t current_; // 当前entry在data中的偏移. >= restarts_表明非法
uint32_t restart_index_; // current_所在的重启点的index

下面来看看对Iterator接口的实现,简单函数略过。

首先是Next()函数,直接调用private函数ParseNextKey()跳到下一个k/v对,函数实现如下:

S1

跳到下一个entry,其位置紧邻在当前value之后。如果已经是最后一个entry了,返回false,标记current为invalid。

1
2
3
4
5
6
7
8
current_ = NextEntryOffset(); // (value_.data() + value_.size()) - data_
const char* p = data_ +current_;
const char* limit = data_ +restarts_; // Restarts come right after data
if (p >= limit) { // entry到头了,标记为invalid.
current_ = restarts_;
restart_index_ =num_restarts_;
return false;
}

S2

解析出entry,解析出错则设置错误状态,记录错误并返回false。解析成功则根据信息组成key和value,并更新重启点index。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
uint32_t shared, non_shared,value_length;
p = DecodeEntry(p, limit,&shared, &non_shared, &value_length);
if (p == NULL || key_.size()< shared) {
CorruptionError();
return false;
} else { // 成功
key_.resize(shared);
key_.append(p, non_shared);
value_ = Slice(p +non_shared, value_length);
while (restart_index_ + 1< num_restarts_ && GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_; //更新重启点index
}
return true;
}
  • 函数DecodeEntry从字符串[p, limit)解析出key的前缀长度、key前缀之后的字符串长度和value的长度这三个vint32值,代码很简单。
  • 函数CorruptionError将current和restart_index都设置为invalid状态,并在status中设置错误状态。
  • 函数GetRestartPoint从data中读取指定restart index的偏移值restart[index],并返回:
1
DecodeFixed32(data_ + restarts_ +index * sizeof(uint32_t);

接下来看看Prev函数,Previous操作分为两步:首先回到current之前的重启点,然后再向后直到current,实现如下:

S1

首先向前回跳到在current_前面的那个重启点,并定位到重启点的k/v对开始位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const uint32_t original =current_;

while (GetRestartPoint(restart_index_)>= original) {
// 到第一个entry了,标记invalid状态
if (restart_index_ == 0) {
current_ = restarts_;
restart_index_ =num_restarts_;
return;
}
restart_index_--;
}

//根据restart index定位到重启点的k/v对
SeekToRestartPoint(restart_index_);

S2

第二步,从重启点位置开始向后遍历,直到遇到original前面的那个k/v对。

1
do {} while (ParseNextKey() &&NextEntryOffset() < original);

说说上面遇到的SeekToRestartPoint函数,它只是设置了几个有限的状态,其它值将在函数ParseNextKey()中设置。感觉这有点tricky,这里的value_并不是k/v对的value,而只是一个指向k/v对起始位置的0长度指针,这样后面的ParseNextKey函数将会取出重启点的k/v值。

1
2
3
4
5
6
7
8
void SeekToRestartPoint(uint32_tindex) {
key_.clear();
restart_index_ = index;
// ParseNextKey()会设置current_;
//ParseNextKey()从value_结尾开始, 因此需要相应的设置value_
uint32_t offset =GetRestartPoint(index);
value_ = Slice(data_ + offset,0); // value长度设置为0,字符串指针是data_+offset
}

SeekToFirst/Last,这两个函数都很简单,借助于前面的SeekToResartPoint函数就可以完成。

1
2
3
4
5
6
7
8
9
virtual void SeekToFirst() {
SeekToRestartPoint(0);
ParseNextKey();
}

virtual void SeekToLast() {
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey()&& NextEntryOffset() < restarts_) {} //Keep skipping
}

最后一个Seek函数,跳到指定的target(Slice),函数逻辑如下:

S1

二分查找,找到key < target的最后一个重启点,典型的二分查找算法,代码就不再贴了。

S2

找到后,跳转到重启点,其索引由left指定,这是前面二分查找到的结果。如前面所分析的,value指向重启点的地址,而size指定为0,这样ParseNextKey函数将会取出重启点的k/v值。

1
SeekToRestartPoint(left);
S3

自重启点线性向下,直到遇到key>= target的k/v对。

1
2
3
4
while (true) {
if (!ParseNextKey()) return;
if (Compare(key_, target)>= 0) return;
}

上面就是Block::Iter的全部实现逻辑,这样Block的创建和读取遍历都已经分析完毕。

6.4 创建sstable文件

了解了sstable文件的存储格式,以及Data Block的组织,下面就可以分析如何创建sstable文件了。相关代码在table_builder.h/.cc以及block_builder.h/.cc(构建Block)中。

6.4.1 TableBuilder类

构建sstable文件的类是TableBuilder,该类提供了几个有限的方法可以用来添加k/v对,Flush到文件中等等,它依赖于BlockBuilder来构建Block。

TableBuilder的几个接口说明下:

  1. void Add(const Slice& key, const Slice& value),向当前正在构建的表添加新的{key, value}对,要求根据Option指定的Comparator,key必须位于所有前面添加的key之后;
  2. void Flush(),将当前缓存的k/v全部flush到文件中,一个高级方法,大部分的client不需要直接调用该方法;
  3. void Finish(),结束表的构建,该方法被调用后,将不再会使用传入的WritableFile;
  4. void Abandon(),结束表的构建,并丢弃当前缓存的内容,该方法被调用后,将不再会使用传入的WritableFile;【只是设置closed为true,无其他操作
  5. 一旦Finish()/Abandon()方法被调用,将不能再次执行Flush或者Add操作。

下面来看看涉及到的类。

其中WritableFile和op log一样,使用的都是内存映射文件。Options是一些调用者可设置的选项。

TableBuilder只有一个成员变量Rep* rep_,实际上Rep结构体的成员就是TableBuilder所有的成员变量;这样做的目的可能是为了隐藏其内部细节。Rep的定义也是在.cc文件中,对外是透明的。

简单解释下成员的含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Options options;              // data block的选项
Options index_block_options; // index block的选项
WritableFile* file; // sstable文件
uint64_t offset;
// 要写入data block在sstable文件中的偏移,初始0
Status status; //当前状态-初始ok
BlockBuilder data_block; //当前操作的data block
BlockBuilder index_block; // sstable的index block
std::string last_key; //当前data block最后的k/v对的key
int64_t num_entries; //当前data block的个数,初始0
bool closed; //调用了Finish() or Abandon(),初始false
FilterBlockBuilder*filter_block;
//根据filter数据快速定位key是否在block中
bool pending_index_entry; //见下面的Add函数,初始false
BlockHandle pending_handle; //添加到index block的data block的信息
std::string compressed_output;//压缩后的data block,临时存储,写入后即被清空

Filter block是存储的过滤器信息,它会存储{key, 对应data block在sstable的偏移值},不一定完全精确的,以快速定位给定key是否在data block中。

下面分析如何向sstable中添加k/v对,创建并持久化sstable。其它函数都比较简单,略过。另外对于Abandon,简单设置closed=true即返回。

6.4.2 添加k/v对

这是通过方法Add(constSlice& key, const Slice& value)完成的,没有返回值。下面分析下函数的逻辑:

S1 首先保证文件没有close,也就是没有调用过Finish/Abandon,以及保证当前status是ok的;如果当前有缓存的kv对,保证新加入的key是最大的。

1
2
3
4
5
6
7
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0)
{
assert(r->options.comparator->Compare(key, Slice(r->last_key))> 0);
}

S2 如果标记r->pending_index_entry为true,表明遇到下一个data block的第一个k/v,根据key调整r->last_key,这是通过Comparator的FindShortestSeparator完成的。

1
2
3
4
5
6
7
8
9
if (r->pending_index_entry) 
{
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key,key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry =false;
}

接下来将pending_handle加入到index block中{r->last_key, r->pending_handle’sstring}。最后将r->pending_index_entry设置为false。

值得讲讲pending_index_entry这个标记的意义,见代码注释:

直到遇到下一个databock的第一个key时,我们才为上一个datablock生成index entry,这样的好处是:可以为index使用较短的key;比如上一个data block最后一个k/v的key是”the quick brown fox”,其后继data block的第一个key是”the who”,我们就可以用一个较短的字符串“the r”作为上一个data block的index block entry的key。

简而言之,就是在开始下一个datablock时,Leveldb才将上一个data block加入到index block中。标记pending_index_entry就是干这个用的,对应data block的index entry信息就保存在(BlockHandle)pending_handle。

S3 如果filter_block不为空,就把key加入到filter_block中。

1
2
3
4
if (r->filter_block != NULL) 
{
r->filter_block->AddKey(key);
}

S4 设置r->last_key = key,将(key, value)添加到r->data_block中,并更新entry数。

1
2
3
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key,value);

S5 如果data block的个数超过限制,就立刻Flush到文件中。

1
2
const size_testimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >=r->options.block_size) Flush();

6.4.3 Flush文件

该函数逻辑比较简单,直接见代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Rep* r = rep_;
assert(!r->closed); // 首先保证未关闭,且状态ok
if (!ok()) return;
if (r->data_block.empty())return; // data block是空的
// 保证pending_index_entry为false,即data block的Add已经完成
assert(!r->pending_index_entry);
// 写入data block,并设置其index entry信息—BlockHandle对象
WriteBlock(&r->data_block, &r->pending_handle);
//写入成功,则Flush文件,并设置r->pending_index_entry为true,
//以根据下一个data block的first key调整index entry的key—即r->last_key
if (ok())
{
r->pending_index_entry =true;
r->status =r->file->Flush();
}
if (r->filter_block != NULL)
{
//将data block在sstable中的便宜加入到filter block中
r->filter_block->StartBlock(r->offset);
// 并指明开始新的data block
}

6.4.4 WriteBlock函数

在Flush文件时,会调用WriteBlock函数将data block写入到文件中,该函数同时还设置data block的index entry信息。原型为:

1
void WriteBlock(BlockBuilder* block, BlockHandle* handle)

该函数做些预处理工作,序列化要写入的data block,根据需要压缩数据,真正的写入逻辑是在WriteRawBlock函数中。下面分析该函数的处理逻辑。

S1 获得block的序列化数据Slice,根据配置参数决定是否压缩,以及根据压缩格式压缩数据内容。对于Snappy压缩,如果压缩率太低<12.5%,还是作为未压缩内容存储。

BlockBuilder的Finish()函数将data block的数据序列化成一个Slice

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Rep* r = rep_;
Slice raw = block->Finish();
// 获得data block的序列化字符串
Slice block_contents;
CompressionType type =r->options.compression;
switch (type)
{
case kNoCompression: block_contents= raw; break; // 不压缩
case kSnappyCompression:
{
// snappy压缩格式
std::string* compressed =&r->compressed_output;
if(port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size()< raw.size() - (raw.size() / 8u))
{
block_contents =*compressed;
}
else
{
// 如果不支持Snappy,或者压缩率低于12.5%,依然当作不压缩存储
block_contents = raw;
type = kNoCompression;
}
break;
}
}

S2 将data内容写入到文件,并重置block成初始化状态,清空compressedoutput。

1
2
3
WriteRawBlock(block_contents,type, handle);  
r->compressed_output.clear();
block->Reset();

6.4.5 WriteRawBlock函数

在WriteBlock把准备工作都做好后,就可以写入到sstable文件中了。来看函数原型:

1
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle*handle);

函数逻辑很简单,见代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Rep* r = rep_;
handle->set_offset(r->offset);
// 为index设置data block的handle信息
handle->set_size(block_contents.size());
r->status =r->file->Append(block_contents); // 写入data block内容
if (r->status.ok())
{
// 写入1byte的type和4bytes的crc32
chartrailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(),
block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc tocover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status =r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok())
{
// 写入成功更新offset-下一个data block的写入偏移
r->offset +=block_contents.size() + kBlockTrailerSize;
}
}

6.4.6 Finish函数

调用Finish函数,表明调用者将所有已经添加的k/v对持久化到sstable,并关闭sstable文件。

该函数逻辑很清晰,可分为5部分

S1 首先调用Flush,写入最后的一块data block,然后设置关闭标志closed=true。表明该sstable已经关闭,不能再添加k/v对。

1
2
3
4
5
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle,metaindex_block_handle, index_block_handle;

S2 写入filter block到文件中。

1
2
3
4
if (ok() &&r->filter_block != NULL) 
{
WriteRawBlock(r->filter_block->Finish(), kNoCompression,&filter_block_handle);
}

S3 写入meta index block到文件中。

如果filterblock不为NULL,则加入从”filter.Name”到filter data位置的映射。通过meta index block,可以根据filter名字快速定位到filter的数据区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (ok()) 
{
BlockBuildermeta_index_block(&r->options);
if (r->filter_block !=NULL)
{
//加入从"filter.Name"到filter data位置的映射
std::string key ="filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key,handle_encoding);
}
// TODO(postrelease): Add stats and other metablocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}

S4 写入index block,如果成功Flush过data block,那么需要为最后一块data block设置index block,并加入到index block中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (ok()) 
{
if (r->pending_index_entry)
{
// Flush时会被设置为true
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
// 加入到index block中
r->pending_index_entry =false;
}
WriteBlock(&r->index_block, &index_block_handle);
}

S5 写入Footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
if (ok()) 
{
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status =r->file->Append(footer_encoding);
if (r->status.ok())
{
r->offset +=footer_encoding.size();
}
}

整个写入流程就分析完了,对于Datablock和Filter Block的操作将在Data block和Filter Block中单独分析,下面的读取相同。

6.5 读取sstable文件

6.5.1 类层次

Sstable文件的读取逻辑在类Table中,其中涉及到的类还是比较多的,如图6.5-1所示。

img

Table类导出的函数只有3个,先从这三个导出函数开始分析。其中涉及到的类(包括上图中为画出的)都会一一遇到,然后再一一拆解。

本节分析sstable的打开逻辑,后面再分析key的查找与数据遍历。

6.5.2 Table::Open()

打开一个sstable文件,函数声明为:

1
2
static Status Open(const Options& options, RandomAccessFile* file,
uint64_tfile_size, Table** table);

这是Table类的一个静态函数,如果操作成功,指针*table指向新打开的表,否则返回错误。

要打开的文件和大小分别由参数file和file_size指定;option是一些选项;

下面就分析下函数逻辑:

S1

首先从文件的结尾读取Footer,并Decode到Footer对象中,如果文件长度小于Footer的长度,则报错。Footer的decode很简单,就是根据前面的Footer结构,解析并判断magic number是否正确,解析出meta index和index block的偏移和长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
*table = NULL;
if (size <Footer::kEncodedLength)
{
// 文件太短
returnStatus::InvalidArgument("file is too short to be an sstable");
}
charfooter_space[Footer::kEncodedLength]; // Footer大小是固定的
Slice footer_input;
Status s = file->Read(size -Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;
Footer footer;
s =footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
S2

解析出了Footer,我们就可以读取index block和meta index了,首先读取index block。

1
2
3
4
5
6
7
8
9
10
BlockContents contents;
Block* index_block = NULL;
if (s.ok())
{
s = ReadBlock(file, ReadOptions(),footer.index_handle(), &contents);
if (s.ok())
{
index_block = newBlock(contents);
}
}

这是通过调用ReadBlock完成的,下面会分析这个函数。

S3

已经成功读取了footer和index block,此时table已经可以响应请求了。构建table对象,并读取metaindex数据构建filter policy。如果option打开了cache,还要为table创建cache。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (s.ok())
{
// 已成功读取footer和index block: 可以响应请求了
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle =footer.metaindex_handle();
rep->index_block =index_block;
rep->cache_id =(options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = rep->filter= NULL;
*table = new Table(rep);
(*table)->ReadMeta(footer);
// 调用ReadMeta读取metaindex
}
else
{
if (index_block) deleteindex_block;
}

到这里,Table的打开操作就已经为完成了。下面来分析上面用到的ReadBlock()和ReadMeta()函数

6.5.3 ReadBlock()

前面讲过block的格式,以及Block的写入(TableBuilder::WriteRawBlock),现在我们可以轻松的分析Block的读取操作了。

这是一个全局函数,声明为:

1
2
Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, 
const BlockHandle&handle, BlockContents* result);

下面来分析实现逻辑:

S1

初始化结果result,BlockContents是一个有3个成员的结构体。

1
2
3
result->data = Slice();
result->cachable = false; // 无cache
result->heap_allocated =false; // 非heap分配
S2

根据handle指定的偏移和大小,读取block内容,type和crc32值,其中常量kBlockTrailerSize=5= 1byte的type和4bytes的crc32。

1
2
Status s = file->Read(handle.offset(),handle.size() + kBlockTrailerSize,
&contents, buf);
S3

如果option要校验CRC32,则计算content + type的CRC32并校验。

S4

最后根据type指定的存储类型,如果是非压缩的,则直接取数据赋给result,否则先解压,把解压结果赋给result,目前支持的是snappy压缩。

另外,文件的Read接口返回的Slice结果,其data指针可能没有使用我们传入的buf,如果没有,那么释放Slice的data指针就是我们的事情,否则就是文件来管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
if (data != buf) 
{
// 文件自己管理,cacheable等标记设置为false
delete[] buf;
result->data =Slice(data, n);
result->heap_allocated= result->cachable =false;
}
else
{
// 读取者自己管理,标记设置为true
result->data =Slice(buf, n);
result->heap_allocated= result->cachable = true;
}

对于压缩存储,解压后的字符串存储需要读取者自行分配的,所以标记都是true

6.5.4 Table::ReadMeta()

解决完了Block的读取,接下来就是meta的读取了。函数声明为:

1
void Table::ReadMeta(const Footer& footer)

函数逻辑并不复杂。

S1

首先调用ReadBlock读取meta的内容

1
2
3
4
5
6
7
8
if(rep_->options.filter_policy == NULL) return; 
// 不需要metadata
ReadOptions opt;
BlockContents contents;
if (!ReadBlock(rep_->file,opt, footer.metaindex_handle(), &contents).ok())
{
return; // 失败了也没报错,因为没有meta信息也没关系
}
S2

根据读取的content构建Block,找到指定的filter;如果找到了就调用ReadFilter构建filter对象。Block的分析留在后面。

1
2
3
4
5
6
7
8
Block* meta = newBlock(contents);
Iterator* iter =meta->NewIterator(BytewiseComparator());
std::string key ="filter.";
key.append(rep_->options.filter_policy->Name());
iter->Seek(key);
if (iter->Valid() &&iter->key() == Slice(key)) ReadFilter(iter->value());
delete iter;
delete meta;

6.5.5 Table::ReadFilter()

根据指定的偏移和大小,读取filter,函数声明:

1
void ReadFilter(const Slice& filter_handle_value);

简单分析下函数逻辑:

S1

从传入的filter_handle_value Decode出BlockHandle,这是filter的偏移和大小;

1
2
BlockHandle filter_handle;
filter_handle.DecodeFrom(&filter_handle_value);
S2

根据解析出的位置读取filter内容,ReadBlock。如果block的heap_allocated为true,表明需要自行释放内存,因此要把指针保存在filter_data中。最后根据读取的data创建FilterBlockReader对象。

1
2
3
4
5
6
ReadOptions opt;
BlockContents block;
ReadBlock(rep_->file, opt,filter_handle, &block);
if (block.heap_allocated)rep_->filter_data = block.data.data();
// 需要自行释放内存
rep_->filter = newFilterBlockReader(rep_->options.filter_policy, block.data);

以上就是sstable文件的读取操作,不算复杂。

6.6 遍历Table

6.6.1 遍历接口

Table导出了一个返回Iterator的接口,通过Iterator对象,调用者就可以遍历Table的内容,它简单的返回了一个TwoLevelIterator对象。见函数实现:

1
2
3
4
5
6
7
8
9
10
11
Iterator* NewIterator(const ReadOptions&options) const;  
{
return NewTwoLevelIterator(rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader,const_cast<Table*>(this), options);
}
// 函数NewTwoLevelIterator创建了一个TwoLevelIterator对象:
Iterator* NewTwoLevelIterator(Iterator* index_iter,BlockFunction block_function,
void* arg, constReadOptions& options)
{
return newTwoLevelIterator(index_iter, block_function, arg, options);
}

这里有一个函数指针BlockFunction,类型为:

1
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, constSlice&);

为什么叫TwoLevelIterator呢,下面就来看看。

6.6.2 TwoLevelIterator

它也是Iterator的子类,之所以叫two level应该是不仅可以迭代其中存储的对象,它还接受了一个函数BlockFunction,可以遍历存储的对象,可见它是专门为Table定制的。
我们已经知道各种Block的存储格式都是相同的,但是各自block data存储的k/v互不相同,于是我们就需要一个途径,能够在使用同一个方式遍历不同的block时,又能解析这些k/v。这就是BlockFunction,它又返回了一个针对block data的Iterator。Block和block data存储的k/v对的key是统一的。
先来看类的主要成员变量:

1
2
3
4
5
6
7
8
9
BlockFunction block_function_; // block操作函数  
void* arg_; // BlockFunction的自定义参数
const ReadOptions options_; // BlockFunction的read option参数
Status status_; // 当前状态
IteratorWrapper index_iter_; // 遍历block的迭代器
IteratorWrapper data_iter_; // May be NULL-遍历block data的迭代器
// 如果data_iter_ != NULL,data_block_handle_保存的是传递给
// block_function_的index value,以用来创建data_iter_
std::string data_block_handle_;

下面分析一下对于Iterator几个接口的实现。

S1

对于其Key和Value接口都是返回的dataiter对应的key和value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
virtual bool Valid() const 
{
return data_iter_.Valid();
}

virtual Slice key() const
{
assert(Valid());
return data_iter_.key();
}

virtual Slice value() const
{
assert(Valid());
return data_iter_.value();
}

S2 在分析Seek系函数之前,有必要先了解下面这几个函数的用途。

1
2
3
4
5
void InitDataBlock();  
void SetDataIterator(Iterator*data_iter);
//设置date_iter_ = data_iter
voidSkipEmptyDataBlocksForward();
voidSkipEmptyDataBlocksBackward();
S2.1

首先是InitDataBlock(),它是根据index_iter来初始化data_iter,当定位到新的block时,需要更新data Iterator,指向该block中k/v对的合适位置,函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (!index_iter_.Valid()) SetDataIterator(NULL);
// index_iter非法
else
{
Slice handle =index_iter_.value();
if (data_iter_.iter() != NULL&& handle.compare(data_block_handle_) == 0)
{
//data_iter已经在该block data上了,无须改变
}
else
{
// 根据handle数据定位data iter
Iterator* iter =(*block_function_)(arg_, options_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
}
S2.2

SkipEmptyDataBlocksForward,向前跳过空的datablock,函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (data_iter_.iter() == NULL|| !data_iter_.Valid()) 
{
// 跳到下一个block
if (!index_iter_.Valid())
{
// 如果index iter非法,设置data iteration为NULL
SetDataIterator(NULL);
return;
}
index_iter_.Next();
InitDataBlock();
if (data_iter_.iter() != NULL)data_iter_.SeekToFirst();
// 跳转到开始
}
S2.3

SkipEmptyDataBlocksBackward,向后跳过空的datablock,函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (data_iter_.iter() == NULL|| !data_iter_.Valid()) 
{
// 跳到前一个block
if (!index_iter_.Valid())
{
// 如果index iter非法,设置data iteration为NULL
SetDataIterator(NULL);
return;
}
index_iter_.Prev();
InitDataBlock();
if (data_iter_.iter() != NULL)data_iter_.SeekToLast();
// 跳转到开始
}
S3

了解了几个跳转的辅助函数,再来看Seek系接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void TwoLevelIterator::Seek(const Slice& target) 
{
index_iter_.Seek(target);
InitDataBlock();
// 根据index iter设置data iter
if (data_iter_.iter() != NULL)data_iter_.Seek(target);
// 调整data iter跳转到target
SkipEmptyDataBlocksForward();
// 调整iter,跳过空的block
}

void TwoLevelIterator::SeekToFirst()
{
index_iter_.SeekToFirst();
InitDataBlock(); // 根据index iter设置data iter
if (data_iter_.iter() != NULL)data_iter_.SeekToFirst();
SkipEmptyDataBlocksForward(); // 调整iter,跳过空的block
}

void TwoLevelIterator::SeekToLast()
{
index_iter_.SeekToLast();
InitDataBlock(); // 根据index iter设置data iter
if (data_iter_.iter() != NULL)data_iter_.SeekToLast();
SkipEmptyDataBlocksBackward();// 调整iter,跳过空的block
}

void TwoLevelIterator::Next()
{
assert(Valid());
data_iter_.Next();
SkipEmptyDataBlocksForward(); // 调整iter,跳过空的block
}

void TwoLevelIterator::Prev()
{
assert(Valid());
data_iter_.Prev();
SkipEmptyDataBlocksBackward();// 调整iter,跳过空的block
}

6.6.3 BlockReader()

上面传递给twolevel Iterator的函数是Table::BlockReader函数,声明如下:

1
2
static Iterator* Table::BlockReader(void* arg, const ReadOptions&options,
constSlice& index_value);

它根据参数指明的blockdata,返回一个iterator对象,调用者就可以通过这个iterator对象遍历blockdata存储的k/v对,这其中用到了LRUCache
函数实现逻辑如下:

S1

从参数中解析出BlockHandle对象,其中arg就是Table对象,index_value存储的是BlockHandle对象,读取Block的索引。

1
2
3
4
5
6
Table* table =reinterpret_cast<Table*>(arg);  
Block* block = NULL;
Cache::Handle* cache_handle =NULL;
BlockHandle handle;
Slice input = index_value;
Status s =handle.DecodeFrom(&input);
S2

根据block handle,首先尝试从cache中直接取出block,不在cache中则调用ReadBlock从文件读取,读取成功后,根据option尝试将block加入到LRU cache中。并在Insert的时候注册了释放函数DeleteCachedBlock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Cache* block_cache =table->rep_->options.block_cache;  
BlockContents contents;
if (block_cache != NULL)
{
char cache_key_buffer[16];
// cache key的格式为table.cache_id + offset
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer+8, handle.offset());
Slice key(cache_key_buffer,sizeof(cache_key_buffer));
cache_handle =block_cache->Lookup(key); // 尝试从LRU cache中查找
if (cache_handle != NULL)
{
// 找到则直接取值
block =reinterpret_cast<Block*>(block_cache->Value(cache_handle));
}
else
{
// 否则直接从文件读取
s =ReadBlock(table->rep_->file,
options, handle, &contents);
if (s.ok())
{
block = new Block(contents);
if (contents.cachable&& options.fill_cache)
// 尝试加到cache中
cache_handle =block_cache->Insert(key, block,block->size(), &DeleteCachedBlock);
}
}
}
else
{
s =ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) block = newBlock(contents);
}
S3

如果读取到了block,调用Block::NewIterator接口创建Iterator,如果cache handle为NULL,则注册DeleteBlock,否则注册ReleaseBlock,事后清理。

1
2
3
4
5
6
7
8
Iterator* iter;  
if (block != NULL)
{
iter =block->NewIterator(table->rep_->options.comparator);
if (cache_handle == NULL) iter->RegisterCleanup(&DeleteBlock,block, NULL);
else iter->RegisterCleanup(&ReleaseBlock,block_cache, cache_handle);
}
else iter = NewErrorIterator(s);

处理结束,最后返回iter。这里简单列下这几个静态函数,都很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void DeleteBlock(void* arg, void* ignored) 
{
deletereinterpret_cast<Block*>(arg);
}

static void DeleteCachedBlock(const Slice& key, void* value)
{
Block* block =reinterpret_cast<Block*>(value);
delete block;
}

static void ReleaseBlock(void* arg, void* h)
{
Cache* cache =reinterpret_cast<Cache*>(arg);
Cache::Handle* handle =reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}

6.7 定位key

这里并不是精确的定位,而是在Table中找到第一个>=指定key的k/v对,然后返回其value在sstable文件中的偏移。也是Table类的一个接口:

1
uint64_t ApproximateOffsetOf(const Slice& key) const;

函数实现比较简单:

S1

调用Block::Iter的Seek函数定位

1
2
3
Iterator* index_iter=rep_->index_block->NewIterator(rep_->options.comparator);  
index_iter->Seek(key);
uint64_t result;
S2

如果index_iter是合法的值,并且Decode成功,返回结果offset。

1
2
3
BlockHandle handle;  
handle.DecodeFrom(&index_iter->value());
result = handle.offset();
S3

其它情况,设置result为rep_->metaindex_handle.offset(),metaindex的偏移在文件结尾附近。

6.8 获取Key—InternalGet()

InternalGet,这是为TableCache开的一个口子。这是一个private函数,声明为:

1
2
Status Table::InternalGet(const ReadOptions& options, constSlice& k,
void*arg, void (*saver)(void*, const Slice&, const Slice&))

其中又有函数指针,在找到数据后,就调用传入的函数指针saver执行调用者的自定义处理逻辑,并且TableCache可能会做缓存。
函数逻辑如下:

S1

首先根据传入的key定位数据,这需要indexblock的Iterator。

1
2
Iterator* iiter =rep_->index_block->NewIterator(rep_->options.comparator);  
iiter->Seek(k);
S2

如果key是合法的,取出其filter指针,如果使用了filter,则检查key是否存在,这可以快速判断,提升效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status s;  
Slice handle_value =iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != NULL && handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(),k))
{
// key不存在
}
else
{
// 否则就要读取block,并查找其k/v对
Slice handle = iiter->value();
Iterator* block_iter =BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid())(*saver)(arg, block_iter->key(), block_iter->value());
s = block_iter->status();
delete block_iter;
}
S3

最后返回结果,删除临时变量。

1
2
3
if (s.ok()) s =iiter->status();  
delete iiter;
return s;

随着有关sstable文件读取的结束,sstable的源码也就分析完了,其中我们还遗漏了一些功课要做,那就是Filter和TableCache部分。

7.TableCache

7.1 TableCache简介

TableCache缓存的是Table对象,每个DB一个,它内部使用一个LRUCache缓存所有的table对象,实际上其内容是文件编号{file number, TableAndFile}TableAndFile是一个拥有2个变量的结构体:RandomAccessFile和Table*;

TableCache类的主要成员变量有:

1
2
3
Env* const env_;            // 用来操作文件  
const std::string dbname_; // db名
Cache* cache_; // LRUCache

三个函数接口,其中的参数@file_number是文件编号,@file_size是文件大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Evict(uint64_tfile_number);  
// 该函数用以清除指定文件所有cache的entry,
//函数实现很简单,就是根据file number清除cache对象。
EncodeFixed64(buf,file_number); cache_->Erase(Slice(buf, sizeof(buf)));
Iterator* NewIterator(constReadOptions& options, uint64_t file_number,
uint64_t file_size, Table**tableptr = NULL);
//该函数为指定的file返回一个iterator(对应的文件长度必须是"file_size"字节).
//如果tableptr不是NULL,那么tableptr保存的是底层的Table指针。
//返回的tableptr是cache拥有的,不能被删除,生命周期同返回的iterator

Status Get(constReadOptions& options,
uint64_t file_number,uint64_t file_size,
const Slice& k,void* arg,
void(*handle_result)(void*, const Slice&, const Slice&));

// 这是一个查找函数,如果在指定文件中seek 到internal key "k" 找到一个entry,
//就调用 (*handle_result)(arg,found_key, found_value).

7.2 TableCache::Get()

先来看看Get接口,只有几行代码:

1
2
3
4
5
6
7
8
9
Cache::Handle* handle = NULL;  
Status s =FindTable(file_number, file_size, &handle);
if (s.ok())
{
Table* t =reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options,k, arg, saver);
cache_->Release(handle);
}
return s;

首先根据file_number找到Table的cache对象,如果找到了就调用Table::InternalGet,对查找结果的处理在调用者传入的saver回调函数中。
Cache在Lookup找到cache对象后,如果不再使用需要调用Release减引用计数。这个见Cache的接口说明。

7.3 TableCache遍历

函数NewIterator(),返回一个可以遍历Table对象的Iterator指针,函数逻辑:

S1

初始化tableptr,调用FindTable,返回cache对象

1
2
3
4
if (tableptr != NULL) *tableptr =NULL;  
Cache::Handle* handle = NULL;
Status s =FindTable(file_number, file_size, &handle);
if (!s.ok()) returnNewErrorIterator(s);

S2

从cache对象中取出Table对象指针,调用其NewIterator返回Iterator对象,并为Iterator注册一个cleanup函数。

1
2
3
4
5
Table* table =reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;  
Iterator* result =table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != NULL) *tableptr= table;
return result;

7.4 TableCache::FindTable()

前面的遍历和Get函数都依赖于FindTable这个私有函数完成对cache的查找,下面就来看看该函数的逻辑。函数声明为:

1
2
Status FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle)

函数流程为:

S1

首先根据file number从cache中查找table,找到就直接返回成功。

1
2
3
4
char buf[sizeof(file_number)];  
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);

S2

如果没有找到,说明table不在cache中,则根据file number和db name打开一个RadomAccessFile。Table文件格式为:..sst。如果文件打开成功,则调用Table::Open读取sstable文件。

1
2
3
4
5
std::string fname =TableFileName(dbname_, file_number);  
RandomAccessFile* file = NULL;
Table* table = NULL;
s =env_->NewRandomAccessFile(fname, &file);
if (s.ok()) s =Table::Open(*options_, file, file_size, &table);

S3

如果Table::Open成功则,插入到Cache中。

1
2
TableAndFile* tf = newTableAndFile(table, file);  
*handle = cache_->Insert(key,tf, 1, &DeleteEntry);

如果失败,则删除file,直接返回失败,失败的结果是不会cache的。

7.5 辅助函数

有点啰嗦,不过还是写一下吧。其中一个是为LRUCache注册的删除函数DeleteEntry

1
2
3
4
5
6
7
static void DeleteEntry(const Slice& key, void* value) 
{
TableAndFile* tf =reinterpret_cast<TableAndFile*>(value);
delete tf->table;
delete tf->file;
delete tf;
}

另外一个是为Iterator注册的清除函数UnrefEntry

1
2
3
4
5
6
static void UnrefEntry(void* arg1, void* arg2) 
{
Cache* cache =reinterpret_cast<Cache*>(arg1);
Cache::Handle* h =reinterpret_cast<Cache::Handle*>(arg2);
cache->Release(h);
}

8.FilterPolicy&Bloom之1

8.1 FilterPolicy

因名知意,FilterPolicy是用于key过滤的,可以快速的排除不存在的key。前面介绍Table的时候,在Table::InternalGet函数中有过一面之缘。
FilterPolicy有3个接口:

1
2
3
4
5
virtual const char* Name() const = 0; 
// 返回filter的名字
virtual void CreateFilter(const Slice* keys,
int n, std::string* dst)const = 0;
virtual bool KeyMayMatch(const Slice& key, const Slice& filter)const = 0;

CreateFilter接口,它根据指定的参数创建过滤器,并将结果append到dst中,注意:不能修改dst的原始内容,只做append。
参数@keys[0,n-1]包含依据用户提供的comparator排序的key列表—可重复,并把根据这些key创建的filter追加到@*dst中。

KeyMayMatch,参数@filter包含了调用CreateFilter函数append的数据,如果key在传递函数CreateFilter的key列表中,则必须返回true

注意:它不需要精确,也就是即使key不在前面传递的key列表中,也可以返回true,但是如果key在列表中,就必须返回true

8.2InternalFilterPolicy

这是一个简单的FilterPolicy的wrapper,以方便的把FilterPolicy应用在InternalKey上,InternalKey是Leveldb内部使用的key,这些前面都讲过。它所做的就是从InternalKey拆分得到user key,然后在user key上做FilterPolicy的操作。
它有一个成员:

1
constFilterPolicy* const user_policy_;

Name()返回的是userpolicy->Name()

1
2
3
4
5
6
7
8
9
10
11
bool InternalFilterPolicy::KeyMayMatch(const Slice& key, constSlice& f) const 
{
returnuser_policy_->KeyMayMatch(ExtractUserKey(key), f);
}
void InternalFilterPolicy::CreateFilter(const Slice* keys,
int n,std::string* dst) const
{
Slice* mkey =const_cast<Slice*>(keys);
for (int i = 0; i < n; i++)mkey[i] = ExtractUserKey(keys[i]);
user_policy_->CreateFilter(keys, n, dst);
}

8.3 BloomFilter

8.3.1 基本理论

Bloom Filter实际上是一种hash算法,数学之美系列有专门介绍。它是由巴顿.布隆于一九七零年提出的,它实际上是一个很长的二进制向量和一系列随机映射函数

Bloom Filter将元素映射到一个长度为m的bit向量上的一个bit,当这个bit是1时,就表示这个元素在集合内。使用hash的缺点就是元素很多时可能有冲突,为了减少误判,就使用k个hash函数计算出k个bit,只要有一个bit为0,就说明元素肯定不在集合内。下面的图8.3-1是一个示意图。

在leveldb的实现中,Name()返回”leveldb.BuiltinBloomFilter”,因此metaindex block中的key就是filter.leveldb.BuiltinBloomFilter。Leveldb使用了double hashing来模拟多个hash函数,当然这里不是用来解决冲突的。

和线性再探测(linearprobing)一样,Double hashing从一个hash值开始,重复向前迭代,直到解决冲突或者搜索完hash表。不同的是,double hashing使用的是另外一个hash函数,而不是固定的步长。

给定两个独立的hash函数h1和h2,对于hash表T和值k,第i次迭代计算出的位置就是:h(i, k) = (h1(k) + i*h2(k)) mod |T|。对此,Leveldb选择的hash函数是:

1
2
Gi(x)=H1(x)+iH2(x)
H2(x)=(H1(x)>>17) | (H1(x)<<15)

H1是一个基本的hash函数,H2是由H1循环右移得到的,Gi(x)就是第i次循环得到的hash值。在bloom_filter的数据的最后一个字节存放的是k的值,k实际上就是G(x)的个数,也就是计算时采用的hash函数个数

8.3.2 BloomFilter参数

这里先来说下其两个成员变量:bitsper_keykey_;其实这就是Bloom Hashing的两个关键参数。变量k_实际上就是模拟的hash函数的个数;

关于变量bitsper_key,对于n个key,其hash table的大小就是bits_per_key_。它的值越大,发生冲突的概率就越低,那么bloom hashing误判的概率就越低。因此这是一个时间空间的trade-off

对于hash(key),在平均意义上,发生冲突的概率就是1 / bits_per_key_。它们在构造函数中根据传入的参数bits_per_key初始化

1
2
3
4
bits_per_key_ = bits_per_key;  
k_ =static_cast<size_t>(bits_per_key * 0.69); // 0.69 =~ ln(2)
if (k_ < 1) k_ = 1;
if (k_ > 30) k_ = 30;

模拟hash函数的个数k取值为**bits_per_key*ln(2)**,为何不是0.5或者0.4了,可能是什么理论推导的结果吧,不了解了。

8.3.3 建立BloomFilter

了解了上面的理论,再来看leveldb对Bloom Fil**ter的实现就轻松多了,先来看Bloom Filter的构建。这就是FilterPolicy::CreateFilter接口的实现**:

1
void CreateFilter(const Slice* keys, int n, std::string* dst) const

下面分析其实现代码,大概有如下几个步骤:

S1

首先根据key个数分配filter空间,并圆整到8byte。

1
2
3
4
5
6
7
size_t bits = n * bits_per_key_;  
if (bits < 64) bits = 64;
// 如果n太小FP会很高,限定filter的最小长度
size_t bytes = (bits + 7) / 8; // 圆整到8byte
bits = bytes * 8; // bit计算的空间大小
const size_t init_size =dst->size();
dst->resize(init_size +bytes, 0); // 分配空间
S2

在filter最后的字节位压入hash函数个数

1
2
dst->push_back(static_cast<char>(k_));
// Remember # of probes in filter
S3

对于每个key,使用double-hashing生产一系列的hash值h(K_个),设置bits array的第h位=1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
char* array =&(*dst)[init_size];  
for (size_t i = 0; i < n;i++)
{
// double-hashing,分析参见[Kirsch,Mitzenmacher 2006]
uint32_t h =BloomHash(keys[i]);
// h1函数
const uint32_t delta = (h>> 17) | (h << 15);
// h2函数、由h1 Rotate right 17 bits
for (size_t j = 0; j < k_; j++)
{
// double-hashing生产k_个的hash值
const uint32_t bitpos = h% bits;
// 在bits array上设置第bitpos位
array[bitpos/8] |= (1<< (bitpos % 8));
h += delta;
}
}

Bloom Filter的创建就完成了。

8.3.4 查找BloomFilter

在指定的filer中查找key是否存在,这就是bloom filter的查找函数:
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter),函数逻辑如下:

S1

准备工作,并做些基本判断。

1
2
3
4
5
6
7
8
const size_t len =bloom_filter.size();  
if (len < 2) return false;
const char* array = bloom_filter.data();
const size_t bits = (len - 1)* 8;
const size_t k = array[len-1];
// 使用filter的k,而不是k_,这样更灵活
if (k > 30) return true;
// 为短bloom filter保留,当前认为直接match
S2

计算key的hash值,重复计算阶段的步骤,循环计算k个hash值,只要有一个结果对应的bit位为0,就认为不匹配,否则认为匹配。

1
2
3
4
5
6
7
8
9
10
uint32_t h = BloomHash(key);  
const uint32_t delta = (h>> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k;j++)
{
const uint32_t bitpos = h %bits;
if ((array[bitpos/8] &(1 << (bitpos % 8))) == 0) return false;
// notmatch
h += delta;
}
return true; // match

8.4 Filter Block格式

Filter Block也就是前面sstable中的meta block,位于data block之后。

如果打开db时指定了FilterPolicy,那么每个创建的table都会保存一个filter block,table中的metaindex就包含一条从”filter.到filter block的BlockHandle的映射,其中””是filter policy的Name()函数返回的string

Filter block存储了一连串的filter值,其中第i个filter保存的是block b中所有的key通过FilterPolicy::CreateFilter()计算得到的结果,block b在sstable文件中的偏移满足[ i*base … (i+1)*base-1 ]

当前base是2KB,举个例子,如果block X和Y在sstable的起始位置都在[0KB, 2KB-1]中,X和Y中的所有key调用FilterPolicy::CreateFilter()的计算结果都将生产到同一个filter中,而且该filter是filter block的第一个filter。

Filter block也是一个block,其格式遵从block的基本格式:|block data| type | crc32|。其中block dat的格式如图8.4-1所示。

8.5 构建FilterBlock

8.5.1 FilterBlockBuilder

了解了filter机制,现在来看看filter block的构建,这就是类FilterBlockBuilder。它为指定的table构建所有的filter,结果是一个string字符串,并作为一个block存放在table中。它有三个函数接口:

1
2
3
4
5
6
7
8
// 开始构建新的filter block,TableBuilder在构造函数和Flush中调用  
void StartBlock(uint64_tblock_offset);

// 添加key,TableBuilder每次向data block中加入key时调用
void AddKey(const Slice&key);

// 结束构建,TableBuilder在结束对table的构建时调用
Slice Finish();

FilterBlockBuilder的构建顺序必须满足如下范式:(StartBlock AddKey*)* Finish,显然这和前面讲过的BlockBuilder有所不同。
其成员变量有:

1
2
3
4
5
6
const FilterPolicy* policy_; // filter类型,构造函数参数指定  
std::string keys_; //Flattened key contents
std::vector<size_t> start_; // 各key在keys_中的位置
std::string result_; // 当前计算出的filter data
std::vector<uint32_t>filter_offsets_; // 各个filter在result_中的位置
std::vector<Slice> tmp_keys_;// policy_->CreateFilter()参数

前面说过base是2KB,这对应两个常量kFilterBase =11, kFilterBase =(1<<kFilterBaseLg);其实从后面的实现来看tmpkeys完全不必作为成员变量,直接作为函数GenerateFilter()的栈变量就可以。下面就分别分析三个函数接口。

8.5.2 FilterBlockBuilder::StartBlock()

它根据参数block_offset计算出filter index,然后循环调用GenerateFilter生产新的Filter。

1
2
3
uint64_t filter_index =(block_offset / kFilterBase);  
assert(filter_index >=filter_offsets_.size());
while (filter_index >filter_offsets_.size()) GenerateFilter();

我们来到GenerateFilter这个函数,看看它的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//S1 如果filter中key个数为0,则直接压入result_.size()并返回  
const size_t num_keys =start_.size();
if (num_keys == 0) { // there are no keys for this filter
filter_offsets_.push_back(result_.size()); //result_.size()应该是0
return;
}

//S2 从key创建临时key list,根据key的序列字符串kyes_和各key在keys_
//中的开始位置start_依次提取出key。

start_.push_back(keys_.size()); // Simplify lengthcomputation
tmp_keys_.resize(num_keys);
for (size_t i = 0; i < num_keys; i++) {
const char* base =keys_.data() + start_[i]; // 开始指针
size_t length = start_[i+1] -start_[i]; // 长度
tmp_keys_[i] = Slice(base,length);

}

//S3 为当前的key集合生产filter,并append到result_

filter_offsets_.push_back(result_.size());
policy_->CreateFilter(&tmp_keys_[0], num_keys, &result_);

//S4 清空,重置状态

tmp_keys_.clear();
keys_.clear();
start_.clear();

8.5.3 FilterBlockBuilder::AddKey()

这个接口很简单,就是把key添加到key中,并在start中记录位置。

1
2
3
Slice k = key;  
start_.push_back(keys_.size());
keys_.append(k.data(),k.size());

8.5.4 FilterBlockBuilder::Finish()

调用这个函数说明整个table的data block已经构建完了,可以生产最终的filter block了,在TableBuilder::Finish函数中被调用,向sstable写入meta block。函数逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//S1 如果start_数字不空,把为的key列表生产filter  

if (!start_.empty()) GenerateFilter();

//S2 从0开始顺序存储各filter的偏移值,见filter block data的数据格式。

const uint32_t array_offset =result_.size();
for (size_t i = 0; i < filter_offsets_.size();i++) {
PutFixed32(&result_,filter_offsets_[i]);
}

//S3 最后是filter个数,和shift常量(11),并返回结果
PutFixed32(&result_,array_offset);
result_.push_back(kFilterBaseLg); // Save encoding parameter in result
return Slice(result_);

8.5.5 简单示例

让我们根据TableBuilder对FilterBlockBuilder接口的调用范式:
(StartBlock AddKey) Finish以及上面的函数实现,结合一个简单例子看看leveldb是如何为data block创建filter block(也就是meta block)的。
考虑两个datablock,在sstable的范围分别是:Block 1 [0, 7KB-1], Block 2 [7KB, 14.1KB]

  • S1 首先TableBuilder为Block 1调用FilterBlockBuilder::StartBlock(0),该函数直接返回;
  • S2 然后依次向Block 1加入k/v,其中会调用FilterBlockBuilder::AddKey,FilterBlockBuilder记录这些key。
  • S3 下一次TableBuilder添加k/v时,例行检查发现Block 1的大小超过设置,则执行Flush操作,Flush操作在写入Block 1后,开始准备Block 2并更新block offset=7KB,最后调用FilterBlockBuilder::StartBlock(7KB),开始为Block 2构建Filter。
  • S4 在FilterBlockBuilder::StartBlock(7KB)中,计算出filter index = 3,触发3次GenerateFilter函数,为Block 1添加的那些key列表创建filter,其中第2、3次循环创建的是空filter。
    • 在StartBlock(7KB)时会向filter的偏移数组filteroffsets压入两个包含空key set的元素,filteroffsets[1]和filteroffsets[2],它们的值都等于7KB-1。
  • S5 Block 2构建结束,TableBuilder调用Finish结束table的构建,这会再次触发Flush操作,在写入Block 2后,为Block 2的key创建filter。
  • 这里如果Block 1的范围是[0, 1.8KB-1],Block 2从1.8KB开始,那么Block 2将会和Block 1共用一个filter,它们的filter都被生成到filter 0中。
    • 当然在TableBuilder构建表时,Block的大小是根据参数配置的,也是基本均匀的。

8.6 读取FilterBlock

8.6.1 FilterBlockReader

FilterBlock的读取操作在FilterBlockReader类中,它的主要功能是根据传入的FilterPolicy和filter,进行key的匹配查找。
它有如下的几个成员变量:

1
2
3
4
5
const FilterPolicy* policy_; // filter策略  
const char* data_; // filter data指针 (at block-start)
const char* offset_; // offset array的开始地址 (at block-end)
size_t num_; // offsetarray元素个数
size_t base_lg_; // 还记得kFilterBaseLg吗

Filter策略和filter block内容都由构造函数传入。一个接口函数,就是key的批判查找:

1
bool KeyMayMatch(uint64_t block_offset, const Slice& key);

8.6.2 构造

在构造函数中,根据存储格式解析出偏移数组开始指针、个数等信息。

1
2
3
4
5
6
7
8
9
10
11
12
FilterBlockReader::FilterBlockReader(const FilterPolicy* policy, 
constSlice& contents)
: policy_(policy),data_(NULL), offset_(NULL), num_(0), base_lg_(0) {

size_t n = contents.size();
if (n < 5) return; // 1 byte forbase_lg_ and 4 for start of offset array
base_lg_ = contents[n-1]; // 最后1byte存的是base
uint32_t last_word =DecodeFixed32(contents.data() + n - 5); //偏移数组的位置
if (last_word > n - 5)return;
data_ = contents.data();
offset_ = data_ + last_word; // 偏移数组开始指针
num_ = (n - 5 - last_word) / 4; // 计算出filter个数

8.6.3 查找

查找函数传入两个参数

  • @block_offset是查找data block在sstable中的偏移,Filter根据此偏移计算filter的编号;
  • @key是查找的key。

声明如下:bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, constSlice& key)

首先计算出filterindex,根据index解析出filter的range,如果是合法的range,就从data_中取出filter,调用policy_做key的匹配查询。函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
uint64_t index = block_offset>> base_lg_; // 计算出filter index  

if (index < num_) {
// 解析出filter的range
uint32_t start =DecodeFixed32(offset_ + index*4);
uint32_t limit =DecodeFixed32(offset_ + index*4 + 4);
if (start <= limit&& limit <= (offset_ - data_)) {
Slice filter = Slice(data_ +start, limit - start); // 根据range得到filter
returnpolicy_->KeyMayMatch(key, filter);
} else if (start == limit) {
return false; // 空filter不匹配任何key
}
}
return true; // 当匹配处理

至此,FilterPolicy和Bloom就分析完了。

9 LevelDB框架之1

到此为止,基本上Leveldb的主要功能组件都已经分析完了,下面就是把它们组合在一起,形成一个高性能的k/v存储系统。这就是leveldb::DB类。

这里先看一下LevelDB的导出接口和涉及的类,后面将依次以接口分析的方式展开。而实际上leveldb::DB只是一个接口类,真正的实现和框架类是DBImpl这个类,正是它集合了上面的各种组件。此外,还有Leveldb对版本的控制,执行版本控制的是VersionVersionSet类。在leveldb的源码中,DBImpl和VersionSet是两个庞然大物,体量基本算是最大的。对于这两个类的分析,也会分散在打开、销毁和快照等等这些功能中,很难在一个地方集中分析。

作者在文档impl.html中描述了leveldb的实现,其中包括文件组织compactionrecovery等等。下面的9.1和9.2基本都是翻译子impl.html文档。在进入框架代码之前,先来了解下leveldb的文件组织和管理。

9.1 DB文件管理

9.1.1 文件类型

对于一个数据库Level包含如下的6种文件:

1/[0-9]+.log:db操作日志
这就是前面分析过的操作日志,log文件包含了最新的db更新,每个更新都以append的方式追加到文件结尾。当log文件达到预定大小时(缺省大约4MB),leveldb就把它转换为一个有序表(如下-2),并创建一个新的log文件。
当前的log文件在内存中的存在形式就是memtable,每次read操作都会访问memtable,以保证read读取到的是最新的数据。

2/[0-9]+.sst:db的sstable文件
这两个就是前面分析过的静态sstable文件,sstable存储了以key排序的元素。每个元素或者是key对应的value,或者是key的删除标记(删除标记可以掩盖更老sstable文件中过期的value)。

Leveldbsstable文件通过level的方式组织起来,从log文件中生成的sstable被放在level 0。当level 0的sstable文件个数超过设置(当前为4个)时,leveldb就把所有的level 0文件,以及有重合的level 1文件merge起来,组织成一个新的level 1文件(每个level 1文件大小为2MB)。

Level 0的SSTable文件(后缀为.sst)和Level>1的文件相比有特殊性:这个层级内的.sst文件,两个文件可能存在key重叠。对于Level>0,同层sstable文件的key不会重叠。考虑level>0,level中的文件的总大小超过10^level MB时(如level=1是10MB,level=2是100MB),那么level中的一个文件,以及所有level+1中和它有重叠的文件,会被merge到level+1层的一系列新文件。Merge操作的作用是将更新从低一级level迁移到最高级,只使用批量读写(最小化seek操作,提高效率)。

3/MANIFEST-[0-9]+:DB元信息文件
它记录的是leveldb的元信息,比如DB使用的Comparator名,以及各SSTable文件的管理信息:如Level层数、文件名、最小key和最大key等等。

4/CURRENT:记录当前正在使用的Manifest文件
它的内容就是当前的manifest文件名;因为在LevleDb的运行过程中,随着Compaction的进行,新的SSTable文件被产生,老的文件被废弃。并生成新的Manifest文件来记载sstable的变动,而CURRENT则用来记录我们关心的Manifest文件。

当db被重新打开时,leveldb总是生产一个新的manifest文件。Manifest文件使用log的格式,对服务状态的改变(新加或删除的文件)都会追加到该log中。

上面的log文件、sst文件、清单文件,末尾都带着序列号,其序号都是单调递增的(随着next_file_number从1开始递增),以保证不和之前的文件名重复。

5/log:系统的运行日志,记录系统的运行信息或者错误日志。

6/dbtmp:临时数据库文件,repair时临时生成的。
这里就涉及到几个关键的number计数器,log文件编号,下一个文件(sstable、log和manifest)编号,sequence。
所有正在使用的文件编号,包括log、sstable和manifest都应该小于下一个文件编号计数器。

9.1.2 Level 0

当操作log超过一定大小时(缺省是1MB),执行如下操作:

  • S1 创建新的memtable和log文件,并重导向新的更新到新memtable和log中;
  • S2 在后台:
  • S2.1 将前一个memtable的内容dump到sstable文件;
  • S2.2 丢弃前一个memtable;
  • S2.3 删除旧的log文件和memtable
  • S2.4 把创建的sstable文件放到level 0

9.2 Compaction

level L的总文件大小查过限制时,我们就在后台执行compaction操作。Compaction操作从level L中选择一个文件f,以及选择中所有和f有重叠的文件。如果某个level (L+1)的文件ff只是和f部分重合,compaction依然选择ff的完整内容作为输入,在compaction后f和ff都会被丢弃。

另外:因为level 0有些特殊(同层文件可能有重合),从level 0到level 1的compaction就需要特殊对待:level 0的compaction可能会选择多个level 0文件,如果它们之间有重叠。

Compaction将选择的文件内容merge起来,并生成到一系列的level (L+1)文件中,如果输出文件超过设置(2MB),就切换到新的。当输出文件的key范围太大以至于和超过10个level (L+2)文件有重合时,也会切换。后一个规则确保了level (L+1)的文件不会和过多的level (L+2)文件有重合,其后的level (L+1) compaction不会选择过多的level (L+2)文件。

老的文件会被丢弃,新创建的文件将加入到server状态中。

Compaction操作在key空间中循环执行,详细讲一点就是,对于每个level,我们记录上次compaction的ending key。Level的下一次compaction将选择ending key之后的第一个文件(如果这样的文件不存在,将会跳到key空间的开始)。

Compaction会忽略被写覆盖的值,如果更高一层的level没有文件的范围包含了这个key,key的删除标记也会被忽略。

9.2.1 时间

Level 0的compaction最多从level 0读取4个1MB的文件,以及所有的level 1文件(10MB),也就是我们将读取14MB,并写入14BM。

Level > 0的compaction,从level L选择一个2MB的文件,最坏情况下,将会和levelL+1的12个文件有重合(10:level L+1的总文件大小是level L的10倍;边界的2:level L的文件范围通常不会和level L+1的文件对齐)。因此Compaction将会读26MB,写26MB。对于100MB/s的磁盘IO来讲,compaction将最坏需要0.5秒。

如果磁盘IO更低,比如10MB/s,那么compaction就需要更长的时间5秒。如果user以10MB/s的速度写入,我们可能生成很多level 0文件(50个来装载5*10MB的数据)。这将会严重影响读取效率,因为需要merge更多的文件。

  • 解决方法1:为了降低该问题,我们可能想增加log切换的阈值,缺点就是,log文件越大,对应的memtable文件就越大,这需要更多的内存。
  • 解决方法2:当level 0文件太多时,人工降低写入速度。
  • 解决方法3:降低merge的开销,如把level 0文件都无压缩的存放在cache中。

9.2.2 文件数

对于更高的level我们可以创建更大的文件,而不是2MB,代价就是更多突发性的compaction。或者,我们可以考虑分区,把文件放存放多目录中。
在2011年2月4号,作者做了一个实验,在ext3文件系统中打开100KB的文件,结果表明可以不需要分区。

  • 文件数 文件打开ms
  • 1000 9
  • 10000 10
  • 100000 16

9.3 Recovery & GC

9.3.1 Recovery

Db恢复的步骤:

  • S1 首先从CURRENT读取最后提交的MANIFEST
  • S2 读取MANIFEST内容
  • S3 清除过期文件
  • S4 这里可以打开所有的sstable文件,但是更好的方案是lazy open
  • S5 把log转换为新的level 0sstable
  • S6 将新写操作导向到新的log文件,从恢复的序号开始

9.3.2 GC

垃圾回收,每次compaction和recovery之后都会有文件被废弃,成为垃圾文件。GC就是删除这些文件的,它在每次compaction和recovery完成之后被调用。

9.4 版本控制

当执行一次compaction后,Leveldb将在当前版本基础上创建一个新版本,当前版本就变成了历史版本。还有,如果你创建了一个Iterator,那么该Iterator所依附的版本将不会被leveldb删除。

在leveldb中,Version就代表了一个版本,它包括当前磁盘及内存中的所有文件信息。在所有的version中,只有一个是CURRENTVersionSet是所有Version的集合,这是个version的管理机构。

前面讲过的VersionEdit记录了Version之间的变化,相当于delta增量,表示又增加了多少文件,删除了文件。也就是说:Version0 + VersionEdit —> Version1

每次文件有变动时,leveldb就把变动记录到一个VersionEdit变量中,然后通过VersionEdit把变动应用到current version上,并把current version的快照,也就是db元信息保存到MANIFEST文件中。

另外,MANIFEST文件组织是以VersionEdit的形式写入的,它本身是一个log文件格式,采用log::Writer/Reader的方式读写,一个VersionEdit就是一条log record。

9.4.1 VersionSet

和DBImpl一样,下面就初识一下Version和VersionSet。先来看看Version的成员:

1
2
3
4
5
6
7
8
std::vector<FileMetaData*>files_[config::kNumLevels]; // sstable文件列表  
// Next fileto compact based on seek stats. 下一个要compact的文件
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// 下一个应该compact的level和compaction分数.
// 分数 < 1 说明compaction并不紧迫. 这些字段在Finalize()中初始化
double compaction_score_;
int compaction_level_;

可见一个Version就是一个sstable文件集合,以及它管理的compact状态。Version通过Version prev和next指针构成了一个Version双向循环链表,表头指针则在VersionSet中(初始都指向自己)。
下面是VersionSet的成员。可见它除了通过Version管理所有的sstable文件外,还关心manifest文件信息,以及控制log文件等编号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//=== 第一组,直接来自于DBImple,构造函数传入  
Env* const env_; // 操作系统封装
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_; // table cache
const InternalKeyComparatoricmp_;

//=== 第二组,db元信息相关
uint64_t next_file_number_; // log文件编号
uint64_t manifest_file_number_; // manifest文件编号
uint64_t last_sequence_;
uint64_t log_number_; // log编号
uint64_t prev_log_number_; // 0 or backingstore for memtable being compacted

//=== 第三组,menifest文件相关
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;

//=== 第四组,版本管理
Version dummy_versions_; // versions双向链表head.
Version* current_; // ==dummy_versions_.prev_
// level下一次compaction的开始key,空字符串或者合法的InternalKey
std::stringcompact_pointer_[config::kNumLevels];

关于版本控制大概了解其Version和VersionEdit的功能和管理范围,详细的函数操作在后面再慢慢揭开。

9.4.2 VersionEdit

LevelDB中对Manifest的Decode/Encode是通过类VersionEdit完成的,Menifest文件保存了LevelDB的管理元信息。VersionEdit这个名字起的蛮有意思,每一次compaction,都好比是生成了一个新的DB版本,对应的Menifest则保存着这个版本的DB元信息。VersionEdit并不操作文件,只是为Manifest文件读写准备好数据、从读取的数据中解析出DB元信息。
VersionEdit有两个作用:

  1. 当版本间有增量变动时,VersionEdit记录了这种变动;
  2. 写入到MANIFEST时,先将current version的db元信息保存到一个VersionEdit中,然后在组织成一个log record写入文件;

了解了VersionEdit的作用,来看看这个类导出的函数接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Clear(); // 清空信息  
void Setxxx(); // 一系列的Set函数,设置信息

// 添加sstable文件信息,要求:DB元信息还没有写入磁盘Manifest文件
// @level:.sst文件层次;@file 文件编号-用作文件名 @size 文件大小
// @smallest, @largest:sst文件包含k/v对的最大最小key
void AddFile(int level, uint64_t file, uint64_t file_size,
constInternalKey& smallest, const InternalKey& largest);
void DeleteFile(int level, uint64_t file); // 从指定的level删除文件
void EncodeTo(std::string* dst) const; // 将信息Encode到一个string中
Status DecodeFrom(const Slice& src); // 从Slice中Decode出DB元信息

//===================下面是成员变量,由此可大概窥得DB元信息的内容。
typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;
std::string comparator_; // key comparator名字
uint64_t log_number_; // 日志编号
uint64_t prev_log_number_; // 前一个日志编号
uint64_t next_file_number_; // 下一个文件编号
SequenceNumber last_sequence_; // 上一个seq
bool has_comparator_; // 是否有comparator
bool has_log_number_;// 是否有log_number_
bool has_prev_log_number_;// 是否有prev_log_number_
bool has_next_file_number_;// 是否有next_file_number_
bool has_last_sequence_;// 是否有last_sequence_
std::vector< std::pair<int, InternalKey> >compact_pointers_; // compact点
DeletedFileSet deleted_files_; // 删除文件集合
std::vector< std::pair<int, FileMetaData> > new_files_; // 新文件集合

Set系列的函数都很简单,就是根据参数设置相应的信息。
AddFile函数就是根据参数生产一个FileMetaData对象,把sstable文件信息添加到newfiles数组中。
DeleteFile函数则是把参数指定的文件添加到deleted_files中;
SetCompactPointer函数把{level, key}指定的compact点加入到compact_pointers_中。
执行序列化和发序列化的是DecodeEncode函数,根据这些代码,我们可以了解Manifest文件的存储格式。序列化函数逻辑都很直观,不详细说了。

9.4.3 Manifest文件格式

前面说过Manifest文件记录了leveldb的管理元信息,这些元信息到底都包含哪些内容呢?下面就来一一列示。
首先是使用的coparator名、log编号、前一个log编号、下一个文件编号、上一个序列号。这些都是日志、sstable文件使用到的重要信息,这些字段不一定必然存在。

Leveldb在写入每个字段之前,都会先写入一个varint型数字来标记后面的字段类型。在读取时,先读取此字段,根据类型解析后面的信息。一共有9种类型:

1
2
3
kComparator = 1, kLogNumber = 2, kNextFileNumber = 3, kLastSequence = 4,
kCompactPointer = 5, kDeletedFile = 6, kNewFile = 7, kPrevLogNumber = 9
// 8 was used for large value refs

其中8另有它用。

其次是compact点,可能有多个,写入格式为{kCompactPointer, level, internal key}。其后是删除文件,可能有多个,格式为{kDeletedFile, level, file number}。最后是新文件,可能有多个,格式为{kNewFile, level, file number, file size, min key, max key}

对于版本间变动它是新加的文件集合,对于MANIFEST快照是该版本包含的所有sstable文件集合。

其中的数字都是varint存储格式,string都是以varint指明其长度,后面跟实际的字符串内容。

9.5 DB接口

9.5.1 接口函数

除了DB类, leveldb还导出了C语言风格的接口:接口和实现在c.h&c.cc,它其实是对leveldb::DB的一层封装。DB是一个持久化的有序map{key, value},它是线程安全的。DB只是一个虚基类,下面来看看其接口:

首先是一个静态函数,打开一个db,成功返回OK,打开的db指针保存在dbptr中,用完后,调用者需要调用`delete dbptr`删除之。

1
static Status Open(const Options& options, const std::string&name, DB** dbptr);

下面几个是纯虚函数,最后还有两个全局函数,为何不像Open一样作为静态函数呢。
注:在几个更新接口中,可考虑设置options.sync = true。另外,虽然是纯虚函数,但是leveldb还是提供了缺省的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 设置db项{key, value}  
virtual Status Put(const WriteOptions& options, const Slice&key, const Slice& value) = 0;
// 在db中删除"key",key不存在依然返回成功
virtual Status Delete(const WriteOptions& options, const Slice&key) = 0;
// 更新操作
virtual Status Write(const WriteOptions& options, WriteBatch*updates) = 0;
// 获取操作,如果db中有”key”项则返回结果,没有就返回Status::IsNotFound()
virtual Status Get(const ReadOptions& options, const Slice& key,std::string* value) = 0;
// 返回heap分配的iterator,访问db的内容,返回的iterator的位置是invalid的
// 在使用之前,调用者必须先调用Seek。
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
// 返回当前db状态的handle,和handle一起创建的Iterator看到的都是
// 当前db状态的稳定快照。不再使用时,应该调用ReleaseSnapshot(result)
virtual const Snapshot* GetSnapshot() = 0;

// 释放获取的db快照
virtual voidReleaseSnapshot(const Snapshot* snapshot) = 0;

// 借此方法DB实现可以展现它们的属性状态. 如果"property" 是合法的,
// 设置"*value"为属性的当前状态值并返回true,否则返回false.
// 合法属性名包括:
//
// >"leveldb.num-files-at-level<N>"– 返回level <N>的文件个数,
// <N> 是level 数的ASCII 值 (e.g. "0").
// >"leveldb.stats" – 返回描述db内部操作统计的多行string
// >"leveldb.sstables" – 返回一个多行string,描述构成db内容的所有sstable
virtual bool GetProperty(constSlice& property, std::string* value) = 0;

//"sizes[i]"保存的是"[range[i].start.. range[i].limit)"中的key使用的文件空间.
// 注:返回的是文件系统的使用空间大概值,
// 如果用户数据以10倍压缩,那么返回值就是对应用户数据的1/10
// 结果可能不包含最近写入的数据大小.
virtual voidGetApproximateSizes(const Range* range, int n, uint64_t* sizes) = 0;

// Compactkey范围[*begin,*end]的底层存储,删除和被覆盖的版本将会被抛弃
// 数据会被重新组织,以减少访问开销
// 注:那些不了解底层实现的用户不应该调用该方法。
//begin==NULL被当作db中所有key之前的key.
//end==NULL被当作db中所有key之后的key.
// 所以下面的调用将会compact整个db:
// db->CompactRange(NULL, NULL);
virtual void CompactRange(constSlice* begin, const Slice* end) = 0;

// 最后是两个全局函数--删除和修复DB
// 要小心,该方法将删除指定db的所有内容
Status DestroyDB(const std::string& name, const Options&options);
// 如果db不能打开了,你可能调用该方法尝试纠正尽可能多的数据
// 可能会丢失数据,所以调用时要小心
Status RepairDB(const std::string& dbname, const Options&options);

9.5.2 类图

这里又会设计到几个功能类,如图9.5-1所示。此外还有前面我们讲过的几大组件:操作日志的读写类、内存MemTable类、InternalFilterPolicy类、Internal Key比较类、以及sstable的读取构建类。如图9.5-2所示。

这里涉及的类很多,snapshot是内存快照,Version和VersionSet类。

9.6 DBImpl类

在向下继续之前,有必要先了解下DBImpl这个具体的实现类。主要是它的成员变量,这说明了它都利用了哪些组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  //== 第一组,他们在构造函数中初始化后将不再改变。其中,InternalKeyComparator和InternalFilterPolicy已经分别在Memtable和FilterPolicy中分析过。  
Env* const env_; // 环境,封装了系统相关的文件操作、线程等等
const InternalKeyComparatorinternal_comparator_; // key comparator
const InternalFilterPolicyinternal_filter_policy_; // filter policy
const Options options_; //options_.comparator == &internal_comparator_
bool owns_info_log_;
bool owns_cache_;
const std::string dbname_;

//== 第二组,只有两个。
TableCache* table_cache_; // Table cache,线程安全的
FileLock* db_lock_;// 锁db文件,persistent state,直到leveldb进程结束

//== 第三组,被mutex_包含的状态和成员
port::Mutex mutex_; // 互斥锁
port::AtomicPointershutting_down_;
port::CondVar bg_cv_; // 在background work结束时激发
MemTable* mem_;
MemTable* imm_; // Memtablebeing compacted
port::AtomicPointerhas_imm_; // BGthread 用来检查是否是非NULL的imm_

// 这三个是log相关的
WritableFile* logfile_; // log文件
uint64_t logfile_number_; // log文件编号
log::Writer* log_; // log writer

//== 第四组,没有规律
std::deque<Writer*>writers_; // writers队列.
WriteBatch* tmp_batch_;
SnapshotList snapshots_; //snapshot列表

// Setof table files to protect from deletion because they are
// part ofongoing compactions.
std::set<uint64_t>pending_outputs_; // 待copact的文件列表,保护以防误删
bool bg_compaction_scheduled_; // 是否有后台compaction在调度或者运行?
Status bg_error_; // paranoid mode下是否有后台错误?
ManualCompaction*manual_compaction_; // 手动compaction信息
CompactionStatsstats_[config::kNumLevels]; // compaction状态
VersionSet* versions_; // 多版本DB文件,又一个庞然大物

10.Version分析之一

先来分析leveldb对单版本的sstable文件管理,主要集中在Version类中。前面的10.4节已经说明了Version类的功能和成员,这里分析其函数接口和代码实现。
Version不会修改其管理的sstable文件,只有读取操作。

10.1 Version接口

先来看看Version类的接口函数,接下来再一一分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 追加一系列iterator到 @*iters中,
//将在merge到一起时生成该Version的内容

// 要求: Version已经保存了(见VersionSet::SaveTo)

void AddIterators(constReadOptions&,
std::vector<Iterator*>* iters);


// 给定@key查找value,如果找到保存在@*val并返回OK。
// 否则返回non-OK,设置@ *stats.
// 要求:没有hold lock

struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};

Status Get(constReadOptions&, const LookupKey& key,
std::string* val,GetStats* stats);

// 把@stats加入到当前状态中,如果需要触发新的compaction返回true

// 要求:hold lock
bool UpdateStats(constGetStats& stats);
void GetOverlappingInputs(intlevel,
const InternalKey*begin, // NULL 指在所有key之前
const InternalKey* end, // NULL指在所有key之后
std::vector<FileMetaData*>* inputs);

// 如果指定level中的某些文件和[*smallest_user_key,*largest_user_key]
//有重合就返回true。

// @smallest_user_key==NULL表示比DB中所有key都小的key.
// @largest_user_key==NULL表示比DB中所有key都大的key.

bool OverlapInLevel(int level,const Slice*smallest_user_key,
const Slice* largest_user_key);

// 返回我们应该在哪个level上放置新的memtable compaction,
// 该compaction覆盖了范围[smallest_user_key,largest_user_key].

int PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key);

// 指定level的sstable个数
int NumFiles(int level) const {return files_[level].size();

10.2 Version::AddIterators()

该函数最终在DB::NewIterators()接口中被调用,调用层次为:
DBImpl::NewIterator()->DBImpl::NewInternalIterator()->Version::AddIterators()

函数功能是为该Version中的所有sstable都创建一个Two Level Iterator,以遍历sstable的内容。

  • 对于level=0级别的sstable文件,直接通过TableCache::NewIterator()接口创建,这会直接载入sstable文件到内存cache中。
  • 对于level>0级别的sstable文件,通过函数NewTwoLevelIterator()创建一个TwoLevelIterator,这就使用了lazy open的机制。

下面来分析函数代码:

S1

对于level=0级别的sstable文件,直接装入cache,level0的sstable文件可能有重合,需要merge。

1
2
3
4
for (size_t i = 0; i <files_[0].size(); i++) {  
iters->push_back(vset_->table_cache_->NewIterator(// versionset::table_cache_
options,files_[0][i]->number, files_[0][i]->file_size));
}

S2

对于level>0级别的sstable文件,lazy open机制,它们不会有重叠。

1
2
3
for (int ll = 1; ll <config::kNumLevels; ll++) {  
if(!files_[ll].empty()) iters->push_back(NewConcatenatingIterator(options,level));
}

函数NewConcatenatingIterator()直接返回一个TwoLevelIterator对象:

1
2
return NewTwoLevelIterator(new LevelFileNumIterator(vset_->icmp_,&files_[level]),
&GetFileIterator,vset_->table_cache_, options);
  • 其第一级iterator是一个LevelFileNumIterator
  • 第二级的迭代函数是GetFileIterator

下面就来分别分析之。
GetFileIterator是一个静态函数,很简单,直接返回TableCache::NewIterator()。函数声明为:

1
2
3
4
5
6
7
8
9
static Iterator* GetFileIterator(void* arg,const ReadOptions& options, constSlice& file_value)
TableCache* cache =reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) { // 错误
return NewErrorIterator(Status::Corruption("xxx"));
} else {
return cache->NewIterator(options,
DecodeFixed64(file_value.data()), // filenumber
DecodeFixed64(file_value.data() + 8)); // filesize
}

这里的file_value是取自于LevelFileNumIterator的value,它的value()函数把file number和size以Fixed 8byte的方式压缩成一个Slice对象并返回。

10.3 Version::LevelFileNumIterator类

这也是一个继承者Iterator的子类,一个内部Iterator。

给定一个version/level对,生成该level内的文件信息。

对于给定的entry

  • key()返回的是文件中所包含的最大的key;
  • value()返回的是|file number(8 bytes)|file size(8 bytes)|串;
  • 它的构造函数接受两个参数:InternalKeyComparator&,用于key的比较;
  • vector*,指向version的所有sstable文件列表。
1
2
3
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp), flist_(flist),index_(flist->size()) {} // Marks as invalid

来看看其接口实现。

Valid函数、SeekToxx和Next/Prev函数都很简单,毕竟容器是一个vector。Seek函数调用了FindFile,这个函数后面会分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
virtual void Seek(constSlice& target) { index_ = FindFile(icmp_, *flist_, target);}  
virtual void SeekToFirst() {index_ = 0; }
virtual void SeekToLast() {index_ = flist_->empty() ? 0 : flist_->size() - 1;}
virtual void Next() {
assert(Valid());
index_++;
}

virtual void Prev() {
assert(Valid());
if (index_ == 0) index_ =flist_->size(); // Marks as invalid
else index_--;
}

Slice key() const {
assert(Valid());
return(*flist_)[index_]->largest.Encode(); // 返回当前sstable包含的largest key
}

Slice value() const { // 根据|number|size|的格式Fixed int压缩
assert(Valid());
EncodeFixed64(value_buf_,(*flist_)[index_]->number);
EncodeFixed64(value_buf_+8,(*flist_)[index_]->file_size);
return Slice(value_buf_,sizeof(value_buf_));
}

来看FindFile,这其实是一个二分查找函数,因为传入的sstable文件列表是有序的,因此可以使用二分查找算法。就不再列出代码了。

10.4 Version::Get()

查找函数,直接在DBImpl::Get()中被调用,函数原型为:

1
Status Version::Get(const ReadOptions& options, constLookupKey& k, std::string* value, GetStats* stats)

如果本次Get不止seek了一个文件(仅会发生在level 0的情况),就将搜索的第一个文件保存在stats中。如果stat有数据返回,表明本次读取在搜索到包含key的sstable文件之前,还做了其它无谓的搜索。这个结果将用在UpdateStats()中。
这个函数逻辑还是有些复杂的,来看看代码。

S1

首先,取得必要的信息,初始化几个临时变量

1
2
3
4
5
6
7
8
9
10
Slice ikey = k.internal_key();  
Slice user_key = k.user_key();
const Comparator* ucmp =vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read =NULL; // 在找到>1个文件时,读取时记录上一个
int last_file_read_level = -1; // 这仅发生在level 0的情况下
std::vector<FileMetaData*>tmp;
FileMetaData* tmp2;

S2

从0开始遍历所有的level,依次查找。因为entry不会跨越level,因此如果在某个level中找到了entry,那么就无需在后面的level中查找了。

1
2
3
4
5
for (int level = 0; level < config::kNumLevels; level++) {  
size_t num_files = files_[level].size();
if (num_files == 0) continue; // 本层没有文件,则直接跳过
// 取得level下的所有sstable文件列表,搜索本层
FileMetaData* const* files = &files_[level][0];

后面的所有逻辑都在for循环体中。

S3

遍历level下的sstable文件列表,搜索,注意对于level=0和>0的sstable文件的处理,由于level 0文件之间的key可能有重叠,因此处理逻辑有别于>0的level。

S3.1

对于level 0,文件可能有重叠,找到所有和user_key有重叠的文件,然后根据时间顺序从最新的文件依次处理。

1
2
3
4
5
6
7
8
9
10
11
12
tmp.reserve(num_files);  

for (uint32_t i = 0; i <num_files; i++) { // 遍历level 0下的所有sstable文件
FileMetaData* f =files[i];
if(ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0)
tmp.push_back(f); // sstable文件有user_key有重叠
}

if (tmp.empty()) continue;
std::sort(tmp.begin(),tmp.end(), NewestFirst); // 排序
files = &tmp[0]; num_files= tmp.size();// 指向tmp指针和大小
S3.2

对于level>0,leveldb保证sstable文件之间不会有重叠,所以处理逻辑有别于level 0,直接根据ikey定位到sstable文件即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
//二分查找,找到第一个largest key >=ikey的file index  
uint32_t index =FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) { // 未找到,文件不存在
files = NULL; num_files = 0;
} else {
tmp2 = files[index];
if(ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// 找到的文件其所有key都大于user_key,等于文件不存在
files = NULL; num_files = 0;
} else {
files = &tmp2; num_files = 1;
}
}

S4

遍历找到的文件,存在files中,其个数为num_files。

1
for (uint32_t i = 0; i <num_files; ++i) {

后面的逻辑都在这一层循环中,只要在某个文件中找到了k/v对,就跳出for循环。

S4.1

如果本次读取不止搜索了一个文件,记录之,这仅会发生在level 0的情况下。

1
2
3
4
5
6
7
8
9
if(last_file_read != NULL && stats->seek_file == NULL) {  
// 本次读取不止seek了一个文件,记录第一个
stats->seek_file =last_file_read;
stats->seek_file_level= last_file_read_level;
}

FileMetaData* f = files[i];
last_file_read = f; // 记录本次读取的level和file
last_file_read_level =level;
S4.2

调用TableCache::Get()尝试获取{ikey, value},如果返回OK则进入,否则直接返回,传递的回调函数是SaveValue()。

1
2
3
4
5
6
7
8
Saver saver; // 初始化saver  
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
s = vset_->table_cache_->Get(options,f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) return s;
S4.3

根据saver的状态判断,如果是Not Found则向下搜索下一个更早的sstable文件,其它值则返回。

1
2
3
4
5
6
7
8
9
10
switch (saver.state) {  
case kNotFound: break; // 继续搜索下一个更早的sstable文件
case kFound: return s; // 找到
case kDeleted: // 已删除
s =Status::NotFound(Slice()); // 为了效率,使用空的错误字符串
return s;
case kCorrupt: // 数据损坏
s =Status::Corruption("corrupted key for ", user_key);
return s;
}

以上就是Version::Get()的代码逻辑,如果level 0的sstable文件太多的话,会影响读取速度,这也是为什么进行compaction的原因。
另外,还有一个传递给TableCache::Get()的saver函数,下面就来简单分析下。这是一个静态函数:static void SaveValue(void* arg,const Slice& ikey, const Slice& v)。它内部使用了结构体Saver:

1
2
3
4
5
6
struct Saver {
SaverState state;
const Comparator* ucmp; // user key比较器
Slice user_key;
std::string* value;
};

函数SaveValue的逻辑很简单。首先解析Table传入的InternalKey,然后根据指定的Comparator判断user key是否是要查找的user key。如果是并且type是kTypeValue,则设置到Saver::value中,并*返回kFound,否则返回kDeleted。代码如下:

1
2
3
4
5
6
7
8
9
Saver* s =reinterpret_cast<Saver*>(arg);  
ParsedInternalKey parsed_key; // 解析ikey到ParsedInternalKey
if (!ParseInternalKey(ikey,&parsed_key)) s->state = kCorrupt; // 解析失败
else {
if(s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { // 比较user key
s->state =(parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) s->value->assign(v.data(), v.size()); // 找到,保存结果
}
}

下面要分析的几个函数,或多或少都和compaction相关。

10.5 Version::UpdateStats()

Get操作直接搜寻memtable没有命中时,就需要调用Version::Get()函数从磁盘load数据文件并查找。如果此次Get不止seek了一个文件,就记录第一个文件到stat并返回。其后leveldb就会调用UpdateStats(stat)

Stat表明在指定key range查找key时,都要先seek此文件,才能在后续的sstable文件中找到key

该函数是将stat记录的sstable文件的allowed_seeks减1,减到0就执行compaction。也就是说如果文件被seek的次数超过了限制,表明读取效率已经很低,需要执行compaction了。所以说allowed_seeks是对compaction流程的有一个优化。

函数声明:boolVersion::UpdateStats(const GetStats& stats)函数逻辑很简单:

1
2
3
4
5
6
7
8
9
10
FileMetaData* f =stats.seek_file;  
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <=0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ =stats.seek_file_level;
return true;
}
}
return false;

变量allowed_seeks的值在sstable文件加入到version时确定,也就是后面将遇到的VersionSet::Builder::Apply()函数。

10.6 Version::GetOverlappingInputs()

它在指定level中找出和[begin, end]有重合的sstable文件,函数声明为:

1
2
void Version::GetOverlappingInputs(int level,
const InternalKey* begin, constInternalKey* end, std::vector<FileMetaData*>* inputs);

要注意的是,对于level0,由于文件可能有重合,其处理具有特殊性。当在level 0中找到有sstable文件和[begin, end]重合时,会相应的将begin/end扩展到文件的min key/max key,然后重新开始搜索。了解了功能,下面分析函数实现代码,逻辑还是很直观的。

S1 首先根据参数初始化查找变量。

1
2
3
4
5
inputs->clear();  
Slice user_begin, user_end;
if (begin != NULL) user_begin =begin->user_key();
if (end != NULL) user_end = end->user_key();
const Comparator* user_cmp =vset_->icmp_.user_comparator();

S2 遍历该层的sstable文件,比较sstable的{minkey,max key}和传入的[begin, end],如果有重合就记录文件到@inputs中,需要对level 0做特殊处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
for (size_t i = 0; i <files_[level].size(); ) {  
FileMetaData* f =files_[level][i++];
const Slice file_start =f->smallest.user_key();
const Slice file_limit =f->largest.user_key();
if (begin != NULL &&user_cmp->Compare(file_limit, user_begin) < 0) {
//"f" 中的k/v全部在指定范围之前; 跳过
} else if (end != NULL&& user_cmp->Compare(file_start, user_end) > 0) {
//"f" 中的k/v全部在指定范围之后; 跳过
} else {
inputs->push_back(f); // 有重合,记录
if (level == 0) {
// 对于level 0,sstable文件可能相互有重叠,所以要检查新加的文件
// 是否范围更大,如果是则扩展范围重新开始搜索
if (begin != NULL&& user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != NULL&& user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}

10.7 Version::OverlapInLevel()

检查是否和指定level的文件有重合,该函数直接调用了SomeFileOverlapsRange(),这两个函数的声明为:

1
2
3
4
5
6
7
8
9
10
11
bool Version::OverlapInLevel(int level,const Slice*smallest_user_key, 
const Slice* largest_user_key){
return SomeFileOverlapsRange(vset_->icmp_,(level > 0), files_[level],
smallest_user_key, largest_user_key);
}

bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,const
Slice*smallest_user_key,
const Slice* largest_user_key);

所以下面直接分析SomeFileOverlapsRange()函数的逻辑,代码很直观。
disjoint_sorted_files=true,表明文件集合是互不相交、有序的,对于乱序的、可能有交集的文件集合,需要逐个查找,找到有重合的就返回true;对于有序、互不相交的文件集合,直接执行二分查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// S1 乱序、可能相交的文件集合,依次查找  
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f =files[i];
if(AfterFile(ucmp,smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)){
} else
return true; // 有重合
}

// S2 有序&互不相交,直接二分查找
uint32_t index = 0;
if (smallest_user_key != NULL) {
// Findthe earliest possible internal key smallest_user_key
InternalKeysmall(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
index = FindFile(icmp, files,small.Encode());
}
if (index >= files.size())
// 不存在比smallest_user_key小的key
return false;
//保证在largest_user_key之后
return !BeforeFile(ucmp,largest_user_key, files[index]);

上面的逻辑使用到了AfterFile()BeforeFile()两个辅助函数,都很简单。

1
2
3
4
5
6
7
8
9
static bool AfterFile(const Comparator* ucmp,  
const Slice* user_key, constFileMetaData* f) {
return (user_key!=NULL&& ucmp->Compare(*user_key, f->largest.user_key())>0);
}

static bool BeforeFile(const Comparator* ucmp,
constSlice* user_key, const FileMetaData* f) {
return (user_key!=NULL&& ucmp->Compare(*user_key, f->smallest.user_key())<0);
}

10.8 Version::PickLevelForMemTableOutput()

函数返回我们应该在哪个level上放置新的memtable compaction,这个compaction覆盖了范围[smallest_user_key,largest_user_key]
该函数的调用链为:

1
DBImpl::RecoverLogFile/DBImpl::CompactMemTable -> DBImpl:: WriteLevel0Table->Version::PickLevelForMemTableOutput;

函数声明如下:

1
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key, constSlice& largest_user_key);

如果level 0没有找到重合就向下一层找,最大查找层次为kMaxMemCompactLevel = 2。如果在level 0or1找到了重合,就返回level 0。否则查找level 2,如果level 2有重合就返回level 1,否则返回level 2。
函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int level = 0;  
//level 0无重合
if (!OverlapInLevel(0,&smallest_user_key, &largest_user_key)) {
// 如果下一层没有重叠,就压到下一层,
// andthe #bytes overlapping in the level after that are limited.
InternalKeystart(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKeylimit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level <config::kMaxMemCompactLevel) {
if (OverlapInLevel(level +1, &smallest_user_key, &largest_user_key))
break; // 检查level + 1层,有重叠就跳出循环
GetOverlappingInputs(level +2, &start, &limit, &overlaps); // 没理解这个调用
const int64_t sum =TotalFileSize(overlaps);
if (sum >kMaxGrandParentOverlapBytes) break;
level++;
}
}
return level;

这个函数在整个compaction逻辑中的作用在分析DBImpl时再来结合整个流程分析,现在只需要了解它找到一个level存放新的compaction就行了。如果返回level = 0,表明在level 0或者1和指定的range有重叠;如果返回1,表明在level2和指定的range有重叠;否则就返回2(kMaxMemCompactLevel)。也就是说在compactmemtable的时候,写入的sstable文件不一定总是在level 0,如果比较顺利,没有重合的,它可能会写到level1或者level2中。

10.9 小结

Version是管理某个版本的所有sstable的类,就其导出接口而言,无非是遍历sstable,查找k/v。以及为compaction做些事情,给定range,检查重叠情况。
而它不会修改它管理的sstable这些文件,对这些文件而言它是只读操作接口。

11 VersionSet分析

Version之后就是VersionSet,它并不是Version的简单集合,还肩负了不少的处理逻辑。这里的分析不涉及到compaction相关的部分,这部分会单独分析。包括log等各种编号计数器,compaction点的管理等等。

11.1 VersionSet接口

1 首先是构造函数,VersionSet会使用到TableCache,这个是调用者传入的。TableCache用于Get k/v操作。

1
2
VersionSet(const std::string& dbname, const Options* options,
TableCache*table_cache, const InternalKeyComparator*);

VersionSet的构造函数很简单,除了根据参数初始化,还有两个地方值得注意:

  • N1 nextfile_number从2开始;
  • N2 创建新的Version并加入到Version链表中,并设置CURRENT=新创建version;
  • 其它的数字初始化为0,指针初始化为NULL。

2 恢复函数,从磁盘恢复最后保存的元信息

1
Status Recover();

3 标记指定的文件编号已经被使用了

1
void MarkFileNumberUsed(uint64_t number);

逻辑很简单,就是根据编号更新文件编号计数器:

1
2
if (next_file_number_ <= number) 
next_file_number_ = number + 1;

4 在current version上应用指定的VersionEdit,生成新的MANIFEST信息,保存到磁盘上,并用作current version。
要求:没有其它线程并发调用;要用于mu;

1
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)EXCLUSIVE_LOCKS_REQUIRED(mu);

5 对于@v中的@key,返回db中的大概位置

1
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

6 其它一些简单接口,信息获取或者设置,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//返回current version
Version* current() const {
return current_;
}

// 当前的MANIFEST文件号
uint64_t ManifestFileNumber() const {
return manifest_file_number_;
}

// 分配并返回新的文件编号
uint64_t NewFileNumber() {
return next_file_number_++;
}

// 返回当前log文件编号
uint64_t LogNumber() const {
return log_number_;
}

// 返回正在compact的log文件编号,如果没有返回0
uint64_t PrevLogNumber() const {
return prev_log_number_;
}

// 获取、设置last sequence,set时不能后退
uint64_t LastSequence() const {
return last_sequence_;
}

void SetLastSequence(uint64_t s) {
assert(s >=last_sequence_);
last_sequence_ = s;
}

// 返回指定level中所有sstable文件大小的和
int64_t NumLevelBytes(int level) const;

// 返回指定level的文件个数
int NumLevelFiles(int level) const;

// 重用@file_number,限制很严格:@file_number必须是最后分配的那个
// 要求: @file_number是NewFileNumber()返回的.

void ReuseFileNumber(uint64_t file_number) {
if (next_file_number_ ==file_number + 1) next_file_number_ = file_number;
}

// 对于所有level>0,遍历文件,找到和下一层文件的重叠数据的最大值(in bytes)
// 这个就是Version:: GetOverlappingInputs()函数的简单应用
int64_t MaxNextLevelOverlappingBytes();

// 获取函数,把所有version的所有level的文件加入到@live中
void AddLiveFiles(std::set<uint64_t>* live);

// 返回一个可读的单行信息——每个level的文件数,保存在*scratch中
struct LevelSummaryStorage {char buffer[100]; };
const char* LevelSummary(LevelSummaryStorage* scratch) const;

下面就来分析这两个接口RecoverLogAndApply以及ApproximateOffsetOf

11.2 VersionSet::Builder类

Builder是一个内部辅助类,其主要作用是:

  1. 把一个MANIFEST记录的元信息应用到版本管理器VersionSet中;
  2. 把当前的版本状态设置到一个Version对象中。

11.2.1 成员与构造

Builder的vset与base都是调用者传入的,此外它还为FileMetaData定义了一个比较类BySmallestKey,首先依照文件的min key,小的在前;如果min key相等则file number小的在前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedefstd::set<FileMetaData*, BySmallestKey> FileSet;  
// 这个是记录添加和删除的文件
struct LevelState {
std::set<uint64_t>deleted_files;
// 保证添加文件的顺序是有效定义的
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelStatelevels_[config::kNumLevels];

// 其接口有3个:
void Apply(VersionEdit* edit);
void SaveTo(Version* v);
void MaybeAddFile(Version* v, int level, FileMetaData* f);

构造函数执行简单的初始化操作,在析构时,遍历检查LevelState::added_files,如果文件引用计数为0,则删除文件。

11.2.2 Apply()

函数声明:voidApply(VersionEdit* edit),该函数将edit中的修改应用到当前状态中。注意除了compaction点直接修改了vset_,其它删除和新加文件的变动只是先存储在Builder自己的成员变量中,在调用SaveTo(v)函数时才施加到v上。

S1 把edit记录的compaction点应用到当前状态

1
edit->compact_pointers_ => vset_->compact_pointer_

S2 把edit记录的已删除文件应用到当前状态

1
edit->deleted_files_ => levels_[level].deleted_files

S3把edit记录的新加文件应用到当前状态,这里会初始化文件的allowed_seeks值,以在文件被无谓seek指定次数后自动执行compaction,这里作者阐述了其设置规则。

1
2
3
4
5
6
7
8
9
for (size_t i = 0; i < edit->new_files_.size(); i++) {  
const int level =edit->new_files_[i].first;
FileMetaData* f = newFileMetaData(edit->new_files_[i].second);
f->refs = 1;
f->allowed_seeks = (f->file_size /16384); // 16KB-见下面
if (f->allowed_seeks <100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number); // 以防万一
levels_[level].added_files->insert(f);
}

值allowed_seeks事关compaction的优化,其计算依据如下,首先假设:

  • 1 一次seek时间为10ms
  • 2 写入10MB数据的时间为10ms(100MB/s)
  • 3 compact 1MB的数据需要执行25MB的IO
    • 从本层读取1MB
    • 从下一层读取10-12MB(文件的key range边界可能是非对齐的)
    • 向下一层写入10-12MB

这意味这25次seek的代价等同于compact 1MB的数据,也就是一次seek花费的时间大约相当于compact 40KB的数据。基于保守的角度考虑,对于每16KB的数据,我们允许它在触发compaction之前能做一次seek。

11.2.3 MaybeAddFile()

函数声明:

1
voidMaybeAddFile(Version* v, int level, FileMetaData* f);

该函数尝试将f加入到levels_[level]文件set中。要满足两个条件:

  1. 文件不能被删除,也就是不能在levels_[level].deleted_files集合中;
  2. 保证文件之间的key是连续的,即基于比较器vset->icmp,f的min key要大于levels_[level]集合中最后一个文件的max key;

11.2.4 SaveTo()

把当前的状态存储到v中返回,函数声明:

1
void SaveTo(Version* v);

函数逻辑:For循环遍历所有的level[0, config::kNumLevels-1],把新加的文件和已存在的文件merge在一起,丢弃已删除的文件,结果保存在v中。对于level> 0,还要确保集合中的文件没有重合。

S1 merge流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 原文件集合
conststd::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter =base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end =base_files.end();
const FileSet* added =levels_[level].added_files;
v->files_[level].reserve(base_files.size()+ added->size());
for (FileSet::const_iteratoradded_iter = added->begin();
added_iter !=added->end(); ++added_iter) {
//加入base_中小于added_iter的那些文件

for(std::vector<FileMetaData*>::const_iterator bpos = std::upper_bound(base_iter,base_end,*added_iter, cmp);
base_iter != bpos;++base_iter) {
// base_iter逐次向后移到
MaybeAddFile(v, level,*base_iter);
}
// 加入added_iter
MaybeAddFile(v, level,*added_iter);
}

// 添加base_剩余的那些文件
for (; base_iter != base_end;++base_iter)
MaybeAddFile(v, level, *base_iter);

对象cmp就是前面定义的比较仿函数BySmallestKey对象。

S2 检查流程,保证level>0的文件集合无重叠,基于vset->icmp,确保文件i-1的max key < 文件i的min key。

11.3 Recover()

对于VersionSet而言,Recover就是根据CURRENT指定的MANIFEST,读取db元信息。这是9.3介绍的Recovery流程的开始部分。

11.3.1 函数流程

下面就来分析其具体逻辑。

S1 读取CURRENT文件,获得最新的MANIFEST文件名,根据文件名打开MANIFEST文件。CURRENT文件以\n结尾,读取后需要trim下。

1
2
3
4
5
std::string current; // MANIFEST文件名  
ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
env_->NewSequentialFile(dscname, &file);

S2 读取MANIFEST内容,MANIFEST是以log的方式写入的,因此这里调用的是log::Reader来读取。然后调用VersionEdit::DecodeFrom,从内容解析出VersionEdit对象,并将VersionEdit记录的改动应用到versionset中。读取MANIFEST中的log number, prev log number, nextfile number, last sequence。

1
2
3
4
5
6
7
8
9
10
11
12
Builder builder(this, current_);  
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok())builder.Apply(&edit);
// log number, file number, …逐个判断
if (edit.has_log_number_) {
log_number =edit.log_number_;
have_log_number = true;
}
… …
}

S3 将读取到的log number, prev log number标记为已使用。

1
2
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);

S4 最后,如果一切顺利就创建新的Version,并应用读取的几个number。

1
2
3
4
5
6
7
8
9
10
11
12
if (s.ok()) {  
Version* v = newVersion(this);
builder.SaveTo(v);
// 安装恢复的version
Finalize(v);
AppendVersion(v);
manifest_file_number_ =next_file;
next_file_number_ = next_file+ 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ =prev_log_number;
}

Finalize(v)AppendVersion(v)用来安装并使用version v,在AppendVersion函数中会将current version设置为v。下面就来分别分析这两个函数。

11.3.2 Finalize()

函数声明:

1
void Finalize(Version*v);

该函数依照规则为下次的compaction计算出最适用的level,对于level 0和>0需要分别对待,逻辑如下。

S1 对于level 0以文件个数计算,kL0_CompactionTrigger默认配置为4。

1
score =v->files_[level].size()/static_cast<double>(config::kL0_CompactionTrigger);

S2 对于level>0,根据level内的文件总大小计算

1
2
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) /MaxBytesForLevel(level);

S3 最后把计算结果保存到v的两个成员compactionlevel和compactionscore中。

其中函数MaxBytesForLevel根据level返回其本层文件总大小的预定最大值。
计算规则为:1048576.0* level^10
这里就有一个问题,为何level0和其它level计算方法不同,原因如下,这也是leveldb为compaction所做的另一个优化。

  1. 对于较大的写缓存(write-buffer),做太多的level 0 compaction并不好
  2. 每次read操作都要merge level 0的所有文件,因此我们不希望level 0有太多的小文件存在(比如写缓存太小,或者压缩比较高,或者覆盖/删除较多导致小文件太多)。
  3. 看起来这里的写缓存应该就是配置的操作log大小。

11.3.3 AppendVersion()

函数声明:

1
void AppendVersion(Version*v);

把v加入到versionset中,并设置为current version。并对老的current version执行Uref()。在双向循环链表中的位置在dummy_versions_之前。

11.4 LogAndApply()

函数声明:

1
Status LogAndApply(VersionEdit*edit, port::Mutex* mu)

前面接口小节中讲过其功能:在currentversion上应用指定的VersionEdit,生成新的MANIFEST信息,保存到磁盘上,并用作current version,故为Log And Apply。
参数edit也会被函数修改。

11.4.1 函数流程

下面就来具体分析函数代码。

S1 为edit设置log number等4个计数器。

1
2
3
4
5
6
7
8
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
}
else edit->SetLogNumber(log_number_);
if (!edit->has_prev_log_number_) edit->SetPrevLogNumber(prev_log_number_);
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

要保证edit自己的log number是比较大的那个,否则就是致命错误。保证edit的log number小于next file number,否则就是致命错误-见9.1小节。

S2 创建一个新的Version v,并把新的edit变动保存到v中。

1
2
3
4
5
6
7
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v); //如前分析,只是为v计算执行compaction的最佳level

S3 如果MANIFEST文件指针不存在,就创建并初始化一个新的MANIFEST文件。这只会发生在第一次打开数据库时。这个MANIFEST文件保存了current version的快照。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) {
// 这里不需要unlock *mu因为我们只会在第一次调用LogAndApply时
// 才走到这里(打开数据库时).
assert(descriptor_file_ == NULL); // 文件指针和log::Writer都应该是NULL
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_); // 写入快照
}
}

S4 向MANIFEST写入一条新的log,记录current version的信息。在文件写操作时unlock锁,写入完成后,再重新lock,以防止浪费在长时间的IO操作上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mu->Unlock();
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);// 序列化current version信息
s = descriptor_log_->AddRecord(record); // append到MANIFEST log中
if (s.ok()) s = descriptor_file_->Sync();
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
if (ManifestContains(record)) { // 返回出错,其实确实写成功了
Log(options_->info_log, "MANIFEST contains log record despiteerror ");
s = Status::OK();
}
}
}
//如果刚才创建了一个MANIFEST文件,通过写一个指向它的CURRENT文件
//安装它;不需要再次检查MANIFEST是否出错,因为如果出错后面会删除它
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();

S5 安装这个新的version

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (s.ok()) { // 安装这个version  
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
}
else { // 失败了,删除
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}

流程的S4中,函数会检查MANIFEST文件是否已经有了这条record,那么什么时候会有呢?

主函数使用到了几个新的辅助函数WriteSnapshot,ManifestContains和SetCurrentFile,下面就来分析。

11.4.2 WriteSnapshot()

函数声明:

1
Status WriteSnapshot(log::Writer*log)

把currentversion保存到*log中,信息包括comparator名字、compaction点和各级sstable文件,函数逻辑很直观。

  • S1 首先声明一个新的VersionEdit edit
  • S2 设置comparator:edit.SetComparatorName(icmp_.user_comparator()->Name());
  • S3 遍历所有level,根据compactpointer[level],设置compaction点:
    • edit.SetCompactPointer(level, key);
  • S4 遍历所有level,根据current->files,设置sstable文件集合:edit.AddFile(level, xxx)
  • S5 根据序列化并append到log(MANIFEST文件)中;
1
2
3
std::string record;
edit.EncodeTo(&record);
returnlog->AddRecord(record);

11.4.3 ManifestContains()

函数声明:

1
bool ManifestContains(conststd::string& record)

如果当前MANIFEST包含指定的record就返回true,来看看函数逻辑。

  • S1 根据当前的manifestfile_number文件编号打开文件,创建SequentialFile对象
  • S2 根据创建的SequentialFile对象创建log::Reader,以读取文件
  • S3 调用log::Reader的ReadRecord依次读取record,如果和指定的record相同,就返回true,没有相同的record就返回false

SetCurrentFile很简单,就是根据指定manifest文件编号,构造出MANIFEST文件名,并写入到CURRENT即可。

11.5 ApproximateOffsetOf()

函数声明:

1
uint64_tApproximateOffsetOf(Version* v, const InternalKey& ikey)

在指定的version中查找指定key的大概位置。假设version中有n个sstable文件,并且落在了地i个sstable的key空间内,那么返回的位置 = sstable1文件大小+sstable2文件大小 + … + sstable (i-1)文件大小 + key在sstable i中的大概偏移。

可分为两段逻辑。

  • 首先直接和sstable的max key作比较,如果key > max key,直接跳过该文件,还记得sstable文件是有序排列的。
    • 对于level >0的文件集合而言,如果如果key < sstable文件的min key,则直接跳出循环,因为后续的sstable的min key肯定大于key。
  • key在sstable i中的大概偏移使用的是Table:: ApproximateOffsetOf(target)接口,前面分析过,它返回的是Table中>= target的key的位置。

VersionSet的相关函数暂时分析到这里,compaction部分后需单独分析。

12 DB的打开

先分析LevelDB是如何打开db的,万物始于创建。在打开流程中有几个辅助函数:DBImpl(),DBImpl::Recover, DBImpl::DeleteObsoleteFiles, DBImpl::RecoverLogFile, DBImpl::MaybeScheduleCompaction

12.1 DB::Open()

打开一个db,进行PUT、GET操作,就是前面的静态函数DB::Open的工作。如果操作成功,它就返回一个db指针。前面说过DB就是一个接口类,其具体实现在DBImp类中,这是一个DB的子类。
函数声明为:

1
Status DB::Open(const Options& options, const std::string&dbname, DB** dbptr);

分解来看,Open()函数主要有以下5个执行步骤。

  • S1 创建DBImpl对象,其后进入DBImpl::Recover()函数执行S2和S3。
  • S2 从已存在的db文件恢复db数据,根据CURRENT记录的MANIFEST文件读取db元信息;这通过调用VersionSet::Recover()完成。
  • S3 然后过滤出那些最近的更新log,前一个版本可能新加了这些log,但并没有记录在MANIFEST中。然后依次根据时间顺序,调用DBImpl::RecoverLogFile()从旧到新回放这些操作log。回放log时可能会修改db元信息,比如dump了新的level 0文件,因此它将返回一个VersionEdit对象,记录db元信息的变动。
  • S4 如果DBImpl::Recover()返回成功,就执行VersionSet::LogAndApply()应用VersionEdit,并保存当前的DB信息到新的MANIFEST文件中。
  • S5 最后删除一些过期文件,并检查是否需要执行compaction,如果需要,就启动后台线程执行。

下面就来具体分析Open函数的代码,在Open函数中涉及到上面的3个流程。

S1 首先创建DBImpl对象,锁定并试图做Recover操作。Recover操作用来处理创建flag,比如存在就返回失败等等,尝试从已存在的sstable文件恢复db。并返回db元信息的变动信息,一个VersionEdit对象。

1
2
3
4
DBImpl* impl = newDBImpl(options, dbname);  
impl->mutex_.Lock(); // 锁db
VersionEdit edit;
Status s =impl->Recover(&edit); // 处理flag&恢复:create_if_missing,error_if_exists

S2 如果Recover返回成功,则调用VersionSet取得新的log文件编号——实际上是在当前基础上+1,准备新的log文件。如果log文件创建成功,则根据log文件创建log::Writer。然后执行VersionSet::LogAndApply,根据edit记录的增量变动生成新的current version,并写入MANIFEST文件。

函数NewFileNumber(){returnnextfile_number++;},直接返回nextfile_number

1
2
3
4
5
6
7
8
9
10
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = newlog::Writer(lfile);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}

S3 如果VersionSet::LogAndApply返回成功,则删除过期文件,检查是否需要执行compaction,最终返回创建的DBImpl对象。

1
2
3
4
5
6
7
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) *dbptr = impl;
return s;

以上就是DB::Open的主题逻辑。

12.2 DBImpl::DBImpl()

构造函数做的都是初始化操作,

1
DBImpl::DBImpl(const Options& options, const std::string&dbname)

首先是初始化列表中,直接根据参数赋值,或者直接初始化。Comparator和filter policy都是参数传入的。在传递option时会首先将option中的参数合法化,logfilenumber初始化为0,指针初始化为NULL。
创建MemTable,并增加引用计数,创建WriteBatch。

1
2
3
4
5
6
7
8
mem_(newMemTable(internal_comparator_)),
tmp_batch_(new WriteBatch),
mem_->Ref();
// 然后在函数体中,创建TableCache和VersionSet。
// 为其他预留10个文件,其余的都给TableCache.
const int table_cache_size = options.max_open_files - 10;
table_cache_ = newTableCache(dbname_, &options_, table_cache_size);
versions_ = newVersionSet(dbname_, &options_, table_cache_, &internal_comparator_);

12.3 DBImp::NewDB()

当外部在调用DB::Open()时设置了option指定如果db不存在就创建,如果db不存在leveldb就会调用函数创建新的db。判断db是否存在的依据是<db name>/CURRENT文件是否存在。其逻辑很简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// S1首先生产DB元信息,设置comparator名,以及log文件编号、文件编号,以及seq no。  
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
// S2 生产MANIFEST文件,将db元信息写入MANIFEST文件。
const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) return s;
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) s = file->Close();
}
delete file;
// S3 如果成功,就把MANIFEST文件名写入到CURRENT文件中
if (s.ok())
s = SetCurrentFile(env_, dbname_, 1);
else
env_->DeleteFile(manifest);
return s;

这就是创建新DB的逻辑,很简单。

12.4 DBImpl::Recover()

函数声明为:

1
StatusDBImpl::Recover(VersionEdit* edit)

如果调用成功则设置VersionEdit。Recover的基本功能是:首先是处理创建flag,比如存在就返回失败等等;然后是尝试从已存在的sstable文件恢复db;最后如果发现有大于原信息记录的log编号的log文件,则需要回放log,更新db数据。回放期间db可能会dump新的level 0文件,因此需要把db元信息的变动记录到edit中返回。函数逻辑如下:

S1 创建目录,目录以db name命名,忽略任何创建错误,然后尝试获取db name/LOCK文件锁,失败则返回。

1
2
3
env_->CreateDir(dbname_);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) return s;

S2 根据CURRENT文件是否存在,以及option参数执行检查。

  • 如果文件不存在 & create_is_missing=true,则调用函数NewDB()创建;否则报错。
  • 如果文件存在 & error_if_exists=true,则报错。

S3 调用VersionSet的Recover()函数,就是从文件中恢复数据。如果出错则打开失败,成功则向下执行S4。

1
s = versions_->Recover();

S4尝试从所有比manifest文件中记录的log要新的log文件中恢复(前一个版本可能会添加新的log文件,却没有记录在manifest中)。另外,函数PrevLogNumber()已经不再用了,仅为了兼容老版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//  S4.1 这里先找出所有满足条件的log文件:比manifest文件记录的log编号更新。  
SequenceNumber max_sequence(0);
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string>filenames;
s = env_->GetChildren(dbname_, &filenames); // 列出目录内的所有文件
uint64_t number;
FileType type;
std::vector<uint64_t>logs;
for (size_t i = 0; i < filenames.size(); i++) { // 检查log文件是否比min log更新
if (ParseFileName(filenames[i], &number, &type) && type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
logs.push_back(number);
}
}
// S4.2 找到log文件后,首先排序,保证按照生成顺序,依次回放log。并把DB元信息的变动(sstable文件的变动)追加到edit中返回。
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
// 前一版可能在生成该log编号后没有记录在MANIFEST中,
//所以这里我们手动更新VersionSet中的文件编号计数器
versions_->MarkFileNumberUsed(logs[i]);
}
// S4.3 更新VersionSet的sequence
if (s.ok()) {
if (versions_->LastSequence() < max_sequence)
versions_->SetLastSequence(max_sequence);
}

上面就是Recover的执行流程。

12.5 DBImpl::DeleteObsoleteFiles()

这个是垃圾回收函数,如前所述,每次compaction和recovery之后都会有文件被废弃。DeleteObsoleteFiles就是删除这些垃圾文件的,它在每次compaction和recovery完成之后被调用。

其调用点包括:DBImpl::CompactMemTable,DBImpl::BackgroundCompaction, 以及DB::Open的recovery步骤之后。它会删除所有过期的log文件,没有被任何level引用到、或不是正在执行的compaction的output的sstable文件。该函数没有参数,其代码逻辑也很直观,就是列出db的所有文件,对不同类型的文件分别判断,如果是过期文件,就删除之,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// S1 首先,确保不会删除pending文件,将versionset正在使用的所有文件加入到live中。  
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live); //该函数其后分析
// S2 列举db的所有文件
std::vector<std::string>filenames;
env_->GetChildren(dbname_, &filenames);
// S3 遍历所有列举的文件,根据文件类型,分别处理;
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
bool keep = true; //false表明是过期文件
// S3.1 kLogFile,log文件,根据log编号判断是否过期
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
// S3.2 kDescriptorFile,MANIFEST文件,根据versionset记录的编号判断
keep = (number >= versions_->ManifestFileNumber());
// S3.3 kTableFile,sstable文件,只要在live中就不能删除
// S3.4 kTempFile,如果是正在写的文件,只要在live中就不能删除
keep = (live.find(number) != live.end());
// S3.5 kCurrentFile,kDBLockFile, kInfoLogFile,不能删除
keep = true;
// S3.6 如果keep为false,表明需要删除文件,如果是table还要从cache中删除
if (!keep) {
if (type == kTableFile) table_cache_->Evict(number);
Log(options_.info_log, "Delete type=%d #%lld\n", type, number);
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
}
}

这就是删除过期文件的逻辑,其中调用到了VersionSet::AddLiveFiles函数,保证不会删除active的文件。

函数DbImpl::MaybeScheduleCompaction()放在Compaction一节分析,基本逻辑就是如果需要compaction,就启动后台线程执行compaction操作。

12.6 DBImpl::RecoverLogFile()

函数声明:

1
StatusRecoverLogFile(uint64_t log_number, VersionEdit* edit,SequenceNumber* max_sequence)

参数说明:

  • @log_number是指定的log文件编号
  • @edit记录db元信息的变化——sstable文件变动
  • @max_sequence 返回max{log记录的最大序号, *max_sequence}

该函数打开指定的log文件,回放日志。期间可能会执行compaction,生产新的level 0sstable文件,记录文件变动到edit中。它声明了一个局部类LogReporter以打印错误日志,没什么好说的,下面来看代码逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// S1 打开log文件返回SequentialFile*file,出错就返回,否则向下执行S2。  
// S2 根据log文件句柄file创建log::Reader,准备读取log。
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
// S3 依次读取所有的log记录,并插入到新生成的memtable中。这里使用到了批量更新接口WriteBatch,具体后面再分析。
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) && status.ok()) { // 读取全部log
if (record.size() < 12) { // log数据错误,不满足最小长度12
reporter.Corruption(record.size(), Status::Corruption("log recordtoo small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record); // log内容设置到WriteBatch中
if (mem == NULL) { // 创建memtable
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem); // 插入到memtable中
MaybeIgnoreError(&status);
if (!status.ok()) break;
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) *max_sequence = last_seq; // 更新max sequence
// 如果mem的内存超过设置值,则执行compaction,如果compaction出,
// 立刻返回错误,DB::Open失败
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) break;
mem->Unref(); // 释放当前memtable
mem = NULL;
}
}
// S4 扫尾工作,如果mem != NULL,说明还需要dump到新的sstable文件中。
if (status.ok() && mem != NULL) {// 如果compaction出错,立刻返回错误
status = WriteLevel0Table(mem, edit, NULL);
}
if (mem != NULL)mem->Unref();
delete file;
return status;

把MemTabledump到sstable是函数WriteLevel0Table的工作,其实这是compaction的一部分,准备放在compaction一节来分析。

13 DB的关闭&销毁

13.1 DB关闭

外部调用者通过DB::Open()获取一个DB*对象,如果要关闭打开的DB* db对象,则直接delete db即可,这会调用到DBImpl的析构函数。析构依次执行如下的5个逻辑:

  • S1 等待后台compaction任务结束
  • S2 释放db文件锁,/lock文件
  • S3 删除VersionSet对象,并释放MemTable对象
  • S4 删除log相关以及TableCache对象
  • S5 删除options的block_cache以及info_log对象

13.2 DB销毁

函数声明:

1
StatusDestroyDB(const std::string& dbname, const Options& options)

该函数会删除掉db的数据内容,要谨慎使用。函数逻辑为:

  • S1 获取dbname目录的文件列表到filenames中,如果为空则直接返回,否则进入S2。
  • S2 锁文件<dbname>/lock,如果锁成功就执行S3
  • S3 遍历filenames文件列表,过滤掉lock文件,依次调用DeleteFile删除。
  • S4 释放lock文件,并删除之,然后删除文件夹。

Destory就执行完了,如果删除文件出现错误,记录之,依然继续删除下一个。最后返回错误代码。

14 DB的查询与遍历

分析完如何打开和关闭db,本章就继续分析如何从db中根据key查询value,以及遍历整个db

14.1 Get()

函数声明:StatusGet(const ReadOptions& options, const Slice& key, std::string* value)

从DB中查询key 对应的value,参数@options指定读取操作的选项,典型的如snapshot号,从指定的快照中读取。快照本质上就是一个sequence号,后面将单独在快照一章中分析。下面就来分析下函数逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// S1 锁mutex,防止并发,如果指定option则尝试获取snapshot;然后增加MemTable的引用值。  
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != NULL)
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
else snapshot = versions_->LastSequence(); // 取当前版本的最后Sequence
MemTable *mem = mem_, *imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
// S2 从sstable文件和MemTable中读取时,释放锁mutex;之后再次锁mutex。
bool have_stat_update = false;
Version::GetStats stats;
{
mutex_.Unlock();
// 先从memtable中查询,再从immutable memtable中查询
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
}
else if (imm != NULL && imm->Get(lkey, value, &s)) {
}
else { // 需要从sstable文件中查询
s = current->Get(options, lkey, value, &stats);
have_stat_update = true; // 记录之,用于compaction
}
mutex_.Lock();
}
// S3 如果是从sstable文件查询出来的,检查是否需要做compaction。最后把MemTable的引用计数减1。
if (have_stat_update &¤t->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL)imm->Unref();
current->Unref();

查询是比较简单的操作,UpdateStats在前面Version一节已经分析过。

14.2 NewIterator()

函数声明:Iterator*NewIterator(const ReadOptions& options)。通过该函数生产了一个Iterator对象,调用这就可以基于该对象遍历db内容了。函数很简单,调用两个函数创建了一个二级*Iterator

1
2
3
4
5
6
7
8
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
returnNewDBIterator(&dbname_, env_, user_comparator(), internal_iter,
(options.snapshot != NULL
? reinterpret_cast<constSnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}

其中,函数NewDBIterator直接返回了一个DBIter指针

1
2
3
4
5
Iterator* NewDBIterator(const std::string* dbname, Env* env,
const Comparator*user_key_comparator, Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
}

函数NewInternalIterator有一些处理逻辑,就是收集所有能用到的iterator,生产一个Merging Iterator。这包括MemTable,Immutable MemTable,以及各sstable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber*latest_snapshot) {
IterState* cleanup = newIterState;
mutex_.Lock();
// 根据last sequence设置lastest snapshot,并收集所有的子iterator
*latest_snapshot = versions_->LastSequence();
std::vector<Iterator*>list;
list.push_back(mem_->NewIterator()); // >memtable
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator()); // >immutablememtable
imm_->Ref();
}
versions_->current()->AddIterators(options, &list); // >current的所有sstable
Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
// 注册清理机制
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
return internal_iter;
}

这个清理函数CleanupIteratorState是很简单的,对注册的对象做一下Unref操作即可。

1
2
3
4
5
6
7
8
9
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock();
state->mem->Unref();
if (state->imm != NULL)state->imm->Unref();
state->version->Unref();
state->mu->Unlock();
delete state;
}

可见对于db的遍历依赖于DBIter和Merging Iterator这两个迭代器,它们都是Iterator接口的实现子类。

14.3 MergingIterator

MergingIterator是一个合并迭代器,它内部使用了一组自Iterator,保存在其成员数组children_中。如上面的函数NewInternalIterator,包括memtable,immutable memtable,以及各sstable文件;它所做的就是根据调用者指定的key和sequence,从这些Iterator中找到合适的记录。

在分析其Iterator接口之前,先来看看两个辅助函数FindSmallest和FindLargest。FindSmallest从0开始向后遍历内部Iterator数组,找到key最小的Iterator,并设置到current;FindLargest从最后一个向前遍历内部Iterator数组,找到key最大的Iterator,并设置到current

MergingIterator还定义了两个移动方向:kForward,向前移动;kReverse,向后移动。

14.3.1 Get系接口

下面就把其接口拖出来一个一个分析,首先是简单接口,key和value都是返回current的值,current是当前seek到的Iterator位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
virtual Slice key() const {
assert(Valid());
return current_->key();
}

virtual Slice value() const {
assert(Valid());
return current_->value();
}

virtual Status status() const {
Status status;
for (int i = 0; i < n_; i++) { // 只有所有内部Iterator都ok时,才返回ok
status = children_[i].status();
if (!status.ok()) break;
}
return status;
}

14.3.2 Seek系接口

然后是几个seek系的函数,也比较简单,都是依次调用内部Iterator的seek系函数。然后做merge,对于Seek和SeekToFirst都调用FindSmallest;对于SeekToLast调用FindLargest。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
virtual void SeekToFirst() {
for (int i = 0; i < n_; i++) children_[i].SeekToFirst();
FindSmallest();
direction_ = kForward;
}

virtual void SeekToLast() {
for (int i = 0; i < n_; i++) children_[i].SeekToLast();
FindLargest();
direction_ = kReverse;
}

virtual void Seek(constSlice& target) {
for (int i = 0; i < n_; i++) children_[i].Seek(target);
FindSmallest();
direction_ = kForward;
}

14.3.3 逐步移动

最后就是Next和Prev函数,完成迭代遍历。这可能会有点绕。下面分别来说明。

首先,在Next移动时,如果当前direction不是kForward的,也就是上一次调用了Prev或者SeekToLast函数,就需要先调整除current之外的所有iterator,为什么要做这种调整呢?啰嗦一点,考虑如下的场景,如图14.3-1所示。

当前direction为kReverse,并且有:Current = memtable Iterator。各Iterator位置为:{memtable, stable 0, sstable1} ={ key3:1:1, key2:3:1, key2:1:1},这符合prev操作的largest key要求。

注:需要说明下,对于每个update操作,leveldb都会赋予一个全局唯一的sequence号,且是递增的。例子中的sequence号可理解为每个key的相对值,后面也是如此。

接下来我们来分析Prev移动的操作。

  • 第一次Prev,current(memtable iterator)移动到key1:3:0上,3者中最大者变成sstable0;因此current修改为sstable0;
  • 第二次Prev,current(sstable0 Iterator)移动到key1:2:1上,3者中最大者变成sstable1;因此current修改为sstable1:
  • 此时各Iterator的位置为{memtable, sstable 0, sstable1} = { key1:3:0, key1:2:1, key2:2:1},并且current=sstable1。
  • 接下来再调用Next,显然当前Key()为key2:2:1,综合考虑3个iterator,两次Next()的调用结果应该是key2:1:1和key3:1:1。而memtable和sstable0指向的key却是key1:3:0和key1:2:1,这时就需要调整memtable和sstable0了,使他们都定位到Key()之后,也就是key3:1:1和key2:3:1上。

然后current(current1)Next移动到key2:1:1上。这就是Next时的调整逻辑,同理,对于Prev也有相同的调整逻辑。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
virtual void Next() {
assert(Valid());
// 确保所有的子Iterator都定位在key()之后.
// 如果我们在正向移动,对于除current_外的所有子Iterator这点已然成立
// 因为current_是最小的子Iterator,并且key() = current_->key()。
// 否则,我们需要明确设置其它的子Iterator
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) { // 把所有current之外的Iterator定位到key()之后
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() && comparator_->Compare(key(), child->key()) == 0)
child->Next(); // key等于current_->key()的,再向后移动一位
}
}
direction_ = kForward;
}
// current也向后移一位,然后再查找key最小的Iterator
current_->Next();
FindSmallest();
}

virtual void Prev() {
assert(Valid());
// 确保所有的子Iterator都定位在key()之前.
// 如果我们在逆向移动,对于除current_外的所有子Iterator这点已然成立
// 因为current_是最大的,并且key() = current_->key()
// 否则,我们需要明确设置其它的子Iterator
if (direction_ != kReverse) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid()) {
// child位于>=key()的第一个entry上,prev移动一位到<key()
child->Prev();
}
else { // child所有的entry都 < key(),直接seek到last即可
child->SeekToLast();
}
}
}
direction_ = kReverse;
}
//current也向前移一位,然后再查找key最大的Iterator
current_->Prev();
FindLargest();
}

这就是MergingIterator的全部代码逻辑了,每次Next或者Prev移动时,都要重新遍历所有的子Iterator以找到key最小或最大的Iterator作为current_。这就是merge的语义所在了。
但是它没有考虑到删除标记等问题,因此直接使用MergingIterator是不能正确的遍历DB的,这些问题留待给DBIter来解决。

14.4 DBIter

Leveldb数据库的MemTablesstable文件的存储格式都是(user key, seq, type) => uservalue。DBIter把同一个userkey在DB中的多条记录合并为一条,综合考虑了userkey的序号、删除标记、和写覆盖等等因素。

从前面函数NewIterator的代码还能看到,DBIter内部使用了MergingIterator,在调用MergingItertor的系列seek函数后,DBIter还要处理key的删除标记。否则,遍历时会把已删除的key列举出来。

DBIter还定义了两个移动方向,默认是kForward:

  1. kForward,向前移动,代码保证此时DBIter的内部迭代器刚好定位在this->key(),this->value()这条记录上;
  2. kReverse,向后移动,代码保证此时DBIter的内部迭代器刚好定位在所有key=this->key()的entry之前。

其成员变量savedkey和saved value保存的是KReverse方向移动时的k/v对,每次seek系调用之后,其值都会跟随iter_而改变。

DBIter的代码开始读来感觉有些绕,主要就是它要处理删除标记,而且其底层的MergingIterator,对于同一个key会有多个不同sequence的entry。导致其Next/Prev操作比较复杂,要考虑到上一次移动的影响,跳过删除标记和重复的key。

DBIter必须导出Iterator定义的几个接口,下面就拖出来挨个分析。

14.4.1 Get系接口

首先是几个简单接口,获取key、value和status的:

1
2
3
4
5
6
7
8
9
10
11
//kForward直接取iter_->value(),否则取saved value
virtual Slice value() const {
assert(valid_);
return (direction_ == kForward) ? iter_->value() : saved_value_;
}

virtual Status status() const {
if (status_.ok())
returniter_->status();
return status_;
}

14.4.2 辅助函数

在分析seek系函数之前,先来理解两个重要的辅助函数:FindNextUserEntryFindPrevUserEntry的功能和逻辑。其功能就是循环跳过下一个/前一个delete的记录,直到遇到kValueType的记录。

先来看看,函数声明为:void DBIter::FindNextUserEntry(bool skipping, std::string* skip)

  • 参数@skipping表明是否要跳过sequence更小的entry;
  • 参数@skip临时存储空间,保存seek时要跳过的key;

在进入FindNextUserEntry时,iter_刚好定位在this->key(), this->value()这条记录上。下面来看函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
virtual Slice key() const { //kForward直接取iter_->key(),否则取saved key  
assert(valid_);
return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
}

// 循环直到找到合适的entry,direction必须是kForward
assert(iter_->Valid());
assert(direction_ == kForward);
do {
ParsedInternalKey ikey;
// 确保iter_->key()的sequence <= 遍历指定的sequence
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
switch (ikey.type) {
case kTypeDeletion:
//对于该key,跳过后面遇到的所有entry,它们被这次删除覆盖了
//保存key到skip中,并设置skipping=true
SaveKey(ikey.user_key, skip);
skipping = true;
break;
case kTypeValue:
if (skipping &&
user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
// 这是一个被删除覆盖的entry,或者user key比指定的key小,跳过
}
else { // 找到,清空saved key并返回,iter_已定位到正确的entry
valid_ = true;
saved_key_.clear();
return;
}
break;
}
}
iter_->Next(); // 继续检查下一个entry
} while (iter_->Valid());
// 到这里表明已经找到最后了,没有符合的entry
saved_key_.clear();
valid_ = false;

FindNextUserKey移动方向是kForward,DBIter在向kForward移动时,借用了saved key作为临时缓存。FindNextUserKey确保定位到的entry的sequence不会大于指定的sequence,并跳过被删除标记覆盖的旧记录。

接下来是FindPrevUserKey,函数声明为:void DBIter::FindPrevUserEntry(),在进入FindPrevUserEntry时,iter_刚好位于saved key对应的所有记录之前。源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
assert(direction_ == kReverse); // 确保是kReverse方向  
ValueType value_type =kTypeDeletion; //后面的循环至少执行一次Prev操作
if (iter_->Valid()) {
do { // 循环
// 确保iter_->key()的sequence <= 遍历指定的sequence
ParsedInternalKey ikey;
if (ParseKey(&ikey)&& ikey.sequence <= sequence_) {
if ((value_type !=kTypeDeletion) &&
user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
break; // 我们遇到了前一个key的一个未被删除的entry,跳出循环
// 此时Key()将返回saved_key,saved key非空;
}
//根据类型,如果是Deletion则清空saved key和saved value
//否则,把iter_的user key和value赋给saved key和saved value
value_type = ikey.type;
if (value_type ==kTypeDeletion) {
saved_key_.clear();
ClearSavedValue();
} else {
Slice raw_value =iter_->value();
if(saved_value_.capacity() > raw_value.size() + 1048576) {
std::string empty;
swap(empty,saved_value_);
}
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
saved_value_.assign(raw_value.data(), raw_value.size());
}
}
iter_->Prev(); // 前一个
} while (iter_->Valid());
}
if (value_type == kTypeDeletion){ // 表明遍历结束了,将direction设置为kForward
valid_ = false;
saved_key_.clear();
ClearSavedValue();
direction_ = kForward;
} else {
valid_ = true;
}

函数FindPrevUserKey根据指定的sequence,依次检查前一个entry,直到遇到user key小于saved key,并且类型不是Delete的entry。如果entry的类型是Delete,就清空saved key和saved value,这样在依次遍历前一个entry的循环中,只要类型不是Delete,就是要找的entry。这就是Prev的语义。

14.4.3 Seek系函数

了解了这两个重要的辅助函数,可以分析几个Seek接口了,它们需要借助于上面的这两个函数来跳过被delete的记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void DBIter::Seek(const Slice& target) {  
direction_ = kForward; // 向前seek
// 清空saved value和saved key,并根据target设置saved key
ClearSavedValue();
saved_key_.clear();
AppendInternalKey( // kValueTypeForSeek(1) > kDeleteType(0)
&saved_key_,ParsedInternalKey(target, sequence_, kValueTypeForSeek));
iter_->Seek(saved_key_); // iter seek到saved key
//可以定位到合法的iter,还需要跳过Delete的entry
if (iter_->Valid()) FindNextUserEntry(false,&saved_key_);
else valid_ = false;
}

void DBIter::SeekToFirst() {
direction_ = kForward; // 向前seek
// 清空saved value,首先iter_->SeekToFirst,然后跳过Delete的entry
ClearSavedValue();
iter_->SeekToFirst();
if (iter_->Valid()) FindNextUserEntry(false,&saved_key_ /*临时存储*/);
else valid_ = false;
}

void DBIter::SeekToLast() { // 更简单
direction_ = kReverse;
ClearSavedValue();
iter_->SeekToLast();
FindPrevUserEntry();
}

14.4.4 Prev()和Next()

Next和Prev接口,相对复杂一些。和底层的merging iterator不同,DBIter的Prev和Next步进是以key为单位的,而mergingiterator是以一个record为单位的。所以在调用merging Iterator做Prev和Next迭代时,必须循环直到key发生改变。

假设指定读取的sequence为2,当前iter在key4:2:1上,direction为kForward。此时调用Prev(),此图显示了Prev操作执行的5个步骤:

  • S1 首先因为direction为kForward,先调整iter到key3:1:1上。此图也说明了调整的理由,key4:2:1前面还有key4:3:1。然后进入FindPrevUserEntry函数,执行S2到S4。
  • S2 跳到key3:2:0上时,这是一个删除标记,清空saved key(其中保存的是key3:1:1)。
  • S3 循环继续,跳到key2:1:1上,此时key2:1:1 > saved key,设置saved key为key2:1:1,并继续循环。
  • S4 循环继续,跳到key2:2:1上,此时key2:2:1 > saved key,设置saved key为key2:2:1,并继续循环。
  • S5 跳到Key1:1:1上,因为key1:1:1 < saved key,跳出循环。

最终状态iter_位置在key1:1:1上,而saved key保存的则是key2:2:1上,这也就是Prev应该定位到的值。也就是说在Prev操作下,iter_的位置并不是真正的key位置。这就是前面Get系函数中,在direction为kReverse时,返回saved key/value的原因。

同理,在Next时,如果direction是kReverse,根据上面的Prev可以发现,此时iter刚好是saved key的前一个entry。执行iter->Next()就跳到了saved key的dentry范围的sequence最大的那个entry。在前面的例子中,在Prev后执行Next,那么iter首先跳转到key2:3:1上,然后再调用FindNextUserEntry循环,使iter定位在key2:2:1上。

下面首先来分析Next的实现。如果direction是kReverse,表明上一次做的是kReverse跳转,这种情况下,iter位于key是this->key()的所有entry之前,我们需要先把iter跳转到this->key()对应的entries范围内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void DBIter::Next() {  
assert(valid_);
if (direction_ == kReverse) { //需要预处理,并更改direction=kForward
direction_ = kForward;
// iter_刚好在this->key()的所有entry之前,所以先跳转到this->key()
// 的entries范围之内,然后再做常规的skip
if (!iter_->Valid()) iter_->SeekToFirst();
else iter_->Next();
if (!iter_->Valid()) {
valid_ = false;
saved_key_.clear();
return;
}
}
// 把saved_key_ 用作skip的临时存储空间
std::string* skip =&saved_key_;
SaveKey(ExtractUserKey(iter_->key()), skip);// 设置skip为iter_->key()的user key
FindNextUserEntry(true, skip);
}

接下来是Prev(),其实和Next()逻辑相似,但方向相反。

如果direction是kForward,表明上一次是做的是kForward跳转,这种情况下,iter_指向当前的entry,我们需要调整iter,使其指向到前一个key,iter的位置是这个key所有record序列的最后一个,也就是sequence最小的那个record。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void DBIter::Prev() {  
assert(valid_);
if (direction_ == kForward) { //需要预处理,并更改direction
// iter_指向当前的entry,向后扫描直到key发生改变,然后我们可以做
//常规的reverse扫描
assert(iter_->Valid()); // iter_必须合法,并把saved key设置为iter_->key()
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
while (true) {
iter_->Prev();
if (!iter_->Valid()) { // 到头了,直接返回
valid_ = false;
saved_key_.clear();
ClearSavedValue();
return;
}
if (user_comparator_->Compare(ExtractUserKey(iter_->key()),
saved_key_) < 0) {
break; // key变化就跳出循环,此时iter_刚好位于saved key对应的所有entry之前
}
}
direction_ = kReverse;
}
FindPrevUserEntry();
}

14.5 小结

查询操作并不复杂,只需要根据seq找到最新的记录即可。知道leveldb的遍历会比较复杂,不过也没想到会这么复杂。这主要是得益于sstable 0的重合性,以及memtable和sstable文件的重合性。