MapReduce原理及简单实现 (2)

在Coordinator中保存所有的任务信息以及执行状态,worker通过AcquireJob来提交和申请任务,要等待所有map任务完成后才能执行reduce任务。这里就简单的将每一个文件都作为一个任务。

func doMap(mapf func(string, string) []KeyValue, job *Job, nReduce int) (files []string) { outFiles := make([]*os.File, nReduce) for idx := range outFiles { outFile, err := ioutil.TempFile("./", "mr-tmp-*") if err != nil { log.Fatalf("create tmp file failed: %v", err) } defer outFile.Close() outFiles[idx] = outFile } for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) for _, kv := range kva { hash := ihash(kv.Key) % nReduce js, _ := json.Marshal(kv) outFiles[hash].Write(js) outFiles[hash].WriteString("\n") } } for idx := range outFiles { filename := fmt.Sprintf("mr-%d-%d", job.Index, idx) os.Rename(outFiles[idx].Name(), filename) files = append(files, filename) } return } func doReduce(reducef func(string, []string) string, job *Job, nMap int) { log.Printf("Start reduce %d", job.Index) outFile, err := ioutil.TempFile("./", "mr-out-tmp-*") defer outFile.Close() if err != nil { log.Fatalf("create tmp file failed: %v", err) } m := make(map[string][]string) for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } scanner := bufio.NewScanner(file) for scanner.Scan() { kv := KeyValue{} if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil { log.Fatalf("read kv failed: %v", err) } m[kv.Key] = append(m[kv.Key], kv.Value) } if err := scanner.Err(); err != nil { log.Fatal(err) } file.Close() } for key, value := range m { output := reducef(key, value) fmt.Fprintf(outFile, "%v %v\n", key, output) } os.Rename(outFile.Name(), fmt.Sprintf("mr-out-%d", job.Index)) log.Printf("End reduce %d", job.Index) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { CallExample() var status int = MAP args := AcquireJobArgs{Job{Index: -1}, MAP} for { args.Status = status reply := AcquireJobReply{} call("Coordinator.AcquireJob", &args, &reply) fmt.Printf("AcReply: %+v\n", reply) if reply.Status == FINISH { break } status = reply.Status if reply.Job.Index >= 0 { // get a job, do it commitJob := reply.Job if status == MAP { commitJob.Files = doMap(mapf, &reply.Job, reply.NOther) } else { doReduce(reducef, &reply.Job, reply.NOther) commitJob.Files = make([]string, 0) } // job finished args = AcquireJobArgs{commitJob, status} } else { // no job, sleep to wait time.Sleep(time.Second) args = AcquireJobArgs{Job{Index: -1}, status} } } }

worker通过RPC调用向Coordinator.AcquireJob申请和提交任务,之后根据任务类型执行doMap或doReduce。

doMap函数读取目标文件并将<filename, content>传递给map函数,之后将返回值根据hash(key) % R写入到目标中间文件中去。

doReduce函数则从目标文件中读取KV对并加载到内存中,对相同的key进行合并(这里我是用map来做的,但是之后看论文发现是用排序来做的,这样可以保证在每个输出文件中的key是有序的)。合并之后就将<key, list(value)>交给reduce函数处理,最后把返回值写入到结果文件中去。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspwfg.html