Mit分布式笔记

Published: by Creative Commons Licence

  • Tags:

线性一致性

线性一致性用来描述我们的分布式系统多么接近一台高性能单机应用,其正式定义如下:

  • 操作具有全序性
  • 顺序符合实时顺序
  • 读操作返回最后写入值

zookeeper

要实现线性一致性,raft我们需要通过leader节点来执行读操作,但是这就导致了写入和读取不能水平扩容的问题。事实上由于写入必须经由leader节点,因此随着机器的增加,写入的吞吐量反而会下降(需要更多节点组成多数派)。

zookeeper通过违背线性一致性,提供了一种新的一致性,从而得到了读水平扩容的能力。

  • 线性化写入(所有写入都通过leader节点并以相同的顺序复制到不同机器上)
  • 抛弃读取的线性一致性,提供FIFO客户端序(相同客户端提交的请求,较晚的请求可以看到较早的请求的结果,即在日志中的相对顺序是有序的)
    • 读取操作可以看到同一客户端之前的写操作结果
    • 读取操作看到的已提交日志的前缀

其具体原理是,zookeeper客户端执行写入操作,leader只有在写入提交后才会响应客户端,同时返回写入的日志索引zxid。之后客户端执行读取操作时,可以路由到任意的节点上进行,不过读取操作会带上最后一次写入操作返回的zxid,而节点直到接收并提交了zxid的日志项之后才会执行这个读取操作。

在zookeeper中,提供了watch机制,客户端可以在读某个key的同时在key上放置watcher,watcher会在之后对这个key的修改发生后立即发送(在任何后续写入前发生)。我们可以根据watch实现原子性,客户端A在写入key1,key2,而客户端B需要读取key1,key2。为了保证原子性,客户端A的代码为

del("ready")
write(key1, ...)
write(key2, ...)
create("ready")

客户端B的代码为

if(exist("ready", watcher)) {
    read(key1)
    read(key2)

    if watcher get notified {
        restart the whole process
    }
}

链式复制

实现复制状态机有两种方式,一种是通过raft/paxos等一致性算法,由leader节点向follower节点拷贝日志。这种方式的缺点在于不适合存储大量的数据,以及提供高吞吐的读写操作。第二种方式就是通过协调服务(zookeeper或类似的一致性服务)+主从备份。第二种方法在业界得到广泛的应用。

具体来说,就是假设有三台服务构成主从关系,由协调服务为每个服务分配一个唯一递增编号,最小编号的服务即使master,分别记作S1,S2,S3。拷贝关系见下图

S1 --> S2 --> S3

且所有写操作发生在S1,而所有读操作发生在S3。一旦某个日志被复制到S3,则认为该日志提交成功。

链式复制能很简单处理宕机的问题,任意机器的下线,只需要简单地把它从链中剔除,由原来的机器按序组成新的链即可。

对于新加入节点,我们不能直接将它作为尾部节点,因为它的数据过度落后,这会导致检查点前移的问题。因此新加入节点应该从当前尾节点进行复制,直到得到了所有的更新,之后才能成为尾节点。

链式复制的性质:

  • 实现了读写分离。
  • 每个节点最多只需要向一个节点拷贝数据。
  • N个节点之间只需要建立N个TCP连接。
  • 简单的宕机和恢复操作
  • 任意节点下线都会导致链条的重建,这段期间写操作需要被block,导致短时间的整体服务不可用。

链式复制存在一个扩展,允许随着机器的增加实现水平扩容。具体就是构建多条chain,每条chain的首尾节点尽量不同,之后对数据进行切片,让数据落在某一条chain上。这样读写流量就能均分到所有机器上。这种情况下,每条单独的链表都能保证各自的线性一致性。

由于所有写操作必须等待尾部节点接受到对应的日志后才能返回,因此写操作会有额外的延迟。但是一般链式复制只会引入少数的节点,因此延迟一般是可以接受的。(实际上超过2个节点后,只能提高容错能力,并不能提高性能)

缓存一致性

Frangipani是一个分布式文件系统,和常规的CS架构不同,它采用文件存储和文件读写分离的方式,即客户端负责IO的逻辑,而文件存储只负责存储块数据。

设计这样的文件系统存在以下挑战

  • 缓存一致性
  • 写入的原子性
  • 操作过程中宕机后能恢复

实现缓存一致性,Frangipani通过一个分布式服务维护一张lock表(锁服务),其中记录每个文件和文件的所有者。并且每个工作站维护一张文件表,其中包含每个缓存在本地的文件的状态,busy或者idle。工作站在修改一个文件之前,需要获得这个文件对应的锁。在一个工作站WS1需要修改或读取某个文件的时候,需要请求锁服务,如果此时文件被另外一台工作站WS2所获得,锁服务会请求WS2要求取回锁,WS2需要将自己的变动写入到分布式文件存储后再释放锁。

原子性同样通过锁互斥来保证。

宕机后恢复通过使用write-ahead logging来保证,将每一步操作记录到日志中(记录发生在执行操作之前)。至于如何保证日志的写入是原子的,基本方式就是写入的同时将校验和也一同写入。在宕机重启后,demon服务会读取WAL并应用所有的更改。

日志的内容为,块号,版本号,和新的数据,以及校验和。版本号用来避免宕机重演日志时覆盖宕机后发生的变更,对同一个文件块的修改会递增文件块的版本号。

在缓存回写的过程中,首先将日志发送到文件存储,之后发送更新的文件块。日志重做时可能是发生在一台不同的机器上。

分布式事务

分布式事务追求的一致性目标称为可序列化。可序列化是指多个事务的执行结果一定等同于按某种顺序逐一执行这些事务(不过多个并发的事务之间的执行顺序不保证)。可序列化不保证实时性,因此弱于线性一致性。

要保证可序列化的方式称为并发控制。并发控制的方案有

  • 悲观(锁)
  • 乐观(如果不可序列化则终止事务)

悲观锁分为

  • 两阶段锁
  • 两阶段提交

两阶段锁为每一条记录建立一把锁。两条标准

  • 线程在使用记录前获得对应的锁。
  • 直到提交,线程才释放锁。

两阶段锁是细粒度锁,它并不一次性获得所有需要的锁,而是在事务执行过程中按需获得锁,因此它的并发度会更高。也由于这个原因,它可能会导致死锁。判断死锁的方式有两种

  • 超时机制
  • 等待图是否有环

两阶段提交是指客户端将要执行的分布式事务提交给协调器,由协调器控制不同的服务完成事务。协调者发送操作要求服务将操作写入到日志中,之后对所有服务调用prepare请求,判断它们是否有能力提交日志(这时候服务要获取所有所需的锁)。如果所有服务都prepare好了,则发送commit请求给服务来提交事务。

为了能保证宕机恢复,参与者和协调器必须所有操作写入日志。由于协调者的宕机都会导致其余所有参与者的等待,因此常见的方式是用raft实现协调者的高可用。

如果任意一个参与服务没有prepare好,则放弃提交,终止事务。如果所有某个参与服务prepare好了但是随后宕机,则重启后必须恢复原来的状态(获得所需的锁,重新进入prepare状态),等待协调者重试。

由于两阶段提交要求参与者必须将日志落盘,而磁盘的WPS往往只有1000,这也导致一台机器的事务上限只能达到1000/s。

Spanner通过Paxos实现副本,在多副本集合中要直接通过调用leader来实现写入,为了避免leader过期但是依旧认为自己是leader,需要追加租约机制,就是leader必须定时重新获得选票,而在租约期间其它follower不能成为leader。

Spanner中只读事务比读写事务要快10倍,原因是只读事务不适用2PL和2PC,只从本地分片中进行读取。

Spanner提供了以下两种一致性

  • 外部一致性,即如果T1在T2开始之前就提交了,那么T2可以看到T1的变更。
  • 可序列化

Spanner的只读事务读取的并不是最新提交的数据,而是使用了快照隔离的技术,具体就是为每个事务分配一个时间戳。对于读写事务,时间戳记录的是事务发送到协调者的时间,对于只读事务,则记录事务开始的时间。之后按照时间戳依次执行事务。每个分片存储记录的数据和时间戳,存储同一条数据可以有多个不同时间戳的版本。

但是由于Spanner的只读事务只从本地副本中读取,如果本地副本不是leader而是follower,则数据可能存在滞后,Spanner通过安全时间的技术来保证只读事务读取到了最新的提交。具体来说就是事务的写入在raft日志中按照时间戳有序发送,而只读事务必须等待副本的日志中出现比自己时间戳更大的事务写入(并且还需要等待那些时间戳小于自己,但是完成了prepare操作,还没开始提交的事务的完成),才执行读取操作。

安全时间的机制要求所有事务的时间戳是正确的,否则就会破坏外部一致性。为了保证时间戳的正确性,Spanner使用原子时钟而非机器内置的时钟(在每个数据中心布置一个原子时钟以获取正确的时间),误差仅在微秒和毫秒级别。由于请求时间戳也可能存在不确定的延迟,这些延迟也会被估算在内,实际返回的时间戳是一个可能的时间区间(earliest,latest)。

引入时间区间,需要稍微修改只读事务的安全时间机制。具体就是等待直到出现earliest大于当前事务时间戳的日志出现。而读写事务和只读事务的时间戳直接取latest,读写事务的提交必须等待直到确定时间绝对超过了latest之后才允许进行提交(由协调者负责等待操作)。

乐观并发控制

FaRM是一个分布式内存数据库,它的特点是通过RDMA来实现高性能的读写操作。FaRM的并发控制是通过乐观并发控制来实现的。FaRM通过90台机器支持了每秒一亿次的事务。Fa(fast)R(remote)M(memory),所有机器都处于一个数据中心,通过分布式内存,从而提交日志不需要刷盘而是直接保留在内存中。

内存分成若干region,每个region有一个备份,单个region大小为2GB。

为了避免数据中心断电,每个机器都有一个备用电源。备用电源的电量足够将内存中的数据写入到SSD中。

内存中的region用于存储对象,每个对象都有一个oid(即区域号+区域内偏移),对象头部包含一个64位的数字,其中一位表示锁,另外63位表示版本号。

为了提高CPU利用率,传统上网络交互需要通过内核的驱动程序去访问机器上的网卡(对网卡上的寄存器进行读写),而FaRM采用了kernel-bypass的技术,它将网卡的存储分成若干个接受队列和发送队列,并且将队列映射到应用程序的地址空间,之后应用程序对网络的使用可以不经过内核。并且为了避免网卡发送中断和处理中断的开销,FaRM采用了polling的方式,即应用程序不断地轮询网卡的队列。

通过这些方式FaRM可以直接访问不同机器内存中的对象,而不需要经过内核的网络栈。这种方式称为R(remote)D(direct)M(memory)A(access)。这种方式发送包的延迟只有5us,远小于磁盘IO。

FaRM采用乐观锁的方式,读取对象时不加锁,只有在提交事务之前的验证步骤检查对象的版本号是否一致,如果不一致则终止事务。FaRM实现了可序列化。

为了实现严格可序列化,事务的处理和提交分为

  • 执行阶段:协调者读取对象到本地内存,并执行修改操作
  • 提交阶段
    • 锁阶段:从primary获得所有修改的对象的锁,这里取锁并不等待(test-and-set),而是一旦无法取到,就认为事务失败(因为使用了旧值)。这个阶段还会把所有写操作记入内存日志。
    • 校验阶段:对于读取但是不修改的对象,需要通过primay查询其版本号,如果不一致则终止事务,或者如果锁位被设置,则终止事务。
    • 提交backup阶段:在backup节点执行提交操作(写入内存日志)
    • 提交primary阶段:在primary节点执行提交操作(写入内存日志)

Spark

Spark中的操作分为Transformation和Action,前者惰性执行,而后者会触发真正的执行。Spark中的所有操作都是函数式的,意味着对相同数据执行相同操作得到的结果总是不变的。

在spark中,对于中间的变量,可以通过persist将它固化,这样之后对相同变量的多次消费则可以只计算一次。

errors = input.filter(_.startsWith("ERROR"));
errors.pesist();
errors.filter(_.contains("MySQL")).count();
errors.filter(_.contains("HDFS")).count();

这里如果没有persist,则两次count的action会导致errors被重复计算两次。而加了persist后,errors则只会被执行一次。

默认情况下,整个流都发生在内存中,不会涉及到磁盘操作。在persist的时候可以执行reliable参数,这样spark会把数据存储在HDFS上,这个称为检查点。

Spark中的scheduler负责调度worker到不同的分区,从而实现并行计算。一般worker数量会少于分区数,从而保证负载均衡(worker可以处理多个较小的分区或少数较大分区)。

Spark中的Driver负责收集用户的输入(代码),并将其翻译生成执行图,交给scheduler进行调度。

Spark会为执行流程生成数据的Lineage(血缘图),血缘图中有多个上游的结点称为宽依赖,只有一个上游的结点为窄依赖。窄依赖应该被调度到和父结点相同的结点,来避免数据拷贝。而宽依赖一般需要涉及机器之间的通讯。

如果在执行过程中,某个Worker宕机了,scheduler会重新计算Worker负责的部分。某个结点的失败需要血缘图上游所有结点都重新计算,可能会非常昂贵,作为程序员我们可以在代码中插入一些存储点(将结点数据存储到HDFS上)。

数据库和缓存一致性

服务扩展

  • 第一阶段:单个DB存储数据,多个前端服务负责计算
  • 第二阶段:DB进行分片,多个前端服务负责计算
  • 第三阶段:DB和前端之间架构缓存,DB写入key时异步从缓存删除key

数据架构

  • 容量问题:通过对数据分区解决
  • 容错问题:通过使用多机房,一个主机房和多个从机房,所有写入都写入到主机房的DB,从机房仅备份主机房DB。
  • 热点数据问题:使用多个cluster来复制热点数据,每个cluster都有独立的前端和缓存,非热点数据用一个单独的regional pool来存储
  • 集群预热:当增加cluster的时候,新的cluster缺少数据(冷集群),这时候不能直接去DB查询,应该在预热阶段(几小时内)都从暖集群的缓存中查询数据,并写入到自己的缓存中。
  • 惊群问题:使用租约和锁来解决
  • 单点故障问题:某台缓存服务宕机,这时候需要访问备份用的gutter pool,这是一个临时pool,没有数据删除。

竞争情况

  • Stale set: 对于同一个键,A读取v1数据,之后B写入v2数据并失效缓存,A将v1数据写入到缓存。这种情况用lease解决,A放入缓存后检查lease是否失效,如果失效,则需要失效数据。A和B失效数据的同时需要失效lease。
  • Cold cluster: 对于一个冷集群,一个客户端向DB写入关键字k->v2,同时另外一个客户端向暖集群请求k,得到了v1,并写入当前集群。解决方案是当一个key被写入DB后,所有冷集群在2s内都不允许向缓存写入key,以保证失效信息能被暖集群处理。
  • 主从区域:从区域客户端写入数据到主机房DB,之后从区域客户端向缓存执行get操作拿到老版本数据,因为数据是通过squeel从主机房的DB拷贝到从机房的DB,这需要时间,这样就不能保证写入即可见。解决方案是从机房会给key加上标记,标记后的key都从主机房的缓存中获取。