Apache Hama安装部署(2)

int in2 = 0;
            for (int i = 0; i < iterations; i++) {
                double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
                if ((Math.sqrt(x * x + y * y) < 1.0)) {
                    in2++;
                }
            }

double data2 = 4.0 * in2 / iterations;

peer.send(masterTask, new DoubleWritable(data2));
            peer.sync();

if (peer.getPeerName().equals(masterTask)) {
                double pi2 = 0.0;
                int numPeers = peer.getNumCurrentMessages();
                DoubleWritable received;
                while ((received = peer.getCurrentMessage()) != null) {
                    pi2 += received.get();
                }

pi2 = pi2 / numPeers;
                peer.write(new Text("Estimated value2 of PI is"),
                        new DoubleWritable(pi2));
            }
            peer.sync();

}

@Override
        public void setup(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException {
            // Choose one as a master

this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
        }

@Override
        public void cleanup(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException {

// if (peer.getPeerName().equals(masterTask)) {
            // double pi = 0.0;
            // int numPeers = peer.getNumCurrentMessages();
            // DoubleWritable received;
            // while ((received = peer.getCurrentMessage()) != null) {
            // pi += received.get();
            // }
            //
            // pi = pi / numPeers;
            // peer.write(new Text("Estimated value of PI is"),
            // new DoubleWritable(pi));
            // }
        }
    }

static void printOutput(HamaConfiguration conf) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] files = fs.listStatus(TMP_OUTPUT);
        for (int i = 0; i < files.length; i++) {
            if (files[i].getLen() > 0) {
                FSDataInputStream in = fs.open(files[i].getPath());
                IOUtils.copyBytes(in, System.out, conf, false);
                in.close();
                break;
            }
        }

fs.delete(TMP_OUTPUT, true);
    }

public static void main(String[] args) throws InterruptedException,
            IOException, ClassNotFoundException {
        // BSP job configuration
        HamaConfiguration conf = new HamaConfiguration();
        BSPJob bsp = new BSPJob(conf, PiEstimator.class);
        // Set the job name
        bsp.setJobName("Pi Estimation Example");
        bsp.setBspClass(MyEstimator.class);
        bsp.setInputFormat(NullInputFormat.class);
        bsp.setOutputKeyClass(Text.class);
        bsp.setOutputValueClass(DoubleWritable.class);
        bsp.setOutputFormat(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);

BSPJobClient jobClient = new BSPJobClient(conf);
        ClusterStatus cluster = jobClient.getClusterStatus(true);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b517929e7d0cb94348a8b038071fbf65.html