[MIT6.5840]MapReduce
MapReduce
Lab 地址
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
论文地址
https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf
工作原理
简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是Map和Reduce函数:

凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。

整个框架分为Master和Worker,Master负责分配map和reduce任务,Worker负责向Master申请任务并执行。执行流程如下:
Map阶段:
- 输入是大文件分割后的一组小文件,通常大小为16~64MB。
- Worker向Master申请任务,假设得到map任务in0。
- Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
- 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*
Reduce阶段:
- 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
- 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
- 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。

本文介绍了大致思想,详细内容请参考原论文。
代码详解
rpc.go
package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("fmt""os""strconv"
)const (MAP = "MAP"REDUCE = "REDUCE"DONE = "DONE"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ApplyArgs struct {WorkerID intLastTaskType stringLastTaskID int
}type ReplyArgs struct {TaskId intTaskType stringInputFile stringMapNum intReduceNum int
}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}func finalMapResult(taskID int, reduceID int) string {return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}func tmpReduceResult(workerID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}func finalReduceResult(reduceID int) string {return fmt.Sprintf("mr-out-%d", reduceID)
}
worker.go
package mrimport ("fmt""hash/fnv""io""log""net/rpc""os""sort""strings"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.id := os.Getegid()// log.Printf("worker %d start working", id)lastTaskId := -1lastTaskType := ""loop:for {args := ApplyArgs{WorkerID: id,LastTaskType: lastTaskType,LastTaskID: lastTaskId,}reply := ReplyArgs{}ok := call("Coordinator.ApplyForTask", &args, &reply)if !ok {fmt.Printf("call failed!\n")continue}// log.Printf("reply: %v", reply)lastTaskId = reply.TaskIdlastTaskType = reply.TaskTypeswitch reply.TaskType {case "":// log.Println("finished")break loopcase MAP:// log.Printf("worker %d get map task %d", id, reply.TaskId)doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)case REDUCE:// log.Printf("worker %d get reduce task %d", id, reply.TaskId)doReduceTask(id, reply.TaskId, reply.MapNum, reducef)}}// uncomment to send the Example RPC to the coordinator.// CallExample()}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {file, err := os.Open(filename)if err != nil {log.Fatalf("%s 文件打开失败! ", filename)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("%s 文件内容读取失败! ", filename)}file.Close()kvList := mapf(filename, string(content)) // kv listhashedKvList := make(map[int]ByKey)for _, kv := range kvList {hashedKey := ihash(kv.Key) % reduceNumhashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)}for i := 0; i < reduceNum; i++ {outFile, err := os.Create(tmpMapResult(id, taskId, i))if err != nil {log.Fatalf("can not create output file: %e", err)return}for _, kv := range hashedKvList[i] {fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)}outFile.Close()}// log.Printf("worker %d finished map task\n", id)
}func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {var kvList ByKeyvar lines []stringfor i := 0; i < mapNum; i++ {mapOutFile := finalMapResult(i, taskId)file, err := os.Open(mapOutFile)if err != nil {log.Fatalf("can not open output file %s: %e", mapOutFile, err)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("file read failed %s: %e", mapOutFile, err)return}lines = append(lines, strings.Split(string(content), "\n")...)}for _, line := range lines {if strings.TrimSpace(line) == "" {continue}split := strings.Split(line, "\t")kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})}sort.Sort(kvList)outputFile := tmpReduceResult(id, taskId)file, err := os.Create(outputFile)if err != nil {log.Fatalf("can not create output file: %e", err)return}for i := 0; i < len(kvList); {j := i + 1key := kvList[i].Keyvar values []stringfor j < len(kvList) && kvList[j].Key == key {j++}for k := i; k < j; k++ {values = append(values, kvList[k].Value)}res := reducef(key, values)fmt.Fprintf(file, "%v %v\n", key, res)i = j}file.Close()// log.Printf("worker %d finished reduce task", id)
}
coordinator.go
package mrimport ("fmt""log""math""net""net/http""net/rpc""os""sync""time"
)type Task struct {id intinputFile stringworker inttaskType stringdeadLine time.Time
}type Coordinator struct {// Your definitions here.mtx sync.MutexinputFile []stringreduceNum intmapNum inttaskStates map[string]TasktodoList chan Taskstage string
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {// process the last taskif args.LastTaskID != -1 {taskId := createTaskId(args.LastTaskID, args.LastTaskType)c.mtx.Lock()if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务// log.Printf("worker %d finish task %d", args.WorkerID, task.id)if args.LastTaskType == MAP {for i := 0; i < c.reduceNum; i++ {err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))if err != nil {log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)}}} else if args.LastTaskType == REDUCE {err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))if err != nil {log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)}}delete(c.taskStates, taskId)if len(c.taskStates) == 0 {c.shift()}}c.mtx.Unlock()}// assign the new tasktask, ok := <-c.todoListif !ok {return nil}reply.InputFile = task.inputFilereply.MapNum = c.mapNumreply.ReduceNum = c.reduceNumreply.TaskId = task.idreply.TaskType = task.taskTypetask.worker = args.WorkerIDtask.deadLine = time.Now().Add(10 * time.Second)// log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)c.mtx.Lock()c.taskStates[createTaskId(task.id, task.taskType)] = taskc.mtx.Unlock()return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {// 加锁状态if c.stage == MAP {// log.Printf("Map Task finished")c.stage = REDUCE// 分配reduce taskfor i := 0; i < c.reduceNum; i++ {task := Task{id: i,worker: -1,taskType: REDUCE,}c.todoList <- taskc.taskStates[createTaskId(i, REDUCE)] = task}} else if c.stage == REDUCE {close(c.todoList)c.stage = DONE}
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.c.mtx.Lock()defer c.mtx.Unlock()return c.stage == DONE
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{mtx: sync.Mutex{},inputFile: files,reduceNum: nReduce,mapNum: len(files),taskStates: make(map[string]Task),todoList: make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),stage: MAP,}for i, file := range files {task := Task{id: i,inputFile: file,worker: -1,taskType: MAP,}c.todoList <- taskc.taskStates[createTaskId(i, MAP)] = task}// 回收任务go c.collectTask()c.server()return &c
}func createTaskId(id int, taskType string) string {return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {for {time.Sleep(500 * time.Millisecond)c.mtx.Lock()if c.stage == DONE {c.mtx.Unlock()return}for _, task := range c.taskStates {if task.worker != -1 && time.Now().After(task.deadLine) {// task is expiredtask.worker = -1// log.Printf("task %d is expired", task.id)c.todoList <- task}}c.mtx.Unlock()}
}
运行说明
mrcoordinator
cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt
mrworker
cd src/main/
go run mrworker.go wc.so
测试结果
bash test-mr.sh

MIT6.5840 课程Lab完整项目
https://github.com/Joker0x00/MIT-6.5840-Lab/
相关文章:
[MIT6.5840]MapReduce
MapReduce Lab 地址 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 论文地址 https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf 工作原理 简单来讲,MapReduce是一种分布式框架,可以用来处理…...
【系统架构设计师】计算机组成与体系结构 ⑯ ( 奇偶校验码 | CRC 循环冗余码 | 海明码 | 模 2 除法 )
文章目录 一、校验码1、校验码由来2、奇偶校验码3、CRC 循环冗余码 ( 重点考点 )4、海明码校验 ( 软考不经常考到 ) 二、CRC 循环冗余码 ( 重点考点 )1、模 2 除法概念2、模 2 除法步骤3、模 2 除法示例4、CRC 循环冗余码示例 15、CRC 循环冗余码示例 2 参考之前的博客 : 【计…...
springboot,service 层统一异常抛出时,throws Exception写在接口上还是实现类上
springboot,service 层统一异常抛出时,throws Exception写在实现接口上,不是直接写在实现类上...
深度学习高效性网络
为了减轻Transformer笨重的计算成本,一系列工作重点开发了高效的Vision Transformer,如Swin Transformer、PVT、Twins、CoAtNet和MobileViT。 1、字节TRT-ViT 兼具CNN的速度、Transformer精度的模型 TRT-ViT(Transformer-based Vision Tra…...
PyQt ERROR:ModuleNotFoundError: No module named ‘matplotlib‘
Solution:打开cmd输入指令下载malplotlib pip install matplotlib...
Flutter Geolocator插件使用指南:获取和监听地理位置
Flutter Geolocator插件使用指南:获取和监听地理位置 简介 geolocator 是一个Flutter插件,提供了一个简单易用的API来访问特定平台的地理位置服务。它支持获取设备的最后已知位置、当前位置、连续位置更新、检查设备上是否启用了位置服务,以…...
网站基本布局CSS
代码 <!DOCTYPE html> <html> <head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width, initial-scale1"><title></title><style type"text/css">body {margi…...
ssm框架整合,异常处理器和拦截器(纯注解开发)
目录 ssm框架整合 第一步:指定打包方式和导入所需要的依赖 打包方法:war springMVC所需依赖 解析json依赖 mybatis依赖 数据库驱动依赖 druid数据源依赖 junit依赖 第二步:导入tomcat插件 第三步:编写配置类 SpringCon…...
古籍双层PDF制作教程:保姆级古籍数字化教程
在智慧古籍数字化项目中,很多图书馆要求将古籍导出为双层PDF,并且确保输出双层PDF底层文本与上层图片偏移量控制在1毫米以内。那么本教程带你使用古籍数字化平台,3分钟把一个古籍书籍转化为双侧PDF。 第1步:上传古籍 点批量上传…...
Git 删除 远端的分支
要删除 Git 远端的分支(例如: V3.2.1.13): 可以执行以下命令 git push origin --delete V3.2.1.13这条命令会向远端的仓库删除名为 V3.2.1.13 的分支。如果这个分支只在远端仓库存在而没有对应的本地分支,那么删除后这…...
PrgogressBar实现原理分析
ProgressBar 是 Android 中用于显示进度条的控件,它可以用来表示任务的完成程度或者加载进度等信息。ProgressBar 有两种主要类型:一种是确定性的(determinate),另一种是不确定性的(indeterminateÿ…...
【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期
【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期 页面和组件 组件:用Component装饰的代码称为自定义组件页面:Entry装饰的组件即页面的根节点 组件生命周期 aboutToAppear:在创建自定义组件的新实例后…...
【iOS】——Block循环引用
循环引用原因 如果在Block中使用附有_ _strong修饰符的对象类型自动变量,那么当Block从栈复制到堆时,该对象为Block所持有,这样容易引起循环引用。 HPPerson *person [[HPPerson alloc] init];person.block ^{NSLog("person.age--- …...
shell脚本自动化安装启动各种服务
1、自动化配置dns服务器 A主机:vim dns.sh #!/bin/bash# 自动化部署dns# 1、下载bind# 2、修改配置文件# vim /etc/named.conf # listen-on port 53 { 127.0.0.1;any; }; 修改(定位替换)# allow-query { localhost;any; }; 修改&am…...
Python - 开源库 ReportLab 库合并 CVS 和图像生成 PDF 文档
欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/140281680 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 Report…...
Java编写SIP协议
1、编写Server代码 package com.genersoft.iot.vmp.sip; import javax.sip.*; import javax.sip.message.*; import javax.sip.header.*; import java.util.*;public class SimpleSipServer implements SipListener {private SipFactory sipFactory;private SipStack sipStack…...
大型语言模型LLM的核心概念
本文主要介绍了目前主流的,几个大型语言模型LLM的整个训练过程 通常分为下面的几个阶段 1. 预训练 采用互联网上的大量数据进行训练,这一阶段大模型LLM的主体已定,找出共性并且压缩成一个模型。模型的参数量不是越大越好,遵循合理…...
软件测试---网络基础、HTTP
一、网络基础 (1)Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 (2)IP协议 (3)可靠传输的TCP和三次握手策略 (4)域名解析服务DNS ࿰…...
韩顺平0基础学java——第39天
p820-841 jdbc和连接池 1.JDBC为访问不同的数据库提供了统一的接口,为使用者屏蔽了细节问题。 2.Java程序员使用JDBC,可以连接任何提供了JDBC驱动程序的数据库系统,从而完成对数据库的各种操作。 3.jdbc原理图 JDBC带来的好处 2.JDBC带来的…...
Linux文件恢复
很麻烦 一般还是小心最好 特别恢复的时候 可能不能选择某个文件夹去扫描恢复 所以 删除的时候 用rm -i代替rm 一定小心 以及 探索下linux的垃圾箱机制 注意 一定要恢复到不同文件夹 省的出问题 法1 系统自带工具 debugfs 但是好像不能重启? testdisk 1、安装 …...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
