您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

Hadoop权威指南笔记

【作者:Hadoop实战专家】【关键词:序列化 文件系统 客户端 】 【点击:63035次】【2014-02-1】
HDFS是为流式数据访问模式存储超大文件而设计的文件系统,在商用硬件的集群上运行。另一种可行的方法是运行一个二级名称节点,虽然它不能作为名称节点使用,这个二级名称节点的重要作用就是定期的通过编辑日志合并命名空间镜像,以防止编辑日志过大。TA.length]);  

相关热门搜索:文件系统 网络文件系统

大数据标签:hadoop hdfs mapreduce pic hbase hive zookeeper bigdata

第一章:初始Hadoop

Hadoop提供了一个稳定的共享存储和分析系统。存储由HDFS实现,分析由MapReduce实现。

MapReduce很适合处理那些需要分析整个数据集的问题,以批处理的方式,尤其是Ad Hoc(自主或及时)分析。MapReduce适合数据被一次写入和多次读取的应用,对于非结构化或半结构化数据非常有效,它被设计为在处理时间内解释数据。即MapReduce输入的键和值并不是数据固有的属性,是由分析数据的人来选择的。Web服务器日志是记录集的一个很好的非规范化例子,也是MapReduce非常适合用于分析各种日志文件的原因。

MapReduce是一种线性的可伸缩的编程模型。程序员编写两个函数——map函数和Reduce函数,每一个都定义一个键/值对集映射到另一个。这些函数无视数据的大小或者他们正在使用的集群的特性,这样他们就可以原封不动地应用到小规模数据集或者大的数据集上。

MapReduce尝试在计算节点本地存储数据,这种“数据本地化”功能,成为MapReduce的核心功能并且也是它拥有良好性能的原因之一。

在一个大规模分布式计算平台上协调进程是一个很大的挑战。而MapReduce是一个无共享的架构,这意味着各个任务之间彼此并不依赖。

Hadoop的子项目:

Core:一系列分布式文件系统和通用I/O的组件和接口(序列号、java RPC和持久化数据结构)

Avro:一种提供高效、跨语言RPC的数据序列系统,持久化数据存储。

MapReduce:分布式数据处理模式和执行环境,运行于大型商用机集群。

HDFS:分布式文件系统,运行于大型商用机集群。

Pig:一种数据流语言和运行环境,用以检索非常大的数据集。Pig运行在MapReduce和HDFS的集群上。

Hbase:一个分布式的、列存储数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询。

ZooKeeper:一个分布式的、高可用性的协调服务。ZooKeeper提供分布式锁之类的基本服务用于构建分布式应用。 Hive:分布式数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言用以查询数据。

Chukwa:分布式数据收集和分析系统。Chukwa运行HDFS中存储数据的收集器,它使用MapReduce来生成报告。

第二章:MapReduce简介

MapReduce的工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。程序员还具体定义了两个函数:map函数和reduce函数。map函数的输出先由MapReduce框架处理,然后再被发送到reduce函数。这一处理过程根据键来对键/值对进行排序和分组。

明白了MapReduce程序的工作原理之后,就是要用代码来实现它,需要三样东西:一个map函数、一个reduce函数和一些来运行作业的代码。map函数是由一个Mapper接口来实现的,其中声明了一个map()方法。Mapper接口是一个泛型类型,它有4个形式参数类型,由它们来指定map函数的输入键、输入值、输出键和输出值的类型。Hadoop规定了自己的一套可用于网络序列化的基本类型,而不是使用内置的Java类型。这些在org.apache.hadoop.io包中可以找到。

MapReduce作业(job)是客户端执行的单位:它包括输入数据、MapReduce程序和配置信息库。Hadoop通过把作业分成若干个小任务(task)来工作,其中包括两种类型的任务:map任务和reduce任务。有两种类型的节点控制着作业执行过程:jobtracker和多个tasktracker。jobtracker通过调度任务在tasktracker上运行,来协调所有运行在系统上的作业。Tasktracker运行任务的同时,把进度报告传送到jobtracker,jobtracker则记录这每项任务的整体进展情况。如果其中一个任务失败,jobtracker可以重新调度任务到另外一个tasktracker。Hadoop把输入数据划分成等长的小数据发送到Mapreduce,称为输入分片或者分片。Hadoop为每个分片split创建一个map任务,由它来运行用户自定义的map函数来分析每个分片中的记录。对于大多数作业,一个理想的分片大小往往是一个HDFS块的大小,默认是64MB。

map任务的执行阶段和输入数据的存储节点是同一节点,Hadoop的性能达到最佳。这就是所谓的data locality optimization(数据局部性优化)。如果分区跨越两个块,那么对于任何一个HDFS节点而言,基本上不可能同时存储这两数据块,因此此分布的某部分必须通过网络传输到节点,这与使用本地数据运行map任务相比,显然效率更低。map任务把输出写入本地硬盘,而不是HDFS。map的输出作为中间输出,中间输出则被reduce任务处理后产生最终的输出,一旦作业完成,map输出就可以删除了。所以没有把它存储在HDFS上,如果节点上运行的map任务在map输出给reduce任务处理之前崩溃,那么Hadoop将在另一个节点上重新运行map任务以再次创建map的输出。

reduce任务并不具备数据本地读取的优势,一个单一的reduce任务的输入往往来自于所有mapper的输出。因此,有序map的输出必须通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。为增加其可靠性,reduce的输出通常存储在HDFS中。

reduce任务的数目并不是由输入的大小来决定,而是单独具体指定的。如果有多个reducer,map任务会对其输出进行分区,为每个reduce任务创建一个分区。每个分区包含许多键机器关联的值,但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制,但通常是用默认的分区工具,它使用的是hash函数来形成”木桶“键/值。

Hadoop运行用户声明一个combiner,运行在map的输出上——该函数的输出作为reduce函数的输入,由于combiner是一个优化方法,所以Hadoop不博鳌镇对于某个map的输出记录是否调用该方法,调用该方法多少次。换言之,不调用该方法或者调用该方法多次,reducer的输出结果都一样。

Hadoop提供了一个API来运行Mapreduce,并运行用除java以外的语言来编写自己的map函数和reduce函数。Hadoop流使用Unix标准流作为Hadoop和程序之间的接口,所以可以使用任何语言,只要编写的Mapreduce程序能够读取标准输入并写入到标准输出。

Hadoop管道式Hadoop MapReduce的C++接口的代称,于流不同,流使用标准输入和输出让map和reduce节点之间相互交流,管道使用sockets作为tasktracker与C++编写的map或者reduce函数的进程之间的通道。

第三章:Hadoop分布式文件系统

HDFS是为流式数据访问模式存储超大文件而设计的文件系统,在商用硬件的集群上运行。

HDFS建立在这样一个思想上:一次写入、多次读取模式是最高效的。一个数据集通常由数据源生成或复制,接着在此基础上进行各种各样的分析。每个分析至少都会涉及数据集中的大部分数据(甚至全部),因此读取整个数据集的时间比读取第一条记录的延迟更为重要。

HDFS中的文件只有一个写入者,而且写操作总是在文件的末尾。它不支持多个写入者,或是在文件的任意位置修改。HDFS数据库的大小默认为64MB。HDFS的块比磁盘的块大,目的是为了减少寻址开销。通过让一个块足够大,从磁盘转移数据的时间能够远远大于定位这个块开始端的时间,因此,传送一个由多个块组成的文件的时间就取决于磁盘传输率。

在分布式文件系统中使用抽象块会带来很多好处。第一,一个文件可以大于网络中任意一个磁盘的容量。文件的分块不需要存储在同一个磁盘上,因此它们可以利用集群上的任意一个磁盘。第二,使用抽象单元而不是文件会简化存储子系统。另外,块很适合于伟提供容错和实用性而做的复制操作。

HDFS集群有两种节点,以管理者-工作者的模式运行,即一个名称节点(管理者)和多个数据节点(工作者)。名称节点管理文件系统的命名空间。它维护着这个文件系统树及这个树内所有的文件和索引目录。这些信息以两种形式将文件永久保存在本地磁盘上:命名空间镜像和编辑日志。名称节点也记录这米格文件的每个块所在的数据节点,但它并不永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。数据节点是文件系统的工作者,他们存储并提供定位块的服务,并且定时的向名称节点发送它们存储的块的列表。

Hadoop提供了两种机制来确保名称节点因故障而不损失数据,第一种机制就是复制那些组成文件系统元数据持久状态的文件。Hadoop可以通过配置使名称节点在多个文件系统上写入其持久化状态。这些写操作是具同步性和原子性的。一般的配置选择是,在本地磁盘上写入的同时,写入一个远程NFS挂载(mount)。另一种可行的方法是运行一个二级名称节点,虽然它不能作为名称节点使用,这个二级名称节点的重要作用就是定期的通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个二级名称节点一般在其他单独的物理计算机上运行,因为它也需要占用大量CPU和内存来执行合并操作。它会保存合并后的命名空间镜像的副本,在名称节点失效后就可以使用。二级名称节点的状态是比主节点滞后的,所以主节点的数据若全部丢失,损失仍在所难免。

HDFS的配置:fs.default.name用来配置HDFS的默认文件系统,HDFS的守护程序将通过这个属性来决定HDFS名称节点的宿主机和端口。HDFS用户将通过这个属性得知名称节点在哪里运行以便于连接到它。dfs.replication用来设置HDFS文件系统块的复制份数。

第四章 Hadoop的I/O

客户端读取数据节点上的数据时,会验证校验和,将其与数据节点上存储的校验和进行对比。每个数据节点维护一个连续的校验和和验证日志,因此它知道每个数据块最后验证的时间。客户端成功验证数据块之后,便会告诉数据节点,后者便随之更新日志。保持这种统计,它对检测损坏磁盘是很有价值的。

序列化(serialization)指的是将结构化对象转为字节流以便通过网络进行传输或写入持久化存储的过程。反序列化指的是将字节流转为一系列结构化对象的过程。序列化用于分布式数据处理中两个不同的领域:进程间通信和持久存储。

Hadoop中,节点之间的进程通信是用远程过程调用(RPC,remote procedures call)来实现的。RPC协议使用序列化将消息编码为二进制流(将被发送到远程节点),此后,二进制流被反序列化为原始消息。一般情况下,可用的RPC序列化格式特定如下:紧凑性,一个紧凑的格式使网络带宽得到充分利用,带宽是数据中心中最稀缺的资源。快速性,进程间通信是分布式系统的骨干,因此它必须尽量减少序列化和反序列化开销。可扩展性,协议随时间而变以满足新的要求,因此它应该直接演变为客户端和服务器端的控制协议。互操作性,对于某些系统,最好能够支持不同语言编写的客户端被写入服务器端,所以需要为此而精心设计文件格式。

Hadoop使用自己的序列化格式Writables,它紧凑、快速(不容易扩展java之外的语言)Mapreduce程序使用它来序列化键/值对。

W日table接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

类型的比较对MapReduce而言至关重要,键和键之间的比较式在排序阶段完成的。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展的:

package org.apache.hadoop.io;

import java.util.Comparator;

public interface RawComparator extends Comparator{

public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2);

}

这个接口运行执行者比较从流中读取的未被发序列化的对象的记录,从而省去了创建对象的所有开销。例如:IntWritables 的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流中要比较的对象进行反序列化,然后调用对象的compare()方法。其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需要使用:

RawComparator compartor=WritableComparator.get(IntWritable.class);

comparator可以用来比较两个IntWritable对象:

IntWritable w1=new IntWritable(163);

IntWritable w2=new Intwritable(67);

assertThat(comparator.compare(w1,w2,),greaterThan(0));

或者他们的序列号描述:

byte【】 b1=serialize(w1);

byte【】 b2=serialize(w2);

assertThat(comparator.compare(b1,0,b1.length,b2,0,b2.length),greaterThan(0));

Hadoop将许多Writable类归入org.apache.hadoop.io包。它们的类层次结构为:

它们都有用于检索和存储封装值得get()和set()方法。

Text类

Text类是一种UTF-8格式的Writable。可以将它理解为是一种与java.lang.String相似的Writable。Text类代替了UTF8类,UTF8类不支持编码大于32767个字节的字符串,使用了java改进的UTF-8。Text在字符串编码中使用int型存储字节数,最大值为2GN。Text使用标准的UTF-8,使其更易于与理解UTF-8的其他工具协同工作。

迭代Text 使用索引的字节偏移对Text中的Unicode字符进行迭代是很复杂的,因为不能只增加索引。迭代的定义有点模糊:将Text对象变成java.io.ByteBuffer,然后对缓冲的Text反复调用bytesToCodePoint()静态方法。这个方法提取下一个代码点作为int然后更新缓冲中的位置。当bytesToCodePoint()返回-1时,检测到字符串结束。

BytesWritable是对二进制数据数组的封装。它的序列化格式为一个用于指定后面数据字节数的整数域(4字节),后跟字节本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节整数(00000002)和该数组中的两个字节(03)和(05)。

NullWritable是Writable的一个特殊类型。它的序列化长度为0,它并不从数据流中读取数据,也不写入数据。它充当占位符;例如,在MapReduce中,如果不需要使用键或值,就可以将键或值声明为NullWritable——结果是存储常量控制。它是一个可变的单实例类型:通过调用NullWritable.get()方法可以获取这个实例。

ObjectWritable和GenericWritable  ObjectWritable是对java基本类型(string、enum、writable、null或这些类型组成的数组)的一个通用封装,它在Hadoop RPC中用于对方法的参数和返回类型进行封装和解封装。

Writable集合:org.apache.hadoop.io包中有四种Writable集合类型,分别是ArrayWritable,TwoDArrayWritable,MapperWritablehe SortedMapWritable.

ArrayWritable和TwoDArrayWritable是Writable针对数组和二维数据(数组的数组)实例的实现。所有对ArrayWritable或者TwoDArrayWritable的使用都必须实例化相同的类,这是在构造时指定的,如下所示:

ArrayWritable writable=new ArrayWritable(Text.class);

ArrayWritable和TwoDArrayWritable都有get()和set()方法,也有toArray()方法,后者用于创建数组(或者二维数组)的浅拷贝。

MapWritable和SortedMapWritable分别是java.util.Map(Writable,Writable)和java.util.SortedMap(WritableComparable,Writable)的实例。

每个键/值字段的类型都是此字段序列化格式的一部分。

基于文件的数据结构

对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于MapReduce的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop开发了一组更高层次的容器。

SequenceFile

考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本是不合适的。这种情况下,Hadoop的SequenceFile类非常合适,因为上述提供了二进制键/值对的永久存储的数据结构。当作为日志文件的存储格式时,可以自己选择键,比如由LongWritable类型表示的时间戳,以及值可以是Writable类型,用于表示日志记录的数量。SequenceFile同样为可以作为小文件的容器。而HDFS和MapReduce是针对大文件进行优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

通过createWriter()静态方法可以创建SequenceFile对象,并返回SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStream或FileSystem对象和Path对象),Configuration对象,以及键和值的类型。另外可选参数包括压缩类型以及相应的codec,Progressable回调函数用于通知写入的进度,以及在SequenceFile头文件中存储的Metadata实例。

存储在SequenceFile中的键和值对并不一定是Writable类型。任意可以通过Serialization类实现序列化和反序列化的类型均可被使用。一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附件键/值对。写完后,可以调用close()方法(SequenceFile.Writer实现了java.io.Closeable接口)。

写入SequenceFile对象

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SequenceFileWriteDemo {

private static final String[] DATA={
"One,two,buckle my shoe",
"Three,four,shut the door",
"Five,six,pick up sticks",
"Seven,eight,lay them straight",
"Nine,ten,a big fat hen"
};

public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
String uri=args[0];
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(uri), conf);
Path path=new Path(uri);
IntWritable key=new IntWritable();
Text value=new Text();
SequenceFile.Writer writer=null;
try{
writer=SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for(int i=0;i<100;i++){
key.set(100-i);
value.set(DATA[i?TA.length]);
System.out.printf("[%s]\t%s\t%s\n",writer.getLength(),key,value);
writer.append(key, value);
}
}
finally{
IOUtils.closeStream(writer);
}
}
}

读取SequenceFile

从头到尾读取顺序文件的过程是创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相关。如果使用的是Writable类型,那么通过键和值作为参数的Next()方法可以将数据流中的下一条键值对读入变量中:public boolean next(Writable key,Writable val),如果键值对成功读取,则返回true,如果已读到文件末尾,则返回false。

对其他非Writable类型的序列化框架(如Apache Thrift),可使用下面两种方法:

public Object next(Object key) throws IOException

public Object getCurrentValue(Object val) throws IOException

必须确定希望使用的序列化放已经设置了io.serializations属性。

如果next()方法返回一个非null对象,则可以从数据流中读取一个键/值对,并用getCurrentValue()方法来检索当前值。否则,如果next()返回null,则表示已经到达文件末尾。

类型是通过调用getKeyClass()和getValueClass()方法是从SequenceFile.Reader中找到的,之后通过ReflectionUtils来建立一个键的实例和值的实例。

读取一个序列文件

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReadDemo {

public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub

String uri=args[0];
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(uri), conf);
Path path=new Path(uri);
SequenceFile.Reader reader=null;
try{
reader=new SequenceFile.Reader(fs, path, conf);
Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value=(Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position=reader.getPosition();
while(reader.next(key, value)){
String syncSeen=reader.syncSeen()?"*":"";
System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
position=reader.getPosition();
}
}finally{
IOUtils.closeStream(reader);
}

}

}

大数据系列相关文章:

最新评论
不离不弃2014-09-10 08:30:06
无图无真相
迈克ル唐僧2014-09-10 04:12:46
请问各位大神,这是什么情况
Munimunich2014-09-10 01:45:46
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/io/Text
归文胜2014-09-09 10:30:19
一次性可以提交多个相互独立的作业吗?
小丹2014-09-09 05:12:31
最新资讯
 
  • Hadoop生态系统资料推荐