hadoop学习

hadoop学习

zookeeper的介绍

zookeeper概述

Zookeeper 是一个分布式协调服务的开源框架。 主要用来解决分布式集群中应用系统的一致性问题,例如怎样避免同时操作同一数据造成脏读的问题。ZooKeeper 本质上是一个分布式的小文件存储系统。 提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。从而用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。 诸如: 统一命名服(dubbo)、分布式配置管理(solr的配置集中管理)、分布式消息队列(sub/pub)、分布式锁、分布式协调等功能。

zookeeper的架构图

zookeeper1

Leader:

zookeeper集群工作的核心;事务请求(写操作) 的唯一调度和处理者,保证集群事务处理的顺序性;
集群内部各个服务器的调度者。
对于 create, setData, delete 等有写操作的请求,则需要统一转发给leader 处理, leader 需要决定编号、执行操作,这个过程称为一个事务。

Follower:

处理客户端非事务(读操作) 请求,转发事务请求给 Leader;参与集群 Leader 选举投票 2n-1台可以做集群投票。此外,针对访问量比较大的 zookeeper 集群, 还可新增观察者角色。

Observer:

增加并发的读请求

hadoop的介绍

2.x的版本架构模型介绍

第一种:NameNode与ResourceManager单节点架构模型

hadoop1

文件系统核心模块:

  • NameNode:集群当中的主节点,主要用于管理集群当中的各种数据;
  • secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理;
  • DataNode:集群当中的从节点,主要用于存储集群当中的各种数据;

数据计算核心模块:

  • ResourceManager:接收用户的计算请求任务,并负责集群的资源分配;
  • NodeManager:负责执行主节点APPmaster分配的任务

第二种:NameNode单节点与ResourceManager高可用架构模型

hadoop1

文件系统核心模块:

  • NameNode:集群当中的主节点,主要用于管理集群当中的各种数据
  • secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理
  • DataNode:集群当中的从节点,主要用于存储集群当中的各种数据

数据计算核心模块:

  • ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分,通过zookeeper实现ResourceManager的高可用
  • NodeManager:负责执行主节点ResourceManager分配的任务

第三种:NameNode高可用与ResourceManager单节点架构模型

hadoop1

文件系统核心模块:

  • NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,其中nameNode可以有两个,形成高可用状态
  • DataNode:集群当中的从节点,主要用于存储集群当中的各种数据
  • JournalNode:文件系统元数据信息管理

数据计算核心模块:

  • ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分
  • NodeManager:负责执行主节点ResourceManager分配的任务

第四种:NameNode与ResourceManager高可用架构模型

hadoop1

文件系统核心模块:

  • NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,一般都是使用两个,实现HA高可用
  • JournalNode:元数据信息管理进程,一般都是奇数个
  • DataNode:从节点,用于数据的存储

数据计算核心模块:

  • ResourceManager:Yarn平台的主节点,主要用于接收各种任务,通过两个,构建成高可用
  • NodeManager:Yarn平台的从节点,主要用于处理ResourceManager分配的任务

HDFS介绍

HDFS 是 Hadoop Distribute File System 的简称,意为:Hadoop 分布式文件系统。是 Hadoop 核心组件之一,作为最底层的分布式存储服务而存在。 分布式文件系统解决的问题就是大数据存储。它们是横跨在多台计算机上的存储系统。分布式文件系统在大数据时代有着广泛的应用前景,它们为存储和处理超大规模数据提供所需的扩展能力。

HDFS的基础架构

hadoop1

hadoop1

  1. NameNode是一个中心服务器,单一节点(简化系统的设计和实现),负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  2. 文件操作,namenode是负责文件元数据的操作,datanode负责处理文件内容的读写请求,跟文件内容相关的数据流不经过Namenode,只询问它跟哪个dataNode联系,否则NameNode会成为系统的瓶颈
  3. 副本存放在哪些Datanode上由NameNode来控制,根据全局情况作出块放置决定,读取文件时NameNode尽量让用户先读取最近的副本,降低读取网络开销和读取延时
  4. NameNode全权管理数据库的复制,它周期性的从集群中的每个DataNode接收心跳信合和状态报告,接收到心跳信号意味着DataNode节点工作正常,块状态报告包含了一个该DataNode上所有的数据列表

NameNode与Datanode的总结概述

hadoop1

所有的文件都是以block块的方式存放在HDFS文件系统当中,在hadoop1当中,文件的block块默认大小是64M,hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定

HDFS的元数据信息FSimage以及edits

  • 客户端对hdfs进行写文件时会首先被记录在edits文件中
  • edits修改时元数据也会更新
  • 每次hdfs更新时edits先更新后客户端才会看到最新信息
  • fsimage:是namenode中关于元数据的镜像,一般称为检查点

一般开始时对namenode的操作都放在edits中,为什么不放在fsimage中呢?

因为fsimage是namenode的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和CPU;fsimage内容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元数据信息。随着edits内容增大,就需要在一定时间点和fsimage合并。

secondarynameNode如何辅助管理FSImage与Edits文件

  1. secnonaryNN通知NameNode切换editlog
  2. secondaryNN从NameNode中获得FSImage和editlog(通过http方式)
  3. secondaryNN将FSImage载入内存,然后开始合并editlog,合并之后成为新的fsimage
  4. secondaryNN将新的fsimage发回给NameNode
  5. NameNode用新的fsimage替换旧的fsimage

hadoop1

  • 完成合并的是secondarynamenode,会请求namenode停止使用edits,暂时将新写操作放入一个新的文件中(edits.new)
  • secondarynamenode从namenode中通过http get获得edits,因为要和fsimage合并,所以也是通过http get 的方式把fsimage加载到内存,然后逐一执行具体对文件系统的操作,与fsimage合并,生成新的fsimage,然后把fsimage发送给namenode,通过http post的方式。namenode从secondarynamenode获得了fsimage后会把原有的fsimage替换为新的fsimage,把edits.new变成edits。同时会更新fstime
  • hadoop进入安全模式时需要管理员使用dfsadmin的save namespace来创建新的检查点
  • secondarynamenode在合并edits和fsimage时需要消耗的内存和namenode差不多,所以一般把namenode和secondarynamenode放在不同的机器上
  • fs.checkpoint.period: 默认是一个小时(3600s)
  • fs.checkpoint.size: edits达到一定大小时也会触发合并(默认64MB)

HDFS的文件写入过程

hadoop1

详细步骤解析:

  1. client发起文件上传请求,通过RPC与NameNode建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,返回是否可以上传
  2. client请求第一个block该传输到哪些DataNode服务器上
  3. NameNode根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的DataNode的地址如:A,B,C. 注:Hadoop在设计时考虑到数据的安全与高效,数据文件默认在HDFS上存放三份,存储策略为本地一份,同机架内其它某一节点上一份,不同机架的某一节点上一份
  4. client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,后逐级返回client
  5. client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(默认64K),A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
  6. 数据被分割成一个个packet数据包在pipeline上依次传输,在pipeline反方向上,逐个发送ack(命令正确应答),最终由pipeline中第一个DataNode节点A将pipelineack发送给client
  7. 当一个block传输完成之后,client再次请求NameNode上传第二个block到服务器

HDFS的读取过程

hadoop1

详细步骤解析

  1. Client向NameNode发起RPC请求,来确定请求文件block所在的位置
  2. NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode都会返回含有该block 副本的 DataNode 地址; 这些返回的 DN地址,会按照集群拓扑结构得出DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离Client 近的排靠前;心跳机制中超时汇报的 DN
    状态为STALE,这样的排靠后
  3. Client 选取排序靠前的DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性)
  4. 底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类DataInputStream 的 read 方法,直到这个块上的数据读取完毕
  5. 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode获取下一批的block 列表
  6. 读取完一个 block 都会进行checksum 验证,如果读取DataNode 时出现错误,客户端会通NameNode,然后再从下一个拥有该block 副本的DataNode 继续读
  7. read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据
  8. 最终读取来所有的block 会合并成一个完整的最终文件

MapReduce介绍

MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。 Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。 Reduce负责“合”,即对map阶段的结果进行全局汇总。 这两个阶段合起来正是MapReduce思想的体现。

hadoop1

MapTask运行机制详解以及Map任务的并行度

hadoop1

hadoop1

整个Map阶段流程大体如上图所示。简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个maptask都有一个内存缓冲区,存储map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个maptask结束后再对磁盘中这个maptask产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reducetask来拉数据。

详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一

  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回。Key表示每行首字符偏移值,value表示这一行文本内容

  3. 读取split返回,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次

  4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reducetask处理。默认对key hash后再以reducetask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上

  5. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组

    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响

  6. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序

    如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果

  7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。至此map整个阶段结束

ReduceTask工作机制以及reduceTask的并行度

hadoop1

Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理

详细步骤:

  1. Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件
  2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件
  3. 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序
  4. 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中

MapReduceshuffle过程

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,规约,合并等过程)。

hadoop1

shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle

  1. Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等

  2. Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序

  3. Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件

  4. Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上

  5. Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作

  6. Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可

    Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

Yarn的主要组件介绍与作用

yarn中各个主要组件的介绍

  • ResourceManager:yarn集群的主节点,主要用于接收客户端提交的任务,并对资源进行分配
  • NodeManager:yarn集群的从节点,主要用于任务的计算
  • ApplicationMaster:当有新的任务提交到ResourceManager的时候,ResourceManager会在某个从节点nodeManager上面启动一个ApplicationMaster进程,负责这个任务执行的资源的分配,任务的生命周期的监控等
  • Container:资源的分配单位,ApplicationMaster启动之后,与ResourceManager进行通信,向ResourceManager提出资源申请的请求,然后ResourceManager将资源分配给ApplicationMaster,这些资源的表示,就是一个个的container
  • JobHistoryServer:这是yarn提供的一个查看已经完成的任务的历史日志记录的服务,我们可以启动jobHistoryServer来观察已经完成的任务的所有详细日志信息
  • TimeLineServer:hadoop2.4.0以后出现的新特性,主要是为了监控所有运行在yarn平台上面的所有任务(例如MR,Storm,Spark,HBase等等)

yarn中各个主要组件的作用

  • resourceManager主要作用:
    • 处理客户端请求
    • 启动/监控ApplicationMaster
    • 监控NodeManager
    • 资源分配与调度
  • NodeManager主要作用:
    • 单个节点上的资源管理和任务管理
    • 接收并处理来自resourceManager的命令
    • 接收并处理来自ApplicationMaster的命令
    • 管理抽象容器container
    • 定时向RM汇报本节点资源使用情况和各个container的运行状态
  • ApplicationMaster主要作用:
    • 数据切分
    • 为应用程序申请资源
    • 任务监控与容错
    • 负责协调来自ResourceManager的资源,开通NodeManager监视容的执行和资源使用(CPU,内存等的资源分配)
  • Container主要作用:
    • 对任务运行环境的抽象
    • 任务运行资源(节点,内存,cpu)
    • 任务启动命令
    • 任务运行环境

yarn的架构

hadoop1

yarn中的调度器

  • 第一种调度器:FIFO Scheduler (队列调度器)
  • 第二种调度器:capacity scheduler(容量调度器,apache版本默认使用的调度器)

    Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略

    hadoop1

  • 第三种调度器:Fair Scheduler(公平调度器,CDH版本的hadoop默认使用的调度器)

    Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。公平调度在也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享

    hadoop1

-------------本文结束感谢您的阅读-------------