Percolator是Google使用的一款基于Bigtable的、支持事务的增量索引系统,以客户端库的形式提供给用户。使用Percolator后,从页面爬取到新数据到这些数据被用户可见之间的平均延迟被缩小100倍!从技术的角度看主要有两方面改进:
1. 能够增量地处理索引(Percolator之前Google使用MapReduce批量更新索引)
2. 在并发处理的过程中能够保持一些”不变性(invariants)”,如指向同一个页面的链接等。
Percolator基于Bigtable,在实现过程中充分利用了Bigtable的单行事务特性,在Bigtable提供的单行数据原子读写的基础上Percolator实现了跨行跨表的事务特性(ACID)。Percolator实现的事务具有如下特点:
1. 利用了Bigtable的单行事务。因为Bigtable基于GFS提供了可靠的存储和原子写特性,Bigtable被选作用于实现锁服务。
2. Percolator利用了Bigtable的timestamp维来提供snapshot isolation. snapshot isolation为每一个事务提供了一个一致的快照视图。
本文重点考察Percolator中如何实现两阶段提交协议。在Percolator的论文中用银行转账的经典案例来说明Percolator是如何实现两阶段提交的,如下图:
上面的例子展示了一次成功的事务,事务执行过程中没有冲突发生,也没出现worker宕机的情形。那么,在Percolator是如何处理各种异常情况的呢?下面从代码的角度来分析整个过程:
class Transaction{ struct Write { Row row; Column col; string value; }; vector<Write> writes_; // 每一个事务都有一个start timestamp // 读事务只关心[0, start_ts_]时间区间之内数据逻辑是否一致 // 写事务则需要关心[0, infinate)时间区间之内数据逻辑是否一致 int start_ts_; // 初始化当前事务的timestamp,note: 此oracle非彼Oracle Transaction() : start_ts_(oracle.GetTimestamp()) { } void Set(Write w) { writes_.push_back(w); } // 读事务 void Get(Row row, Column c, string *value) { while (true) { // 利用了Google Bigtable的单行事务特性。 // 单行事务的特征为: // 在单行数据的操作上保证事务性(ACID) bigtable::Txn T = bigtable::StartRowTransaction(row); // 检查在读操作的同时是否有并发的写操作,如果有并发写操作 // (包括那些没有彻底完成写操作就挂掉的情况)则需要执行比较 // 复杂的重试/清理操作 - BackoffAndMaybeCleanupLock() // // 这里需要注意时间区间为[0, start_ts],也就是说Get只关心 // 在本事务发起前的数据快照是否具有一致性,对start_ts_之后 // 发起的事务它并不关心。这反映了Percolator表现出来的 // snapshot isolation特性,Get操作的是start_ts_之前 // 的一个快照 if (T.Read(row, c+"lock", [0, start_ts])) { // 执行到这里的时候说明有尚未解开的锁 // (pending lock),可能来自: // 1. 在start_ts_之前发起的一个写事务正在进行中 // 2. 在start_ts_之前发起的一个写事务 // 没有完全commit就死掉了 // ps. Back off的意思是后退,滚开, // 貌似一群警察踢门的时候常喊? BackoffAndMaybeCleanupLock(row, c); continue; } // 执行到这里的时候说明start_ts_之前的数据具有 // 一个一致的snapshot last_write = T.Read(row, c+"write", [0, start_ts_]); // sanity check. 没有找到任何数据可读,返回。 if (!latest_write.found()) { return false; } // write列记录了data所在的timestamp, // 为了读到一条数据,需要先得到该数据所在的 // timestamp,然后通过timestamp读到最终数据, // 有点间接寻址的味道 int data_ts = latest_write.start_timestamp(); *value = T.Read(row, c+"data", [data_ts, data_ts]); return true; } } bool Prewrite(Write w, Write primary) { Column c = w.col; bigtable::Txn T = bigtable::StartRowTransaction(w.row); // 如果在本事务开始后([start_ts_, inf))也有其他事务执行写操作, // 并且已经完成了部分/全部数据写操作,则abort // // 对于start_ts_之前的写操作,分为两种情况 // 1. 整个事务都提交完成了得写操作,这是正常情况,结果一致 // 2. 只写了一半的事务,这由后面的lock检查来处理 if (T.Read(w.row, c+"write", [start_ts_, inf]) { return false; } // 如果在当前操作的cell上还有锁的话,则abort // 这个检查比较狠,只要有锁,无论timestamp为多少均abort,这是 // 因为只要有锁,就说明还有一个并发事务(dead or not)在写当前cell if (T.Read(w.row, c+"lock", [0, inf])) { return false; } // 检查到这里就可以放心地预写入数据和锁了 // 此时data对Get还不可见,因为write还没有写入 T.Write(w.row, c+"data", start_ts_, w.value); T.Write(w.row, c+"lock", start_ts_, {primary.row, primary.col}); // 提交bigtable单行事务 return T.commit(); } bool Commit() { // 任选一个write作为primary,这里primary的作用类似于一个标志点 // primary行被提交后,整个事务必须提交 Write primary = writes_[0]; vector<Write> secondaries(writes_.begin() + 1, writes_end()); // 预提交 // primary和secondarise的预提交如果失败, // 则说明还有别的并发事务在写当前cell,当前commit需要abort // // 我对并发事务的理解:在时间轴上有交集的事务。 // Timeline --------------------------------------------> // Trans0: ^-$ // Trans1: ^-----$ // Trans2: ^----------$ // Trans3: ^------------$ // Trans4: ^----$ // Trans5: ^----x // ^标志事务开始,$标志事务结束,x表示执行事务的进程中途死掉 // 0,5; 1,2; 1,3; 1,5; 2,3; 2,5均为并发事务 if (!Prewrite(primary, primary)) { return false; } for (Write w : secondaries) { if (!Prewrite(w, primary)) { return false; } } int commit_ts = oracle_.GetTimestamp(); Write p = primary; bigtable::Txn T = bigtable::StartRowTransaction(p.row); // 读取Prewrite阶段写入的lock,如果读取失败,则abort // 执行这一步的原因在于lock可能由于某种原因被Get操作清理掉了 // 某种原因包括: // 1. 真死了 // 2. 假死,等下可能活过来的 // 1) 执行当前事务的线程被调度器调度出去了,执行优先级较低 // 2) 系统中出现了一些工作特别繁重的线程,把系统暂时性压死 // 3) 等待IO。等等 // 另外,这里只读取primary lock,而没有读取其它lock,是Percolator // 的一个约定,它相对简化了检查过程,不需要检查secondaries的lock。 if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_])) { return false; } T.Write(p.row, p.col+"write", commit_ts, start_ts_); // 成功执行下面的Commit操作后,写操作对Get可见 T.Erase(p.row, p.col+"lock", commit_ts); // *NOTE* 提交点,T.Commit执行成功后一旦系统出现故障,恢复后 // 只能rollforward,不能rollback if(!T.Commit()) { return false; } // 此时的写操作已经不需要用行事务来保证了,因为这里只有写操作 // 并且也不可能有两个并发写操作都写同一个commit_ts下的cell for (Write w : secondaries) { bigtable.Write(w.row, w.col+"write", commit_ts, start_ts_); bigtable.Erase(w.row, w.col+"lock", commit_ts); } return true; } // 确认造成冲突的进程是否已经退出,如果退出则做清理,否则忽略 void BackoffAndMaybeCleanupLock(Row row, Column c) { // 判断写这个锁的worker是否还活着(liveness)的方法: // 每个worker会写一个token到Chunbby lockservice中,并且定期 // 更新这个token中的last_update_time,其它worker检查这个worker // 是否存活的方法就是去检查这个token是否存在,如果存在,其 // last_update_time是否太旧,通过这两重检查才判定该worker活着。 // 如果判定该worker已死,则根据primary lock的状态来决定动作: // 1. primary lock不存在: roll-forward, 将所有未提交的 // secondary write都提交掉,相应的lock都擦除掉 // 2. primary lock存在 : roll-back, 将primary的数据清除掉, // write的值也擦除掉。 // 另外注意,读取到任意版本的lock就执行本函数 // 如果worker还活着,则不采取什么数据操作,可能小睡眠一下,等待 // worker主动将锁清除。 // Get操作会因此等待较长一段时间,这是Percolator需要注意的一个特点。 } } // class Transaction
上面的代码注释着重分析了执行过程,和每一条语句的内涵。下面还是同样的代码,注释中着重说明了当出现宕机的情形下,Percolator是如何处理failure case及实现事务的rollback或rollforward的。
class Transaction{ struct Write { Row row; Column col; string value; }; vector<Write> writes_; int start_ts_; Transaction() : start_ts_(oracle.GetTimestamp()) { } void Set(Write w) { writes_.push_back(w); } // [fail here] 不考虑清理工作的话,Get操作没有副作用,fail safe void Get(Row row, Column c, string *value) { while (true) { bigtable::Txn T = bigtable::StartRowTransaction(row); if (T.Read(row, c+"lock", [0, start_ts])) { BackoffAndMaybeCleanupLock(row, c); continue; } last_write = T.Read(row, c+"write", [0, start_ts_]); if (!latest_write.found()) { return false; } int data_ts = latest_write.start_timestamp(); *value = T.Read(row, c+"data", [data_ts, data_ts]); return true; } } bool Prewrite(Write w, Write primary) { Column c = w.col; bigtable::Txn T = bigtable::StartRowTransaction(w.row); // [fail here] 尚未Commit,safe,下面Commit之前都一样safe if (T.Read(w.row, c+"write", [start_ts_, inf]) { return false; } if (T.Read(w.row, c+"lock", [0, inf])) { return false; } // 这里可以看到用bigtable transaction的意义: // 如果没有使用行事务,data写入后worker fail掉,但lock又还没写入 // 这个data写入将不能被后继的Get检测到,成为垃圾数据 T.Write(w.row, c+"data", start_ts_, w.value); T.Write(w.row, c+"lock", start_ts_, {primary.row, primary.col}); return T.commit(); // [fail here] lock写入了,后继的Get会看到这个lock,最终通过 // BackoffAndMaybeCleanupLock将这个lock清理(rollback) // data写入了,但没有write索引它,对外不可见,safe } bool Commit() { Write primary = writes_[0]; vector<Write> secondaries(writes_.begin() + 1, writes_end()); // [fail here] 尚无任何实际动作,safe if (!Prewrite(primary, primary)) { return false; } // [fail here] Primary lock存在, // 后继的Get将看到这个lock,rollback for (Write w : secondaries) { if (!Prewrite(w, primary)) { return false; } } // [fail here] Primary lock存在, // 后继的Get将看到这个lock,rollback int commit_ts = oracle_.GetTimestamp(); Write p = primary; bigtable::Txn T = bigtable::StartRowTransaction(p.row); // [fail here] 单行事务,Commit前所有操作对外不可见,此事务期间任意 // 点fail,后继的Get将看到primary lock,rollback if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_])) { return false; } T.Write(p.row, p.col+"write", commit_ts, start_ts_); T.Erase(p.row, p.col+"lock", commit_ts); // [FIX ME] 这里为什么要用行事务我还没想明白, // 不用事务会有什么大问题?欢迎补充。 if(!T.Commit()) { // [fail here] Primary lock存在,后继的Get将看到这个lock, // rollback return false; } // [fail here] Primary lock不存在, // 后继的Get将操作将触发rollforward for (Write w : secondaries) { // [fail here] Primary lock不存在, // 后继的Get将操作将触发rollforward bigtable.Write(w.row, w.col+"write", commit_ts, start_ts_); // [fail here] Primary lock不存在, // 后继的Get将操作将触发rollforward bigtable.Erase(w.row, w.col+"lock", commit_ts); } // [fail here] 事务已经完成,safe return true; } void BackoffAndMaybeCleanupLock(Row row, Column c) { } } // class Transaction
下面再回过头来看看Percolator的两阶段提交和教科书中得两阶段提交之间的异同。在Wikipedia中两阶段提交的过程被描述为:
Success
If the coordinator received an agreement message from all cohorts during the commit-request phase:
- The coordinator sends a commit message to all the cohorts.
- Each cohort completes the operation, and releases all the locks and resources held during the transaction.
- Each cohort sends an acknowledgment to the coordinator.
- The coordinator completes the transaction when all acknowledgments have been received.
上面的过程里面有协调者(coordinator)和参与者(cohort)两个角色,而Percolator中仿佛只有客户端一个角色 。实际上,需要灵活地理解协调者和参与者,他们只是两个概念,而不是必须对应两个实体。协调者代表的概念是控制事务进度,参与者代表的概念是执行具体操作,记录事务状态,同时向协调者报告状态。在Percolator中Get逻辑和Commit逻辑体现了协调者的概念,客户端对Bigtable状态的修改和读取体现了参与者的概念。理解了这一点之后再对应看上面的过程就要清晰很多:
- 客户端调用Prewrite函数,通知所有写操作都预提交
- Prewrite函数中将数据进行预提交(写data和lock)
- 预提交均成功,返回true
- 客户端提交Primary write的commit操作,commit成功后标志着两阶段提交完成,后继secondary writes必须成功。
Failure
If any cohort votes No during the commit-request phase (or the coordinator’s timeout expires):
- The coordinator sends a rollback message to all the cohorts.
- Each cohort undoes the transaction using the undo log, and releases the resources and locks held during the transaction.
- Each cohort sends an acknowledgement to the coordinator.
- The coordinator undoes the transaction when all acknowledgements have been received.
Percolator中对失败(Failure)的处理与这里有较大不同,这里失败处理是立即进行的,而percolator中延迟了失败的处理,由后继的Get操作来完成。
参考文献
- Google Percolator Research paper:“Large-scale Incremental Processing Using Distributed Transactions and Notifications”, Daniel Peng, Frank Dabek, Proceedings of the 9th USENIX Symposium on Operating Systems Design and Implementation, 2010.
- Google research papers on GFS, BigTable, MapReduce
- My reading of Percolator architecture: a Google search engine component
转载请注明:爱开源 » 新人讨论二- Percolator中的两阶段提交协议分析