Hadoop HA

什么是HA?

HA的意思是High Availability高可用,指当当前工作中的机器宕机后,会自动处理这个异常,并将工作无缝地转移到其他备用机器上去,以来保证服务的高可用。

HA方式安装部署才是最常见的生产环境上的安装部署方式。Hadoop HA是Hadoop 2.x中新添加的特性,包括NameNode HA 和 ResourceManager HA。因为DataNode和NodeManager本身就是被设计为高可用的,所以不用对他们进行特殊的高可用处理。

NameNode HA

​ 在Hadoop2.0之前,NameNode只有一个,存在单点问题(虽然Hadoop1.0有SecondaryNameNode,CheckPointNode,BackupNode这些,但是单点问题依然存在),在hadoop2.0引入了HA机制。Hadoop2.0的HA机制官方介绍了有2种方式,一种是NFS(Network File System)方式,另外一种是QJM(Quorum Journal Manager)方式。

​ Hadoop2.0的HA 机制有两个NameNode,一个是Active状态,另一个是Standby状态。两者的状态可以切换,但同时最多只有1个是Active状态。只有Active Namenode提供对外的服务。Active NameNode和Standby NameNode之间通过NFS或者JN(JournalNode,QJM方式)来同步数据。

​ Active NameNode会把最近的操作记录写到本地的一个edits文件中(edits file),并传输到NFS或者JN中。Standby NameNode定期的检查,从NFS或者JN把最近的edit文件读过来,然后把edits文件和fsimage文件合并成一个新的fsimage,合并完成之后会通知Active NameNode获取这个新fsimage。Active NameNode获得这个新的fsimage文件之后,替换原来旧的fsimage文件。

​ 这样,保持了Active NameNode和Standby NameNode的数据实时同步,Standby NameNode可以随时切换成Active NameNode(譬如Active NameNode挂了)。而且还有一个原来Hadoop1.0的SecondaryNameNode,CheckpointNode,BackupNode的功能:合并edits文件和fsimage文件,使fsimage文件一直保持更新。所以启动了hadoop2.0的HA机制之后,SecondaryNameNode,CheckpointNode,BackupNode这些都不需要了。

数据同步方式:NFS与 QJM(Quorum Journal Manager )

NFS

img

NFS作为Active NameNode和Standby NameNode之间数据共享的存储。Active NameNode会把最近的edits文件写到NFS,而Standby NameNode从NFS中把数据读过来。这个方式的缺点是,如果Active NameNode或者Standby Namenode有一个和NFS之间网络有问题,则会造成他们之前数据的同步出问题。

QJM(Quorum Journal Manager )

img

QJM的方式可以解决上述NFS容错机制不足的问题。Active NameNode和Standby NameNode之间是通过一组JournalNode(数量是奇数,可以是3,5,7…,2n+1)来共享数据。Active NameNode把最近的edits文件写到2n+1个JournalNode上,只要有n+1个写入成功就认为这次写入操作成功了,然后Standby NameNode就可以从JournalNode上读取了。可以看到,QJM方式有容错机制,可以容忍n个JournalNode的失败。

Active和Standby两个NameNode之间的数据交互流程为:

1)NameNode在启动后,会先加载FSImage文件和共享目录上的EditLog Segment文件;

2)Standby NameNode会启动EditLogTailer线程和StandbyCheckpointer线程,正式进入Standby模式;

3)Active NameNode把EditLog提交到JournalNode集群;

4)Standby NameNode上的EditLogTailer 线程定时从JournalNode集群上同步EditLog;

5)Standby NameNode上的StandbyCheckpointer线程定时进行Checkpoint,并将Checkpoint之后的FSImage文件上传到Active NameNode。(在Hadoop 2.0中不再有Secondary NameNode这个角色了,StandbyCheckpointer线程的作用其实是为了替代 Hadoop 1.0版本中的Secondary NameNode的功能。)

QJM方式有明显的优点,一是本身就有fencing的功能,二是通过多个Journal节点增强了系统的健壮性,所以建议在生产环境中采用QJM的方式。JournalNode消耗的资源很少,不需要额外的机器专门来启动JournalNode,可以从Hadoop集群中选几台机器同时作为JournalNode。

主备NameNode切换

img

Active NameNode和Standby NameNode可以随时切换,可以人工和自动。人工切换是通过执行HA管理命令来改变NameNode的状态,从Standby到Active,或从Active到Standby。自动切换则在Active NameNode挂掉的时候,Standby NameNode自动切换成Active状态。

主备NameNode的自动切换需要配置Zookeeper。Active NameNode和Standby NameNode把他们的状态实时记录到Zookeeper中,Zookeeper监视他们的状态变化。当Zookeeper发现Active NameNode挂掉后,会自动把Standby NameNode切换成Active NameNode。

HDFS HA 原理

单NameNode的缺陷存在单点故障的问题,如果NameNode不可用,则会导致整个HDFS文件系统不可用。所以需要设计高可用的HDFS(Hadoop HA)来解决NameNode单点故障的问题。解决的方法是在HDFS集群中设置多个NameNode节点。但是一旦引入多个NameNode,就有一些问题需要解决。

HDFS HA需要保证的四个问题:

  1. 保证NameNode内存中元数据数据一致,并保证编辑日志文件的安全性;
  2. 多个NameNode如何协作;
  3. 客户端如何能正确地访问到可用的那个NameNode;
  4. 怎么保证任意时刻只能有一个NameNode处于对外服务状态。

解决方法

对于保证NameNode元数据的一致性和编辑日志的安全性,采用Zookeeper来存储编辑日志文件。

两个NameNode一个是Active状态的,一个是Standby状态的,一个时间点只能有一个Active状态的。

NameNode提供服务,两个NameNode上存储的元数据是实时同步的,当Active的NameNode出现问题时,通过Zookeeper实时切换到Standby的NameNode上,并将Standby改为Active状态。

客户端通过连接一个Zookeeper的代理来确定当时哪个NameNode处于服务状态。

HDFS HA架构

img

  1. Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务;
  2. ZKFailoverController(主备切换控制器,FC):ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换(当然 NameNode 目前也支持不依赖于 Zookeeper 的手动主备切换);
  3. Zookeeper 集群:为主备切换控制器提供主备选举支持;
  4. 共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。主 NameNode 和备 NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务
  5. DataNode 节点:因为主 NameNode 和备 NameNode 需要共享 HDFS 的数据块和 DataNode 之间的映射关系,为了使故障切换能够快速进行,DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。

FailoverController

FC 最初的目的是为了实现 SNN 和 ANN 之间故障自动切换,FC 是独立与 NN 之外的故障切换控制器,ZKFC 作为 NameNode 机器上一个独立的进程启动 ,它启动的时候会创建 HealthMonitor 和 ActiveStandbyElector 这两个主要的内部组件,其中:

  1. HealthMonitor:主要负责检测 NameNode 的健康状态,如果检测到 NameNode 的状态发生变化,会回调 ZKFailoverController 的相应方法进行自动的主备选举;
  2. ActiveStandbyElector:主要负责完成自动的主备选举,内部封装了 Zookeeper 的处理逻辑,一旦 Zookeeper 主备选举完成,会回调 ZKFailoverController 的相应方法来进行 NameNode 的主备状态切换。

自动触发主备选举

NameNode 在选举成功后,会在 zk 上创建了一个 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点,而没有选举成功的备 NameNode 会监控这个节点,通过 Watcher 来监听这个节点的状态变化事件,ZKFC 的 ActiveStandbyElector 主要关注这个节点的 NodeDeleted 事件(这部分实现跟 Kafka 中 Controller 的选举一样)。

如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时, ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock,这样处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件。收到这个事件之后,会马上再次进入到创建 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点的流程,如果创建成功,这个本来处于 Standby 状态的 NameNode 就选举为主 NameNode 并随后开始切换为 Active 状态。

当然,如果是 Active 状态的 NameNode 所在的机器整个宕掉的话,那么根据 Zookeeper 的临时节点特性,/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换。

HDFS 脑裂问题

在实际中,NameNode 可能会出现这种情况,NameNode 在垃圾回收(GC)时,可能会在长时间内整个系统无响应,因此,也就无法向 zk 写入心跳信息,这样的话可能会导致临时节点掉线,备 NameNode 会切换到 Active 状态,这种情况,可能会导致整个集群会有同时有两个 NameNode,这就是脑裂问题。

脑裂问题的解决方案是隔离(Fencing),主要是在以下三处采用隔离措施:

  1. 第三方共享存储:任一时刻,只有一个 NN 可以写入;
  2. DataNode:需要保证只有一个 NN 发出与管理数据副本有关的删除命令;
  3. Client:需要保证同一时刻只有一个 NN 能够对 Client 的请求发出正确的响应。

关于这个问题目前解决方案的实现如下:

  1. ActiveStandbyElector 为了实现 fencing,会在成功创建 Zookeeper 节点 hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 从而成为 Active NameNode 之后,创建另外一个路径为 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 的持久节点,这个节点里面保存了这个 Active NameNode 的地址信息;
  2. Active NameNode 的 ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候,会一起删除这个持久节点;
  3. 但如果 ActiveStandbyElector 在异常的状态下 Zookeeper Session 关闭 (比如前述的 Zookeeper 假死),那么由于 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来,后面当另一个 NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行 fencing。

在进行 fencing 的时候,会执行以下的操作:

  1. 首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态;
  2. 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施。

Hadoop 目前主要提供两种隔离措施,通常会选择第一种:

  1. sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死;
  2. shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离。

只有在成功地执行完成 fencing 之后,选主成功的 ActiveStandbyElector 才会回调 ZKFailoverController 的 becomeActive 方法将对应的 NameNode 转换为 Active 状态,开始对外提供服务。

NameNode 选举的实现机制与 Kafka 的 Controller 类似,那么 Kafka 是如何避免脑裂问题的呢?

  1. Controller 给 Broker 发送的请求中,都会携带 controller epoch 信息,如果 broker 发现当前请求的 epoch 小于缓存中的值,那么就证明这是来自旧 Controller 的请求,就会决绝这个请求,正常情况下是没什么问题的;
  2. 但是异常情况下呢?如果 Broker 先收到异常 Controller 的请求进行处理呢?现在看 Kafka 在这一部分并没有适合的方案;
  3. 正常情况下,Kafka 新的 Controller 选举出来之后,Controller 会向全局所有 broker 发送一个 metadata 请求,这样全局所有 Broker 都可以知道当前最新的 controller epoch,但是并不能保证可以完全避免上面这个问题,还是有出现这个问题的几率的,只不过非常小,而且即使出现了由于 Kafka 的高可靠架构,影响也非常有限,至少从目前看,这个问题并不是严重的问题。

第三方存储(共享存储)

上述 HA 方案还有一个明显缺点,那就是第三方存储节点有可能失效,之前有很多共享存储的实现方案,目前社区已经把由 Clouderea 公司实现的基于 QJM 的方案合并到 HDFS 的 trunk 之中并且作为默认的共享存储实现,本部分只针对基于 QJM 的共享存储方案的内部实现原理进行分析。

QJM(Quorum Journal Manager)本质上是利用 Paxos 协议来实现的,QJM 在 2F+1 个 JournalNode 上存储 NN 的 editlog,每次写入操作都通过 Paxos 保证写入的一致性,它最多可以允许有 F 个 JournalNode 节点同时故障,其实现如下:

基于 QJM 的共享存储的数据同步机制

基于 QJM 的共享存储的数据同步机制

Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog。

还有一点需要注意的是,在 2.0 中不再有 SNN 这个角色了,NameNode 在启动后,会先加载 FSImage 文件和共享目录上的 EditLog Segment 文件,之后 NameNode 会启动 EditLogTailer 线程和 StandbyCheckpointer 线程,正式进入 Standby 模式,其中:

  1. EditLogTailer 线程的作用是定时从 JournalNode 集群上同步 EditLog;
  2. StandbyCheckpointer 线程的作用其实是为了替代 Hadoop 1.x 版本之中的 Secondary NameNode 的功能,StandbyCheckpointer 线程会在 Standby NameNode 节点上定期进行 Checkpoint,将 Checkpoint 之后的 FSImage 文件上传到 Active NameNode 节点。

YARN HA(ResourceManager HA)

Hadoop2.4版本之前,ResourceManager也存在单点故障的问题,也需要实现HA来保证ResourceManger的高可也用性。

ResouceManager从记录着当前集群的资源分配情况和JOB的运行状态,YRAN HA 利用Zookeeper等共享存储介质来存储这些信息来达到高可用。另外利用Zookeeper来实现ResourceManager自动故障转移。

img

如果大家理解HDFS的HA,那么ResourceManager的HA与之是相同道理的:也是Active/Standby架构,任意时刻,都一个是Active,其余处于Standby状态的ResourceManager可以随时转换成Active状态。状态转换可以手工完成,也可以自动完成。手工完成时通过命令行的管理命令(命令是“yarn rmadmin”)。自动完成是通过配置自动故障转移(automatic-failover),使用集成的failover-controller完成状态的自动切换。

自动故障转移是依赖于ZooKeeper集群,依赖ZooKeeper的ActiveStandbyElector会嵌入到ResourceManager中,当Active状态的ResourceManager失效时,处于 Standby状态的ResourceManager就会被选举为Active状态的,实现切换。注意:这里没有ZooKeeperFailoverController进程,这点和HDFS的HA不同。

对于客户端而言,必须知道所有的ResourceManager中。因此,需要在yarn-site.xml中配置所有的ResourceManager。那么,当一个Active状态的ResourceManager失效时,客户端怎么办哪?客户端会采用轮询机制,轮询配置在yarn-site.xml中的ResourceManager,直到找到一个active状态的ResourceManager。如果我们想修改这种寻找ResourceManager的机制,可以继承类org.apache.hadoop.yarn.client.RMFailoverProxyProvider,实现自己的逻辑。然后把类的名字配置到yarn-site.xml的配置项yarn.client.failover-proxy-provider中。

配置

在yarn-site.xml中配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>master1:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>master2:8088</value>
</property>
<property>
<name>hadoop.zk.address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

命令

查看状态的命令

1
yarn rmadmin –getServiceState rm1

状态切换的命令

1
yarn rmadmin –transitionToStandby rm1

原文链接:

http://matt33.com/2018/07/15/hdfs-architecture-learn/

https://blog.csdn.net/andyguan01_2/article/details/88696239

https://www.cnblogs.com/mlj5288/p/4449848.html