最新消息:

新人讨论二- Percolator中的两阶段提交协议分析

未分类 admin 2746浏览 0评论

Percolator是Google使用的一款基于Bigtable的、支持事务的增量索引系统,以客户端库的形式提供给用户。使用Percolator后,从页面爬取到新数据到这些数据被用户可见之间的平均延迟被缩小100倍!从技术的角度看主要有两方面改进:
1. 能够增量地处理索引(Percolator之前Google使用MapReduce批量更新索引)
2. 在并发处理的过程中能够保持一些”不变性(invariants)”,如指向同一个页面的链接等。

Percolator基于Bigtable,在实现过程中充分利用了Bigtable的单行事务特性,在Bigtable提供的单行数据原子读写的基础上Percolator实现了跨行跨表的事务特性(ACID)。Percolator实现的事务具有如下特点:

1. 利用了Bigtable的单行事务。因为Bigtable基于GFS提供了可靠的存储和原子写特性,Bigtable被选作用于实现锁服务。
2. Percolator利用了Bigtable的timestamp维来提供snapshot isolation. snapshot isolation为每一个事务提供了一个一致的快照视图。

本文重点考察Percolator中如何实现两阶段提交协议。在Percolator的论文中用银行转账的经典案例来说明Percolator是如何实现两阶段提交的,如下图:

write,lock,data列的含义

 

 

 

上面的例子展示了一次成功的事务,事务执行过程中没有冲突发生,也没出现worker宕机的情形。那么,在Percolator是如何处理各种异常情况的呢?下面从代码的角度来分析整个过程:

class Transaction{
    struct Write { Row row; Column col; string value; };
    vector<Write> writes_;

    // 每一个事务都有一个start timestamp
    // 读事务只关心[0, start_ts_]时间区间之内数据逻辑是否一致
    // 写事务则需要关心[0, infinate)时间区间之内数据逻辑是否一致
    int start_ts_;

    // 初始化当前事务的timestamp,note: 此oracle非彼Oracle
    Transaction() : start_ts_(oracle.GetTimestamp())
    {
    }

    void Set(Write w)
    {
        writes_.push_back(w);
    }

    // 读事务
    void Get(Row row, Column c, string *value)
    {
        while (true)
        {
            // 利用了Google Bigtable的单行事务特性。
            // 单行事务的特征为:
            // 在单行数据的操作上保证事务性(ACID)
            bigtable::Txn T = bigtable::StartRowTransaction(row);
            // 检查在读操作的同时是否有并发的写操作,如果有并发写操作
            // (包括那些没有彻底完成写操作就挂掉的情况)则需要执行比较
            // 复杂的重试/清理操作 - BackoffAndMaybeCleanupLock()
            //
            // 这里需要注意时间区间为[0, start_ts],也就是说Get只关心
            // 在本事务发起前的数据快照是否具有一致性,对start_ts_之后
            // 发起的事务它并不关心。这反映了Percolator表现出来的
            // snapshot isolation特性,Get操作的是start_ts_之前
	    // 的一个快照
            if (T.Read(row, c+"lock", [0, start_ts]))
            {
                // 执行到这里的时候说明有尚未解开的锁
		// (pending lock),可能来自:
                //  1. 在start_ts_之前发起的一个写事务正在进行中
                //  2. 在start_ts_之前发起的一个写事务
		//     没有完全commit就死掉了
                // ps. Back off的意思是后退,滚开,
		// 貌似一群警察踢门的时候常喊?
                BackoffAndMaybeCleanupLock(row, c);
                continue;
            }

            // 执行到这里的时候说明start_ts_之前的数据具有
	    // 一个一致的snapshot
            last_write = T.Read(row, c+"write", [0, start_ts_]);
            // sanity check. 没有找到任何数据可读,返回。
            if (!latest_write.found())
            {
                return false;
            }
            // write列记录了data所在的timestamp,
	    // 为了读到一条数据,需要先得到该数据所在的
	    // timestamp,然后通过timestamp读到最终数据,
	    // 有点间接寻址的味道
            int data_ts = latest_write.start_timestamp();
            *value = T.Read(row, c+"data", [data_ts, data_ts]);

            return true;
        }
    }

    bool Prewrite(Write w, Write primary)
    {
        Column c = w.col;
        bigtable::Txn T = bigtable::StartRowTransaction(w.row);

        // 如果在本事务开始后([start_ts_, inf))也有其他事务执行写操作,
        // 并且已经完成了部分/全部数据写操作,则abort
        //
        // 对于start_ts_之前的写操作,分为两种情况
        //  1. 整个事务都提交完成了得写操作,这是正常情况,结果一致
        //  2. 只写了一半的事务,这由后面的lock检查来处理
        if (T.Read(w.row, c+"write", [start_ts_, inf])
        {
            return false;
        }

        // 如果在当前操作的cell上还有锁的话,则abort
        // 这个检查比较狠,只要有锁,无论timestamp为多少均abort,这是
        // 因为只要有锁,就说明还有一个并发事务(dead or not)在写当前cell
        if (T.Read(w.row, c+"lock", [0, inf]))
        {
            return false;
        }

        // 检查到这里就可以放心地预写入数据和锁了
        // 此时data对Get还不可见,因为write还没有写入
        T.Write(w.row, c+"data", start_ts_, w.value);
        T.Write(w.row, c+"lock", start_ts_, {primary.row, primary.col});

        // 提交bigtable单行事务
        return T.commit();
    }

    bool Commit()
    {
        // 任选一个write作为primary,这里primary的作用类似于一个标志点
        // primary行被提交后,整个事务必须提交
        Write primary = writes_[0];
        vector<Write> secondaries(writes_.begin() + 1, writes_end());

        // 预提交
        // primary和secondarise的预提交如果失败,
        // 则说明还有别的并发事务在写当前cell,当前commit需要abort
        //
        // 我对并发事务的理解:在时间轴上有交集的事务。
        //  Timeline -------------------------------------------->
        //  Trans0:   ^-$
        //  Trans1:       ^-----$
        //  Trans2:       ^----------$
        //  Trans3:          ^------------$
        //  Trans4:                           ^----$
        //  Trans5:  ^----x
        // ^标志事务开始,$标志事务结束,x表示执行事务的进程中途死掉
        //  0,5; 1,2; 1,3; 1,5; 2,3; 2,5均为并发事务
        if (!Prewrite(primary, primary))
        {
            return false;
        }
        for (Write w : secondaries)
        {
            if (!Prewrite(w, primary))
            {
                return false;
            }
        }

        int commit_ts = oracle_.GetTimestamp();

        Write p = primary;
        bigtable::Txn T = bigtable::StartRowTransaction(p.row);
        // 读取Prewrite阶段写入的lock,如果读取失败,则abort
        // 执行这一步的原因在于lock可能由于某种原因被Get操作清理掉了
        // 某种原因包括:
        //  1. 真死了
        //  2. 假死,等下可能活过来的
        //    1) 执行当前事务的线程被调度器调度出去了,执行优先级较低
        //    2) 系统中出现了一些工作特别繁重的线程,把系统暂时性压死
        //    3) 等待IO。等等
        // 另外,这里只读取primary lock,而没有读取其它lock,是Percolator
        // 的一个约定,它相对简化了检查过程,不需要检查secondaries的lock。
        if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
        {
            return false;
        }
        T.Write(p.row, p.col+"write", commit_ts, start_ts_);
        // 成功执行下面的Commit操作后,写操作对Get可见
        T.Erase(p.row, p.col+"lock", commit_ts);

        // *NOTE* 提交点,T.Commit执行成功后一旦系统出现故障,恢复后
        // 只能rollforward,不能rollback
        if(!T.Commit())
        {
            return false;
        }

        // 此时的写操作已经不需要用行事务来保证了,因为这里只有写操作
        // 并且也不可能有两个并发写操作都写同一个commit_ts下的cell
        for (Write w : secondaries)
        {
            bigtable.Write(w.row, w.col+"write", commit_ts, start_ts_);
            bigtable.Erase(w.row, w.col+"lock", commit_ts);
        }

        return true;
    }

    // 确认造成冲突的进程是否已经退出,如果退出则做清理,否则忽略
    void BackoffAndMaybeCleanupLock(Row row, Column c)
    {
        // 判断写这个锁的worker是否还活着(liveness)的方法:
        // 每个worker会写一个token到Chunbby lockservice中,并且定期
        // 更新这个token中的last_update_time,其它worker检查这个worker
        // 是否存活的方法就是去检查这个token是否存在,如果存在,其
        // last_update_time是否太旧,通过这两重检查才判定该worker活着。

        // 如果判定该worker已死,则根据primary lock的状态来决定动作:
        //  1. primary lock不存在: roll-forward, 将所有未提交的
	//     secondary write都提交掉,相应的lock都擦除掉
        //  2. primary lock存在  : roll-back, 将primary的数据清除掉,
        //     write的值也擦除掉。
        // 另外注意,读取到任意版本的lock就执行本函数

        // 如果worker还活着,则不采取什么数据操作,可能小睡眠一下,等待
        // worker主动将锁清除。
        // Get操作会因此等待较长一段时间,这是Percolator需要注意的一个特点。
    }

} // class Transaction

上面的代码注释着重分析了执行过程,和每一条语句的内涵。下面还是同样的代码,注释中着重说明了当出现宕机的情形下,Percolator是如何处理failure case及实现事务的rollback或rollforward的。

class Transaction{
    struct Write { Row row; Column col; string value; };
    vector<Write> writes_;
    int start_ts_;

    Transaction() : start_ts_(oracle.GetTimestamp())
    {
    }

    void Set(Write w)
    {
        writes_.push_back(w);
    }

    // [fail here] 不考虑清理工作的话,Get操作没有副作用,fail safe
    void Get(Row row, Column c, string *value)
    {
        while (true)
        {
            bigtable::Txn T =
		bigtable::StartRowTransaction(row);
            if (T.Read(row, c+"lock", [0, start_ts]))
            {
                BackoffAndMaybeCleanupLock(row, c);
                continue;
            }

            last_write = T.Read(row, c+"write", [0, start_ts_]);
            if (!latest_write.found())
            {
                return false;
            }
            int data_ts = latest_write.start_timestamp();
            *value = T.Read(row, c+"data", [data_ts, data_ts]);

            return true;
        }
    }

    bool Prewrite(Write w, Write primary)
    {
        Column c = w.col;
        bigtable::Txn T = bigtable::StartRowTransaction(w.row);
        // [fail here] 尚未Commit,safe,下面Commit之前都一样safe
        if (T.Read(w.row, c+"write", [start_ts_, inf])
        {
            return false;
        }
        if (T.Read(w.row, c+"lock", [0, inf]))
        {
            return false;
        }
        // 这里可以看到用bigtable transaction的意义:
        // 如果没有使用行事务,data写入后worker fail掉,但lock又还没写入
        // 这个data写入将不能被后继的Get检测到,成为垃圾数据
        T.Write(w.row, c+"data", start_ts_, w.value);
        T.Write(w.row, c+"lock", start_ts_,
		{primary.row, primary.col});

        return T.commit();
        // [fail here] lock写入了,后继的Get会看到这个lock,最终通过
        //       BackoffAndMaybeCleanupLock将这个lock清理(rollback)
        //       data写入了,但没有write索引它,对外不可见,safe
    }

    bool Commit()
    {
        Write primary = writes_[0];
        vector<Write> secondaries(writes_.begin() + 1,
				writes_end());

        // [fail here] 尚无任何实际动作,safe
        if (!Prewrite(primary, primary))
        {
            return false;
        }
        // [fail here] Primary lock存在,
	//		后继的Get将看到这个lock,rollback
        for (Write w : secondaries)
        {
            if (!Prewrite(w, primary))
            {
                return false;
            }
        }
        // [fail here] Primary lock存在,
	//		后继的Get将看到这个lock,rollback

        int commit_ts = oracle_.GetTimestamp();

        Write p = primary;
        bigtable::Txn T = bigtable::StartRowTransaction(p.row);
        // [fail here] 单行事务,Commit前所有操作对外不可见,此事务期间任意
        //             点fail,后继的Get将看到primary lock,rollback
        if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
        {
            return false;
        }
        T.Write(p.row, p.col+"write", commit_ts, start_ts_);
        T.Erase(p.row, p.col+"lock", commit_ts);
        // [FIX ME] 这里为什么要用行事务我还没想明白,
	//	    不用事务会有什么大问题?欢迎补充。
        if(!T.Commit())
        {
            // [fail here] Primary lock存在,后继的Get将看到这个lock,
	    //		   rollback
            return false;
        }
        // [fail here] Primary lock不存在,
	//	       后继的Get将操作将触发rollforward
        for (Write w : secondaries)
        {
            // [fail here] Primary lock不存在,
	    //		   后继的Get将操作将触发rollforward
            bigtable.Write(w.row, w.col+"write",
				commit_ts, start_ts_);
            // [fail here] Primary lock不存在,
	    //		   后继的Get将操作将触发rollforward
            bigtable.Erase(w.row, w.col+"lock", commit_ts);
        }
        // [fail here] 事务已经完成,safe

        return true;
    }

    void BackoffAndMaybeCleanupLock(Row row, Column c)
    {
    }

} // class Transaction

下面再回过头来看看Percolator的两阶段提交和教科书中得两阶段提交之间的异同。在Wikipedia中两阶段提交的过程被描述为:

Success

If the coordinator received an agreement message from all cohorts during the commit-request phase:

  1. The coordinator sends a commit message to all the cohorts.
  2. Each cohort completes the operation, and releases all the locks and resources held during the transaction.
  3. Each cohort sends an acknowledgment to the coordinator.
  4. The coordinator completes the transaction when all acknowledgments have been received.

上面的过程里面有协调者(coordinator)和参与者(cohort)两个角色,而Percolator中仿佛只有客户端一个角色 。实际上,需要灵活地理解协调者和参与者,他们只是两个概念,而不是必须对应两个实体。协调者代表的概念是控制事务进度,参与者代表的概念是执行具体操作,记录事务状态,同时向协调者报告状态。在Percolator中Get逻辑和Commit逻辑体现了协调者的概念,客户端对Bigtable状态的修改和读取体现了参与者的概念。理解了这一点之后再对应看上面的过程就要清晰很多:

  • 客户端调用Prewrite函数,通知所有写操作都预提交
  • Prewrite函数中将数据进行预提交(写data和lock)
  • 预提交均成功,返回true
  • 客户端提交Primary write的commit操作,commit成功后标志着两阶段提交完成,后继secondary writes必须成功。 

Failure

If any cohort votes No during the commit-request phase (or the coordinator’s timeout expires):

  1. The coordinator sends a rollback message to all the cohorts.
  2. Each cohort undoes the transaction using the undo log, and releases the resources and locks held during the transaction.
  3. Each cohort sends an acknowledgement to the coordinator.
  4. The coordinator undoes the transaction when all acknowledgements have been received.

Percolator中对失败(Failure)的处理与这里有较大不同,这里失败处理是立即进行的,而percolator中延迟了失败的处理,由后继的Get操作来完成。

 

参考文献

转载请注明:爱开源 » 新人讨论二- Percolator中的两阶段提交协议分析

您必须 登录 才能发表评论!