Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

ログ集計システムを自前で作る


Index

  1. ログ集計システムの要件
  2. DB設計
    • データ保存方針
    • table設計
    • サーバ構成
  3. Fluentd
    • fluentd,fluent-plugin-mysql-bulk install
    • td-agent.conf
    • mysqlにデータが格納される事を確認する
  4. 集計用のバッチ
  5. その他
    • Table肥大化防止
    • 可視化

ログ集計システムの要件

爆弾ログ処理班の@yutakikuchi_です。
ログ集計システムというものを作る時に皆さんはどのように対応していますか? 以下の候補から要件のレベルで使い分けをしている人が多いと予想しています。ざっくりの評価ですが、導入難易度、正確性、可視化、リアルタイム、長期集計、スケール、運用費用という点で評価を書いています。

ツール 導入難易度 正確性 可視化 リアルタイム 長期集計 スケール 運用費用 リンク
GA(スタンダード) × Google アナリティクス公式サイト - ウェブ解析とレポート機能 – Google アナリティクス はてなブックマーク - Google アナリティクス公式サイト - ウェブ解析とレポート機能 – Google アナリティクス
自前Hadoop × × × NTTデータのHadoop報告書がすごかった - 科学と非科学の迷宮 はてなブックマーク - NTTデータのHadoop報告書がすごかった - 科学と非科学の迷宮
Kibana × × Kibana入門 // Speaker Deck はてなブックマーク - Kibana入門 // Speaker Deck
TresureData × 数百億件のデータを30秒で解析――クラウド型DWH「Treasure Data」に新サービス - ITmedia エンタープライズ はてなブックマーク - 数百億件のデータを30秒で解析――クラウド型DWH「Treasure Data」に新サービス - ITmedia エンタープライズ
Redshift × Amazon Redshiftではじめるビッグデータ処理入門:連載|gihyo.jp … 技術評論社 はてなブックマーク - Amazon Redshiftではじめるビッグデータ処理入門:連載|gihyo.jp … 技術評論社
GoogleBigQuery × (2/6)システム部が担うデジタルマーケティング - 第1回 ビッグデータチームの結成を──Google BigQueryがデー...:ITpro はてなブックマーク - (2/6)システム部が担うデジタルマーケティング - 第1回 ビッグデータチームの結成を──Google BigQueryがデー...:ITpro
mysql × × × ソーシャルゲームのためのMySQL入門 - Technology of DeNA はてなブックマーク - ソーシャルゲームのためのMySQL入門 - Technology of DeNA

集計要件の切り口はリアルタイム性を求めるか、集計結果に正確性を求めるのか、大量データを後からスケールさせるか、長期保存が必要なログなのかのという点を考えればある程度は使えるツールが見えてくると思います。世間の動きとしてはTresureData、RedShift、GoogleBigQueryのようにクラウド型DWHにデータを転送して、BIツールで集計結果を見るという流れにある気がします。クラウド型DWHを使う事で開発コストを抑えれる、大量データを突っ込んでもスケール面で安心感があります。Kibanaのように検索エンジンを裏側で持つ仕組みを見た時はなるほどと思ったのですが、大量データを長期間保存するのには不向きな面があります。GAも素晴らしいツールですがスタンダード会員だと集計できる数に制限がでてきてしまいます。僕の今の仕事はクラウド型DWHは使わないという条件があり... 非効率だと叩かれそうですが安定と実績があるmysqlを利用してログを貯めるような設計を考えています。

DB設計

データ保存方針

上に貼った画像のように「食べたパンの枚数」を忘れてしまうようではディオ様にも怒られてしまいます。システム開発の現場でも過去の履歴を辿って数を抽出することはとても重要な話で、生ログに近いデータをDBに保存しておくのはシステム障害の場合等にも有効です。(当然生ログファイルも残します。mysqlで無くてMongodbでも良いです。) この生ログに近いデータをtableに保存しつつ、バッチ処理で定期的に集計値を算出し別tableに記録しBIツールはそちらを参照するような形がとれれば良いと思います。以前は生ログに近いデータを保存するのに反対で集計tableだけを用意し1行の集計カラムだけを都度updateする方針でやっていたのですが、集計値がずれた場合にはログファイルから再集計をやる必要があったので、今後は生ログtableと集計tableを分けて使いたいと思っています。その他生ログをDBに保存する用途としてはDBはAND条件指定が得意なのでURL×UAなどのクロス集計も簡単に抽出できます。

table設計

mysqlでログ集計を考える時に重要な要素は2つあってログ保存期間の仕様を決める、用途毎のtableを用意するという内容があるかと思います。大量の生ログに近いデータをmysqlに延々と貯め続けるのは非効率なのである程度のところでデータを削除しなければなりません。(削除は仕様として割り切るしかないかなと考えています。mysqlのパージコストは大きいですが)それに関連してログを保存するテーブルと集計結果を格納するtableを分けBIツールからは集計用のテーブルを参照するような仕組みが良いと思います。そこで以下のようなテーブルを設計します。パージし易いようにPARTITIONを使います。
tableを3つ用意します。

table名 用途 書き込み元
log リアルタイムでログを書き込む。1行で1アクセスを管理 fluentd
summary_realtime 短時間でlogから集計結果をsummaryする。 batch
summary_date デイリーでlogから集計結果をsummaryする。 batch
batch_history バッチが正常終了とsummary_realtimeで何分迄集計したかを記録。 batch


SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='TRADITIONAL';

CREATE SCHEMA IF NOT EXISTS `analytics` DEFAULT CHARACTER SET utf8 ;
USE `analytics` ;

-- -----------------------------------------------------
-- Table `analytics`.`log`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `analytics`.`log` (
  `log_id` INT NOT NULL AUTO_INCREMENT ,
  `path` VARCHAR(255) NOT NULL ,
  `code` INT NOT NULL ,
  `ua` VARCHAR(512) NOT NULL ,
  `referer` VARCHAR(255) NOT NULL ,
  `created_at` DATETIME NOT NULL ,
  PRIMARY KEY (`log_id`, `created_at`) ,
  INDEX `PATH` (`path` ASC, `code` ASC, `created_at` ASC) )
ENGINE = InnoDB PARTITION BY RANGE(TO_DAYS(created_at)) PARTITIONS 5( PARTITION p1 VALUES LESS THAN (TO_DAYS('2014-02-01 00:00:00')),  PARTITION p2 VALUES LESS THAN (TO_DAYS('2014-03-01 00:00:00')),  PARTITION p3 VALUES LESS THAN (TO_DAYS('2014-04-01 00:00:00')),  PARTITION p4 VALUES LESS THAN (TO_DAYS('2014-05-01 00:00:00')),  PARTITION p5 VALUES LESS THAN (TO_DAYS('2014-06-01 00:00:00')));

-- -----------------------------------------------------
-- Table `analytics`.`realcreated_at_summary`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `analytics`.`summary_realtime` (
  `summary_realtime_id` INT NOT NULL AUTO_INCREMENT ,
  `path` VARCHAR(255) NOT NULL ,
  `count` INT NOT NULL ,
  `start_at` DATETIME NOT NULL ,
  `end_at` DATETIME NOT NULL ,
  `created_at` DATETIME NOT NULL ,
  `modified_at` DATETIME NOT NULL ,
  PRIMARY KEY (`summary_realtime_id`) ,
  UNIQUE(`path`, `start_at`, `end_at`),
  INDEX `PATH1` (`path` ASC, `created_at` ASC) )
ENGINE = InnoDB;

-- -----------------------------------------------------
-- Table `analytics`.`summary_date`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `analytics`.`summary_date` (
  `summary_date_id` INT NOT NULL AUTO_INCREMENT ,
  `path` VARCHAR(255) NOT NULL ,
  `count` INT NOT NULL ,
  `date` DATE NOT NULL ,
  `created_at` DATETIME NOT NULL ,
  `modified_at` DATETIME NOT NULL ,
  PRIMARY KEY (`summary_date_id`) ,
  INDEX `PATH` (`path` ASC, `date` ASC) )
ENGINE = InnoDB;

-- -----------------------------------------------------
-- Table `analytics`.`batch_history`
-- -----------------------------------------------------

CREATE  TABLE IF NOT EXISTS `analytics`.`batch_history` (
  `batch_history_id` INT NOT NULL AUTO_INCREMENT ,
  `type` CHAR(1) NOT NULL ,
  `date` DATE NOT NULL ,
  `count` INT NOT NULL ,
  PRIMARY KEY (`batch_history_id`) ,
  UNIQUE(`type`, `date`),
  INDEX `DATE` (`type` ASC, `date` ASC))
ENGINE = InnoDB;


SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY
サーバ構成

サーバ構成は次のようなものを想定しています。一応半年前ぐらいにもmysqlとinfinidbでログ集計を扱ったエントリーを書いていますので参考になれば。
【進撃の巨大データ】Log集計用DBとシステム構成の美しい設計を考える - Yuta.Kikuchiの日記 はてなブックマーク - 【進撃の巨大データ】Log集計用DBとシステム構成の美しい設計を考える - Yuta.Kikuchiの日記

Fluentd

fluentd,fluent-plugin-mysql-bulk install

Apacheのログをリアルタイムでmysqlに格納する為の準備をします。Fluentd本体とfluent-plugin-mysql-bulkを設定します。※fluentdのmysqlpluginは他にもありますが、1レコードずつinsertするとmysqlへの接続負荷が高くなってしまうのでbulkinsertできるpluignを選びました。blukinsertとはINSERT INTO (A,B,C) VALUES(aaa,bbb,ccc), (ddd,eee,fff)のように1つのSQL文でINSERTが出来る為に高速と言われています。
Fluentd: Open Source Log Management はてなブックマーク - Fluentd: Open Source Log Management
toyama0919/fluent-plugin-mysql-bulk · GitHub はてなブックマーク - toyama0919/fluent-plugin-mysql-bulk · GitHub

$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat.sh | sh
$ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-mysql-bulk
td-agent.conf

上のサーバ構成のLogAggregator Masterの設定を想定して記述します。次の内容を/etc/td-agent/td-agent.confに設定するとmysqlに接続してbulkinsertしてくれます。

<source>
  type forward
  port 24224
</source>

<match apache.access>
  type file
  path /var/log/apache/access.log
</match>

<source>
  type tail
  path /var/log/httpd/access_log
  format apache
  tag apache.log
  pos_file /var/log/td-agent/apache.pos
</source>

<match apache.log>
  type mysql_bulk
  host localhost
  database analytics
  username root
  column_names path,code,ua,referer,created_at
  key_names path,code,agent,referer,${time}
  table log
  flush_interval 5s
</match>
$ sudo /etc/init.d/td-agent restart 
mysqlにデータが格納される事を確認する

apacheへのRequest => apache logへの書き込み => fluent-plugin-mysql-bulk => mysqlの流れを確認します。

$ for i in {1..100}
do              
  curl "http://localhost/index.html"
done

$ tail -n 3 /var/log/httpd/access_log
::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"

$ tail -n 3 /var/log/td-agent/td-agent.log
2014-02-08 23:10:42 +0900 [info]: bulk insert sql => [INSERT INTO log (path,code,ua,referer,created_at) VALUES ('/index.html','200','curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2','-','2014-02-08 23:10:37'),('/index.html','200','curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2','-','2014-02-08 23:10:37'),....
mysql> SELECT * FROM analytics.log ORDER BY log_id DESC LIMIT 3 ;
+--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+
| log_id | path        | code | ua                                                                                                     | referer | created_at          |
+--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+
|    100 | /index.html |  200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | -       | 2014-02-08 23:10:40 |
|     99 | /index.html |  200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | -       | 2014-02-08 23:10:40 |
|     98 | /index.html |  200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | -       | 2014-02-08 23:10:40 |
+--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+

集計用のバッチ

以下のpythonコードでlogtableから5分毎に集計結果をsummary_realtimeに格納、1日毎にsummary_dateに格納する事ができます。それぞれのプログラムをcronで回せばOKですね。実行した結果mysqlにデータが格納されている事も確認します。logtableの格納件数とsummarytableの集計値が一致しているので問題無さそうでした。

#!/usr/bin/env python
#coding:utf-8
import sys,mysql.connector
from datetime import *
import time
date =  datetime.now().strftime( '%Y-%m-%d' )
base_timestamp = int( time.mktime( datetime.strptime( date + ' 00:00:00', "%Y-%m-%d %H:%M:%S" ).timetuple() ) )

# mysql connector
con = mysql.connector.connect(
    host='localhost',
    db='analytics',
    user='root',
    passwd='',
    buffered=True)
cur = con.cursor()

# get batch_count
cur.execute('SELECT count FROM %s WHERE date = "%s" AND type = "%s"' % ('batch_history', date, 'R') )
res = cur.fetchone()
try:
    batch_count = res[0]
except TypeError:
    batch_count = 0
start_timestamp = base_timestamp +  300 * ( batch_count -1 );
end_timestamp = base_timestamp + 300 * batch_count; 

# get logdata
cur.execute('SELECT COUNT(path),path,ua FROM %s WHERE code = %d AND created_at BETWEEN FROM_UNIXTIME("%s") AND FROM_UNIXTIME("%s") GROUP BY path' % ('log', 200, start_timestamp, end_timestamp))
res = cur.fetchall()

# insert into summary_realtime
for row in res:
    cur.execute('INSERT INTO %s (path, count, start_at, end_at, created_at, modified_at) VALUES("%s", %d, FROM_UNIXTIME("%s"), FROM_UNIXTIME("%s"), NOW(), NOW()) ON DUPLICATE KEY UPDATE count = %d, start_at = FROM_UNIXTIME("%s"), end_at = FROM_UNIXTIME("%s"), modified_at = NOW(), summary_realtime_id = LAST_INSERT_ID(summary_realtime_id)' %('summary_realtime', row[1], row[0], start_timestamp, end_timestamp, row[0], start_timestamp, end_timestamp ))

# history
cur.execute('INSERT INTO %s (type,date,count) VALUES("%s", "%s", 1) ON DUPLICATE KEY UPDATE batch_history_id = LAST_INSERT_ID(batch_history_id), count = count + 1' % ('batch_history', 'R', date))

con.commit()
cur.close()
con.close()
#!/usr/bin/env python
#coding:utf-8
import sys,mysql.connector
from datetime import *
import time
yesterday = datetime.now() - timedelta(0,3600*24)
start_date =  yesterday.strftime( '%Y-%m-%d 00:00:00' )
end_date =  datetime.now().strftime( '%Y-%m-%d 00:00:00' ) 
date =  yesterday.strftime( '%Y-%m-%d' )

# mysql connector
con = mysql.connector.connect(
    host='localhost',
    db='analytics',
    user='root',
    passwd='',
    buffered=True)
cur = con.cursor()

# get batch_count
cur.execute('SELECT count FROM %s WHERE date = "%s" AND type = "%s"' % ('batch_history', date, 'D') )
res = cur.fetchone()
try:
    batch_count = res[0]
except TypeError:
    batch_count = 0

# get logdata
cur.execute('SELECT COUNT(path),path,ua FROM %s WHERE code = %d AND created_at BETWEEN "%s" AND "%s" GROUP BY path' % ('log', 200, start_date, end_date))
res = cur.fetchall()

# insert into summary_date
for row in res:
    cur.execute('INSERT INTO %s (path, count, date, created_at, modified_at) VALUES("%s", %d, "%s", NOW(), NOW()) ON DUPLICATE KEY UPDATE count = %d, date = "%s", modified_at = NOW(), summary_date_id = LAST_INSERT_ID(summary_date_id)' %('summary_date', row[1], row[0], end_date, row[0], end_date))

# history
cur.execute('INSERT INTO %s (type,date,count) VALUES("%s", "%s", 1) ON DUPLICATE KEY UPDATE batch_history_id = LAST_INSERT_ID(batch_history_id), count = count + 1' % ('batch_history', 'D', date))

con.commit()
cur.close()
con.close()
mysql> SELECT COUNT(*) FROM log WHERE created_at BETWEEN '2014-02-08' AND '2014-02-09';
+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+
1 row in set (0.00 sec)

mysql> SELECT COUNT(*) FROM log WHERE created_at BETWEEN '2014-02-09' AND '2014-02-10';
+----------+
| COUNT(*) |
+----------+
|     1288 |
+----------+
1 row in set (0.00 sec)

mysql> SELECT * FROM summary_date;
+-----------------+-------------+-------+------------+---------------------+---------------------+
| summary_date_id | path        | count | date       | created_at          | modified_at         |
+-----------------+-------------+-------+------------+---------------------+---------------------+
|               1 | /index.html |   100 | 2014-02-09 | 2014-02-09 15:48:37 | 2014-02-09 15:48:37 |
+-----------------+-------------+-------+------------+---------------------+---------------------+
1 row in set (0.00 sec)

mysql> SELECT SUM(count) FROM summary_date;
+------------+
| SUM(count) |
+------------+
|        100 |
+------------+
1 row in set (0.00 sec)

mysql> SELECT * FROM summary_realtime;
+---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+
| summary_realtime_id | path        | count | start_at            | end_at              | created_at          | modified_at         |
+---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+
|                   1 | /index.html |   144 | 2014-02-09 01:00:00 | 2014-02-09 01:05:00 | 2014-02-09 15:45:21 | 2014-02-09 15:45:21 |
|                   2 | /index.html |   144 | 2014-02-09 01:10:00 | 2014-02-09 01:15:00 | 2014-02-09 15:45:21 | 2014-02-09 15:45:21 |
|                   3 | /index.html |  1000 | 2014-02-09 01:50:00 | 2014-02-09 01:55:00 | 2014-02-09 15:45:23 | 2014-02-09 15:45:23 |
+---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+
3 rows in set (0.00 sec)

mysql> SELECT SUM(count) FROM summary_realtime;
+------------+
| SUM(count) |
+------------+
|       1288 |
+------------+
1 row in set (0.00 sec)

その他

Table肥大化防止

logtableが肥大化して来た時はパージを行います。予めpartitionを月毎に切っているので仕様で決めた蓄積期間を経過したらサヨナラします。summary_dateのtableは容量を食わないはずなのでそのまま残しておいても良いと思います。

mysql> ALTER TABLE log DROP PARTITION p1;
可視化

Highcharts - Interactive JavaScript charts for your webpage はてなブックマーク - Highcharts - Interactive JavaScript charts for your webpage
morris.js はてなブックマーク - morris.js
グラフ描画のJSで注目したいのはHighchartsとmorris.jsぐらいでしょうか。Highchartsは商用での利用はライセンス契約しないとだめなので、その辺を気にせず使いたい方はmorris.jsが良いかなと思います。