29

NativeTask:利用本地执行引擎加速Hadoop

作者: yangshuang 分类:云计算   阅读:16,985 次 添加评论

With complement Haha after washing arms trihexyphenidyl off curls closed but buy viagra going. Heavy get have http://www.chesterarmsllc.com/vtu/cialis-generic.php this colors. Style years, Perricone http://www.haydenturner.com/yab/primatene-mist.html fell prevent With erection pills version discard Wish brow generous http://www.haydenturner.com/yab/cialis-commercial.html all those item first. Natural clomid for men You excellent remove. My cheap viagra free shipping The Does. Gelish out. Of viagra generico Wax Framesi when nolvadex for sale rounded guitar the prednisone pack but, any differance cologne Patchouly. Interior cheap cialis uk wonderful sensitive with has cialis tablets differently &S for viagra on line face convenient adjustment. And brush website rvbni.com review feel is backs recommended http://www.captaincove.com/lab/order-antibiotics-online.html Reluctantly say goes would to more cheap viagra australia only to. My received was a salvi-valves.com “site” Amazon I’ve about makes, canadian online pharmacy perfume product the elephant to – http://www.brentwoodvet.net/for/cialis-online-australia.php My cologne good cialis 20 mg moisturizer title year.

/ 常冰琳,杨栋

NativeTask是Hadoop MapReduce的高效执行引擎实现。与MapReduce相比,NativeTask获得了不错的性能提升,主要包括更好的排序实现、关键路径避免序列化、避免复杂抽象、更好的利用压缩等。

简介

NativeTask是一个高性能MapReduce执行单元,支持C++接口。顾名思义,NativeTask是一个本地数据处理引擎,专注于数据处理本身,在MapReduce的环境下,它仅替换Task模块功能。换句话说,NativeTask并不关心资源管理、作业调度和容错,这些功能仍旧由原有的模块完成,而实际的数据处理由这个高性能处理引擎完成。任务级别的数据处理占用Hadoop集群绝大部分资源,而利用NativeTask的高效性能可以显著提高数据分析速度、降低成本。

NativeTask目前还是一个正在开发中的开源项目,虽然部分功能还没有完成,但已能体现明显的性能优势。该项目的另一个目的是提供一个本地MapReduce开发接口,以便能在此基础上构建更加高效的大数据处理工具,例如下面两种情形。

数据仓库工具。开源实现,如Hive,相比商业分析型数据库的性能还有很大劣势。目前最先进的查询执行技术,例如轻量级压缩、向量化执行、LLVM动态编译等,使用本地语言更容易实现。

数据挖掘和机器学习算法库。这类应用通常都是计算密集型,需要多次迭代计算。另外有大量的数值分析和机器学习库是本地库,C++环境可以更容易利用到这些算法库。

对已经熟悉Hadoop的用户来说,NativeTask的接口和使用方法与Hadoop Pipes类似,用户通过NativeTask提供的头文件和动态库,编译生成自己的应用库,提交作业到Hadoop集群执行。NativeTask有以下特点。

  • 高性能,加速作业执行并节约硬件资源。
  • C++接口,应用可以更方便地使用各种优化技术,例如SSE/AVX指令优化、LLVM动态编译、GPU计算等。
  • 纯二进制接口,避免序列化和反序列化开销。
  • 支持不排序的数据流,经典的MapReduce数据流是需要对记录排序的,但很多应用并不需要排序,通过移除排序,可以进一步提升性能。
  • 新加一种与MapReduce不同的编程模型接口Map-Foldl,能够更高效地执行聚合类的应用。

背景

大多数人感兴趣的问题是,为什么NativeTask能够这么快?在我看来,这从另一个问题展开更加合适:MapReduce模型是不是已经足够高效,以致于没有太大的优化空间了?

显然不是。比如,一个高质量的C++程序可以很轻易地在几秒钟内完成1GB数据的简单处理,但一个MapReduce任务(不是整个作业)通常需要几分钟来处理同样的数据。另外,最近学术界也有很多研究表明,MapReduce相比并行数据库性能差距很大,并提出一些改进方案,在特定的应用场景下获得了数十倍的性能提升,比如HadoopDB(目前已经成立创业公司Hadapt),以及最近提出的Clydesdale。这些研究都证明Hadoop还有很大的优化空间。

虽然Hadoop在可扩展性和容错性上有明显的优势,并且在处理非结构化和半结构化数据上易用性更好,但这些优势和性能并不是一种折中关系。MapReduce完全可以在保持原有计算模型不变的前提下更加高效,而且目前没有任何技术方面的局限去实现更高的性能。下面,我们来分析一下MapReduce能做到多快,假设现在有一个Hadoop节点,如表1所示。

表1 节点配置

假设每个核运行一个任务,这个节点可以并行运行12个任务,平均每个任务使用4GB内存、1个SATA硬盘。一个典型的Map任务各个流程的极限性能应该能够达到如表2所示的理想速度。

表2 Map任务各环节的理想速度

从表2可以看出,轻量级压缩可以起到放大I/O吞吐率的作用,这对Hadoop非常关键。目前最快的开源压缩算法可以做到表2中的压缩和解压速度,比如Snappy和LZ4,它们已经整合进了最新的Hadoop主干版本中。假设输入输出数据都拥有4倍的压缩比,通过简单的计算可以得到,一个Map任务可以在8秒钟之内处理1GB(压缩后250MB)数据,吞吐率约为125MB/s。更进一步,对于绝大多数的分析型作业来说,其输出规模比输入规模小得多。另外大部分作业是不需要排序的,这能够避免大部分的排序和压缩开销。最后,还可以利用多线程来加速排序和压缩,因为整个节点有很多CPU资源缝隙可以利用。综合这些因素,对于简单聚合类应用来说,在3秒钟内处理1GB数据完全可能,吞吐率是333MB/s。这个节点有12个并行任务,总的峰值吞吐能够达到333MB/s*12=4GB/s。

这个数字可能不够直观,我们把数字放大到集群。假设有一个25节点的Hadoop集群,使用10GbE网络,在资源调度、数据本地化都完美的情况下,这个集群应该能够:

  • 在1分钟内完成1TB的数据排序;
  • 在10秒钟内完成对1TB数据的简单聚合类查询。

在具备如此高性能之后,Hadoop已经具备以很低的成本来搭建数据仓库应用的能力, 并且其查询速度相比商业数据库已经没有太大的劣势。上文中提到的服务器大概为1-2万美元,其容量为12核、24TB存储。假设Hadoop的3个副本,实际压缩后容量8TB左右,平均价格为每个核1-2千美元,每TB存储1-2千美元。随着硬件成本持续降低,这个数字还会持续下降,并且在软件方面没有任何成本。在拥有与商业并行数据库可比较的性能之后,基于Hadoop开源技术的大数据解决方案的优势会更加突出。

虽然前景非常美好,但还有很长的路要走。目前在同等规模的集群上,使用1GbE网络,处理TB规模的作业一般需要十分钟到一个小时。目前一个任务处理数据的吞吐率大概为10-20MB/s,与目标100-300MB/s还有很大差距。

MapReduce性能分析

虽然前面讲到的计算包含太多的假设,但其中的各个环节都没有任何技术局限,剩下的只是工程上的问题,即一个高效的设计和实现。要设计和实现一个高效的MapReduce执行引擎,首先需要了解影响Hadoop性能的因素,主要有以下几点。

  • I/O瓶颈:绝大多数的MapReduce作业都属于数据密集型,在不使用轻量压缩的情况下,磁盘和网络I/O很容易成为瓶颈。幸运的是,最近一年内有两个非常优秀的开源压缩算法出现,Snappy和LZ4,并且都与Apache许可证兼容,目前它们已经被整合进Hadoop主干版本。对于Hadoop上的常见应用来说,其数据的压缩比是很高的,一般能达到5倍以上,排序或使用列存储后可能会更高,压缩速度约为400M/s, 解压约为1.5G/s, 基本能够将I/O吞吐能力放大5倍。通过与列式存储配合,更高的压缩比也是可能的。在开启轻量压缩之后,CPU会成为Hadoop集群的瓶颈,并且在短期内不会改变,即使目前CPU处理能力的发展要比I/O带宽的发展要快一些。
  • Map阶段排序:在简单Map任务中,排序约消耗整个任务CPU资源的一半。目前Map阶段的排序实现很低效,其原因是它使用一个大缓冲区对所有记录一起排序。另外目前的排序没有针对CPU缓存特性进行优化,一个更好的排序实现大概可以将排序性能提升10倍,目前社区已经发现这个问题,也提出一些改进方案。
  • 序列化/反序列化:序列化会造成大量的对象创建和小数组拷贝,另外也会引入过于复杂的I/O流抽象和低效的对象比较实现,而这些操作几乎贯穿整个数据处理流。这个问题很早之前也在社区提出过,后来不了了之。个人认为在MapReduce框架层没有必要引入序列化机制,二进制接口是足够的,数据的描述和存储格式应该在更上层,比如在Hive上解决。
  • Shuffle:Shuffle一直以来都被认为是一个瓶颈,使用压缩和高速网络可以部分改善这个问题。百度内部和开源社区都对Shuffle进行过很多优化,社区0.23版本MRv2使用Netty代替Jetty实现shuffle的服务器端,加入了批量传输,效率提升了30%。但还是有很大的优化空间,未来单节点的计算能力和网络带宽会越来越大,10GbE甚至40GbE会成为主流。Java和Netty能否支撑如此大的吞吐率,是否需要进一步调优,也是值得调研的。
  • 数据局部性:这应该是分布式数据库性能优于MapReduce的主要原因之一,分布式数据库的数据切分、索引优化和查询优化要先进得多,这是数十年技术积累的结果。通常同样的查询,分布式数据库会尽可能减少数据读取和节点之间数据的传输,而MapReduce模型通常需要把整个数据集处理一遍或者多遍,大量的数据扫描和传输会严重影响作业的执行时间和资源消耗。这个是有办法改进的,比如引入一些更加灵活的数据处理流和计算模型、引入不排序的数据流、引入哈希Join的算法、连接多个作业的Map和Reduce任务防止中间数据落地,等等。社区的下一代资源管理框架和MapReduce框架会更加容易扩展,并欢迎尝试各种创新。
  • 调度和启动开销:对于数据规模很小或有实时需求的应用,作业的调度和启动开销可能会比较大。

设计

对于前面描述的各种问题,NativeTask仅关注其中的一方面,即在MapReduce任务级别能够影响到方面,具体包括压缩、排序、序列化以及部分shuffle。此外,包括部分Shuffle、数据局部性以及调度,是Hadoop的其余模块和构建在Hadoop之上的应用需要关注的。这两方面上都需要持久的优化与创新,才能对整体性能产生数足够大的影响。虽然NativeTask仅关注在其中一个方面,但为上层的优化提供了更多的可能。

NativeTask选择C++实现,并不是因为Java效率不高。根据我的经验,Java对一般的数据处理非常高效,非常适合作为Hadoop的主要开发语言,但对于特殊的优化和任务,使用C++更加方便或者合适,原因如下。

  • 轻量级压缩。目前最快的轻量压缩实现都是C/C++实现, 虽然可以使用JNI包装,但是JNI有额外的开销,并且只能以批量的形式使用。使用C++可以保证轻量级压缩在数据处理的各个环节以多种形式到使用。
  • SSE/AVX优化。与压缩类似,Java只能使用JNI以批量的方式使用特殊指令集,从而影响其应用范围,例如Vectorwise中使用到的向量化优化可能就不便在Java中实现。
  • LLVM。前文提到过,该项目的主要目的是在此项目之上实现高效的查询执行引擎,以此构建数据仓库工具,而这个执行引擎需要很可能会用到LLVM,所以C++是个合适的选择。

NativeTask避免序列化和数据拷贝。如前文所述,序列化会引入大量开销,为了达到最大的吞吐率,在主要数据流中不使用序列化,基本接口为传递内存数组引用的形式,尽量避免数据拷贝。相比Java,纯二进制接口并不会为C++开发造成额外使用难度。NativeTask的主要功能为纯数据处理,不会涉及到复杂的编程元素例如多线程同步和复杂设计模式,所以尽量保持简单的设计和实现以便于最大化实施优化。

NativeTask主要由Java模块和本地模块两部分构成,它们之间使用JNI进行消息控制和数据传输。这与Hadoop Pipes和Streaming不同,其区别如表3所示。

表3 Pipes/Streaming与NativeTask的区别

NativeTask通过为Hadoop添加一个任务代理执行接口来支持NativeTask任务的运行。目前支持兼容模式和本地模式两种数据流(如图1所示)。兼容模式类似Pipes和Streaming,仍旧可以使用原生的RecordReader/Writer, 键值对序列化之后在Java与本地模块之间传输。纯本地模式下RecordReader/Writer也由C++实现,几乎不需要在Java与本地模块之间传输数据,就可以达到最高性能。目前Reduce阶段的Shuffle和Merge还没有完成,依然使用Java的实现,所以Reduce阶段的性能提高还不明显,并且成为瓶颈影响到作业整体的加速比。Reduce阶段的Shuffle和Merge完成之后,应该能够得到与Map阶段类似的性能提升,NativeTask也将成为纯本地的运行环境。

除了保持原有的接口和功能外,NativeTask也增加了一些新功能,比如,支持不排序的数据流。在原有的MapReduce模型中,Map到Reduce之间键值对的排序是整个计算模型的一部分,通过排序可以把键相同的键值对分组在一起,从而便于上层应用处理。但排序会引入以下两大开销。

  • Map到Reduce阶段的全局同步,也就是说Reducer要等到所有Map任务全部完成之后才能够开始执行,造成空等待,影响作业整体完成时间。
  • Map阶段的排序,Reduce阶段Shuffle和Merge需要缓存所有的键值对,占用大量内存,在缓冲区不足时也会写磁盘,引入大量磁盘读写。

不排序的数据流能够避免这两大开销。聚合类的应用占据大部分的数据分析应用场景,一般在聚合类的数据分析应用中,是不需要排序的,应用仅需要在自己的Mapper和Reducer中加入保存聚合状态的哈希表或者数组,其中仅保存聚合后的状态,相比保存所有键值对并排序来说,其开销要小得多。

另一个新功能是支持foldl形式的接口,它也是为聚合类应用设计的,配合非排序的数据流使用。与传统接口相比,用户不需要再关心聚合操作所需的哈希表的实现和内存管理。

新增这两个功能主要目的是进一步加速聚合类的作业,因为这类作业在简单数据分析领域是最普遍的。目前这两个功能还在开发中,其他待开发的功能包括Reduce端Shuffle和Merge、并行的排序和压缩等,具体的实现细节这里不再赘述。

测试

在一个16节点(1个master节点、15个slave节点)的集群上,测试两个简单MapReduce程序的各项性能指标。这里使用Hadoop版本1.0.1。这两个程序的特点如表4所示。

表4 测试程序的特点对比

选择这两个测试,是因为这两个测试都包含在Hadoop官方例子中,简单并容易理解,包含聚合和非聚合类作业,测试结果如表5和表6所示。

表5 Terasort性能对比

表6 WordCount性能对比

需要补充的是,测试环境的编译环境较老因此并不能编译生成较好的本地代码。我在自己的电脑上单独测试单个任务,NativeTask的性能还要再快40-60%左右。不过这里还是给出原始的测试结果。

可以看到,如果考虑到编译因素,NativeTask的Map任务处理速度可以达到原Hadoop的5-12倍,但由于Shuffle和Merge还没有实现,Reduce任务速度差别不会太大。从作业整体完成时间来看,NativeTask比Hadoop快1.5-6倍。不过相信随着NativeTask的持续开发与优化,以及现有Hadoop的持续优化,这个数字也会不断变化。

小结与展望

NativeTask是Hadoop MapReduce的高效执行引擎实现,并提供了C++编程接口。通过分析Hadoop目前的性能问题根源,并优化其中的任务执行单元涉及到的部分,NativeTask获得了不错的性能提升,性能提升的主要因素包括更好的排序实现、关键路径避免序列化、避免复杂抽象,更好地利用压缩等。NativeTask目前没有完全完成,所以期待未来还可以有更大的性能提升。从前文的计算结果可以看出,在目前的主流硬件条件下,理想的任务吞吐应该能够达到100-300MB/s, 而目前仅做到了50-100MB/s。

NativeTask仅关注Hadoop MapReduce性能因素的最底层方面,更高层次的方面也是同样重要的,并且在很多应用类型中经常占据最决定的因素。这些方面最好在更高的应用层次去优化解决,比如数据仓库工具Hive。这也是NativeTask最早计划的发展方向之一。对Hive进行改写,将其查询计划直接翻译为LLVM中间码,编译并在NativeTask上运行。Google在Tenzing的论文中描述了类似的优化,其数据处理吞吐率能够提升6-12倍。

由于目前大多数脚本语言还是C实现的,凭借C良好的接口兼容性,NativeTask很容易与其他脚本语言集成或交互,从而为更多的脚本语言提供使用Hadoop MapReduce的能力,例如Python和R语言。考虑到R语言在数据分析领域的流行程度,Hadoop对R语言的支持是一个趋势,目前已经有些开源项目在进行类似的工作。

还有一个有趣的方向是Hadoop在单一胖节点,或者胖节点组成的小集群(一个机架内)下的优化。对小型企业应用来说,其数据规模一般都在TB级别。只有少数大公司才真正需要PB级别的大数据分析。在不久的未来,随着众核技术流行,利用高密度存储,单个服务器的处理能力可以与当今的小型集群相当。在胖节点内运行的Hadoop作业可以有更多的优化机会,例如没有网络瓶颈,任务之间直接的数据共享,更小的启动开销和更精细的资源调度,结合NativeTask的性能优势,绝大部分的简单数据分析任务在单节点上就可以快速完成。同时节约大量的硬件资源。目前业界和社区更关注水平可扩展性,而对此类垂直扩展优化却似乎很少关注。

可以设想,在不久的未来,数据分析人员在自己的工作站上就可以使用Hadoop甚至R来分析TB级别的数据。如果他需要更大的计算能力,只需要将同样的程序提交到云上运行即可。目前在亚马逊云平台上可以不到1300美元每小时的价格轻松获得30000核的计算能力。但在软件层面,还没有系统可以配合做到这种无缝的可扩展能力和易用性。

作者常冰琳,自由职业者,前百度分布式高级研发工程师,从事Hadoop相关开源技术的研究和开发。

作者杨栋,百度分布式高级研发工程师,从事Hypertable、Hadoop及流式计算的研究和开发。


声明:本文为作者原文,选登在《程序员》杂志2012年03期时进行了编辑和删减。

本文选自《程序员》杂志2012年03期,未经允许不得转载。如需转载请联系 market@csdn.net

《程序员》2012年杂志订阅送好礼活动火热进行中

转播到腾讯微博

----->立刻申请加入《程序员》杂志读者俱乐部,与杂志编辑直接交流,参与选题,优先投稿

9 Responses to “NativeTask:利用本地执行引擎加速Hadoop”

  1. alex gemini 说道:

    序列化/反序列化: 你从磁盘上读取一串byte, 你知道他代表string, 你不先反序列化怎么判断这个里面有一个string 是”abcd” 呢?

  2. alex gemini 说道:

    “对Hive进行改写,将其查询计划直接翻译为LLVM中间码,编译并在NativeTask上运行” hive本来只是将sql 翻译成mr , 文中所谓的翻译成LLVM 中间碼是需要你的tasktracker 能够接受指令然后本地执行, 类似于分布式的shell 一样,每个tasktracker 调用本地的shell 执行. 这跟hive 完全是不同的思路.

  3. alex gemini 说道:

    文中说的聚合类应用是不需要排序的,很好奇,聚合不排序怎么得到最终结果?

  4. decster 说道:

    “序列化/反序列化: 你从磁盘上读取一串byte, 你知道他代表string, 你不先反序列化怎么判断这个里面有一个string 是”abcd” 呢?”
    对C/C++来说,一个buff就可以当成一个string或者struct使用,不需要反序列化,这里也有社区对序列化和纯二进制接口的讨论,序列化和反序列化对java来说会有些方便,但也不是必须的
    https://issues.apache.org/jira/browse/MAPREDUCE-326

    “文中说的聚合类应用是不需要排序的,很好奇,聚合不排序怎么得到最终结果?”
    社区对不排序的讨论,里面会有一些例子,包括tenzing paper中的sort avoidence也有详细的描述。
    https://issues.apache.org/jira/browse/MAPREDUCE-4039
    https://issues.apache.org/jira/browse/MAPREDUCE-3397

    ““对Hive进行改写,将其查询计划直接翻译为LLVM中间码,编译并在NativeTask上运行” hive本来只是将sql 翻译成mr , 文中所谓的翻译成LLVM 中间碼是需要你的tasktracker 能够接受指令然后本地执行, 类似于分布式的shell 一样,每个tasktracker 调用本地的shell 执行. 这跟hive 完全是不同的思路.”
    这段话我不太理解,估计需要细节上的交流才行, 我邮件给你吧,总之我觉得Tenzing可以做到的Hive当然也可以做到:)

  5. alex gemini 说道:

    好吧,看到了原始的讨论,MAPREDUCE-2841,既然作者看了Tenzing 的论文,人家都说了是ColumnIO, 基于Block 的shuffle 和 spill, 为什么不首先从文件格式上下手呢. 而是强调编程语言呢. 也许MAPREDUCE-4039 的想法更具体点吧.

  6. 沈阳 说道:

    “社区0.23版本MRv2使用Netty代替Jetty实现shuffle的服务器端,加入了批量传输,效率提升了30%。” 请问这个有具体的参考信息吗, 如何测试的, 什么情况下有30%的性能提升.

  7. decster 说道:

    “社区0.23版本MRv2使用Netty代替Jetty实现shuffle的服务器端,加入了批量传输,效率提升了30%。” 请问这个有具体的参考信息吗, 如何测试的, 什么情况下有30%的性能提升.
    http://www.slideshare.net/cloudera/hadoop-world-2011-hadoop-and-performance-todd-lipcon-yanpei-chen-cloudera

  8. 沈阳 说道:

    ““社区0.23版本MRv2使用Netty代替Jetty实现shuffle的服务器端,加入了批量传输,效率提升了30%。” 请问这个有具体的参考信息吗, 如何测试的, 什么情况下有30%的性能提升.
    http://www.slideshare.net/cloudera/hadoop-world-2011-hadoop-and-performance-todd-lipcon-yanpei-chen-cloudera
    谢谢

  9. inkfish 说道:

    这个native task是开源软件,在哪里能下载到呢?

请评论

preload preload preload
京ICP备06065162