外贸网站建设教程,景安安装wordpress提示错误,注册公司需要什么条件太原,学校网站模板html下载MIT 6.824 -- MapReduce -- 01 引言抽象和实现可扩展性可用性(容错性)一致性MapReduceMap函数和Reduce函数疑问 课程b站视频地址: MIT 6.824 Distributed Systems Spring 2020 分布式系统
推荐伴读读物:
极客时间 – 大数据经典论文解读DDIA – 数据密集型应用大数据相关论文… MIT 6.824 -- MapReduce -- 01 引言抽象和实现可扩展性可用性(容错性)一致性MapReduceMap函数和Reduce函数疑问 课程b站视频地址: MIT 6.824 Distributed Systems Spring 2020 分布式系统
推荐伴读读物:
极客时间 – 大数据经典论文解读DDIA – 数据密集型应用大数据相关论文中译版本
本节预习作业:
MapReduce 论文(原版 - 英译)MapReduce 论文(中译) 引言
为什么我们需要使用分布式系统:
为了更高的计算性能 , 大量的并行运算大量的CPU内存和磁盘都在并行运行更好的容错率(tolerate faults) 同时有多台计算机执行一个任务就算其中一台挂掉了任务也可以切换到另一台继续执行一些问题天然在空间上是分布的如银行间的转账操作出于安全考虑进行隔离当需要和一些不被信任的代码进行交互时可以将代码分散在多处运行通过特定的网络协议进行通信这样可以限制出错域
分布式系统不是银弹它会使简单的系统变得复杂“如无必要勿增实体” 。
本课程的重点讨论在: 性能和容错下面我们来看看实现分布式系统的挑战在哪里呢
多服务并发执行带来的并发问题和时间依赖问题(同步异步)局部故障的难以预料如网络中断或不稳定如果合理设计让分布式系统达到我们期望的性能 抽象和实现
分布式系统由三大基础架构组成:
存储通信(网络)计算
其中存储是我们最为关注的因为其定义明确且直观我们晓得如何构建和使用存储系统也晓得如何利用它来构建多副本高容错高性能的分布式系统。
关于通信这里更多只是作为建立分布式系统的工具之一大部分情况下都是指通过网络进行通信关于如何确保网络通信的可靠性可以学习MIT 6.829这门课程。
对于存储和计算我们期望能够对外提供一些抽象过的简单接口让第三方应用能够快速接入使用并且借助这些抽象的接口将分布式特性隐藏在整个系统内。站在应用程序的角度来看整个系统是一个非分布式的系统就像一个文件系统或者一个普通的单体系统对外提供一个简单的模型语句。
因此我们最终的目标就是构建一个接口使其看起来就像一个非分布式存储和计算系统一样但是实际又是一个有极高性能和容错性的分布式系统。
关于抽象接口的落地实现就不得不提到人们在构建分布式系统时使用到的很多工具了
RPC(remote procedure call) : rpc的目标计算掩盖我们正在不可靠网络上通信的事实线程 : 使用线程来充分利用多核心计算机同时线程提供了一种结构化并发操作方式可以简化并发操作线程会导致并发问题因此我们需要花费一些时间来考虑并发控制比如锁 可扩展性
我们构建分布式系统的初衷是为了追求可扩展性这里可扩展性指的是我用一台计算机解决了一些问题那么当我增加一台计算机后我只需一半时间就可以解决这些问题。也就是说我只需要通过增加计算机的数量系统性能和吞吐量就可以得到对应的提高而非通过重构系统这种高昂花费且复杂的做法。
无脑堆机器也未必能解决问题呦 ! 请看下面这个场景: 在上面的场景中系统一开始的瓶颈在Web服务器端产生但是随着我们沉迷于堆Web server的快乐中时系统的瓶颈已经悄咪咪转移到了DB端当我们尝试旧计重施的时候会发现DB的拆分扩容似乎没那么容易
因此传统的单体数据库已经没有办法满足我们的需求了我们需要一种能够通过堆机器实现扩展的分布式存储机制。 可用性(容错性)
大型分布式系统有一个很大的问题就是一些罕见的问题会被放大例如1000台计算机组成的集群中总是会有故障发生要么是机器故障要么是运行出错要么是运行缓慢要么是执行错误的任务要么是网络问题。在一个大型分布式系统中总是会有各种小问题出现所以大型系统会将一些几乎不可能发生的问题变成一个持续不断的问题。
所以因为错误总会发生必须要在设计时就考虑系统能够屏蔽错误或者说能够在出错时继续运行。同时因为我们需要为第三方应用开发人员提供方便的抽象接口我们的确也需要构建这样一种基础架构它能够尽可能多的对应用开发人员屏蔽和掩盖错误。这样应用开发人员就不需要处理各种各样的可能发生的错误。
对于容错有很多不同的概念可以表述。这些表述中有一个共同的思想就是可用性Availability。某些系统经过精心的设计这样在特定的错误类型下系统仍然能够正常运行仍然可以像没有出现错误一样为你提供完整的服务。
某些系统通过这种方式提供可用性。比如你构建了一个有两个拷贝的多副本系统其中一个故障了另一个还能运行。当然如果两个副本都故障了你的系统就不再有可用性。所以可用系统通常是指在特定的故障范围内系统仍然能够提供服务系统仍然是可用的。如果出现了更多的故障系统将不再可用。
除了可用性之外另一种容错特性是自我可恢复性recoverability。这里的意思是如果出现了问题服务会停止工作不再响应请求之后有人来修复并且在修复之后系统仍然可以正常运行就像没有出现过问题一样。这是一个比可用性更弱的需求因为在出现故障到故障组件被修复期间系统将会完全停止工作。但是修复之后系统又可以完全正确的重新运行所以可恢复性是一个重要的需求。
对于一个可恢复的系统通常需要做一些操作例如将最新的数据存放在磁盘中这样在供电恢复之后假设故障就是断电才能将这些数据取回来。甚至说对于一个具备可用性的系统为了让系统在实际中具备应用意义也需要具备可恢复性。因为可用的系统仅仅是在一定的故障范围内才可用如果故障太多可用系统也会停止工作停止一切响应。但是当足够的故障被修复之后系统还是需要能继续工作。所以一个好的可用的系统某种程度上应该也是可恢复的。当出现太多故障时系统会停止响应但是修复之后依然能正确运行。这是我们期望看到的。
为了实现这些特性有很多工具。其中最重要的有两个 一个是非易失存储non-volatile storage类似于硬盘。这样当出现类似电源故障甚至整个机房的电源都故障时我们可以使用非易失存储比如硬盘闪存SSD之类的。我们可以存放一些checkpoint或者系统状态的log在这些存储中这样当备用电源恢复或者某人修好了电力供给我们还是可以从硬盘中读出系统最新的状态并从那个状态继续运行。所以这里的一个工具是非易失存储。因为更新非易失存储是代价很高的操作所以相应的出现了很多非易失存储的管理工具。同时构建一个高性能容错的系统聪明的做法是避免频繁的写入非易失存储。在过去甚至对于今天的一个3GHZ的处理器写入一个非易失存储意味着移动磁盘臂并等待磁碟旋转这两个过程都非常缓慢。有了闪存会好很多但是为了获取好的性能仍然需要许多思考。 对于容错的另一个重要工具是复制replication不过管理复制的多副本系统会有些棘手。任何一个多副本系统中都会有一个关键的问题比如说我们有两台服务器它们本来应该是有着相同的系统状态现在的关键问题在于这两个副本总是会意外的偏离同步的状态而不再互为副本。对于任何一种使用复制实现容错的系统我们都面临这个问题。lab2和lab3都是通过管理多副本来实现容错的系统你将会看到这里究竟有多复杂。 一致性
我们通过一个例子来理解一致性假设我们在构建一个分布式存储系统并且这是一个KV服务。这个KV服务只支持两种操作
其中一个是put操作会将一个value存入一个key另一个是get操作会取出key对应的value
整体表现就像是一个大的key-value表单。当我需要对一个分布式系统举例时我总是会想到KV服务因为它们也很基础可以算是某种基础简单版本的存储系统。
现在如果你是程序员如果这两个操作有特定的意义或者说操作满足一致性那么对于你是有帮助的。你可以去查看手册手册会向你解释如果你调用get你会获取到什么如果你调用put会有什么效果。如果有这样的手册那是极好的。否则如果你不知道put/get的实际行为你又该如何写你的应用程序呢
一致性就是用来定义操作行为的概念。之所以一致性是分布式系统中一个有趣的话题是因为从性能和容错的角度来说我们通常会有多个副本。在一个非分布式系统中你通常只有一个服务器一个表单。虽然不是绝对但是通常来说对于put/get的行为不会有歧义。直观上来说put就是更新这个表单get就是从表单中获取当前表单中存储的数据。但是在一个分布式系统中由于复制或者缓存数据可能存在于多个副本当中于是就有了多个不同版本的key-value对。假设服务器有两个副本那么他们都有一个key-value表单两个表单中key 1对应的值都是20。 现在某个客户端发送了一个put请求并希望将key 1改成值21。这里或许是KV服务里面的一个计数器。这个put请求发送给了第一台服务器: 之后会发送给第二台服务器因为相同的put请求需要发送给两个副本这样这两个副本才能保持同步。但是就在客户端准备给第二台服务器发送相同请求时这个客户端故障了可能是电源故障或者操作系统的bug之类的。所以现在我们处于一个不好的状态我们发送了一个put请求更新了一个副本的值是21但是另一个副本的值仍然是20。 如果现在某人通过get读取key为1的值那么他可能获得21也可能获得20取决于get请求发送到了哪个服务器。即使规定了总是把请求先发送给第一个服务器那么我们在构建容错系统时如果第一台服务器故障了请求也会发给第二台服务器。所以不管怎么样总有一天你会面临暴露旧数据的风险。很可能是这样最开始许多get请求都得到了21之后过了一周突然一些get请求得到了一周之前的旧数据20。所以这里不是很一致。并且如果我们不小心的话这个场景是可能发生的。所以我们需要确定put/get操作的一些规则。
实际上对于一致性有很多不同的定义。有一些非常直观比如说get请求可以得到最近一次完成的put请求写入的值。这种一般也被称为强一致Strong Consistency。但是事实上构建一个弱一致的系统也是非常有用的。弱一致是指不保证get请求可以得到最近一次完成的put请求写入的值。尽管有很多细节的工作要处理强一致可以保证get得到的是put写入的最新的数据而很多的弱一致系统不会做出类似的保证。所以在一个弱一致系统中某人通过put请求写入了一个数据但是你通过get看到的可能仍然是一个旧数据而这个旧数据可能是很久之前写入的。
人们对于弱一致感兴趣的原因是虽然强一致可以确保get获取的是最新的数据但是实现这一点的代价非常高。几乎可以确定的是分布式系统的各个组件需要做大量的通信才能实现强一致性。如果你有多个副本那么不管get还是put都需要询问每一个副本。在之前的例子中客户端在更新的过程中故障了导致一个副本更新了而另一个副本没有更新。如果我们要实现强一致简单的方法就是同时读两个副本如果有多个副本就读取所有的副本并使用最近一次写入的数据。但是这样的代价很高因为需要大量的通信才能得到一个数据。所以为了尽可能的避免通信尤其当副本相隔的很远的时候人们会构建弱一致系统并允许读取出旧的数据。当然为了让弱一致更有实际意义人们还会定义更多的规则。
强一致带来的昂贵的通信问题会把你带入这样的困境当我们使用多副本来完成容错时我们的确需要每个副本都有独立的出错概率这样故障才不会关联。例如将两个副本放在一个机房的一个机架上是一个非常糟糕的主意。如果有谁踢到了机架的电源线那我们数据的两个副本都没了因为它们都连在同一个机架的同一根电线上。所以为了使副本的错误域尽可能独立为了获得良好的容错特性人们希望将不同的副本放置在尽可能远的位置例如在不同的城市或者在大陆的两端。这样如果地震摧毁了一个数据中心另一个数据中心中的副本有很大可能还能保留。我们期望这样的效果。但是如果我们这么做了另一个副本可能在数千英里之外按照光速来算也需要花费几毫秒到几十毫秒才能完成横跨洲际的数据通信而这只是为了更新数据的另一个副本。所以为了保持强一致的通信代价可能会非常高。因为每次你执行put或者get请求你都需要等待几十毫秒来与数据的两个副本通信以确保它们都被更新了或者都被检查了以获得最新的数据。现在的处理器每秒可以执行数十亿条指令等待几十毫秒会大大影响系统的处理速度。
所以人们常常会使用弱一致系统你只需要更新最近的数据副本并且只需要从最近的副本获取数据。在学术界和现实世界工业界有大量关于构建弱一致性保证的研究。所以弱一致对于应用程序来说很有用并且它可以用来获取高的性能。 MapReduce
MapReduce是由Google设计开发和使用的一个系统相关的论文在2004年发表。Google当时面临的问题是他们需要在TB级别的数据上进行大量的计算。比如说为所有的网页创建索引分析整个互联网的链接路径并得出最重要或者最权威的网页。如你所知在当时整个互联网的数据也有数十TB。构建索引基本上等同于对整个数据做排序而排序比较费时。如果用一台计算机对整个互联网数据进行排序要花费多长时间呢可能要几周几个月甚至几年。所以当时Google非常希望能将对大量数据的大量运算并行跑在几千台计算机上这样才能快速完成计算。对Google来说购买大量的计算机是没问题的这样Google的工程师就不用花大量时间来看报纸来等他们的大型计算任务完成。所以有段时间Google买了大量的计算机并让它的聪明的工程师在这些计算机上编写分布式软件这样工程师们可以将手头的问题分包到大量计算机上去完成管理这些运算并将数据取回。
如果你只雇佣熟练的分布式系统专家作为工程师尽管可能会有些浪费也是可以的。但是Google想雇用的是各方面有特长的人不一定是想把所有时间都花在编写分布式软件上的工程师。所以Google需要一种框架可以让它的工程师能够进行任意的数据分析例如排序网络索引器链接分析器以及任何的运算。工程师只需要实现应用程序的核心就能将应用程序运行在数千台计算机上而不用考虑如何将运算工作分发到数千台计算机如何组织这些计算机如何移动数据如何处理故障等等这些细节。所以当时Google需要一种框架使得普通工程师也可以很容易的完成并运行大规模的分布式运算。这就是MapReduce出现的背景。
MapReduce的思想是应用程序设计人员和分布式运算的使用者只需要写简单的Map函数和Reduce函数而不需要知道任何有关分布式的事情MapReduce框架会处理剩下的事情。
抽象来看MapReduce假设有一些输入这些输入被分割成大量的不同的文件或者数据块。所以我们假设现在有输入文件1输入文件2和输入文件3这些输入可能是从网上抓取的网页更可能是包含了大量网页的文件。
MapReduce启动时会查找Map函数。之后MapReduce框架会为每个输入文件运行Map函数。这里很明显有一些可以并行运算的地方比如说可以并行运行多个只关注输入和输出的Map函数。 Map函数以文件作为输入文件又是整个输入数据的一部分。Map函数的输出是一个key-value对的列表。假设我们在实现一个最简单的MapReduce Job单词计数器。它会统计每个单词出现的次数。在这个例子中Map函数会输出key-value对其中key是单词而value是1。Map函数会将输入中的每个单词拆分并输出一个key-value对key是该单词value是1。最后需要对所有的key-value进行计数以获得最终的输出。所以假设输入文件1包含了单词a和单词bMap函数的输出将会是keyavalue1和keybvalue1。第二个Map函数只从输入文件2看到了b那么输出将会是keybvalue1。第三个输入文件有一个a和一个c。 我们对所有的输入文件都运行了Map函数并得到了论文中称之为中间输出intermediate output也就是每个Map函数输出的key-value对。
运算的第二阶段是运行Reduce函数。MapReduce框架会收集所有Map函数输出的每一个单词的统计。比如说MapReduce框架会先收集每一个Map函数输出的key为a的key-value对。收集了之后会将它们提交给Reduce函数。 之后会收集所有的b。这里的收集是真正意义上的收集因为b是由不同计算机上的不同Map函数生成所以不仅仅是数据从一台计算机移动到另一台如果Map只在一台计算机的一个实例里可以直接通过一个RPC将数据从Map移到Reduce。我们收集所有的b并将它们提交给另一个Reduce函数。这个Reduce函数的入参是所有的key为b的key-value对。对c也是一样。所以MapReduce框架会为所有Map函数输出的每一个key调用一次Reduce函数。 在我们这个简单的单词计数器的例子中Reduce函数只需要统计传入参数的长度甚至都不用查看传入参数的具体内容因为每一个传入参数代表对单词加1而我们只需要统计个数。最后每个Reduce都输出与其关联的单词和这个单词的数量。所以第一个Reduce输出a2第二个Reduce输出b2第三个Reduce输出c1。 这就是一个典型的MapReduce Job。从整体来看为了保证完整性有一些术语要介绍一下
Job。整个MapReduce计算称为Job。Task。每一次MapReduce调用称为Task。
所以对于一个完整的MapReduce Job它由一些Map Task和一些Reduce Task组成。所以这是一个单词计数器的例子它解释了MapReduce的基本工作方式。 Map函数和Reduce函数
Map函数使用一个key和一个value作为参数。我们这里说的函数是由普通编程语言编写例如CJava等所以这里的函数任何人都可以写出来。入参中key是输入文件的名字通常会被忽略因为我们不太关心文件名是什么value是输入文件的内容。所以对于一个单词计数器来说value包含了要统计的文本我们会将这个文本拆分成单词。之后对于每一个单词我们都会调用emit。emit由MapReduce框架提供并且这里的emit属于Map函数。emit会接收两个参数其中一个是key另一个是value。在单词计数器的例子中emit入参的key是单词value是字符串“1”。这就是一个Map函数。在一个单词计数器的MapReduce Job中Map函数实际就可以这么简单。而这个Map函数不需要知道任何分布式相关的信息不需要知道有多台计算机不需要知道实际会通过网络来移动数据。这里非常直观。
def map_function(key, value):words split_text_into_words(value)for word in words:emit(word, 1) # 将每个单词作为key固定的值1作为value生成键值对Reduce函数的入参是某个特定key的所有实例Map输出中的key-value对中出现了一次特定的key就可以算作一个实例。所以Reduce函数也是使用一个key和一个value作为参数其中value是一个数组里面每一个元素是Map函数输出的key的一个实例的value。对于单词计数器来说key就是单词value就是由字符串“1”组成的数组所以我们不需要关心value的内容是什么我们只需要关心value数组的长度。Reduce函数也有一个属于自己的emit函数。这里的emit函数只会接受一个参数value这个value会作为Reduce函数入参的key的最终输出。所以对于单词计数器我们会给emit传入数组的长度。这就是一个最简单的Reduce函数。并且Reduce也不需要知道任何有关容错或者其他有关分布式相关的信息。
def reduce_function(key, values):count sum(values) # 对数组中的值1进行累加emit(key, count) # 输出单词及其出现的总次数疑问 可以将Reduce函数的输出再传递给Map函数吗 在现实中这是很常见的。MapReduce用户定义了一个MapReduce Job接收一些输入生成一些输出。之后可能会有第二个MapReduce Job来消费前一个Job的输出。对于一些非常复杂的多阶段分析或者迭代算法比如说Google用来评价网页的重要性和影响力的PageRank算法这些算法是逐渐向答案收敛的。我认为Google最初就是这么使用MapReduce的他们运行MapReduce Job多次每一次的输出都是一个网页的列表其中包含了网页的价值权重或者重要性。所以将MapReduce的输出作为另一个MapReduce Job的输入这很正常。 如果可以将Reduce的输出作为Map的输入在生成Reduce函数的输出时需要有什么注意吗 是的你需要设置一些内容。比如你需要这么写Reduce函数使其在某种程度上知道应该按照下一个MapReduce Job需要的格式生成数据。这里实际上带出了一些MapReduce框架的缺点。如果你的算法可以很简单的由Map函数、Map函数的中间输出以及Reduce函数来表达那是极好的。MapReduce对于能够套用这种形式的算法是极好的。并且Map函数必须是完全独立的它们是一些只关心入参的函数。这里就有一些限制了。事实上很多人想要的更长的运算流程这涉及到不同的处理。使用MapReduce的话你不得不将多个MapReduce Job拼装在一起。而在本课程后面会介绍的一些更高级的系统中会让你指定完整的计算流程然后这些系统会做优化。这些系统会发现所有你想完成的工作然后有效的组织更复杂的计算。 MapReduce框架更重要还是Map/Reduce函数更重要 从程序员的角度来看只需要关心Map函数和Reduce函数。从我们的角度来看我们需要关心的是worker进程和worker服务器。这些是MapReduce框架的一部分它们与其它很多组件一起调用了Map函数和Reduce函数。所以是的从我们的角度来看我们更关心框架是如何组成的。从程序员的角度来看所有的分布式的内容都被剥离了。 当你调用emit时数据会发生什么变化emit函数在哪运行 首先看这些函数在哪运行。如MapReduce论文的图1所示: 现实中MapReduce运行在大量的服务器之上我们称之为worker服务器或者worker。同时也会有一个Master节点来组织整个计算过程。这里实际发生的是Master服务器知道有多少输入文件例如5000个输入文件之后它将Map函数分发到不同的worker。所以它会向worker服务器发送一条消息说请对这个输入文件执行Map函数吧。之后MapReduce框架中的worker进程会读取文件的内容调用Map函数并将文件名和文件内容作为参数传给Map函数。worker进程还需要实现emit这样每次Map函数调用emitworker进程就会将数据写入到本地磁盘的文件中。所以Map函数中调用emit的效果是在worker的本地磁盘上创建文件这些文件包含了当前worker的Map函数生成的所有的key和value。 所以Map阶段结束时我们看到的就是Map函数在worker上生成的一些文件。之后MapReduce的worker会将这些数据移动到Reduce所需要的位置。对于一个典型的大型运算Reduce的入参包含了所有Map函数对于特定key的输出。通常来说每个Map函数都可能生成大量key。所以通常来说在运行Reduce函数之前。运行在MapReduce的worker服务器上的进程需要与集群中每一个其他服务器交互来询问说看我需要对keya运行Reduce请看一下你本地磁盘中存储的Map函数的中间输出找出所有keya并通过网络将它们发给我。所以Reduce worker需要从每一个worker获取特定key的实例。这是通过由Master通知到Reduce worker的一条指令来触发。一旦worker收集完所有的数据它会调用Reduce函数Reduce函数运算完了会调用自己的emit这个emit与Map函数中的emit不一样它会将输出写入到一个Google使用的共享文件服务中。 有关输入和输出文件的存放位置这是我之前没有提到的它们都存放在文件中但是因为我们想要灵活的在任意的worker上读取任意的数据这意味着我们需要某种网络文件系统network file system来存放输入数据。所以实际上MapReduce论文谈到了GFSGoogle File System。GFS是一个共享文件服务并且它也运行在MapReduce的worker集群的物理服务器上。GFS会自动拆分你存储的任何大文件并且以64MB的块存储在多个服务器之上。所以如果你有了10TB的网页数据你只需要将它们写入到GFS甚至你写入的时候是作为一个大文件写入的GFS会自动将这个大文件拆分成64MB的块并将这些块平均的分布在所有的GFS服务器之上而这是极好的这正是我们所需要的。如果我们接下来想要对刚刚那10TB的网页数据运行MapReduce Job数据已经均匀的分割存储在所有的服务器上了。如果我们有1000台服务器我们会启动1000个Map worker每个Map worker会读取1/1000输入数据。这些Map worker可以并行的从1000个GFS文件服务器读取数据并获取巨大的读取吞吐量也就是1000台服务器能提供的吞吐量。 这里的箭头代表什么意思 随着Google这些年对MapReduce系统的改进答案也略有不同。通常情况下如果我们在一个例如GFS的文件系统中存储大的文件你的数据分散在大量服务器之上你需要通过网络与这些服务器通信以获取你的数据。在这种情况下这个箭头表示MapReduce的worker需要通过网络与存储了输入文件的GFS服务器通信并通过网络将数据读取到MapReduce的worker节点进而将数据传递给Map函数。这是最常见的情况。并且这是MapReduce论文中介绍的工作方式。但是如果你这么做了这里就有很多网络通信。 如果数据总共是10TB那么相应的就需要在数据中心网络上移动10TB的数据。而数据中心网络通常是GB级别的带宽所以移动10TB的数据需要大量的时间。在论文发表的2004年MapReduce系统最大的限制瓶颈是网络吞吐。如果你读到了论文的评估部分你会发现当时运行在一个有数千台机器的网络上每台计算机都接入到一个机架机架上有以太网交换机机架之间通过root交换机连接最上面那个交换机。 如果随机的选择MapReduce的worker服务器和GFS服务器那么至少有一半的机会它们之间的通信需要经过root交换机而这个root交换机的吞吐量总是固定的。如果做一个除法root交换机的总吞吐除以2000那么每台机器只能分到50Mb/S的网络容量。这个网络容量相比磁盘或者CPU的速度来说要小得多。所以50Mb/S是一个巨大的限制。在MapReduce论文中讨论了大量的避免使用网络的技巧。其中一个是将GFS和MapReduce混合运行在一组服务器上。所以如果有1000台服务器那么GFS和MapReduce都运行在那1000台服务器之上。当MapReduce的Master节点拆分Map任务并分包到不同的worker服务器上时Master节点会找出输入文件具体存在哪台GFS服务器上并把对应于那个输入文件的Map Task调度到同一台服务器上。所以默认情况下这里的箭头是指读取本地文件而不会涉及网络。虽然由于故障负载或者其他原因不能总是让Map函数都读取本地文件但是几乎所有的Map函数都会运行在存储了数据的相同机器上并因此节省了大量的时间否则通过网络来读取输入数据将会耗费大量的时间。我之前提过Map函数会将输出存储到机器的本地磁盘所以存储Map函数的输出不需要网络通信至少不需要实时的网络通信。但是我们可以确定的是为了收集所有特定key的输出并将它们传递给某个机器的Reduce函数还是需要网络通信。假设现在我们想要读取所有的相关数据并通过网络将这些数据传递给单台机器数据最开始在运行Map Task的机器上按照行存储例如第一行代表第一个Map函数输出a1b1 论文里称这种数据转换之为洗牌shuffle。所以这里确实需要将每一份数据都通过网络从创建它的Map节点传输到需要它的Reduce节点。所以这也是MapReduce中代价较大的一部分。 是否可以通过Streaming的方式加速Reduce的读取 你是对的。你可以设想一个不同的定义其中Reduce通过streaming方式读取数据。我没有仔细想过这个方法我也不知道这是否可行。作为一个程序接口MapReduce的第一目标就是让人们能够简单的编程人们不需要知道MapReduce里面发生了什么。对于一个streaming方式的Reduce函数或许就没有之前的定义那么简单了。不过或许可以这么做。实际上很多现代的系统中会按照streaming的方式处理数据而不是像MapReduce那样通过批量的方式处理Reduce函数。在MapReduce中需要一直要等到所有的数据都获取到了才会进行Reduce处理所以这是一种批量处理。现代系统通常会使用streaming并且效率会高一些。
所以这里的shuffle的重点是这里实际上可能会有大量的网络通信。假设你在进行排序排序的输入输出会有相同的大小。这样如果你的输入是10TB为了能排序你需要将10TB的数据在网络上移动并且输出也会是10TB所以这里有大量的数据。这可能发生在任何MapReduce job中尽管有一些MapReduce job在不同阶段的数据没有那么大。
之前有人提过想将Reduce的输出传给另一个MapReduce job而这也是人们常做的事情。在一些场景中Reduce的输出可能会非常巨大比如排序比如网页索引器。10TB的输入对应的是10TB的输出。所以Reduce的输出也会存储在GFS上。但是Reduce只会生成key-value对MapReduce框架会收集这些数据并将它们写入到GFS的大文件中。所以这里有需要一大轮的网络通信将每个Reduce的输出传输到相应的GFS服务器上。你或许会认为这里会使用相同的技巧就将Reduce的输出存储在运行了Reduce Task的同一个GFS服务器上因为是混部的。或许Google这么做了但是因为GFS会将数据做拆分并且为了提高性能并保留容错性数据会有2-3份副本。这意味着不论你写什么你总是需要通过网络将一份数据拷贝写到2-3台服务器上。所以这里会有大量的网络通信。这里的网络通信是2004年限制MapReduce的瓶颈。在2020年因为之前的网络架构成为了人们想在数据中心中做的很多事情的限制因素现代数据中心中root交换机比过去快了很多。并且你或许已经见过一个典型的现代数据中心网络会有很多的root交换机而不是一个交换机spine-leaf架构。每个机架交换机都与每个root交换机相连网络流量在多个root交换机之间做负载分担。所以现代数据中心网络的吞吐大多了。 我认为Google几年前就不再使用MapReduce了不过在那之前现代的MapReduce已经不再尝试在GFS数据存储的服务器上运行Map函数了它乐意从任何地方加载数据因为网络已经足够快了。