您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

MongoDB MapReduce 性能提升20倍的优化方法

【作者:Hadoop实战专家】【关键词:线程 使用 进行分析 MongoDB 】 【点击:85992次】【2013-09-1】
1.MongoDB提供了哪2种方式来对数据进行分析?本文就来看看,有哪些方法可以让MR的速度有所提升。 在MongoDB中,一个单一的MR任务并不能使用多线程——只有在多个任务中才能使用多线程。splitVector命令可以帮助你非常迅速地找到分割点,如果你有更简单的分割方法更好。  

相关热门搜索:

大数据标签:hadoop mapreduce bigdata

问题导读:
1.MongoDB提供了哪2种方式来对数据进行分析?
2.如何优化MongoDB MapReduce 性能?
3.优化MongoDB MapReduce 性能的方法有哪些?

自从MongoDB被越来越多的大型关键项目采用后,数据分析也成为了越来越重要的话题。人们似乎已经厌倦了使用不同的软件来进行分析(这都利用到了Hadoop),因为这些方法往往需要大规模的数据传输,而这些成本相当昂贵。

MongoDB提供了2种方式来对数据进行分析:Map Reduce(以下简称MR)和聚合框架(Aggregation Framework)。MR非常灵活且易于使用,它可以很好地与分片(sharding)结合使用,并允许大规模输出。尽管在MongoDB v2.4版本中,由于JavaScript引擎从Spider切换到了V8,使得MR的性能有了大幅改进,但是与Agg Framework(使用C++)相比,MR的速度还是显得比较慢。本文就来看看,有哪些方法可以让MR的速度有所提升。

测试

首先我们来做个测试,插入1000万文档,这些文档中包含了介于0和100万之间的单一整数值,这意味着,平均每10个文档具有相同的值。

1. > for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}

2. > db.uniques.findOne()

3. { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }

4. > db.uniques.ensureIndex({dim0: 1})

5. > db.uniques.stats()

6. {

7.         "ns" : "test.uniques",

8.         "count" : 10000000,

9.         "size" : 360000052,

10.         "avgObjSize" : 36.0000052,

11.         "storageSize" : 582864896,

12.         "numExtents" : 18,

13.         "nindexes" : 2,

14.         "lastExtentSize" : 153874432,

15.         "paddingFactor" : 1,

16.         "systemFlags" : 1,

17.         "userFlags" : 0,

18.         "totalIndexSize" : 576040080,

19.         "indexSizes" : {

20.                 "_id_" : 324456384,

21.                 "dim0_1" : 251583696

22.         },

23.         "ok" : 1

24. }

复制代码
这里我们想要得到文档中唯一值的计数,可以通过下面的MR任务来轻松完成:

1. > db.runCommand(

2. { mapreduce: "uniques",

3. map: function () { emit(this.dim0, 1); },

4. reduce: function (key, values) { return Array.sum(values); },

5. out: "mrout" })

6. {

7.         "result" : "mrout",

8.         "timeMillis" : 1161960,

9.         "counts" : {

10.                 "input" : 10000000,

11.                 "emit" : 10000000,

12.                 "reduce" : 1059138,

13.                 "output" : 999961

14.         },

15.         "ok" : 1

16. }

复制代码

正如你看到的,输出结果大约需要1200秒(在EC2 M3实例上测试),共输出了1千万maps、100万reduces、999961个文档。结果类似于:

1. > db.mrout.find()

2. { "_id" : 1, "value" : 10 }

3. { "_id" : 2, "value" : 5 }

4. { "_id" : 3, "value" : 6 }

5. { "_id" : 4, "value" : 10 }

6. { "_id" : 5, "value" : 9 }

7. { "_id" : 6, "value" : 12 }

8. { "_id" : 7, "value" : 5 }

9. { "_id" : 8, "value" : 16 }

10. { "_id" : 9, "value" : 10 }

11. { "_id" : 10, "value" : 13 }

12. ...

复制代码

下面就来看看如何进行优化。

使用排序

我在之前的这篇文章中简要说明了使用排序对于MR的好处,这是一个鲜为人知的特性。在这种情况下,如果处理未排序的输入,意味着MR引擎将得到随机排序的值,基本上没有机会在RAM中进行reduce,相反,它将不得不通过一个临时collection来将数据写回磁盘,然后按顺序读取并进行reduce。

下面来看看如果使用排序,会有什么帮助:

1. > db.runCommand(

2. { mapreduce: "uniques",

3. map: function () { emit(this.dim0, 1); },

4. reduce: function (key, values) { return Array.sum(values); },

5. out: "mrout",

6. sort: {dim0: 1} })

7. {

8.         "result" : "mrout",

9.         "timeMillis" : 192589,

10.         "counts" : {

11.                 "input" : 10000000,

12.                 "emit" : 10000000,

13.                 "reduce" : 1000372,

14.                 "output" : 999961

15.         },

16.         "ok" : 1

17. }

复制代码

现在时间降到了192秒,速度提升了6倍。其实reduces的数量是差不多的,但是它们在被写入磁盘之前已经在RAM中完成了。

使用多线程

在MongoDB中,一个单一的MR任务并不能使用多线程——只有在多个任务中才能使用多线程。但是目前的多核CPU非常有利于在单一服务器上进行并行化工作,就像Hadoop。我们需要做的是,将输入数据分割成若干块,并为每个块分配一个MR任务。splitVector命令可以帮助你非常迅速地找到分割点,如果你有更简单的分割方法更好。

1. > db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})

2. {

3.     "timeMillis" : 6006,

4.     "splitKeys" : [

5.         {

6.             "dim0" : 18171

7.         },

8.         {

9.             "dim0" : 36378

10.         },

11.         {

12.             "dim0" : 54528

13.         },

14.         {

15.             "dim0" : 72717

16.         },

17. …

18.         {

19.             "dim0" : 963598

20.         },

21.         {

22.             "dim0" : 981805

23.         }

24.     ],

25.     "ok" : 1

26. }

复制代码

从1千万文档中找出分割点,使用splitVector命令只需要大约5秒,这已经相当快了。所以,下面我们需要做的是找到一种方式来创建多个MR任务。从应用服务器方面来说,使用多线程和$gt / $lt查询命令会非常方便。从shell方面来说,可以使用ScopedThread对象,它的工作原理如下:

1. > var t = new ScopedThread(mapred, 963598, 981805)

2. > t.start()

3. > t.join()

复制代码

现在我们可以放入一些JS代码,这些代码可以产生4个线程,下面来等待结果显示:

1. > var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })

2. > var keys = res.splitKeys

3. > keys.length

4. 39

5. > var mapred = function(min, max) {

6. return db.runCommand({ mapreduce: "uniques",

7. map: function () { emit(this.dim0, 1); },

8. reduce: function (key, values) { return Array.sum(values); },

9. out: "mrout" + min,

10. sort: {dim0: 1},

11. query: { dim0: { $gte: min, $lt: max } } }) }

12. > var numThreads = 4

13. > var inc = Math.floor(keys.length / numThreads) + 1

14. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }

15. min:0 max:274736

16. min:274736 max:524997

17. min:524997 max:775025

18. min:775025 max:{ "$maxKey" : 1 }

19. connecting to: test

20. connecting to: test

21. connecting to: test

22. connecting to: test

23. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }

24. {

25.         "result" : "mrout0",

26.         "timeMillis" : 205790,

27.         "counts" : {

28.                 "input" : 2750002,

29.                 "emit" : 2750002,

30.                 "reduce" : 274828,

31.                 "output" : 274723

32.         },

33.         "ok" : 1

34. }

35. {

36.         "result" : "mrout274736",

37.         "timeMillis" : 189868,

38.         "counts" : {

39.                 "input" : 2500013,

40.                 "emit" : 2500013,

41.                 "reduce" : 250364,

42.                 "output" : 250255

43.         },

44.         "ok" : 1

45. }

46. {

47.         "result" : "mrout524997",

48.         "timeMillis" : 191449,

49.         "counts" : {

50.                 "input" : 2500014,

51.                 "emit" : 2500014,

52.                 "reduce" : 250120,

53.                 "output" : 250019

54.         },

55.         "ok" : 1

56. }

57. {

58.         "result" : "mrout775025",

59.         "timeMillis" : 184945,

60.         "counts" : {

61.                 "input" : 2249971,

62.                 "emit" : 2249971,

63.                 "reduce" : 225057,

64.                 "output" : 224964

65.         },

66.         "ok" : 1

67. }

复制代码

第1个线程所做的工作比其他的要多一点,但时间仍达到了190秒,这意味着多线程并没有比单线程快!

使用多个数据库

这里的问题是,线程之间存在太多锁争用。当锁时,MR不是非常无私(每1000次读取会进行yield)。由于MR任务做了大量写操作,线程之间结束时会等待彼此。由于MongoDB的每个数据库都有独立的锁,那么让我们来尝试为每个线程使用不同的输出数据库:

1. > var mapred = function(min, max) {

2. return db.runCommand({ mapreduce: "uniques",

3. map: function () { emit(this.dim0, 1); },

4. reduce: function (key, values) { return Array.sum(values); },

5. out: { replace: "mrout" + min, db: "mrdb" + min },

6. sort: {dim0: 1},

7. query: { dim0: { $gte: min, $lt: max } } }) }

8. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }

9. min:0 max:274736

10. min:274736 max:524997

11. min:524997 max:775025

12. min:775025 max:{ "$maxKey" : 1 }

13. connecting to: test

14. connecting to: test

15. connecting to: test

16. connecting to: test

17. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }

18. ...

19. {

20.         "result" : {

21.                 "db" : "mrdb274736",

22.                 "collection" : "mrout274736"

23.         },

24.         "timeMillis" : 105821,

25.         "counts" : {

26.                 "input" : 2500013,

27.                 "emit" : 2500013,

28.                 "reduce" : 250364,

29.                 "output" : 250255

30.         },

31.         "ok" : 1

32. }

33. ...

复制代码

所需时间减少到了100秒,这意味着与一个单独的线程相比,速度约提高2倍。尽管不如预期,但已经很不错了。在这里,我使用了4个核心,只提升了2倍,如果使用8核CPU,大约会提升4倍。

使用纯JavaScript模式

在线程之间分割输入数据时,有一些非常有趣的东西:每个线程只拥有约25万主键来输出,而不是100万。这意味着我们可以使用“纯JS模式”——通过jsMode:true来启用。开启后,MongoDB不会在JS和BSON之间反复转换,相反,它会从内部的一个50万主键的JS字典来reduces所有对象。下面来看看该操作是否对速度提升有帮助。

1. > var mapred = function(min, max) {

2. return db.runCommand({ mapreduce: "uniques",

3. map: function () { emit(this.dim0, 1); },

4. reduce: function (key, values) { return Array.sum(values); },

5. out: { replace: "mrout" + min, db: "mrdb" + min },

6. sort: {dim0: 1},

7. query: { dim0: { $gte: min, $lt: max } },

8. jsMode: true }) }

9. > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }

10. min:0 max:274736

11. min:274736 max:524997

12. min:524997 max:775025

13. min:775025 max:{ "$maxKey" : 1 }

14. connecting to: test

15. connecting to: test

16. connecting to: test

17. connecting to: test

18. > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }

19. ...

20. {

21.         "result" : {

22.                 "db" : "mrdb274736",

23.                 "collection" : "mrout274736"

24.         },

25.         "timeMillis" : 70507,

26.         "counts" : {

27.                 "input" : 2500013,

28.                 "emit" : 2500013,

29.                 "reduce" : 250156,

30.                 "output" : 250255

31.         },

32.         "ok" : 1

33. }

34. ...

35.

复制代码

现在时间降低到70秒。看来jsMode确实有帮助,尤其是当对象有很多字段时。该示例中是一个单一的数字字段,不过仍然提升了30%。

MongoDB v2.6版本中的改进

在MongoDB v2.6版本的开发中,移除了一段关于在JS函数调用时的一个可选“args”参数的代码。该参数是不标准的,也不建议使用,它由于历史原因遗留了下来(见SERVER-4654)。让我们从Git库中pull最新的MongoDB并编译,然后再次运行测试用例:

1. ...

2. {

3.         "result" : {

4.                 "db" : "mrdb274736",

5.                 "collection" : "mrout274736"

6.         },

7.         "timeMillis" : 62785,

8.         "counts" : {

9.                 "input" : 2500013,

10.                 "emit" : 2500013,

11.                 "reduce" : 250156,

12.                 "output" : 250255

13.         },

14.         "ok" : 1

15. }

16. ...

复制代码

从结果来看,时间降低到了60秒,速度大约提升了10-15%。同时,这种更改也改善了JS引擎的整体堆消耗量。

结论

回头来看,对于同样的MR任务,与最开始时的1200秒相比,速度已经提升了20倍。这种优化应该适用于大多数情况,即使一些技巧效果不那么理想(比如使用多个输出dbs /集合)。但是这些技巧可以帮助人们来提升MR任务的速度,未来这些特性也许会更加易用——比如,这个ticket 将会使splitVector命令更加可用,这个ticket将会改进同一数据库中的多个MR任务。

大数据系列相关文章:

最新评论
YU霜2014-09-10 03:36:27
启动了
王道2014-09-10 04:14:13
你要看她喜欢啥了 打了一个月的电话 知道她喜欢啥不
个人听歌2014-09-09 08:14:22
我们学校忽略
那些美到cry的图片2014-09-08 03:37:11
发表了博文 《Hadoop平台优化综述(一)》 - 1. 概述 随着企业要处理的数据量越来越大,MapReduce思想越来越受到重视。Hadoop是MapReduce的一个开源实现,由于其良好的扩展 http://t.cn/RvPqXqW
仔仔2014-09-08 12:56:25
当一个软件系统成熟之后,安全问题会成为影响它的信誉的下一个关键因素。了解Hadoop Security的最新进展 RPXi8zy 给出了slides和videos
Mr_Johnny20112014-09-07 11:42:54
10000元/月 http://t.cn/RPuJdbs Hadoop工程师 10000
2014-09-07 03:37:18
http://t.cn/RPsLX5M
平湖洋洋2014-09-07 04:53:19
第一次,吃饭带钱就中,还要干啥你
乐业文文2014-09-07 03:38:00
DTCC2014今天下午14:20-15:10在Hadoop技术实战和应用专场,连城会介绍《Spark运行时模型剖析》,对Spark感兴趣的同学不可错过。 看了连神提前给我的silde,内容绝对有料! 可惜我不能到现场啊,求直播!
幸运星至尊2014-09-05 11:06:19
#大数据#【为什么很多公司的大数据都基于Hadoop 设计】数据的来源途径非常的多,数据的格式也越来越多越来越复杂,随着时间的推移数据量也越来越大。因此在数据的存储和基于数据之上的计算上传统数据库很快趋于瓶颈。http://t.cn/8FFJa5e
 
  • Hadoop生态系统资料推荐