您好,登录后才能下订单哦!
这篇文章主要讲解了“MapReduce编程模型是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MapReduce编程模型是什么”吧!
MapReduce:大型集群上的简单数据处理
摘要
MapReduce是一个设计模型,也是一个处理和产生海量数据的一个相关实现。用户指定一个用于处理一个键值(key-value)对生成一组key/value对形式的中间结果的map函数,以及一个将中间结果键相同的键值对合并到一起的reduce函数。许多现实世界的任务都能满足这个模型,如这篇文章所示。
使用这个功能形式实现的程序能够在大量的普通机器上并行执行。这个运行程序的系统关心下面的这些细节:输入数据的分区、一组机器上调度程序执行、处理机器失败问题,以及管理所需的机器内部的通信。这使没有任何并行处理和分布式系统经验的程序员能够利用这个大型分布式系统的资源。
我们的MapReduce实现运行在一个由普通机器组成的大规模集群上,具有很高的可扩展性:一个典型的MapReduce计算会在几千台机器上处理许多TB的数据。程序员们发现这个系统很容易使用:目前已经实现了几百个MapReduce程序,在Google的集群上,每天有超过一千个的MapReduce工作在运行。
一、 介绍
在过去的5年中,本文作者和许多Google的程序员已经实现了数百个特定用途的计算程序,处理了海量的原始数据,包括抓取到的文档、网页请求日志等,计算各种衍生出来的数据,如反向索引、网页文档的图形结构的各种表示、每个host下抓取到的页面数量的总计、一个给定日期内的最频繁查询的集合等。大多数这种计算概念明确。然而,输入数据通常都很大,并且计算必须分布到数百或数千台机器上以确保在一个合理的时间内完成。如何并行计算、分布数据、处理错误等问题使这个起初很简单的计算,由于增加了处理这些问题的很多代码而变得十分复杂。
为了解决这个复杂问题,我们设计了一个新的抽象模型,它允许我们将想要执行的计算简单的表示出来,而隐藏其中并行计算、容错、数据分布和负载均衡等很麻烦的细节。我们的抽象概念是受最早出现在lisp和其它结构性语言中的map和reduce启发的。我们认识到,大多数的计算包含对每个在输入数据中的逻辑记录执行一个map操作以获取一组中间key/value对,然后对含有相同key的所有中间值执行一个reduce操作,以此适当的合并之前的衍生数据。由用户指定map和reduce操作的功能模型允许我们能够简单的进行并行海量计算,并使用re-execution作为主要的容错机制。
这项工作的最大贡献是提供了一个简单的、强大的接口,使我们能够自动的进行并行和分布式的大规模计算,通过在由普通PC组成的大规模集群上实现高性能的接口来进行合并。
第二章描述了基本的编程模型,并给出了几个例子。第三章描述了一个为我们的聚类计算环境定制的MapReduce接口实现。第四章描述了我们发现对程序模型很有用的几个优化。第六章探索了MapReduce在Google内部的使用,包括我们在将它作为生产索引系统重写的基础的一些经验。第七章讨论了相关的和未来的工作。
二、 编程模型
这个计算输入一个key/value对集合,产生一组输出key/value对。MapReduce库的用户通过两个函数来标识这个计算:Map和Reduce。
Map,由用户编写,接收一个输入对,产生一组中间key/value对。MapReduce库将具有相同中间key I的聚合到一起,然后将它们发送给Reduce函数。
Reduce,也是由用户编写的,接收中间key I和这个key的值的集合,将这些值合并起来,形成一个尽可能小的集合。通常,每个Reduce调用只产生0或1个输出值。这些中间值经过一个迭代器(iterator)提供给用户的reduce函数。这允许我们可以处理由于数据量过大而无法载入内存的值的链表。
2.1 例子
考虑一个海量文件集中的每个单词出现次数的问题,用户会写出类似于下面的伪码:
Map函数对每个单词增加一个相应的出现次数(在这个例子中仅仅为“1”)。Reduce函数将一个指定单词所有的计数加到一起。
此外,用户使用输入和输出文件的名字、可选的调节参数编写代码,来填充一个mapreduce规格对象,然后调用MapReduce函数,并把这个对象传给它。用户的代码与MapReduce库(C++实现)连接到一起。。附录A包含了这个例子的整个程序。
2.2 类型
尽管之前的伪代码中使用了字符串格式的输入和输出,但是在概念上,用户定义的map和reduce函数需要相关联的类型:
map (k1, v1) --> list(k2, v2)
reduce (k2, list(v2)) --> list(v2)
也就是说,输入的键和值和输出的键和值来自不同的域。此外,中间结果的键和值与输出的键和值有相同的域。
MapReduce的C++实现与用户定义的函数使用字符串类型进行参数传递,将类型转换的工作留给用户的代码来处理。
2.3 更多的例子
这里有几个简单有趣的程序,能够使用MapReduce计算简单的表示出来。
分布式字符串查找(Distributed Grep):map函数将匹配一个模式的行找出来。Reduce函数是一个恒等函数,只是将中间值拷贝到输出上。
URL访问频率计数(Count of URL Access Frequency):map函数处理web页面请求的日志,并输出<URL, 1>。Reduce函数将相同URL的值累加到一起,生成一个<URL, total count>对。
翻转网页连接图(Reverse Web-Link Graph):map函数为在一个名为source的页面中指向目标(target)URL的每个链接输出<target, source>对。Reduce函数将一个给定目标URL相关的所有源(source)URLs连接成一个链表,并生成对:<target, list(source)>。
主机关键向量指标(Term-Vector per Host):一个检索词向量将出现在一个文档或是一组文档中最重要的单词概述为一个<word, frequency>对链表。Map函数为每个输入文档产生一个<hostname, term vector>(hostname来自文档中的URL)。Reduce函数接收一个给定hostname的所有文档检索词向量,它将这些向量累加到一起,将罕见的向量丢掉,然后生成一个最终的<hostname, term vector>对。
倒排索引(Inverted Index):map函数解析每个文档,并生成一个<word, document ID>序列。Reduce函数接收一个给定单词的所有键值对,所有的输出对形成一个简单的倒排索引。可以通过对计算的修改来保持对单词位置的追踪。
分布式排序(Distributed Sort):map函数将每个记录的key抽取出来,并生成一个<key, record>对。Reduce函数不会改变任何的键值对。这个计算依赖了在4.1节提到的分区功能和4.2节提到的排序属性。
三、 实现
MapReduce接口有很多不同的实现,需要根据环境来做出合适的选择。比如,一个实现可能适用于一个小的共享内存机器,而另一个实现则适合一个大的NUMA多处理器机器,再另一个可能适合一个更大的网络机器集合。
这一章主要描述了针对在Google内部广泛使用的计算环境的一个实现:通过交换以太网将大量的普通PC连接到一起的集群。在我们的环境中:
(1) 机器通常是双核x86处理器、运行Linux操作系统、有2-4G的内存。
(2) 使用普通的网络硬件—通常是100Mb/s或者是1Gb/s的机器带宽,但是平均值远小于带宽的一半。
(3) 由数百台或者数千台机器组成的集群,因此机器故障是很平常的事
(4) 存储是由直接装在不同机器上的便宜的IDE磁盘提供。一个内部的分布式文件系统用来管理存储这些磁盘上的数据。文件系统在不可靠的硬件上使用副本机制提供了可用性和可靠性。
(5) 用户将工作提交给一个调度系统,每个工作由一个任务集组成,通过调度者映射到集群中可用机器的集合上。
3.1 执行概述
通过自动的将输入数据分区成M个分片,Map调用被分配到多台机器上运行。数据的分片能够在不同的机器上并行处理。使用分区函数(如,hash(key) mod R)将中间结果的key进行分区成R个分片,Reduce调用也被分配到多台机器上运行。分区的数量(R)和分区函数是由用户指定的。
独立的工作机器的计数器值周期性的传送到master(附在ping的响应上)master将从成功的map和reduce任务上获取的计数器值进行汇总,当MapReduce操作完成时,将它们返回给用户的代码。当前的计数器值也被显示在了master的状态页面上,使人们能够看到当前计算的进度。当汇总计数器值时,master通过去掉同一个map或reduce任务的多次执行所造成的影响来防止重复计数。(重复执行可能会在我们使用备用任务和重新执行失败的任务时出现。)
一些计数器的值是由MapReduce库自动维护的,如已处理的输入key/value对的数量和已生成的输出key/value对的数量。
用户发现计数器对检查MapReduce操作的行为很有用处。例如,在一些MapReduce操作中,用户代码可能想要确保生成的输出对的数量是否精确的等于已处理的输入对的数量,或者已处理的德国的文档数量在已处理的所有文档数量中是否被容忍。
五、 性能
在这章中,我们测试两个运行在一个大规模集群上的MapReduce计算的性能。一个计算在大约1TB的数据中进行特定的模式匹配,另一个计算对大约1TB的数据进行排序。
这两个程序能够代表实际中大量的由用户编写的MapReduce程序,一类程序将数据从一种表示方式转换成另一种形式;另一类程序是从海里的数据集中抽取一小部分感兴趣的数据。
5.1 集群配置
所有的程序运行在一个由将近1800台机器组成的集群上。每个机器有两个2GHz、支持超线程的Intel Xeon处理器、4GB的内存、两个160GB的IDE磁盘和一个1Gbps的以太网链路,这些机器部署在一个两层的树状交换网络中,在根节点处有大约100-200Gbps的带宽。所有的机器都采用相同的部署,因此任意两个机器间的RTT都小于1ms。
在4GB内存里,有接近1-1.5GB用于运行在集群上的其它任务。程序在一个周末的下午开始执行,这时主机的CPU、磁盘和网络基本都是空闲的。
5.2 字符串查找(Grep)
这个grep程序扫描了大概1010个100字节大小的记录,查找出现概率相对较小的3个字符的模式(这个模式出现在92337个记录中)。输入被分割成接近64MB的片(M=15000),整个输出被放到一个文件中(R=1)。
图3:对于排序程序的不同执行过程随时间的数据传输速率
图3(a)显示了排序程序的正常执行过程。左上方的图显示了输入读取的速率,这个速率峰值大约为13GB/s,因为所有的map任务执行完成,速率也在200秒前下降到了0。注意,这里的输入速率比字符串查找的要小,这是因为排序程序的map任务花费了大约一半的处理时间和I/O带宽将终结结果输出到它们的本地磁盘上,字符串查找相应的中间结果输出几乎可以忽略。
左边中间的图显示了数据通过网络从map任务发往reduce任务的速率。这个缓慢的数据移动在第一个map任务完成时会尽快开始。图中的第一个峰值是启动了第一批大概1700个reduce任务(整个MapReduce被分配到大约1700台机器上,每个机器每次最多只执行一个reduce任务)。这个计算执行大概300秒后,第一批reduce任务中的一些执行完成,我们开始执行剩下的reduce任务进行数据处理。所有的处理在计算开始后的大约600秒后完成。
左边下方的图显示了reduce任务就爱那个排序后的数据写到最终的输出文件的速率。在第一个处理周期完成到写入周期开始间有一个延迟,因为机器正在忙于对中间数据进行排序。写入的速率会在2-4GB/s上持续一段时间。所有的写操作会在计算开始后的大约850秒后完成。包括启动的开销,整个计算耗时891秒,这与TeraSort benchmark中的最好记录1057秒相似。
一些事情需要注意:因为我们的位置优化策略,大多数数据从本地磁盘中读取,绕开了网络带宽的显示,所以输入速率比处理速率和输出速率要高。处理速率要高于输出速率,因为输出过程要将排序后的数据写入到两个拷贝中(为了可靠性和可用性,我们将数据写入到两个副本中)。我们将数据写入两个副本,因为我们的底层文件系统为了可靠性和可用性提供了相应的机制。如果底层文件系统使用容错编码(erasure coding)而不是复制,写数据的网络带宽需求会降低。
5.4 备用任务的作用
在图3(b)中,我们显示了一个禁用备用任务的排序程序的执行过程。执行的流程与如3(a)中所显示的相似,除了有一个很长的尾巴,在这期间几乎没有写入行为发生。在960秒后,除了5个reduce任务的所有任务都执行完成。然而,这些落后者只到300秒后才执行完成。整个计算任务耗时1283秒,增加了大约44%的时间。
5.5 机器故障
在图3(c)中,我们显示了一个排序程序的执行过程,在计算过程开始都的几分钟后,我们故意kill掉了1746个工作进程中的200个。底层的调度者会迅速在这些机器上重启新的工作进程(因为只有进程被杀掉,机器本身运行正常)。
工作进程死掉会出现负的输入速率,因为一些之前已经完成的map工作消失了(因为香港的map工作进程被kill掉了),并且需要重新执行。这个map任务会相当快的重新执行。整个计算过程在933秒后完成,包括了启动开销(仅仅比普通情况多花费了5%的时间)。
六、 经验
我们在2003年2月完成了MapReduce库的第一个版本,并在2003年8月做了重大的改进,包括位置优化、任务在工作机器上的动态负载均衡执行等。从那时起,我们惊喜的发现,MapReduce库能够广泛的用于我们工作中的各种问题。它已经被用于Google内部广泛的领域,包括:
大规模机器学习问题
Google新闻和Froogle产品的集群问题
抽取数据用于公众查询的产品报告
从大量新应用和新产品的网页中抽取特性(如,从大量的位置查询页面中抽取地理位置信息)
大规模图形计算
表1: 2004年8月运行的MapReduce任务
在每个工作的最后,MapReduce库统计了工作使用的计算资源。在表1中,我们看到一些2004年8月在Google内部运行的MapReduce工作的一些统计数据。
6.1 大规模索引
目前为止,MapReduce最重要的应用之一就是完成了对生产索引系统的重写,它生成了用于Google网页搜索服务的数据结构。索引系统的输入数据是通过我们的爬取系统检索到的海量文档,存储为就一个GFS文件集合。这些文件的原始内容还有超过20TB的数据。索引程序是一个包含了5-10个MapReduce操作的序列。使用MapReduce(代替了之前版本的索引系统中的adhoc分布式处理)有几个优点:
索引程序代码是一个简单、短小、易于理解的代码,因为容错、分布式和并行处理都隐藏在了MapReduce库中。比如,一个计算程序的大小由接近3800行的C++代码减少到使用MapReduce的大约700行的代码。
MapReduce库性能非常好,以至于能够将概念上不相关的计算分开,来代替将这些计算混合在一起进行,避免额外的数据处理。这会使索引程序易于改变。比如,对之前的索引系统做一个改动大概需要几个月时间,而对新的系统则只需要几天时间。
索引程序变得更易于操作,因为大多数由于机器故障、机器处理速度慢和网络的瞬间阻塞等引起的问题都被MapReduce库自动的处理掉,而无需人为的介入。
七、 相关工作
许多系统都提供了有限的程序模型,并且对自动的并行计算使用了限制。比如,一个结合函数可以在logN时间内在N个处理器上对一个包含N个元素的数组使用并行前缀计算,来获取所有的前缀[6,9,13]。MapReduce被认为是这些模型中基于我们对大规模工作计算的经验的简化和精华。更为重要的是,我们提供了一个在数千个处理器上的容错实现。相反的,大多数并行处理系统只在较小规模下实现,并将机器故障的处理细节交给了程序开发者。
Bulk Synchronous Programming和一些MPI源于提供了更高层次的抽象使它更易于让开发者编写并行程序。这些系统和MapReduce的一个关键不同点是MapReduce开发了一个有限的程序模型来自动的并行执行用户的程序,并提供了透明的容错机制。
我们的位置优化机制的灵感来自于移动磁盘技术,计算用于处理靠近本地磁盘的数据,减少数据在I/O子系统或网络上传输的次数。我们的系统运行在挂载几个磁盘的普通机器上,而不是在磁盘处理器上运行,但是一般方法是类似的。
我们的备用任务机制与Charlotte系统中采用的eager调度机制类似。简单的Eager调度机制有一个缺点,如果一个给定的任务造成反复的失败,整个计算将以失败告终。我们通过跳过损坏计算路的机制,解决了这个问题的一些情况。
MapReduce实现依赖了内部集群管理系统,它负责在一个大规模的共享机器集合中分发和运行用户的任务。尽管不是本篇文章的焦点,但是集群管理系统在本质上与像Condor的其它系统类似。
排序功能是MapReduce库的一部分,与NOW-Sort中的操作类似。源机器(map工作进程)将将要排序的数据分区,并将其发送给R个Reduce工作进程中的一个。每个reduce工作进程在本地对这些数据进行排序(如果可能的话就在内存中进行)。当然NOW-Sort没有使MapReduce库能够广泛使用的用户定义的Map和Reduce函数。
River提供了一个编程模型,处理进程通过在分布式队列上发送数据来进行通信。像MapReduce一样,即使在不均匀的硬件或系统颠簸的情况下,River系统依然试图提供较好的平均性能。River系统通过小心的磁盘和网络传输调度来平衡完成时间。通过限制编程模型,MapReduce框架能够将问题分解成很多细颗粒的任务,这些任务在可用的工作进程上动态的调度,以至于越快的工作进程处理越多的任务。这个受限制的编程模型也允许我们在工作将要结束时调度冗余的任务进行处理,这样可以减少不均匀情况下的完成时间。
BAD-FS与MapReduce有完全不同的编程模型,不像MapReduce,它是用于在广域网下执行工作的。然而,它们有两个基本相似点。(1)两个系统都使用了重新执行的方式来处理因故障而丢失的数据。(2)两个系统都本地有限调度原则来减少网络链路上发送数据的次数。
TASCC是一个用于简化结构的高可用性的网络服务。像MapReduce一样,它依靠重新执行作为一个容错机制。
感谢各位的阅读,以上就是“MapReduce编程模型是什么”的内容了,经过本文的学习后,相信大家对MapReduce编程模型是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。