您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

hadoop深入研究:(十)——序列化与Writable接口

【作者:Hadoop实战专家】【关键词:序列化 Writable 】 【点击:8446次】【2013-10-1】
hadoop自身的序列化存储格式就是实现了Writable接口的类,他只实现了前面两点,压缩和快速。IntWritable实现了WritableComparable,接口看下源代码知道,WritableComparable是Writable接口和java.lang.Comparable的一个子接口。 byte[] b1 = serialize(w1);  

相关热门搜索:序列化

大数据标签:hadoop mapreduce bigdata

1.序列化的作用是什么?
2.hadoop在节点间的内部通讯使用的是什么?
3.hadoop自身的序列化存储格式有什么优势和劣势?

所有源码在github上,https://github.com/lastsweetop/styhadoop

简介
序列化和反序列化就是结构化对象和字节流之间的转换,主要用在内部进程的通讯和持久化存储方面。

通讯格式需求

hadoop在节点间的内部通讯使用的是RPC,RPC协议把消息翻译成二进制字节流发送到远程节点,远程节点再通过反序列化把二进制流转成原始的信息。RPC的序列化需要实现以下几点:

1.压缩,可以起到压缩的效果,占用的宽带资源要小。
2.快速,内部进程为分布式系统构建了高速链路,因此在序列化和反序列化间必须是快速的,不能让传输速度成为瓶颈。
3.可扩展的,新的服务端为新的客户端增加了一个参数,老客户端照样可以使用。
4.兼容性好,可以支持多个语言的客户端

存储格式需求

表面上看来序列化框架在持久化存储方面可能需要其他的一些特性,但事实上依然是那四点:
1.压缩,占用的空间更小
2.快速,可以快速读写
3.可扩展,可以以老格式读取老数据
4.兼容性好,可以支持多种语言的读写

hadoop的序列化格式

hadoop自身的序列化存储格式就是实现了Writable接口的类,他只实现了前面两点,压缩和快速。但是不容易扩展,也不跨语言。
我们先来看下Writable接口,Writable接口定义了两个方法:
1.将数据写入到二进制流中
2.从二进制数据流中读取数据

1. package org.apache.hadoop.io;

2.

3. public interface Writable {

4.     void write(java.io.DataOutput p1) throws java.io.IOException;

5.

6.     void readFields(java.io.DataInput p1) throws java.io.IOException;

7. }

复制代码

我们再来看下Writable接口与序列化和反序列化是如何关联的:

1. package com.sweetop.styhadoop;

2.

3. import junit.framework.Assert;

4. import org.apache.hadoop.io.IntWritable;

5. import org.apache.hadoop.io.Writable;

6. import org.apache.hadoop.util.StringUtils;

7. import org.junit.Before;

8. import org.junit.Test;

9.

10. import java.io.*;

11.

12. /**

13. * Created with IntelliJ IDEA.

14. * User: lastsweetop

15. * Date: 13-7-4

16. * Time: 下午10:25

17. * To change this template use File | Settings | File Templates.

18. */

19. public class TestWritable {

20.     byte[] bytes=null;

21.

22.     /**

23.      * 初始化一个IntWritable实例,并且调用系列化方法

24.      * @throws IOException

25.      */

26.     @Before

27.     public void init() throws IOException {

28.         IntWritable writable = new IntWritable(163);

29.         bytes = serialize(writable);

30.     }

31.

32.     /**

33.      * 一个IntWritable序列号后的四个字节的字节流

34.      * 并且使用big-endian的队列排列

35.      * @throws IOException

36.      */

37.     @Test

38.     public void testSerialize() throws IOException {

39.         Assert.assertEquals(bytes.length,4);

40.         Assert.assertEquals(StringUtils.byteToHexString(bytes),"000000a3");

41.     }

42.

43.     /**

44.      * 创建一个没有值的IntWritable对象,并且通过调用反序列化方法将bytes的数据读入到它里面

45.      * 通过调用它的get方法,获得原始的值,163

46.      */

47.     @Test

48.     public void testDeserialize() throws IOException {

49.         IntWritable newWritable = new IntWritable();

50.         deserialize(newWritable,bytes);

51.         Assert.assertEquals(newWritable.get(),163);

52.     }

53.

54.     /**

55.      * 将一个实现了Writable接口的对象序列化成字节流

56.      * @param writable

57.      * @return

58.      * @throws IOException

59.      */

60.     public static byte[] serialize(Writable writable) throws IOException {

61.         ByteArrayOutputStream out = new ByteArrayOutputStream();

62.         DataOutputStream dataOut = new DataOutputStream(out);

63.         writable.write(dataOut);

64.         dataOut.close();

65.         return out.toByteArray();

66.     }

67.

68.     /**

69.      * 将字节流转化为实现了Writable接口的对象

70.      * @param writable

71.      * @param bytes

72.      * @return

73.      * @throws IOException

74.      */

75.     public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException {

76.         ByteArrayInputStream in=new ByteArrayInputStream(bytes);

77.         DataInputStream dataIn = new DataInputStream(in);

78.         writable.readFields(dataIn);

79.         dataIn.close();

80.         return bytes;

81.     }

82. }

复制代码

WritableComparable和comparators
IntWritable实现了WritableComparable,接口看下源代码知道,WritableComparable是Writable接口和java.lang.Comparable的一个子接口。

1. package org.apache.hadoop.io;

2.

3. public interface WritableComparable   extends org.apache.hadoop.io.Writable, java.lang.Comparable {

4. }

复制代码

MapReduce在排序部分要根据key值的大小进行排序,因此类型的比较相当重要,RawComparator是Comparator的增强版

1. package org.apache.hadoop.io;

2.

3. public interface RawComparator   extends java.util.Comparator {

4.     int compare(byte[] bytes, int i, int i1, byte[] bytes1, int i2, int i3);

5. }

复制代码

它可以做到,不先反序列化就可以直接比较二进制字节流的大小:

1. package com.sweetop.styhadoop;

2.

3. import org.apache.hadoop.io.IntWritable;

4. import org.apache.hadoop.io.RawComparator;

5. import org.apache.hadoop.io.Writable;

6. import org.apache.hadoop.io.WritableComparator;

7. import org.eclipse.jdt.internal.core.Assert;

8. import org.junit.Before;

9. import org.junit.Test;

10.

11. import java.io.ByteArrayOutputStream;

12. import java.io.DataOutputStream;

13. import java.io.IOException;

14.

15. /**

16. * Created with IntelliJ IDEA.

17. * User: lastsweetop

18. * Date: 13-7-5

19. * Time: 上午1:26

20. * To change this template use File | Settings | File Templates.

21. */

22. public class TestComparator {

23.     RawComparator comparator;

24.     IntWritable w1;

25.     IntWritable w2;

26.

27.     /**

28.      * 获得IntWritable的comparator,并初始化两个IntWritable

29.      */

30.     @Before

31.     public void init() {

32.         comparator = WritableComparator.get(IntWritable.class);

33.         w1 = new IntWritable(163);

34.         w2 = new IntWritable(76);

35.     }

36.

37.     /**

38.      * 比较两个对象大小

39.      */

40.     @Test

41.     public void testComparator() {

42.         Assert.isTrue(comparator.compare(w1, w2) > 0);

43.     }

44.

45.     /**

46.      * 序列号后进行直接比较

47.      * @throws IOException

48.      */

49.     @Test

50.     public void testcompare() throws IOException {

51.         byte[] b1 = serialize(w1);

52.         byte[] b2 = serialize(w2);

53.         Assert.isTrue(comparator.compare(b1, 0, b1.length, b2, 0, b2.length) > 0);

54.     }

55.

56.     /**

57.      * 将一个实现了Writable接口的对象序列化成字节流

58.      *

59.      * @param writable

60.      * @return

61.      * @throws java.io.IOException

62.      */

63.     public static byte[] serialize(Writable writable) throws IOException {

64.         ByteArrayOutputStream out = new ByteArrayOutputStream();

65.         DataOutputStream dataOut = new DataOutputStream(out);

66.         writable.write(dataOut);

67.         dataOut.close();

68.         return out.toByteArray();

69.     }

70. }

复制代码

大数据系列相关文章:

最新评论
unny冰吻2014-09-10 05:34:02
[脉脉职位]百度诚招百度hadoop高级研发工程师加盟,工作地点在北京海淀,要求1-3年工作经验,待遇15000-30000元/月,详情及联系方式请查看 http://t.cn/8sYJcQS
大少2014-09-10 05:01:23
有些50岁都还在玩。
回到起点2014-09-09 10:36:12
想去广州
树先生2014-09-09 07:27:23
保存成文件 上到群里吧。
小笨笨熊2014-09-08 10:37:13
我的hdfs目录是在 /home/hadoop/tmp目录下的
小伙2014-09-08 05:09:45
【 北京神州泰岳软件股份有限公司】Android开发工程师、IOS开发工程师、Hadoop开发工程师、测试工程师、C++开发工程师、运维工程师、产品助理 简历接收邮箱: zhangqingshun@ultrapower.com.cn
凌思儿2014-09-08 09:47:23
分享自LeoWei 《Zookeeper的配置》 - 下载zookeeper-3.5.5稳定版本 解压到/usr目录下,修改所属用户为hadoop $chown –R hadoop:hadoop /usr/zookeeper-3.3... (来自 @头条博客) - http://t.cn/RPjRNQg
亚信科技-田毅2014-09-08 01:00:32
加载失败
3W杨小凯2014-09-07 11:11:54
【中国联通成功部署大数据平台】12月14日消息,中国联通研究院副院长黄文良表示,今年中国联通成功将大数据和Hadoop技术引入到移动通信用户上网记录集中查询与分析支撑系统。截止到目前,我们已经部署了4.5PB的存储空间。其中,4.5PB存储分布在300个数据节点上,即每个节点配备15TB的存储空间。
寂寞332014-09-06 07:17:21
Apache Hadoop 2.3.0 发布: http://t.cn/8FmadWP
 
  • Hadoop生态系统资料推荐