用Hadoop实现KMeans算法
Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。
为样本数据建立一个类Sample.java。
相关热门搜索:
大数据标签:hadoop mapreduce mahout bigdata
问题导读: 什么是质心文件? mapreduce的从哪里读取质心文件? Driver驱动程序如何比较两次的质心是否相同? 在我们阅读的时候,我们首先知道什么是KMeans: K-means算法是最为经典的基于划分的聚类方法,是十大经典数据挖掘算法之一。K-means算法的基本思想是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。 虽然已经发展到了hadoop2.4,但是对于一些算法只要明白其中的含义,是和语言无关的,无论是使用Java、C++、python等, 本文以Hadoop1.0.3为例。 从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点);在Reducer中属于同一个质心的样本点在一个链表中,方便我们计算新的中心,然后Emit(质心编号,质心)。但是技术上的事并没有理论层面那么简单。 Mapper和Reducer都要用到K个中心(我习惯称之为质心),Mapper要读这些质心,Reducer要写这些质心。另外Mapper还要读存储样本点的数据文件。我先后尝试以下3种方法,只有第3种是可行的,如果你不想被我误导,请直接跳过前两种。 一、用一个共享变量在存储K个质心 由于K很小,所以我们认为用一个Vector来存储K个质心是没有问题的。以下代码是错误的: 1. class MyJob extends Tool{ 2. static Vector centers=new Vector (K); 3. static class MyMapper extends Mapper{ 4. //read centers 5. } 6. static class MyMapper extends Reducer{ 7. //update centers 8. } 9. void run(){ 10. until ( convergence ){ 11. map(); 12. reduce(); 13. } 14. } 复制代码 发生这种错误是因为对hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。 详细的流程是这样的: 首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。 在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。 在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。 二、用分布式缓存文件存储K个质心 既然不能通过共享外部类变量的方式,那我们通过文件在map和reduce之间传递数据总可以吧,Mapper从文件中读取质心,Reducer把更新后的质心再写入这个文件。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。虽然有MutipleInputs可以指定map()的输入文件有多个,并可以为每个输入文件分别指定解析方式,但是MutipleInputs不能保证每条记录从不同文件中传给map()的顺序。在我们的KMeans中,我们希望质心文件全部被读入后再逐条读入样本数据。 于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之间共享数据。DistributedCacheFile是缓存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式读取它。于是我又有了一个错误的思路: 1. class MyMaper{ 2. Vector centers=new Vector (K); 3. void setup(){ 4. //读取cacheFile,给centers赋值 5. } 6. void map(){ 7. //计算样本离哪个质心最近 8. } 9. } 10. class MyReducer{ 11. Vector centers=new Vector (K); 12. void reduce(){ 13. //更新centers 14. } 15. void cleanup(){ 16. //把centers写回cacheFile 17. } 18. } 复制代码 错因:DistributedCacheFile是只读的,在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这是单向的复制,是不能写回的。试想在分布式环境下,如果不同的mapper和reducer可以把缓存文件写回的话,那岂不又需要一套复杂的文件共享机制,严重地影响hadoop执行效率。 三、用分布式缓存文件存储样本数据 其实DistributedCache还有一个特点,它更适合于“大文件”(各节点内存容不下)缓存在本地。仅存储了K个质心的文件显然是小文件,与之相比样本数据文件才是大文件。 此时我们需要2个质心文件:一个存放上一次的质心prevCenterFile,一个存放reducer更新后的质心currCenterFile。Mapper从prevCenterFile中读取质心,Reducer把更新后有质心写入currCenterFile。在Driver中读入prevCenterFile和currCenterFile,比较前后两次的质心是否相同(或足够地接近),如果相同则停止迭代,否则就用currCenterFile覆盖prevCenterFile(使用fs.rename),进入下一次的迭代。 这时候Mapper就是这样的: 1. class MyMaper{ 2. Vector centers=new Vector (K); 3. void map(){ 4. //逐条读取质心,给centers赋值 5. } 6. void cleanup(){ 7. //逐行读取cacheFile,计算每个样本点离哪个质心最近 8. //然后Emit(样本点所属的簇编号,样本点) 9. } 10. } 复制代码 源代码 试验数据是在Mahout项目中作为example提供的,600个样本点,每个样本是一个60维的浮点向量。 synthetic_control.data.zip (118.04 KB, 下载次数: 2) 2014-6-28 11:58 上传 点击文件名 为样本数据建立一个类Sample.java。 1. package kmeans; 2. 3. import java.io.DataInput; 4. import java.io.DataOutput; 5. import java.io.IOException; 6. 7. import org.apache.commons.logging.Log; 8. import org.apache.commons.logging.LogFactory; 9. import org.apache.hadoop.io.Writable; 10. 11. public class Sample implements Writable{ 12. private static final Log log=LogFactory.getLog(Sample.class); 13. public static final int DIMENTION=60; 14. public double arr[]; 15. 16. public Sample(){ 17. arr=new double[DIMENTION]; 18. } 19. 20. public static double getEulerDist(Sample vec1,Sample vec2){ 21. if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){ 22. log.error("vector's dimention is not "+DIMENTION); 23. System.exit(1); 24. } 25. double dist=0.0; 26. for(int i=0;i THRESHOLD){ 66. stop=false; 67. break; 68. } 69. } 70. //如果还要进行下一次迭代,就用当前质心替代上一次的质心 71. if(stop==false){ 72. fs.delete(pervCenterFile,true); 73. if(fs.rename(currentCenterFile, pervCenterFile)==false){ 74. log.error("质心文件替换失败"); 75. System.exit(1); 76. } 77. } 78. return stop; 79. } 80. 81. public static class ClusterMapper extends Mapper { 82. Vector centers = new Vector (); 83. @Override 84. //清空centers 85. public void setup(Context context){ 86. for (int i = 0; i < K; i++) { 87. centers.add(new Sample()); 88. } 89. } 90. @Override 91. //从输入文件读入centers 92. public void map(LongWritable key, Text value, Context context) 93. throws IOException, InterruptedException { 94. String []str=value.toString().split("\\s+"); 95. if(str.length!=Sample.DIMENTION+1){ 96. log.error("读入centers时维度不对"); 97. System.exit(1); 98. } 99. int index=Integer.parseInt(str[0]); 100. for(int i=1;i { 135. int prev=-1; 136. Sample center=new Sample();; 137. int count=0; 138. @Override 139. //更新每个质心(除最后一个) 140. public void reduce(IntWritable key,Iterable values,Context context) throws IOException,InterruptedException{ 141. while(values.iterator().hasNext()){ 142. Sample value=values.iterator().next(); 143. if(key.get()!=prev){ 144. if(prev!=-1){ 145. for(int i=0;i 大数据系列相关文章:
- Hadoop生态系统资料推荐
用Hadoop实现KMeans算法 百度2014校园招聘深度学习算法研发工程师笔试题 Mahout学习之Mahout简介、安装、配置、程序测试 InfoQ: 大规模SNS中兴趣圈子的自动挖掘 原 实战Mahout聚类算法Canopy+K Hbase数据备份和恢复 HDFS体系结构简介及优缺点 使用FileSystem类进行文件读写及查看文件信息 分布式文件系统HDFS中Block的好处 如何检测Linux NFS服务器状态 hadoop如何分发本地的jar文件 hadoop job工作机制 Hadoop Map/Reduce编程模型实现海量数据处理—数字求和-Hadoop学习 -... [Hadoop源码解读](二)MapReduce篇之Mapper类 C# Hadoop学习笔记(一) Hadoop 如何提交Job Hadoop 文件输入和文件输出 | 学步园 在云中使用 MapReduce 和负载平衡 hadoop开发需要注意的几个问题 模拟namenode宕机:数据块损坏,该如何修复 hadoop术语解释 linux的grep是什么及命令实例 openstack的网络方面很依赖iptables吗? hadoop安装前准备工作 翻译:Keystone用户管理 Hadoop 2.4 完全分布式环境安装与配置 全球著名互联网企业背后的开源力量
最新评论