用Hadoop1.0.3实现KMeans算法

从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点);在Reducer中属于同一个质心的样本点在一个链表中,方便我们计算新的中心,然后Emit(质心编号,质心)。但是技术上的事并没有理论层面那么简单。

Mapper和Reducer都要用到K个中心(我习惯称之为质心),Mapper要读这些质心,Reducer要写这些质心。另外Mapper还要读存储样本点的数据文件。我先后尝试以下3种方法,只有第3种是可行的,如果你不想被我误导,请直接跳过前两种。

一、用一个共享变量在存储K个质心

由于K很小,所以我们认为用一个Vector<Sample>来存储K个质心是没有问题的。以下代码是错误的:

class MyJob extends Tool{

  static Vector<Sample> centers=new Vector<Sample>(K);

  static class MyMapper extends Mapper{

    //read centers

  } 

  static class MyMapper extends Reducer{

    //update centers

  }

  void run(){

    until ( convergence ){

      map();

      reduce();

    }

}

发生这种错误是因为对Hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。

详细的流程是这样的:

首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。

在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。

在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/103e90ef9f10a21de2f918e05bffabb8.html