• 本笔记内容和部分图片来自Coursera课程Cloud Computing https://class.coursera.org/cloudcomputing-001
  • 公有云:对所有人开放使用。私有云:仅对公司内部开放。
  • 云 = 海量数据存储 + 计算能力
  • 目前说的云计算,通常指在云上以虚拟机等方式按需按量提供计算资源。
  • 新时代的云计算特点
    • 规模大
      • 截止2012年,FaceBook拥有18w台服务器
    • 按量使用
      • HaaS(Hardware as a service)
        • 直接购买机器。不建议使用。
      • IaaS(Infrastructure as a service)
        • 购买灵活的计算和存储能力,可以以虚拟机的形式出现。例如AWS EC2, Azure。
      • PaaS(Platform as a Service)
        • 基于IaaS,但是暴露出来的不是虚拟机而是软件控制台。例如Google/Sina的App Engine。
      • SaaS(Software-as-a-Service)
        • 直接暴露服务。例如企业级的分析服务,云邮箱,云笔记等。
    • 新的编程范式
      • 以Hadoop为首的各种MapReduce框架
  • MapReduce
    • Map:并行的处理相互独立的数据(块),产生中间产物:Key-Value对
    • Reduce:根据Key值,并行的合并所有的Key-Value对。(相同Key值放入同一个Reduce任务中)
    • 应用案例:海量日志中的正则查找、海量KV对的反转(查找被引用的次数)、从日志中统计每个URL的访问次数、排序
    • Terasort(Hadoop自带的排序作业)
      • 不需要特殊的Mapper和Reducer类。用默认的 IdentityMapper和IdentityReducer。
      • 第一步:采样,随机取样并排序,根据reducer的数量n得到n-1个分割点
      • 第二步:Map,将需要排序的字段设为key
      • 第三步:Shuffle,根据分割点构造决策树,将KV对交给对应的reducer,使第i个数据块的所有数据都会比第i+1个中的数据小
      • 第四步:Reduce,读取输入数据进行局部排序(默认已做好,归并排序)
      • 第五步:顺序输出每个Reducer的结果即可
    • 数据存储:
      • Map输入:HDFS
      • Map输出:本机磁盘文件
      • Reduce输入:远程磁盘文件
      • Reduce输出:HDFS
    • YARN的组件
      • ResourceManager:(全局唯一)基于应用程序对资源的需求进行调度
      • NodeManager:(对应每个节点)每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况并且向调度器汇报
      • ApplicationMaster:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理失败任务
    • 容错性
      • NM和AM向RM发送心跳
      • Speculative execution:由于最慢的task(可能因为机器性能、分配不均等)会拖累整个job,hadoop会跟踪每个task完成的百分比,当发现一个task进度明显落后时(通过某种算法),会在另一个节点上复制出一个新task,二者同时运行,哪个先运行完,则使用它的结果并把另一个task kill掉。
        • 其实他也没speculate什么东西,名字这么起的就这么叫了
      • HDFS默认每份数据存储3个副本
        • 一个map任务优先在数据所在机器上执行,次之在数据所在rack中的机器上执行,再次之随机(网络拓扑临近优先)
  • Gossip算法
    • 多播(组播)Multicast:一个节点传递消息至多个节点
    • 基于树的多播协议:SRM、RMTP
    • Gossip协议:每一个收到了消息的节点,会不间断的周期性的向周围的m个随机节点发送Gossip Message(UDP)
      • 特点:冗余、分布式容错、最终一致、去中心化
      • 三种通讯方式:
        • push:A节点将数据(key,value,version)推送给B节点,B节点更新数据
          • 传播时间复杂度 clog(n)
        • pull:B将数据(key,version)推送给A,A将本地比B新的数据(key,value,version)推送给B,B更新本地数据
          • 传播时间复杂度 log(log(n))
        • push/pull:与pull类似,只是多了一步,B再将本地比A新的数据推送给A,A更新本地数据
      • 对于路由器连接多个子网络的情况,完全随机会增大路由器负载O(n),应采用P(选择外网节点)=1/内网节点数 的方法降低负载。
  • 组员管理(group membership)
    • 在集群中,每个机器维护着一份可用的成员列表(membership list)。当有机器宕机时,集群中的故障检测程序会监测到并广播给集群中的其他成员。
    • 失效检测器
      • 节点失效在分布式系统中是常见的,因此每个分布式系统都需要失效检测器
      • 由于分布式通讯天生的一致性缺陷,监测器只能保证失效一定会被监测到(完备性Completeness),但不能保证没有误判(准确性Accurancy)。
      • 方法一:机器们向中心节点发送心跳包,超时则判断为fail
        • 缺点:中心节点压力大
      • 方法二:将机器连成一个闭环,每台机器向左右两台发送心跳包
        • 缺点:无法解决同时多台机器失效
      • 方法三:gossip-style协议,每个机器周期性的多播它的membership list,每台机器接收时会逐行做merge,以新代旧
        • 当某个节点超时T1时,会标记为fail,时间超过T2(T2>T1)时才会从membership list清除(确保清除时所有机器已经将它标记为fail)
      • 方法四:SWIM协议,失效检测机器在每个协议周期内,随机的向成员列表中的一台机器Pj(或一个进程)发送ping;如果没有收到ack,则会再选择k台机器(或进程)作为代理向Pj发送ping,如果都没有收到ack,则会判定Pj失效(或可疑)并广播。
        • 猜测机制:如果都没有收到ack,会标记Pj为可疑,再过一段时间才会标记为失效。
        • 广播方式——Infection-style Dissemination:在ping和ack消息中附带上成员列表更新信息。
  • P2P(Peer to Peer)
    • Napster,早期的一个著名的P2P软件,提供MP3分享下载。如今napster协议已经开源。
      • 客户端(Peer)和Server建立TCP连接
        • 上传文件:Peer上传文件时,Server保存文件名、IP和端口号,Server不保存文件。
        • 搜索:Server在文件列表中搜索关键字,返回<filename,ip,port>列表。Peer收到列表后对每个ip发送ping,用户选中下载时会P2P直连下载
    • Gnutella协议
      • 与napster不同的是,没有使用中央数据库存储可用文件列表,使用的是分布式查询法
      • 在客户端启动时,会通过自举找到相邻其他节点,并记录下来
      • 搜索文件:客户端会向相邻节点发送查询请求,同时接收到的节点会继续传播查询请求直到TTL失效(每次传播-1,默认值为6~7);每个收到查询请求的节点会搜索本地资源,如果匹配则会原路返回ip、端口、文件名等信息。
        • 为了防止环形重复查询(每一次请求在一个节点上仅会被传播一次),每个节点缓存了最近查询请求列表
        • 每一次传播,查询请求会发送被到除了传入节点以外的所有相邻节点
        • 在查询结果返回的途中,如果原路已经失效,则直接丢弃结果
      • 下载文件:直接使用http协议向资源节点发送get请求,并附带range字段以支持断点续传。
        • 如果第一次http失败,说明资源节点可能有防火墙,此时下载方会通过查询传播路径向资源方发送push消息(包含下载方的ip和端口),资源方收到消息后主动建立tcp连接,然后下载方就可以正常发送http请求了。
      • 更新相邻节点表(overlay graph):每个节点会定期向周围发送ping(类似query,但TTL较小),收到消息的节点会沿原路返回pong。
        • ping/pong可能占用大量网络资源,需要考虑缓存或多路复用以优化
    • FastTrack协议
      • Napster和Gnutella的结合:保留了Gnutella的相邻节点拓扑,其中的一些节点会变成超级节点(通过声望值产生),超级节点类似Napster的Server,记录了附近的<filename,ip,port>列表。
        • 搜索时,如果节点附近有超级节点,会直接去超级节点查询而不是逐层的广播请求。
    • BitTorrent
      • 下载过程:
        • 下载者获取.torrent文件(包含了tracker地址和文件信息)
        • 下载者连接tracker服务器(每个tracker对应一个或多个文件),tracker会告知下载者当前正在下载该文件的节点ip和端口(tracker通过心跳维持该列表)。
        • 返回的列表包含了seed(持有完整文件的节点)和leecher(持有文件片段的节点),下载者通过与他们直连,进行下载,此时下载者也作为一个leecher在共享着文件。
      • 每个文件都会被分成很多分片,如何选择先下载哪一块?
        • 局部最少优先(Local Rarest First):选择附近节点中,被复制的最少的分片。
    • Chord
      • DHT=Distributed Hash Table
        • 每个文件索引存储为(Hash值, 机器IP)的格式。通过把索引拆分到多个节点,实现分布式查找。
        • Hash函数通常使用SHA-1
      • 主要思想:一致性Hash
        • 节点分布:计算每个节点的Hash值(mod 2的m次方,下同),“放置”到Hash环(Hash环代表了整个P2P网络)中
          • 每个节点都存有顺时针下一个节点的地址(successor),逆时针(predecessor)可选
          • 每个节点还维护一个Finger表,保存了Hash值为N+2的m次方的节点地址,用于帮助定位
        • 文件分布
          • 新增文件时,文件会根据文件名(默认唯一)的Hash值,存放在上述环中文件Hash值位置顺时针的下一个节点上。
        • 查找文件所在节点的方式
          • 1. 计算出K=文件名的Hash值,N=当前节点的Hash值
          • 2. 如果K在N和下一个节点的Hash值之间,则下一个节点即为文件所在节点
          • 3. 在当前节点的Finger表中,找出沿顺时针方向的距离(即节点的Hash值要小于K)最接近K的节点,向其发送查询请求(RPC)
          • 4. 收到查询请求的节点重复2-3步,直到找到文件所在节点
          • 该方式的时间复杂度为O(log(n))。可用来查找、新增、删除文件等操作。
      • 节点失效:
        • 在每个节点的前后节点上,复制一个副本。当查询请求跳到含有副本的节点时,该节点直接返回成功。可以防止文件丢失,减少节点负载。
      • 新增节点:
        • 根据查找算法,找到新增节点N0的Hash值对应的后继节点N1,并将新增节点的后继(successor)设置为N1,将N1的前继节点设置为N0。
        • 所有节点都遵守Stabilization protocol:每个节点周期的从其后继节点获取信息,用来更新自己的后继指针和Finger表。使得整个系统正确。(细节略)
        • 拷贝Hash值在N0和N1之间的文件到N0。
    • Kelips
      • 每个节点根据Hash值取模,“放入”对应的Hash桶中
        • 每个节点都知道桶中所有其他节点的地址
        • 每个节点都知道其他每个桶中至少一个节点的地址
      • 每个文件只存在本地机器上,但是会把元数据(文件名, IP)复制到文件Hash值对应的Hash桶中的所有节点上
        • 因此,查找文件只需计算出文件的Hash值然后到相应Hash桶中请求元数据即可。查找的时间复杂度O(1),空间复杂度O(√n)
      • 维护节点列表:采用Gossip算法
      • 维护文件元数据:文件持有节点周期性的向对应Hash桶发送广播,超时则删除元数据
  • Key-Value存储
    • Cassandra
      • 最初由Facebook创建,后进入Apache开源
      • 如何确定一条数据存储在哪一台机器上
        • 通常使用了DHT(见上节的Chord),一个DataCenter对于一个虚拟一致性Hash环,使用partitioner进行key-server映射
      • 分区策略(partitioner)
        • 第一种SimpleStrategy,对应单个DataCenter
          • 1. RandomPartitioner:一致性Hash,类似Chord,通过键值的MD5实现随机分布
          • 2. ByteOrderedPartitioner:按键值的字节顺序划分,适用于范围查询
        • 第二种NetworkTopologyStrategy,对应多DataCenter(一般指分布在不同地理位置的多个机房)
      • Snitches:将IP映射到具体的机架(rack)和DataCenter中
      • 数据写入
        • 数据写入要求无锁(lock-free)且快速
        • 客户端向集群中任意一台(称为coordinator)发送请求,coordinator通过partitioner找到key对应的所有副本节点,然后分别发出写请求,当其中的x个节点返回成功后,coordinator会向客户端返回写成功
          • 这里的x由用户定义,可以是1,可以是全部,也可以是超过一半(QUORUM, 默认)。详细信息见下文
      • 数据写入的容错:Hinted Handoff 机制
        • 如果任意一个副本节点宕机,coordinator向其他副本节点发送写请求,并本地保存一份数据直到宕机的节点恢复
      • 在一个节点中的写入动作
        • 首先写入磁盘的commit log(用于错误恢复)
        • 然后写入memtable(一种内存结构),用作write-back cache。
        • memtable满足一定条件后批量刷新(flush)到磁盘上,存储为SSTable,分为数据文件(key,value)和索引文件(key,postion)
        • SSTable是只读的,因此只有顺序写,没有随机写
        • 对于更新操作,SSTable只会追加(key-新修改的列数据),原有数据和其他列仍在旧文件中
      • 快速判断一个key在SSTable是否存在:Bloom Filter
        • 如果key存在,则Bloom Filter一定返回true。但是如果Bloom Filter返回true,不能保证key一定存在。
        • Bloom Filter是一个大型的bitmap。对于一个key,它会使用k(用户定义)个hash函数将其映射至k个位上。写入时会将映射的k个位全部置为1,查询时会判断k个位是否全部为1。
      • 数据压缩(compaction)
        • 由于SSTable对于更新操作采取的是追加写的方式,因此同一个key的可能会出现在多个SSTable中,因此需要compaction合并SSTable文件。
        • compaction周期性的在机器本地执行。
        • 删除操作会在log或SSTable上做标记,并最终在compaction阶段被回收。
        • 具体的压缩算法此处略去。
      • 读操作
        • 读请求首先发送到coordinator,coordinator向相应最快的x个副本发送读请求(若未响应则会向更多副本发送),当收到x条结果后,coordinator会选取时间戳最近的结果并返回。
        • 若其他副本返回的结果与最新时间戳的结果不同,则coordinator会发起读修复(read repair):将最新的数据更新到这些副本上。
      • Cassandra的成员管理(membership)
        • 由于任意一台机器都可能成为coordinator,因此每台机器都需要维护一套集群内的所有机器列表
        • Cassandra使用了上文所述的gossip-style协议,具体如下
        • 当节点第一次启动,它会从配置文件中获取seeds node,从而获取集群中其他节点的信息
    • CAP定理
      • 在分布式系统中,以下三个特点至多只能满足两点:
        • Consistency 一致性:在任何时间点,所有节点都持有相同的数据版本
        • Availability 可用性:系统总是能够正常操作,并反应迅速
        • Partition-tolerance 分区容错性:即使分区之间的网络消息丢失,系统仍能够正常提供服务
      • Cassandra实现了:最终(弱)一致性、可用性和分区容错性
      • 传统关系型数据库实现了:在一个分区下的 强一致性、可用性
      • 与RDBMS的ACID不同,例如Cassandra的Key-Value存储遵循BASE(Basically Available Soft-state Eventual Consistency)
        • Soft-state指内存的memtable
    • Cassandra的一致性级别(对应上文的x值,下文的n代表数据的副本总数)
      • ZERO/ANY:(只针对插入和删除)coordinator会缓存写入操作并立即返回。不保证一致性但是速度最快。
      • ALL:x=n,coordinator向所有副本节点发出读/写请求,全部成功后返回结果。任何一个节点没有成功,该操作均失败。保证强一致性但速度最慢。
      • ONE:x=1,coordinator收到至少一条读/写请求结果就会返回
      • QUORUM:每次写操作会向w个副本节点发出请求,读操作会从r个副本节点请求,w和r在一定条件下可以保证每次读操作都能读到新数据,做到了强一致性
        • w和r需要满足的条件:w+r>n 并且 w>n/2
        • 如果读操作多,那么w取值要大一点;如果写操作多,那么r取值要大一点;具体使用可以根据应用场景调节w/r的值。
        • 要求w>n/2的原因是保证能够检测出写冲突。
        • 如果不能满足w+r>n,则Cassandra实现的是最终一致性
    • 其他一致性模型:
      • CRDT(Conflict-free replicated data type):一种数据结构,可以保证写操作不会冲突,即写操作的顺序颠倒不会改变最终结果(例如计数器,每次只能+1),从而实现最终一致性。
      • Red-blue Consistency:分为blue(操作顺序无冲突)和red(要求有序执行)两种操作
      • Causal Consistency(因果一致性):如图,Client A的写入随后被Client B读到,则称A→B存在因果关系。如果一个客户端/进程对某个key发起读操作时,有与之相关的因果关系,则读到的结果一定是在因果关系链中的对该key写操作或其之后的结果。
    • 强一致性模型:
      • Linearizability:每个client的每个操作都会即时的对其他client可见。由于需要全局的同步时钟,较难实现。
      • Sequential Consistency:对于系统的所有操作,找到一个“合理”的顺序(不保证遵循物理时间),并使所有client/processor遵守这个顺序执行。
    • HBase
      • HBase是一款分布式的NoSql数据存储,是BigTable的一种开源实现。它在availability和consistency之中选择了consistency。
      • 整体架构如下图所示:
        • 由Client发起读写操作,首先会从zookeeper获取需要连接的HRegionServer地址。
        • HRegionServer(可以视作一台服务器)负责接收client的请求,它包含了若干个HRegion。
        • 每个HRegion对应Table中一个Region,Region是HBase数据存储和管理的基本单位,一个Table会随着数据的增加而按行分割成多个Region。
        • 一个Table的字段需要由用户分割成若干组Column Family。同一个Region下,每个ColumnFamily的所有字段将会共同存储在一个Store中。
        • Store的MemStore是内存缓冲,写满后会写入StoreFile,StoreFile也会定期执行compact进行文件合并。StoreFile实际对应一个存储于HDFS上的一个HFile文件(类似SSTable)。
        • HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。
        • rowkey是按照字典顺序排序存储的,且必须保证rowkey的全局唯一性
      • 维持强一致性:每个key只由一个HRegionServer服务,并在每次写操作写入memStrore前写入HLog。
        • 对于已经持久化的(进入StoreFile)数据,HLog会定期删除
        • 如果HRegionServer宕机,HMaster会通过zookeeper感知,取到它的HLog,并将失效的Region重分配。被分配的节点在加载新Region时会通过历史HLog,将数据恢复至该节点的MemStore中。
          • 失效恢复期间,该region处于不可用状态。因此Hbase是牺牲可用性来维持一致性的。
        • 尽管一条数据只会写入一个regoin。但是HDFS很有可能是多副本的存储,二者不矛盾。
      • HBase也支持多种数据备份方式,此处略
  • 时钟和顺序
    • 时钟偏移(clock skew):两个时钟的时间值不同
    • 时钟漂移(clock drift):两个时钟的时钟频率不同
    • Cristian算法
      • 如图,所有的时钟P定期与时间服务器S进行时间同步,S会在收到请求后返回S的当前时间
      • 为了考虑到网络传输延迟,设P→S最短网络延迟时间min1,S→P最短网络延迟时间min2,P收发消息间隔时间RTT,S返回的时间为t。则P在收到结果一刻的时间为:t+(RTT+min2-min1)/2,即图中红色线段的中点时间。
      • 注意:
        • 为了保证事件的顺序,时钟的时间只能增加不能后退
        • 时钟频率可以调快或调慢
    • NTP(Network Time Protocol)
      • 如上图,绿点为NTP Server,组成了一棵树。客户端为叶节点,每个节点向其父节点同步时间。
      • 在进行两次时间请求后,客户端就可以进行时钟同步了。记录的四个时间点ts1、tr1、ts2、tr2如上图所示。则child的时钟比parent的时钟了(tr1-tr2+ts2-ts1)/2。
        • 此结果基于一个假设:child和parent收发网络消息的时间相等。
    • Lamport(Logical) Ordering
      • 源自 Lamport L. 《Time, Clocks, and the Ordering of Events in a Distributed System》,也是计算机科学史上被引用最多的文献之一
      • 以下三个条件定义了事件间的happen-before关系,用->表示:
        • 在同一个进程中,a在b之前发生,那么:a -> b
        • a=进程P1发送m到P2,b=进程P2收到来自P1的m。那么:a -> b
        • (传递性)如果a -> b, b -> c,那么:a -> c
      • 并非所有事件之间都有->关系,因此它满足的是局部有序
      • Lamport逻辑时间戳的实现
        • 每个进程维护一个单调递增的计数器,充当逻辑时钟
        • 进程每执行一个指令,逻辑时钟+1
        • 每个发送消息指令携带本进程的时间戳
        • 每次接收消息,逻辑时钟会更新为 max(本地时钟,消息时间戳)+1
      • 注意:
        • a->b 可以推导出 a的时间戳小于b的时间戳
        • a的时间戳小于b的时间戳 可以推导出 a->b或者a,b是并发的
    • Vector Clock
      • 假设一个集群中有n个进程,则每个进程都有一个长度为n的vector,记V[1...n]。在进程i中,Vi[j]的值就是该进程有关于进程j的最新时间戳。
      • Vector Clock的实现
        • 进程i在执行指令时,仅对Vi[i]进行自增操作
        • 每个发送消息指令携带本进程的全部vector[1...n]
        • 进程i收到进程m的消息时,令Vi[i]+=1,Vi[j]=max(Vm[j], Vi[j])
  • 全局快照(globle snapshot)
    • globle snapshot = globle state = 分布式系统中每个进程的状态 + 每条网络通道的状态
    • 分布式系统的全局快照的作用:
      • 检查点:用于错误恢复
      • 垃圾回收:回收掉不可达的(没有被引用的)对象
      • 死锁检测:可用于数据库事务系统
    • 实现方案
      • 如果使用全局时钟同步,使所有进程在指定时间点上报状态,很难达到满意的结果。其缺点在于,时钟同步总会有细微的偏差,而且网络通道的状态很难记录。
      • 使用分布式系统的因果性关系,就可以表示出全局快照
    • Chandy-Lamport全局快照算法
      • 系统模型
        • 设系统共n个进程
        • 每对进程都有两条单向的网络通道,如Cij和Cji
        • 每个网络连接都遵守FIFO顺序
        • 网络消息既不会丢失也不会重复
      • 要求
        • 快照不能妨碍正常的应用程序执行
      • 算法内容
        • 首先有一个快照发起进程,会记录本进程的状态,同时向其他n-1个进程发送标记(marker)消息,并开始记录这n-1个进程向本进程的输入消息(设本进程为i,记为C1i...Cni)
        • 当一个进程i收到来自进程j的标记(marker)消息时:
          • 若这是进程i收到的第一个标记消息并且i不是快照发起进程,则
            • 进程i记录自身状态(记Si)
            • 将Cji记录为空
            • 向其他n-1个进程发送标记信息
            • 开始记录除了来自进程j的其他n-2个进程向进程i的输入消息
          • 否则(已经收到过标记消息)
            • 停止记录Cji的消息
        • 当所有进程都收到标记消息(记录自身状态),并且所有进程都收到其他n-1个进程的标记消息时(记录输入网络通道状态),视为快照记录结束。如果需要,中心服务器此时可以收集这些状态信息,形成全局快照。
      • 算法原理
        • Cut(割集):在每个进程或通道的时间边界,若事件发生在时间边界之前,则称该事件在割集内。
        • Consistent Cut(一致割集):在一致割集C中,如果其中任意一个事件e,满足f→e(f happens-before e),则f一定也在一致割集C中。
        • 全局快照算法获得的快照就是一个一致割集。如下图,绿色虚线就是一个一致割集,它与每个进程和通道的交点,就是上述全局快照获取的状态。
        • 证明过程略(可用反证法)
    • 分布式系统的正确性(correctness)
      • 包括活性(liveness)和安全性(safety)
      • livenes:好的事情最终一定会发生(例如:failure一定会被检测到)
      • safety:保证坏的事情一定不会发生(例如:死锁一定不会发生)
      • 在一个异步的分布式系统中,同时保证活性和安全性是很困难的:
        • 上文提到过的失效检测器,Completeness(=liveness)和Accuracy(=safety)不能同时被保证。
        • 下文将提到的Consensus问题中,在时限内做决定(=liveness)和决定的正确性(=safety)不能同时被保证。
        • 可以类比现实的司法系统,同时保证所有罪犯都被抓捕(=liveness)并且 没有无辜的人被抓捕(=safety)是很困难的。
      • 在全局状态中。设Pr为某个全局状态的特性。
        • liveness: 设状态S'满足Pr,若任意一个状态S满足S→S',则S满足Pr
        • safety: 设状态S满足Pr,则任意一个状态S'满足S→S',则S'满足Pr
  • 组播(multicast)问题
    • 组播:一个发送者发送给一组指定的接收者
    • 组播的顺序影响着分布式系统的正确性(correctness)
    • 保证FIFO顺序的组播
      • 在每一个接收者的角度上,接收消息的顺序和发送消息的顺序都要保证一致。不考虑不同接收者相互之间的顺序。
      • 实现方式:
        • 设集群内有n台机器,对于每台机器i都持有一个长度为n的数组(vecotor)Pi。
        • 当机器i发送消息时,Pi[i]+=1,并Pi[i]值作为消息序号组播出去
        • 当机器j收到来自机器i的消息时,设收到的消息序号为S
          • 如果Pj[i]+1=S,则成功接收消息,并设置Pj[i]=S
          • 如果Pj[i]+1<S,则将消息缓存起来,直到后续条件满足时再将消息发送给应用程序
          • 如果Pj[i]+1>S,则丢弃消息,因为这种序号的消息已经成功接收过了
    • 保证逻辑顺序(causal ordering)的组播
      • 在全局中,保证逻辑顺序,即Lamport-ordering的happen-before关系
      • 保证逻辑顺序一定保证了FIFO顺序,实际也更倾向于使用逻辑顺序
      • 实现方式(上述FIFO的加强版):
        • 设集群内有n台机器,对于每台机器i都持有一个长度为n的数组(vecotor)Pi。(同上)
        • 当机器i发送消息时,Pi[i]+=1,并将整个Pi数组组播出去
        • 当机器j收到来自机器i的消息时,设收到的数组为M(M等于Pi),只有同时满足下面两个条件,才会消费消息,并设置Pj[i]=M[i]
          • 当前机器记录的机器i的消息序号是连续的,即M[i]=Pj[i]+1
          • 机器j与i满足happen-before关系,即对于所有的k!=i,满足M[k]<=Pj[k]
    • 保证全序性(total ordering)的组播
      • 在全局中,每一个接收者接收到消息的顺序都是一致的。(与发送顺序无关)
      • 可以与上述两个相互组合
        • FIFO + total ordering
        • causal + total ordering
      • 实现方式:
        • 集群需要选举出一个leader作为sequencer
        • Pi发送消息:
          • Pi将消息M组播(同时组播给其他机器+sequencer)
          • sequencer维持一个全局序列号S,当收到消息M时,令S+=1,并组播<M,S>
        • Pj接收消息:
          • Pj维护着接收到的全局序号Sj,当收到消息M时,缓存M直到收到<M,S>并且Sj+1=S时,才会消费消息。
    • 组播的可靠性(reliable multicast)
      • 除了failure的进程,最终所有接收者收到的消息集都相同。
      • 可靠性与上述组播ordering是垂直(orthogonal)的,即可以实现FIFO可靠组播,也可以实现causal可靠组播 可靠组播的实现方式:
        • 使用可靠的单播传输协议(如TCP)
        • 第一步:发送端向所有接收者依次单播(此步骤不保证可靠性,因为发送过程中发送端可能down掉)
        • 第二步:当一个接收者收到消息时,会再次向所有其他接收者组播这一消息
    • 组播的容错性
      • 实现方式:虚同步Virtual synchrony,可以在保证可靠性的同时保证容错性。(也可以实现ordering)
        • 每个进程维护一个membership list(表示所有在线的线程),对于所有的进程,membership list的更新顺序都是相同的。
        • 对于一次组播,每个接收者接收时,其本地的membership list一定和发送者发送时的membership list相同,并且发送者一定在这个membership list中
        • 设进程Pi收到了一个来自Pj的组播M,如果在Pi的下一次membership list更新之前,Pi没有收到来自Pk的相同组播M,则会把Pk从membership list踢出
          • 虚同步保证可靠性的实现方式,同上述可靠组播的实现,每个接收者需要重复组播
      • 因为可能不准确的failure detection,virtual synchrony不能用来实现consensus
  • Paxos
    • consensus problem
      • 分布式系统的可靠性只能无限接近100%,而无法达到100%,这是因为consensus probelm在异步系统中是不可能完全解决的
        • 异步系统(asynchronous system model):不同进程的处理器速度可能差别很大,时钟偏移可能很大,消息传播延迟可能很大(可能很大意味着没有最大值限制)。例如分布式集群。
          • consensus problem无法解决是因为,很难区分一个进程是fail了,还是只是执行的很慢
        • 同步系统(asynchronous system model):不同进程的处理器速度差异、时钟偏移延迟、消息延迟都有最大值的。例如多处理器的计算机/超级计算机
          • consensus problem在同步系统是可以解决的
      • 分布式系统的常见问题:
        • 所有进程都能以相同的顺序接收到相同的消息(可靠组播)
        • 每个进程维护一个集群进程列表,每当集群的进程有变动时他们会同时更新(membership)
        • 选举一个leader,并使所有进程都感知到
        • 分布式互斥锁
      • 问题描述:(如何让若干进程对同一个变量达成一致的问题)
        • 设集群中有n个进程,每个进程都会被输入一个bit(0或1),然后要求输出一个结果(0或1,一旦输出不能更改)。需要一个协议,使得所有进程输出的结果都一致
        • 如果所有进程输入结果都一样,则需要输出结果等于输入。
        • 系统不能永远只输出0或者只输出1
      • 补充:
        • 整个决策的过程中,没有参与者说谎
      • 上述问题等同于:
        • 完美的failure detection
        • leader选举(所有进程对leader进程的id达成一致)
        • 主从同步的一致性(主机输出的值一定和副本输出的值一致)
    • Paxos算法
      • 没有完全解决consensus problem(=safety+liveness),但是它保证了 safety和eventual liveness
        • safety: 两个正确(correct)的进程不会输出(decide)两个不同的值
        • eventual liveness: 最终所有进程都会输出结果。eventual表示不保证时间限制,可能会花很长时间
      • 应用:zookeeper, google chubby
      • 作者:又是Lamport
    • 简化版paxos算法(basic paxos)
      • paxos是有时间周期(round)的,尽管进程间没有时钟同步,但是通过两种方式保证整体的时钟周期:
        • 1. 如果进程处于周期j中,收到了来自周期 j+1的消息,则周期直接变为 j+1
        • 2. 超时也会自动跳入下一时间周期
      • 每个时间周期分为3段(也有资料分成了2段)
        • 1. (提议阶段)选举leader,选举成功的leader具有第二阶段提案的权力。提案者定义为proposer,接受提案者定义为acceptor,一个进程可以既是proposer又是acceptor。
          • 想要提案的进程(proposer)需要先竞选leader,方式是向其他所有进程(acceptor)发送propose,只包含proposal ID
            • proposal ID全局唯一递增,可以是round id(ballot id)+server id,也可以是时间戳+server id等。proposal ID会被作为优先级被acceptor判断
          • acceptor收到proposal后,做出两个承诺,一个应答
            • 承诺一:不再应答proposal ID小于等于当前请求的propose请求(第一阶段请求)
            • 承诺二:不再应答proposal ID小于当前请求的accept请求(第二阶段请求)
            • 应答:返回已经accept(第二阶段)过的proposal ID最大的那个提案的value和proposal ID,无则返回空值
        • 2. (决策阶段)leader提案一个值,其他进程ack
        • 3. (通知阶段)leader组播最终值,其他进程accept