読者です 読者をやめる 読者になる 読者になる

Web就活日記

愛と夢と人生について書きます

3ヶ月間Hadoopを使ってみて学んだ事

Hadoop

Hadoop 第2版

Hadoop 第2版

Overture

BigData解析という仕事をやり始めて半年、Hadoopを業務で使い始めて3ヶ月以上が経過したのでここで今までの業務での知識をまとめてみたいと思います。先日参加したWebDBForum2012でも各種企業がBigData(主にログ)からユーザの趣味思考や特徴などを解析して表示システムへのFeedBackや企業戦略などに活かしている報告があり、Hadoopなどの分散処理技術や今後は更にリアルタイムでBigDataを使うためのミドルウェアが出てくることが予想され、そこに精通した人間が求められるようになってくると思います。
第5回 Webとデータベースに関するフォーラム (WebDB Forum 2012) はてなブックマーク - 第5回 Webとデータベースに関するフォーラム (WebDB Forum 2012)

Hadoop

Hadoop - Wikipedia はてなブックマーク - Hadoop - Wikipedia
Hadoopの説明を簡単に。大量のデータを扱うために従来のスタンドアロンで処理を行うのではなく、大量のマシンに処理を分散させて解析スピードを劇的に改善することを目的としたフレームワークです。例えば200GほどのApacheのAccessLogからUserのCookieRefererや利用しているUserAgent/Deviceなどをデータを1日毎に集計しようとした場合、1台スクリプトで計算していたら翌日の集計に間に合いません。しかしこれをHadoopの分散処理を使えば1-2時間で処理を完了させることが出来ると思います。

merit / demerit

Hadoopを利用するmerit/demeritを箇条書きで書きます。

  • merit
    • 大量のデータを複数台で分散し、解析処理を高速化。
    • スケールアウトが比較的楽にできる。
    • 面倒な分散処理は全てHadoopが行ってくれる。Hadoopユーザは解析処理に専念できる。
    • 導入実績が豊富で信頼ができる。
    • JAVAが苦手なユーザでもStreamingを使う事でPerl/Python/Rubyなどの言語でMapReduceを記述する事が可能。
  • demerit
    • 逐次処理を行う場合には不向き。
    • リアルタイム性を求める処理には不向き。
    • 多段のMapReduce処理を重ねないと集計が出来ないケースがある。
    • MapReduceのテクニックを学ばなければならない。
ServerStructure

Hadoopクラスタはmaster/slaveの二つに分けられます。masterとなるのは1台のサーバでそこを起点にslaveノードに対してTask,Dataが割り当てられます。master/slaveのサーバの内部でもMapReduce処理を行うサーバ、HDFSのファイルを管理するサーバに分けられ、それぞれに対してJobTracker/TaskTracker/NameNode/DataNodeと名前が定義されています。

  • JobTracker
    • HadoopClientからjobを受け取る。Slaveサーバに対してTaskを分割する。
  • TaskTracker
    • Masterサーバから受け取ったTaskを実行する。
  • NameNode
    • 実際のデータがDataNodeのどこに格納されているのかを管理するメタデータサーバ。
  • DataNode
    • NameNodeによって管理されている実際のデータ。ブロックと呼ばれる単位で格納されている。ブロックはデフォルトで64MByte。ファイルは設定に応じてreplication管理されている。

以下は概要図となりますが、Clientから投げられたHadoopのJobをMasterのJobTrackerが受け付けてSlaveのTaskTrackerに流します。Clientから投げられたHadoopのJobのINPUT_PATHはNameNodeに問い合わせを行い、どのDataNodeにデータが格納されているのかを取得します。JobTrackerはTaskTrackerに対してDataNodeのファイルを指定し、TaskTrackerが実際のファイルを取りに行くような仕組みです。

Map / Sort & Shuffle/ Reduce

MapReduce流れについて説明します。MapReduceを一言で書くとKey/Value形式のテキストデータを標準入出力を用いた分散処理でまとめる事です。処理の流れを以下で書きます。HadoopClientからJobを流し込む時にInputPath,OutputPath,mapper,reducerを指定します。InputPath/OutputPathはHDFS上のディレクトリを設定し、Mapper/ReducerはHadoopのユーザが定義したプログラムを設定します。JobTrackerはInputPathのデータをInputSplitと呼ばれる単位のファイルに分割し、それぞれのTaskTrackerに分散されます。TaskTrackerではユーザが定義したMapperを実行するためにInputSplitからデータを抽出します。map処理では通常、1行の分解と目的データの抽出を行います。map処理で出力するデータもKeyとValue形式になっている必要があり、通常はそれらをTabで区切りで出力します。出力されたデータはTaskTrackerのローカルに一時的に書き出しされます。出力したmapの結果からKeyを基にSortを行い、Shuffle処理で同じKeyのデータを束ねます。Reducerが受け取るデータの規則はKeyによって決まります。よってShuffleで束ねられたデータはKeyを基に必ず特定のReducerに渡るようになっています。Reducerにデータを渡す方法はmap処理を行ったTaskTrackerからファイルのコピーにて行われます。Reducerの目的はMapperが出力したデータの集計する処理になります。コピーが完了し、Reducerの初期化が完了したTaskから処理に移りユーザが定義したReducerが呼び出され、最終的な結果を最初に指定したOutputPathのディレクトリにファイルとして保存します。


DistributedCache

DistributedCacheはHDFS上に設置した比較的大きなファイルをMapperやReducerで読み込む仕組みです。例えば大量のテキストファイルをMapperで参照したい場合であったり、Jar等のライブラリファイルやsoファイル等の実行ファイルでも可能です。DistributedCacheがJobTrackerから渡されるfileオプションのプログラムと異なるのはコピー回数を減らし、TaskTracker側でデータをキャッシュできるところです。

MapReduce Technique

MapReduceを使っていて気づいたTechniqueをまとめます。

  • 実装したMapper/ReducerをHadoopClientだけで動かして正常に動作する事を確認する。Mapper/Reducerは標準入出力プログラミングなので一度HDFS上のファイルの一部をClientサーバにコピーし、コマンドラインで結果をパイプで連結して目的とする結果が出力されることを確認する。
  • 大量のTaskTrackerサーバのプログラム更新を抑える。StreamingのfileオプションやDistributedCacheを利用してTaskTrackerに対して共通のパッケージをinstallするなどの手間を省く。
  • Mapperで出力するKeyがReducer側で分散し偏りが無いように定義する。例えば指定するReducerの数を大きくしてもMapperで吐き出すKeyのバリエーションが少なければ特定のReducerにデータが転送されてしまい、処理が軽いReducerと重いReducerが存在し十分に並列処理を活かす事ができない。Keyの指定にはバリエーション豊富なデータ項目として沢山のReducerに散らせるようにする。
  • Reducerへのデータ転送容量を減らす。MapperのTaskTrackerからReducerへはデータが転送コピーされる。このデータ容量を出来る限りコンパクトにするため、Mapperで無駄なデータ項目は出力しない。
  • Combinerを使ってReducerへのデータ転送を減らす。MapperとReducerの中間のCombinearを定義して一度簡易集計を行う。簡易集計した結果をReducer側に渡す事によってデータ転送量を減らす事ができる。
  • 1つのMapperに与えるInputSplitのデータサイズが大きい場合はMapper処理が途中で失敗することがある。Hadoop側の設定ファイルを修正する方法もありそうだが、InputFileのデータを細かく分割してHDFSに上げ直すとうまくいくことがある。
  • 1ヶ月間等の大量のログデータを一度のMapperにInputするとHeapSizeを超えて処理できない可能性がある。その場合は一度1日や1週間単位でコンパクトなデータを作り、コンパクトしたデータを1ヶ月分のデータとして再集計を行う。
  • Reducerでの出力を数値の集計としてまとめたい場合、1度のMapReducerでは出来ない可能性がある。その場合は2回に分けてMapReduceを行う必要がある。1回目は複数のReducerで大量のデータから部分集計を行う。2回目のReducerでは1個起動するように定義し、1回目の出力を全てまとめて出力する。

Practice

CentOS release 6.3 (Final)でHadoop Streamingを動かしてみます。

WordCount

http://flash.sonypictures.com/video/movies/thesocialnetwork/awards/thesocialnetwork_screenplay.pdf
映画socialnetworkの台詞を元にWordCountをHadoopStreamingで行います。まずはテキストをword.txtというファイルにコピーしてHDFSにアップします。

$ alias hdfs="/usr/lib/hadoop-0.20/bin/hadoop fs"
$ hdfs -mkdir socialnetwork/input
$ hdfs -put word.txt socialnetwork/input
$ hdfs -ls socialnetwork/input
-rw-r--r--   1 yuta supergroup     167091 2012-11-23 21:42 /user/yuta/socialnetwork/input/word.txt

英単語はピリオド、スペース、カンマ、クエッションで区切られているのでそれを元に単語に分割します。Mapperで単語 1というデータ形式で出力し、Reducer側で単語のHashを作成し、カウントの合計値を計算します。ここでは簡単な処理のためにMapper/Reducerともに1で実行します。またMapper/ReducerともにPerlで記述しています。

#!/usr/bin/perl
#WordCouのMapperです

use strict;
use warnings;

while( <> ) {
   chomp $_;
   my @tokens = split( /\s|\.|,|\?/, $_ );
   foreach( @tokens ) {
      $_ =~ s/(“|”|-)//g; 
      if( $_ ne "" && $_ ne " " ) {
         print $_ . "\t" . 1 . "\n";
      }
   }
}
#!/usr/bin/perl
#WordCountのReducerです

use strict;
use warnings;
my %word_hash = ();
while( <> ) {
   chomp $_;
   my @tokens = split( /\t/, $_ );
   my $word = $tokens[0];
   if( !defined( $word_hash{$word} ) ) {
      $word_hash{$word} = 0;
   }
   $word_hash{$word}++;
}

foreach my $word ( sort keys %word_hash ) {
   print $word . "\t" . $word_hash{$word} . "\n";
}

自作のMapper/Reducerが完成したので一度ローカルで実行してみます。Mapperのデータ形式とReducerの最終出力結果に間違いが無いかを確認します。

$ cat word.txt | perl mapper.pl
the	1
world	1
MARK	1
waits	1
And	1
waits	1
And	1
we	1
SNAP	1
TO	1
BLACK	1
ROLL	1
MAIN	1
TITLE	1

$ cat word.txt | perl mapper.pl | perl reducer.pl
FACEBOOK	6
Facebook	40
Facebookwhich	1
Google	2
MARK	583
MARK)	15
MARKDUSTIN	1
MARK’S	26
MARK’s	24
Mark	77
Mark!	1
Mark’I	1
Mark’s	9
TheFacebook	8
facebook	6
facebooks	1
mark	1
marker	1
resentedMark	1
theFacebook	17

Hadoop上でMapReduceを実行してみます。以下のようなスクリプトを作成し、コマンドの確認と実行を行います。

#!/bin/sh

job_name='"SocialNetwrok WordCount"'
hadoop='/usr/lib/hadoop-0.20/bin/hadoop'
streaming_jar='/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar'
input_path='socialnetwork/input'
output_path='socialnetwork/ouput'
mapper='wordcount_mapper.pl'
reducer='wordcount_reducer.pl'
base_path=`pwd`
reducer_num=1

echo $hadoop fs -rmr $output_path
echo $hadoop jar $streaming_jar -D mapred.job.name=$job_name -D mapred.reduce.task=$reducer_num -input $input_path -output $output_path -mapper $mapper -reducer $reducer -file $base_path/$mapper -file $base_path/$reducer
$ ./exec.sh 
/usr/lib/hadoop-0.20/bin/hadoop fs -rmr socialnetwork/ouput
/usr/lib/hadoop-0.20/bin/hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar -D mapred.job.name="SocialNetwrok WordCount" -D mapred.reduce.task=1 -input socialnetwork/input -output socialnetwork/ouput -mapper wordcount_mapper.pl -reducer wordcount_reducer.pl -file /home/yuta/work/hadoop/socialnetwork/wordcount_mapper.pl -file /home/yuta/work/hadoop/socialnetwork/wordcount_reducer.pl
$ ./exec.sh | sh
2/11/23 23:23:09 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201211231323_0005
12/11/23 23:23:09 ERROR streaming.StreamJob: Job not successful. Error: NA
12/11/23 23:23:09 INFO streaming.StreamJob: killJob...

Hadoopを実行してみましたがINFO streaming.StreamJob: killJob...というエラーがコンソールに表示されJobが動きませんでした。http://localhost:50030/jobdetails.jspツールでエラーを確認すると以下が吐き出されていました。原因を調査してみると、Scriptの先頭に記述するShebangが正しいパスになっていないと動かないようです。shell script not found in hadoop - Stack Overflow はてなブックマーク - shell script not found in hadoop - Stack Overflow本来#!/usr/bin/perlと書くべきところを#!/usr/local/bin/perlとしていた事が原因でした。

Cannot run program "/var/lib/hadoop-0.20/cache/mapred/mapred/local/taskTracker/yuta/jobcache/job_201211231323_0006/attempt_201211231323_0006_m_000001_3/work/./wordcount_mapper.pl": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)

shebangを書き直して再度同じshellを実行します。今度はちゃんとJobが動き出して指定したoutputのディレクトリに結果が格納されます。outputのディレクトリには成功したかどうかの検知用の_SUCCESSと実行ログの_logsと結果出力用のpart-0000が作成されます。作成されたpart-0000の中身を見てみると先ほどlocalで実行した結果と同じものが出力されていて、問題なくHadoopが実行できた事が確認できます。

$ ./exec.sh | sh
12/11/24 00:54:59 INFO mapred.FileInputFormat: Total input paths to process : 1
12/11/24 00:55:00 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local]
12/11/24 00:55:00 INFO streaming.StreamJob: Running job: job_201211231323_0007
12/11/24 00:55:00 INFO streaming.StreamJob: To kill this job, run:
12/11/24 00:55:00 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201211231323_0007
12/11/24 00:55:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201211231323_0007
12/11/24 00:55:01 INFO streaming.StreamJob:  map 0%  reduce 0%
12/11/24 00:55:26 INFO streaming.StreamJob:  map 50%  reduce 0%
12/11/24 00:55:27 INFO streaming.StreamJob:  map 100%  reduce 0%
12/11/24 00:55:39 INFO streaming.StreamJob:  map 100%  reduce 17%
12/11/24 00:55:41 INFO streaming.StreamJob:  map 100%  reduce 100%
12/11/24 00:55:44 INFO streaming.StreamJob: Job complete: job_201211231323_0007
12/11/24 00:55:44 INFO streaming.StreamJob: Output: socialnetwork/output

$ hdfs -ls socialnetwork/output
-rw-r--r--   1 yuta supergroup          0 2012-11-24 00:55 /user/yuta/socialnetwork/output/_SUCCESS
drwxr-xr-x   - yuta supergroup          0 2012-11-24 00:55 /user/yuta/socialnetwork/output/_logs
-rw-r--r--   1 yuta supergroup      44607 2012-11-24 00:55 /user/yuta/socialnetwork/output/part-0000

$ hdfs -text socialnetwork/output/part-00000
FACEBOOK	6
Facebook	40
Facebookwhich	1
Google	2
MARK	583
MARK)	15
MARKDUSTIN	1
MARK’S	26
MARK’s	24
Mark	77
Mark!	1
Mark’I	1
Mark’s	9
TheFacebook	8
facebook	6
facebooks	1
mark	1
marker	1
resentedMark	1
theFacebook	17
DistributedCache

HDFS上に設置したテキストファイルやプログラムをDistributedCacheの形式で読み込めるようにします。ここでは簡単なサンプルとしてMapperへのinputを通常のinputpath以外にもDistributedCacheさせたファイルから読み込めるような設定を行います。例としてWordCountさせたくないNGWordを定義してinputの内容と照らし合わせてMapperで弾くといった処理をやってみます。まずは集計の対象外としたいng_word.txtをHDFSに上げます。またHDFSに上げたng_word.txtをMapperで読み込めるように修正します。実行するshellスクリプトも-cacheFileというオプションでHDFS上のng_word.txtを参照するようにします。HDFSのパスはFullPathで記載し、最後に読み込みたいフィアルのsymlinkを付けます。 hdfs://host:port/absolute-path#link-name

$ hdfs -put ng_word.txt socialnetwork/input/ng_word.txt
$ hdfs -text hdfs://localhost/user/yuta/socialnetwork/input/ng_word.txt
Facebook 
Mark
Zuckerberg
#!/usr/bin/perl
#WordCout Mapperです

use strict;
use warnings;
use Data::Dumper;

open( FH, "./ng_word.txt" );
my @nglist = <FH>;

while( my $data = <> ) {
   chomp $data;
   my @tokens = split( /\s|\.|,|\?/, $data );
   foreach my $node ( @tokens ) {
      $node =~ s/(“|”|-)//g;
      if( $node ne "" && $node ne " "  ) {
         my $flag = 1;
         foreach my $list( @nglist ) {
            $list =~ s/\n//g;
            if( $node =~ /$list/i ) {
               $flag = 0;
               last;
            } 
         }
         if( $flag == 1 ) { 
            print $node . "\t" . 1 . "\n";
         }
      }
   }
}

MapReduceを実行してみるとわかりますが、ng_word.txtを読み込んでFacebook,Mark,Zuckerbergの文字列を含む単語を弾いている事が分かります。

$ cat exec.sh
#!/bin/sh

job_name='"SocialNetwrok WordCount"'
hadoop='/usr/lib/hadoop-0.20/bin/hadoop'
streaming_jar='/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar'
input_path='socialnetwork/input'
output_path='socialnetwork/output'
mapper='wordcount_mapper.pl'
reducer='wordcount_reducer.pl'
base_path=`pwd`
reducer_num=1
cachefile='hdfs://localhost/user/yuta/socialnetwork/input/ng_word.txt#ng_word.txt'

echo $hadoop fs -rmr $output_path
echo $hadoop jar $streaming_jar -D mapred.job.name=$job_name -D mapred.reduce.task=$reducer_num -input $input_path -output $output_path -mapper $mapper -reducer $reducer -file $base_path/$mapper -file $base_path/$reducer -cacheFile $cachefile 

$ ./exec.sh | sh
$ hdfs -text socialnetwork/output/part-00000 | egrep -i "(FaceBook|Mark|Zuckerberg)"
(結果なし)

DistributedCacheの実行で-cacheFileのオプションについてdeprecatedのエラーが出ていました。今後は-filesオプションを使って欲しいとの事ですが -filesは現在hadoop jobコマンドのみしか対応していないようでhadoop jarには適用できません。Commands Guide はてなブックマーク - Commands Guideよってこのエラーは無視してcacheFIleオプションを使うと良いと思います。

WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead

上はテキストファイルのDistributedCacheを説明しましたがJARファイルなども読み込む事ができます。JARファイルをHDFSから読み込む時のオプションは-cacheArchiveで読み込む事ができます。

FutureWorks

冒頭でも述べましたが今後はRealtimeでBigDataを扱うケースが増えてくると思います。(まぁRealtimeで扱う時点でBigDataなのか?という議論はあると思いますが。)そんな中で活躍しそうなMiddleWareについてリンクをいくつか紹介しておきたいと思います。

残りはリンクだけ紹介しておきます。

スポンサーリンク