insert函数
/* in b_plus_leaf_page.h */ INDEX_TEMPLATE_ARGUMENTS int B_PLUS_TREE_LEAF_PAGE_TYPE::Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator) { if(!GetSize()||comparator(key, KeyAt(GetSize() - 1)) > 0) array[GetSize()] = {key, value}; else{ int l=0,r=GetSize()-1; while(l<r){ int mid=(l+r)>>1; if(comparator(key,array[mid].first)<0)r=mid; else if(comparator(key,array[mid].first)>0)l=mid+1; else assert(0); } memmove(array + r + 1, array + r,static_cast<size_t>((GetSize() - r)*sizeof(MappingType))); array[r] = {key, value}; } IncreaseSize(1); return GetSize(); }
否则寻找插入元素应该在的叶子结点
a . 如果叶子结点内的关键字小于m-1,则直接插入到叶子结点insert_into_leaf
findLeafPage函数有点复杂
要考虑无论是读或者写从根节点。到叶子结点都需要加锁。然后注意释放锁否则会锁死。(这个地方测试的时候卡死了我好久)
这里对原来的函数定义做了一些修改。加入了操作类型的判断。
/* 定义在b_plus_tree.h中 定义方法和定义page类型保持一致 */ enum class Operation { READ = 0, INSERT, DELETE }; INDEX_TEMPLATE_ARGUMENTS Page *BPlusTree<KeyType, ValueType, KeyComparator>::FindLeafPage(const KeyType &key, bool leftMost, Operation op, Transaction *transaction) { if (IsEmpty()) { return nullptr; } auto root = buffer_pool_manager_->FetchPage(root_page_id_); if (root == nullptr) { throw "no page can find"; } if (op == Operation::READ) { root->RLatch(); } else { root->WLatch(); } if (transaction != nullptr) { transaction->AddIntoPageSet(root); } auto node = reinterpret_cast<BPlusTreePage *>(root->GetData()); while (!node->IsLeafPage()) { auto internal =reinterpret_cast<BPlusTreeInternalPage<KeyType, page_id_t,KeyComparator> *>(node); page_id_t parent_page_id = node->GetPageId(), child_page_id; if (leftMost) { child_page_id = internal->ValueAt(0); } else { child_page_id = internal->Lookup(key, comparator_); } auto child = buffer_pool_manager_->FetchPage(child_page_id); if (child == nullptr) { throw "not find child in findLeaf"; } if (op == Operation::READ) { child->RLatch(); UnlockUnpinPages(op, transaction); } else { child->WLatch(); } node = reinterpret_cast<BPlusTreePage *>(child->GetData()); assert(node->GetParentPageId() == parent_page_id); // child is locked, If child is safe, release all locks on ancestors. if (op != Operation::READ && isSafe(node, op)) { UnlockUnpinPages(op, transaction); } if (transaction != nullptr) { transaction->AddIntoPageSet(child); } else { root->RUnlatch(); buffer_pool_manager_->UnpinPage(root->GetPageId(), false); root = child; } } return reinterpret_cast<Page*>(node); }Lookup函数
找到key值所在的page---二分查找
INDEX_TEMPLATE_ARGUMENTS ValueType B_PLUS_TREE_INTERNAL_PAGE_TYPE::Lookup(const KeyType &key, const KeyComparator &comparator) const { int l=0,r=GetSize()-1; if (comparator(key, array[1].first) < 0) return array[0].second; else{ while(l<r){ int mid=(l+r)>>1; if(comparator(key,array[mid].first)<0)r=mid; else if(comparator(key, array[mid].first) > 0) l=mid+1; else return array[mid].second; } } return array[r].second; }找到Leaf page之后
判断该元素是否已经在树中
b. 进行分裂
INDEX_TEMPLATE_ARGUMENTS bool BPLUSTREE_TYPE::InsertIntoLeaf(const KeyType &key, const ValueType &value, Transaction *transaction) { auto leaf = reinterpret_cast<BPlusTreeLeafPage<KeyType, ValueType,KeyComparator> *>(FindLeafPage(key, false,Operation::INSERT, transaction)); if (leaf == nullptr) { return false; } // if already in the tree, return false ValueType v; if (leaf->Lookup(key, &v, comparator_)) { UnlockUnpinPages(Operation::INSERT, transaction); return false; } //case 1 keys in leaf page <m-1 if (leaf->GetSize() < leaf->GetMaxSize()) { leaf->Insert(key, value, comparator_); }
分裂的步骤
调用split函数对叶子结点进行分割
--- split的时候会产生一个含有m-m/2个关键字的新结点。注意把两个叶子结点连接起来。
--- 然后调用InsertIntoParent
// case 2 need to split else { leaf->Insert(key, value, comparator_); auto new_leaf = Split<BPlusTreeLeafPage<KeyType, ValueType, KeyComparator>>(leaf); new_leaf->SetNextPageId(leaf->GetNextPageId()); leaf->SetNextPageId(new_leaf->GetPageId()); // insert the split key into parent InsertIntoParent(leaf, new_leaf->KeyAt(0), new_leaf, transaction); } UnlockUnpinPages(Operation::INSERT, transaction); return true; }
在InsertIntoParent中
case1-- 如果当前结点为根节点。则创建一个新的根节点。新根节点的子结点为分裂所得(经过split操作后)得到的两个结点
INDEX_TEMPLATE_ARGUMENTS void BPLUSTREE_TYPE::InsertIntoParent(BPlusTreePage *old_node, const KeyType &key, BPlusTreePage *new_node,Transaction *transaction) { //case 1 create new root if (old_node->IsRootPage()) { auto page = buffer_pool_manager_->NewPage(&root_page_id_); if (page == nullptr) { throw "not page can used in InsertIntoParent"; } assert(page->GetPinCount() == 1); auto root =reinterpret_cast<BPlusTreeInternalPage<KeyType, page_id_t,KeyComparator> *>(page->GetData()); root->Init(root_page_id_,INVALID_PAGE_ID,internal_max_size_); root->PopulateNewRoot(old_node->GetPageId(), key, new_node->GetPageId()); old_node->SetParentPageId(root_page_id_); new_node->SetParentPageId(root_page_id_); //TODO update to new root_page_id UpdateRootPageId(false); //TODO unpin buffer_pool_manager_->UnpinPage(new_node->GetPageId(), true); buffer_pool_manager_->UnpinPage(root->GetPageId(), true); }case2 -- 否则要递归上述的过程
a. 先找分裂产生结点的父亲结点。如果可以直接插入则直接插入