Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

MongoDBのAggregation Framework/MapReduceを使ってより賢く集計を行うためのまとめ


Mogodb集計

MongoDBの集計機能が便利過ぎて泣けてくるお話し - Yuta.Kikuchiの日記 はてなブックマーク - MongoDBの集計機能が便利過ぎて泣けてくるお話し - Yuta.Kikuchiの日記
1月程前にMongoDBを使った集計機能の紹介をさせていただいた@yutakikucです。内容は全く大した事無かったのですが、タイトルで誘導を引っ張って200近いbookmarkを集める事ができました笑。みなさんの参考にしていただけたこと、大変嬉しく思います。今日はMongoDBの集計をもう一歩踏み込んだ内容を紹介して行きたいと思います。題材としてはAggregation FrameworkとMapReduceについてです。因に今回試してみたMongoDB-Versionは2.2.3です。Versionによって挙動が変わると思うので注意してください。

$ mongo --version
MongoDB shell version: 2.2.3

Aggregation Framework

Aggregation ― MongoDB Manual 2.4.4 はてなブックマーク - Aggregation ― MongoDB Manual 2.4.4
SQL to Aggregation Framework Mapping Chart ― MongoDB Manual 2.4.4 はてなブックマーク - SQL to Aggregation Framework Mapping Chart ― MongoDB Manual 2.4.4
Aggregation FrameworkはRDBMSSQLに備わっている便利機能をMongoDBに付与したFrameworkという説明が分かりやすいと思います。MongodbのVerson2.2以降に備わった機能です。以下はMongodbのAggregation OperatorとRDBMSSQLの対応表になります。一つずつのOperatorはFilterに近いイメージで、Chainをしてパイプライン処理とすることができます。最終的にはChainした結果を取得します。

MongoDB Aggregation Operator SQL Term,Function,Concepts
$match WHERE
$group GROUP BY
$project SELECT
$sort ORDER BY
$limit LIMIT
$sum SUM(),COUNT()
$avg AVERAGE()

CentOSでNginxのログをFluentdを使ってMongodbにリアルタイムで格納する - Yuta.Kikuchiの日記 はてなブックマーク - CentOSでNginxのログをFluentdを使ってMongodbにリアルタイムで格納する - Yuta.Kikuchiの日記
では少し例を出してみます。MongoDBへのDataInsertはNginxAccessLogを使います。NginxからMongoDBへの格納にはFluentdを使っています。設定の詳細は上のエントリーを見てください。以下はMongoDBのサンプル出力です。

> db.nginx_access.find()
{ "_id" : ObjectId("5123476fe138231086000001"), "host" : "127.0.0.1", "user" : "-", "method" : "GET", "path" : "/", "code" : "200", "size" : "612", "referer" : "-", "agent" : "w3m/0.5.2", "time" : ISODate("2013-02-19T09:35:35Z") }
{ "_id" : ObjectId("51234790e138231086000002"), "host" : "127.0.0.1", "user" : "-", "method" : "GET", "path" : "/", "code" : "200", "size" : "612", "referer" : "-", "agent" : "w3m/0.5.2", "time" : ISODate("2013-02-19T09:36:14Z") }
{ "_id" : ObjectId("5177c10ee138231b1b000001"), "host" : "127.0.0.1", "user" : "-", "method" : "GET", "path" : "/api/user/test", "code" : "404", "size" : "570", "referer" : "-", "agent" : "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.94 Safari/537.4", "time" : ISODate("2013-04-24T11:24:53Z") }
{ "_id" : ObjectId("5177c10ee138231b1b000002"), "host" : "127.0.0.1", "user" : "-", "method" : "GET", "path" : "/favicon.ico", "code" : "404", "size" : "570", "referer" : "-", "agent" : "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.94 Safari/537.4", "time" : ISODate("2013-04-24T11:24:53Z") }
{ "_id" : ObjectId("5177c119e138231b1b000003"), "host" : "127.0.0.1", "user" : "-", "method" : "-", "code" : "400", "size" : "0", "referer" : "-", "agent" : "-", "time" : ISODate("2013-04-24T11:25:06Z") }

ここから条件指定をしてみます。通常のMongoDBではfind()に設定します。StatusCodeを400だけを抽出するとします。Aggregation Frameworkとの両方の例を見てみましょう。find()の場合はdb.nginx_access.find({code:"400"})としていするとJSON形式がそのまま結果として得る事ができます。それに対してAggregation Framewrokではresultというkeyに対して結果を格納、結果が正常に取得できたのかどうかをokというkeyのJSON形式で結果を返します。

> db.nginx_access.find({code:"400"})
{ "_id" : ObjectId("5177c119e138231b1b000003"), "host" : "127.0.0.1", "user" : "-", "method" : "-", "code" : "400", "size" : "0", "referer" : "-", "agent" : "-", "time" : ISODate("2013-04-24T11:25:06Z") }
{ "_id" : ObjectId("5177c150e138231b1b000007"), "host" : "127.0.0.1", "user" : "-", "method" : "-", "code" : "400", "size" : "0", "referer" : "-", "agent" : "-", "time" : ISODate("2013-04-24T11:26:06Z") }
{ "_id" : ObjectId("5177c187e138231b1b00000f"), "host" : "127.0.0.1", "user" : "-", "method" : "-", "code" : "400", "size" : "0", "referer" : "-", "agent" : "-", "time" : ISODate("2013-04-24T11:26:56Z") }
> db.nginx_access.aggregate([{$match : {code:"400"}}]);
{
	"result" : [
		{
			"_id" : ObjectId("5177c119e138231b1b000003"),
			"host" : "127.0.0.1",
			"user" : "-",
			"method" : "-",
			"code" : "400",
			"size" : "0",
			"referer" : "-",
			"agent" : "-",
			"time" : ISODate("2013-04-24T11:25:06Z")
		},
		{
			"_id" : ObjectId("5177c150e138231b1b000007"),
			"host" : "127.0.0.1",
			"user" : "-",
			"method" : "-",
			"code" : "400",
			"size" : "0",
			"referer" : "-",
			"agent" : "-",
			"time" : ISODate("2013-04-24T11:26:06Z")
		},
		{
			"_id" : ObjectId("5177c187e138231b1b00000f"),
			"host" : "127.0.0.1",
			"user" : "-",
			"method" : "-",
			"code" : "400",
			"size" : "0",
			"referer" : "-",
			"agent" : "-",
			"time" : ISODate("2013-04-24T11:26:56Z")
		}
	],
	"ok" : 1
}

これだけだとAggregation Frameworkを使う意味は薄れてしまうので、groupを使ってそれぞれのStatusCodeのAccessを集計します。通常のMongoDBではfindとcountを使ってそれぞれのStatusCode毎に集計をすると思いますが、Aggregation Frameworkでは1回で可能になります。Aggregation Frameworkを使う上で気をつける必要があるのが、JSONのValueに指定する"$code"のような記述です。"$code"とすると元々データが格納されているMongoDBのKeyカラム名になります。ダブルクォートが付かないとOperator、付くとKeyカラム名になると覚えます。

> db.nginx_access.find( {code : "200"} ).count()
2
> db.nginx_access.find( {code : "304"} ).count()
3
> db.nginx_access.find( {code : "400"} ).count()
3
> db.nginx_access.find( {code : "404"} ).count()
9
> db.nginx_access.aggregate([{ $group : { "_id" : "$code" , "count" : { $sum : 1} }}]);
{
	"result" : [
		{
			"_id" : "304",
			"count" : 3
		},
		{
			"_id" : "400",
			"count" : 3
		},
		{
			"_id" : "404",
			"count" : 9
		},
		{
			"_id" : "200",
			"count" : 2
		}
	],
	"ok" : 1
}

AccessLogでは良くdailyでUniqueUserの集計を行うと思います。そこでMongDBのdistinctを使った例とAggregation Frameworkでのcount方法について記述します。distinctはデータサイズが肥大すると使えなくなるのでそこは注意してください。distinctではcount()の代わりにlengthプロパティを指定します。Aggregation Frameworkでは2回の$groupをChainさせて最終的には$sumでカウント数を出力します。

> db.nginx_access.distinct( "user", { time : { $gte :ISODate("2013-02-19T00:00:00Z"), $lte : ISODate("2013-02-19T23:59:59Z") } } ).length;
1
> db.nginx_access.aggregate([ { $match : { time : { $gte :ISODate("2013-02-19T00:00:00Z"), $lte : ISODate("2013-02-19T23:59:59Z") } } }, 
                                                { $group : { _id : "$user" } }, 
                                                { $group : { _id : "uu", count : { $sum :1 } } } ]);
{ "result" : [ { "_id" : "uu", "count" : 1 } ], "ok" : 1 }

上のAggregation Frameworkの集計を365日間繰り返しで回す場合は以下のようなJavasciptでQueryを書くとやり易いと思います。外部ファイルとして実行する場合はコマンドラインから$mongo db名 < jsファイル のように実行してください。

for( var month=1; month<=12; month++ ) { 
   for( var day=1; day<=31; day++ ) { 
      var iso_month = month;
      var iso_day = day;
      if( month < 10 ) { iso_month = "0" + month }
      if( day < 10 ) { iso_day = "0" + day }
      var iso_date = "2013-" + iso_month + "-" + iso_day;
      var start_time = iso_date + "T00:00:00Z";
      var end_time   = iso_date + "T23:59:59Z";
      var res = db.nginx_access.aggregate([ { $match : { time : { $gte :ISODate( start_time ), $lte : ISODate( end_time) } } }, { $group : { _id : "$user" } }, { $group : { _id : "uu", count : { $sum :1 } } } ]); 
      if( typeof res.result[0] != 'undefined' ) { 
         print( "Date = " + iso_date + ", UU = " + res.result[0].count );
      }   
   }   
}

/*出力結果
Date = 2013-02-19, UU = 1
Date = 2013-04-24, UU = 1
*/

MapReduce

Map-Reduce ― MongoDB Manual 2.4.4 はてなブックマーク - Map-Reduce ― MongoDB Manual 2.4.4
Map-Reduce Examples ― MongoDB Manual 2.4.4 はてなブックマーク - Map-Reduce Examples ― MongoDB Manual 2.4.4
MapRudeceはHadoopでも有名なようにKey/Valueの値をMapperが作成し、ReducerでKey毎の集計を行う仕組みです。Keyの値によって処理を複数台のサーバに分散させる事ができる優れものでMongoDBにも備わっています。MongoDBでのMapReduceはMapper関数の中でKey/Value形式のデータをemitにて作成します。それをReducer側で呼び出す事ができるので、ReducerにてKey毎の集計処理を作れば処理ができます。特徴的な処理としてはMongoDBのMapReduce処理中にはKey/Valueデータの保持に一次的なCollectionが生成されます。
私は1台のMongoDBで利用していますが、ShardingModeの場合はMapReduce処理を複数台で並列処理をします。Keyの値によって処理されるMongoDBサーバは自動的に割り当てられるようです。そもそもShardingって何だよって人はググってください笑。簡単に説明しておくと大きなデータを複数サーバに分散して保存する仕組みです。メリットとしては負荷の軽減とストレージ使用領域が分散できます。ShardingModeでのInputは分散され、Outputの取得はDefaultでは1台となるようです。Outputの出力もShardingさせることはできます。この内容についてはMongoDB本家のDocumentに書いてありましたので以下に転載します。
Sharded Input When using sharded collection as the input for a map-reduce operation, mongos will automatically dispatch the map-reduce job to each shard in parallel. There is no special option required. mongos will wait for jobs on all shards to finish.Sharded OutputBy default the output collection is not sharded.


MongoDBのMapReduceはかなり癖が強いです。僕が実験的に試して見た結果、以下の条件を守ることをお勧めします。

  • MapperでemitするKeyに紐づくValueの数はできるだけ少なくなるような集計をする。
  • MapperでemitしたKey/Valueのデータ構造をReducerで変更してはいけない。
  • ReducerでValueに配列形式のデータを繰り返しでpushしようとすると思うように動作しない時がある。
  • Reducerではインクリメントの集計処理だけを記載する方が良い。
  • Mapper/Reducerを多段で実行する際は一次的にCollectionに書き込むよう指定し、2度目のMapReduceで一次Collectionからデータを読み込むようにする。

それでは少しMapReduceのコードを書いてみます。Javascriptのコードが書き易いように外部ファイルに保存して実行してください。例題は上のAggregation Frameworkで書いたUniqueUserを求めるためのScriptです。処理としては1段階目のMapReduceで日付とUserをKey、ValueをCountとしてemitし、2段階目のMapReduceで日付のKeyをCountします。MongoDBが自動で生成する一次Collectionとは別に、2段階のMapReduceをそれぞれの処理で1次Collectionに格納しています。分かり易いように2段階処理の一次Collectionの中身も出力してみます。データの受け渡し方法が少しややこしいですがreducerの引数に指定するkey,valueにはmapperでemitしたkeyの重複するvalueがあればそれらの全てが格納されます。

var date_user_mapper = function(){
  var date_format = function(x) { return (x < 10) ? '0' + x : '' + x }; 
  var getDay = function (date) {
    var year = date.getFullYear();  
    var month = date_format(date.getMonth() + 1);
    var day = date_format(date.getDate());  
    return year + month + day;
  };
  emit({timestamp: getDay(this.time), user:this.user}, 1);
};

var date_mapper = function(){
  emit(this._id.timestamp, 1);
};

var reducer = function(key, value){
  var sum = 0;
  value.forEach(function(v){  sum += v });
  return sum;
};

db.nginx_access.mapReduce(date_user_mapper, reducer, {out:{replace : "tmp_daily"}});
db.tmp_daily.mapReduce(date_mapper, reducer, {out:{replace : "daily_uu"}});
var uu = db.daily_uu.find();
uu.forEach( function( uu ) { print( "Date = " + uu._id + ", UU = " + uu.value );});

/*出力結果
Date = 20130219, UU = 1
Date = 20130424, UU = 1
*/
> db.tmp_daily.find()
{ "_id" : { "timestamp" : "20130219", "user" : "-" }, "value" : 2 }
{ "_id" : { "timestamp" : "20130424", "user" : "-" }, "value" : 15 }

> db.daily_uu.find()
{ "_id" : "20130219", "value" : 1 }
{ "_id" : "20130424", "value" : 1 }

上の2段階MapReduceが面倒だと思った方もいると思うので、1段階のMapReduce版もソースを上げておきます。mapper/reducer/finalizerという流れで1段階のMapReduceにしています。mapperではkeyに日付、valueでuser名を出力し、reducerでvalueを人数分のuserの連想配列を持ちます。上の例と異なりfinalizerを定義しています。finalizerはReducerの後処理ですね。finalizerではkeyに指定された日付と、valueの連想配列の大きさをカウントしてUniqueUserの計算をしています。但しこの方法は先に書いたお勧め条件に反してValueに配列データを入れてしまっているので、もしかしたらうまくいかないケースがあるかもしれないです。

var mapper = function() {
 var date_format = function(x) {return (x < 10) ? '0' + x : '' + x ;};
 var getDay = function (date) {
   var year = date.getFullYear();  
   var month = date_format(date.getMonth() + 1);
   var day = date_format(date.getDate());  
   return year + month + day;
 };
 var value = {users:[]};
 value.users.push(this.user); 
 emit(getDay(this.time), value );
};

var reducer = function(key,value) {
  var res = {users:[]};
  value.forEach(function(v){
    res.users.push(v.users[0]);
  });
  return res;
};

var finalizer = function(key,value) {
 var sum = 0;
 var hash = {};
 for( var i=0; i<value.users.length; i++ ) {
   if( typeof hash[value.users[i]] == "undefined" ){
     hash[value.users[i]] = 1;
   }
 }
 for( var j in hash) {
  sum++;
 }
 return sum;
}

db.nginx_access.mapReduce(mapper, reducer,{finalize:finalizer,out:{replace:"daily_uu"}});
var uu = db.daily_uu.find();
uu.forEach( function( uu ) { print( "Date = " + uu._id + ", UU = " + uu.value ); } );

/*出力結果
Date = 20130219, UU = 1
Date = 20130424, UU = 1
*/

MongoDB: The Definitive Guide

MongoDB: The Definitive Guide