3ヶ月間Hadoopを使ってみて学んだ事
- 作者: Tom White,玉川竜司,兼田聖士
- 出版社/メーカー: オライリージャパン
- 発売日: 2011/07/23
- メディア: 大型本
- 購入: 9人 クリック: 182回
- この商品を含むブログ (24件) を見る
Overture
BigData解析という仕事をやり始めて半年、Hadoopを業務で使い始めて3ヶ月以上が経過したのでここで今までの業務での知識をまとめてみたいと思います。先日参加したWebDBForum2012でも各種企業がBigData(主にログ)からユーザの趣味思考や特徴などを解析して表示システムへのFeedBackや企業戦略などに活かしている報告があり、Hadoopなどの分散処理技術や今後は更にリアルタイムでBigDataを使うためのミドルウェアが出てくることが予想され、そこに精通した人間が求められるようになってくると思います。
第5回 Webとデータベースに関するフォーラム (WebDB Forum 2012)
Hadoop
Hadoop - Wikipedia
Hadoopの説明を簡単に。大量のデータを扱うために従来のスタンドアロンで処理を行うのではなく、大量のマシンに処理を分散させて解析スピードを劇的に改善することを目的としたフレームワークです。例えば200GほどのApacheのAccessLogからUserのCookie、Refererや利用しているUserAgent/Deviceなどをデータを1日毎に集計しようとした場合、1台スクリプトで計算していたら翌日の集計に間に合いません。しかしこれをHadoopの分散処理を使えば1-2時間で処理を完了させることが出来ると思います。ServerStructure
Hadoopのクラスタはmaster/slaveの二つに分けられます。masterとなるのは1台のサーバでそこを起点にslaveノードに対してTask,Dataが割り当てられます。master/slaveのサーバの内部でもMapReduce処理を行うサーバ、HDFSのファイルを管理するサーバに分けられ、それぞれに対してJobTracker/TaskTracker/NameNode/DataNodeと名前が定義されています。
- JobTracker
- HadoopClientからjobを受け取る。Slaveサーバに対してTaskを分割する。
- TaskTracker
- Masterサーバから受け取ったTaskを実行する。
- NameNode
- 実際のデータがDataNodeのどこに格納されているのかを管理するメタデータサーバ。
- DataNode
- NameNodeによって管理されている実際のデータ。ブロックと呼ばれる単位で格納されている。ブロックはデフォルトで64MByte。ファイルは設定に応じてreplication管理されている。
以下は概要図となりますが、Clientから投げられたHadoopのJobをMasterのJobTrackerが受け付けてSlaveのTaskTrackerに流します。Clientから投げられたHadoopのJobのINPUT_PATHはNameNodeに問い合わせを行い、どのDataNodeにデータが格納されているのかを取得します。JobTrackerはTaskTrackerに対してDataNodeのファイルを指定し、TaskTrackerが実際のファイルを取りに行くような仕組みです。
Map / Sort & Shuffle/ Reduce
MapReduce流れについて説明します。MapReduceを一言で書くとKey/Value形式のテキストデータを標準入出力を用いた分散処理でまとめる事です。処理の流れを以下で書きます。HadoopClientからJobを流し込む時にInputPath,OutputPath,mapper,reducerを指定します。InputPath/OutputPathはHDFS上のディレクトリを設定し、Mapper/ReducerはHadoopのユーザが定義したプログラムを設定します。JobTrackerはInputPathのデータをInputSplitと呼ばれる単位のファイルに分割し、それぞれのTaskTrackerに分散されます。TaskTrackerではユーザが定義したMapperを実行するためにInputSplitからデータを抽出します。map処理では通常、1行の分解と目的データの抽出を行います。map処理で出力するデータもKeyとValue形式になっている必要があり、通常はそれらをTabで区切りで出力します。出力されたデータはTaskTrackerのローカルに一時的に書き出しされます。出力したmapの結果からKeyを基にSortを行い、Shuffle処理で同じKeyのデータを束ねます。Reducerが受け取るデータの規則はKeyによって決まります。よってShuffleで束ねられたデータはKeyを基に必ず特定のReducerに渡るようになっています。Reducerにデータを渡す方法はmap処理を行ったTaskTrackerからファイルのコピーにて行われます。Reducerの目的はMapperが出力したデータの集計する処理になります。コピーが完了し、Reducerの初期化が完了したTaskから処理に移りユーザが定義したReducerが呼び出され、最終的な結果を最初に指定したOutputPathのディレクトリにファイルとして保存します。
MapReduce Technique
MapReduceを使っていて気づいたTechniqueをまとめます。
- 実装したMapper/ReducerをHadoopClientだけで動かして正常に動作する事を確認する。Mapper/Reducerは標準入出力プログラミングなので一度HDFS上のファイルの一部をClientサーバにコピーし、コマンドラインで結果をパイプで連結して目的とする結果が出力されることを確認する。
- 大量のTaskTrackerサーバのプログラム更新を抑える。StreamingのfileオプションやDistributedCacheを利用してTaskTrackerに対して共通のパッケージをinstallするなどの手間を省く。
- Mapperで出力するKeyがReducer側で分散し偏りが無いように定義する。例えば指定するReducerの数を大きくしてもMapperで吐き出すKeyのバリエーションが少なければ特定のReducerにデータが転送されてしまい、処理が軽いReducerと重いReducerが存在し十分に並列処理を活かす事ができない。Keyの指定にはバリエーション豊富なデータ項目として沢山のReducerに散らせるようにする。
- Reducerへのデータ転送容量を減らす。MapperのTaskTrackerからReducerへはデータが転送コピーされる。このデータ容量を出来る限りコンパクトにするため、Mapperで無駄なデータ項目は出力しない。
- Combinerを使ってReducerへのデータ転送を減らす。MapperとReducerの中間のCombinearを定義して一度簡易集計を行う。簡易集計した結果をReducer側に渡す事によってデータ転送量を減らす事ができる。
- 1つのMapperに与えるInputSplitのデータサイズが大きい場合はMapper処理が途中で失敗することがある。Hadoop側の設定ファイルを修正する方法もありそうだが、InputFileのデータを細かく分割してHDFSに上げ直すとうまくいくことがある。
- 1ヶ月間等の大量のログデータを一度のMapperにInputするとHeapSizeを超えて処理できない可能性がある。その場合は一度1日や1週間単位でコンパクトなデータを作り、コンパクトしたデータを1ヶ月分のデータとして再集計を行う。
- Reducerでの出力を数値の集計としてまとめたい場合、1度のMapReducerでは出来ない可能性がある。その場合は2回に分けてMapReduceを行う必要がある。1回目は複数のReducerで大量のデータから部分集計を行う。2回目のReducerでは1個起動するように定義し、1回目の出力を全てまとめて出力する。
Practice
CentOS release 6.3 (Final)でHadoop Streamingを動かしてみます。
WordCount
http://flash.sonypictures.com/video/movies/thesocialnetwork/awards/thesocialnetwork_screenplay.pdf
映画socialnetworkの台詞を元にWordCountをHadoopStreamingで行います。まずはテキストをword.txtというファイルにコピーしてHDFSにアップします。$ alias hdfs="/usr/lib/hadoop-0.20/bin/hadoop fs" $ hdfs -mkdir socialnetwork/input $ hdfs -put word.txt socialnetwork/input $ hdfs -ls socialnetwork/input -rw-r--r-- 1 yuta supergroup 167091 2012-11-23 21:42 /user/yuta/socialnetwork/input/word.txt英単語はピリオド、スペース、カンマ、クエッションで区切られているのでそれを元に単語に分割します。Mapperで単語
1というデータ形式で出力し、Reducer側で単語のHashを作成し、カウントの合計値を計算します。ここでは簡単な処理のためにMapper/Reducerともに1で実行します。またMapper/ReducerともにPerlで記述しています。 #!/usr/bin/perl #WordCouのMapperです use strict; use warnings; while( <> ) { chomp $_; my @tokens = split( /\s|\.|,|\?/, $_ ); foreach( @tokens ) { $_ =~ s/(“|”|-)//g; if( $_ ne "" && $_ ne " " ) { print $_ . "\t" . 1 . "\n"; } } }#!/usr/bin/perl #WordCountのReducerです use strict; use warnings; my %word_hash = (); while( <> ) { chomp $_; my @tokens = split( /\t/, $_ ); my $word = $tokens[0]; if( !defined( $word_hash{$word} ) ) { $word_hash{$word} = 0; } $word_hash{$word}++; } foreach my $word ( sort keys %word_hash ) { print $word . "\t" . $word_hash{$word} . "\n"; }自作のMapper/Reducerが完成したので一度ローカルで実行してみます。Mapperのデータ形式とReducerの最終出力結果に間違いが無いかを確認します。
$ cat word.txt | perl mapper.pl the 1 world 1 MARK 1 waits 1 And 1 waits 1 And 1 we 1 SNAP 1 TO 1 BLACK 1 ROLL 1 MAIN 1 TITLE 1 $ cat word.txt | perl mapper.pl | perl reducer.pl FACEBOOK 6 Facebook 40 Facebookwhich 1 Google 2 MARK 583 MARK) 15 MARKDUSTIN 1 MARK’S 26 MARK’s 24 Mark 77 Mark! 1 Mark’I 1 Mark’s 9 TheFacebook 8 facebook 6 facebooks 1 mark 1 marker 1 resentedMark 1 theFacebook 17Hadoop上でMapReduceを実行してみます。以下のようなスクリプトを作成し、コマンドの確認と実行を行います。
#!/bin/sh job_name='"SocialNetwrok WordCount"' hadoop='/usr/lib/hadoop-0.20/bin/hadoop' streaming_jar='/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar' input_path='socialnetwork/input' output_path='socialnetwork/ouput' mapper='wordcount_mapper.pl' reducer='wordcount_reducer.pl' base_path=`pwd` reducer_num=1 echo $hadoop fs -rmr $output_path echo $hadoop jar $streaming_jar -D mapred.job.name=$job_name -D mapred.reduce.task=$reducer_num -input $input_path -output $output_path -mapper $mapper -reducer $reducer -file $base_path/$mapper -file $base_path/$reducer$ ./exec.sh /usr/lib/hadoop-0.20/bin/hadoop fs -rmr socialnetwork/ouput /usr/lib/hadoop-0.20/bin/hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar -D mapred.job.name="SocialNetwrok WordCount" -D mapred.reduce.task=1 -input socialnetwork/input -output socialnetwork/ouput -mapper wordcount_mapper.pl -reducer wordcount_reducer.pl -file /home/yuta/work/hadoop/socialnetwork/wordcount_mapper.pl -file /home/yuta/work/hadoop/socialnetwork/wordcount_reducer.pl $ ./exec.sh | sh 2/11/23 23:23:09 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201211231323_0005 12/11/23 23:23:09 ERROR streaming.StreamJob: Job not successful. Error: NA 12/11/23 23:23:09 INFO streaming.StreamJob: killJob...Hadoopを実行してみましたがINFO streaming.StreamJob: killJob...というエラーがコンソールに表示されJobが動きませんでした。http://localhost:50030/jobdetails.jspのツールでエラーを確認すると以下が吐き出されていました。原因を調査してみると、Scriptの先頭に記述するShebangが正しいパスになっていないと動かないようです。shell script not found in hadoop - Stack Overflow 本来#!/usr/bin/perlと書くべきところを#!/usr/local/bin/perlとしていた事が原因でした。
Cannot run program "/var/lib/hadoop-0.20/cache/mapred/mapred/local/taskTracker/yuta/jobcache/job_201211231323_0006/attempt_201211231323_0006_m_000001_3/work/./wordcount_mapper.pl": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)shebangを書き直して再度同じshellを実行します。今度はちゃんとJobが動き出して指定したoutputのディレクトリに結果が格納されます。outputのディレクトリには成功したかどうかの検知用の_SUCCESSと実行ログの_logsと結果出力用のpart-0000が作成されます。作成されたpart-0000の中身を見てみると先ほどlocalで実行した結果と同じものが出力されていて、問題なくHadoopが実行できた事が確認できます。
$ ./exec.sh | sh 12/11/24 00:54:59 INFO mapred.FileInputFormat: Total input paths to process : 1 12/11/24 00:55:00 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local] 12/11/24 00:55:00 INFO streaming.StreamJob: Running job: job_201211231323_0007 12/11/24 00:55:00 INFO streaming.StreamJob: To kill this job, run: 12/11/24 00:55:00 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201211231323_0007 12/11/24 00:55:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201211231323_0007 12/11/24 00:55:01 INFO streaming.StreamJob: map 0% reduce 0% 12/11/24 00:55:26 INFO streaming.StreamJob: map 50% reduce 0% 12/11/24 00:55:27 INFO streaming.StreamJob: map 100% reduce 0% 12/11/24 00:55:39 INFO streaming.StreamJob: map 100% reduce 17% 12/11/24 00:55:41 INFO streaming.StreamJob: map 100% reduce 100% 12/11/24 00:55:44 INFO streaming.StreamJob: Job complete: job_201211231323_0007 12/11/24 00:55:44 INFO streaming.StreamJob: Output: socialnetwork/output $ hdfs -ls socialnetwork/output -rw-r--r-- 1 yuta supergroup 0 2012-11-24 00:55 /user/yuta/socialnetwork/output/_SUCCESS drwxr-xr-x - yuta supergroup 0 2012-11-24 00:55 /user/yuta/socialnetwork/output/_logs -rw-r--r-- 1 yuta supergroup 44607 2012-11-24 00:55 /user/yuta/socialnetwork/output/part-0000 $ hdfs -text socialnetwork/output/part-00000 FACEBOOK 6 Facebook 40 Facebookwhich 1 Google 2 MARK 583 MARK) 15 MARKDUSTIN 1 MARK’S 26 MARK’s 24 Mark 77 Mark! 1 Mark’I 1 Mark’s 9 TheFacebook 8 facebook 6 facebooks 1 mark 1 marker 1 resentedMark 1 theFacebook 17DistributedCache
HDFS上に設置したテキストファイルやプログラムをDistributedCacheの形式で読み込めるようにします。ここでは簡単なサンプルとしてMapperへのinputを通常のinputpath以外にもDistributedCacheさせたファイルから読み込めるような設定を行います。例としてWordCountさせたくないNGWordを定義してinputの内容と照らし合わせてMapperで弾くといった処理をやってみます。まずは集計の対象外としたいng_word.txtをHDFSに上げます。またHDFSに上げたng_word.txtをMapperで読み込めるように修正します。実行するshellスクリプトも-cacheFileというオプションでHDFS上のng_word.txtを参照するようにします。HDFSのパスはFullPathで記載し、最後に読み込みたいフィアルのsymlinkを付けます。 hdfs://host:port/absolute-path#link-name
$ hdfs -put ng_word.txt socialnetwork/input/ng_word.txt $ hdfs -text hdfs://localhost/user/yuta/socialnetwork/input/ng_word.txt Facebook Mark Zuckerberg#!/usr/bin/perl #WordCout Mapperです use strict; use warnings; use Data::Dumper; open( FH, "./ng_word.txt" ); my @nglist = <FH>; while( my $data = <> ) { chomp $data; my @tokens = split( /\s|\.|,|\?/, $data ); foreach my $node ( @tokens ) { $node =~ s/(“|”|-)//g; if( $node ne "" && $node ne " " ) { my $flag = 1; foreach my $list( @nglist ) { $list =~ s/\n//g; if( $node =~ /$list/i ) { $flag = 0; last; } } if( $flag == 1 ) { print $node . "\t" . 1 . "\n"; } } } }MapReduceを実行してみるとわかりますが、ng_word.txtを読み込んでFacebook,Mark,Zuckerbergの文字列を含む単語を弾いている事が分かります。
$ cat exec.sh #!/bin/sh job_name='"SocialNetwrok WordCount"' hadoop='/usr/lib/hadoop-0.20/bin/hadoop' streaming_jar='/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar' input_path='socialnetwork/input' output_path='socialnetwork/output' mapper='wordcount_mapper.pl' reducer='wordcount_reducer.pl' base_path=`pwd` reducer_num=1 cachefile='hdfs://localhost/user/yuta/socialnetwork/input/ng_word.txt#ng_word.txt' echo $hadoop fs -rmr $output_path echo $hadoop jar $streaming_jar -D mapred.job.name=$job_name -D mapred.reduce.task=$reducer_num -input $input_path -output $output_path -mapper $mapper -reducer $reducer -file $base_path/$mapper -file $base_path/$reducer -cacheFile $cachefile $ ./exec.sh | sh $ hdfs -text socialnetwork/output/part-00000 | egrep -i "(FaceBook|Mark|Zuckerberg)" (結果なし)DistributedCacheの実行で-cacheFileのオプションについてdeprecatedのエラーが出ていました。今後は-filesオプションを使って欲しいとの事ですが -filesは現在hadoop jobコマンドのみしか対応していないようでhadoop jarには適用できません。Commands Guide よってこのエラーは無視してcacheFIleオプションを使うと良いと思います。
WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead上はテキストファイルのDistributedCacheを説明しましたがJARファイルなども読み込む事ができます。JARファイルをHDFSから読み込む時のオプションは-cacheArchiveで読み込む事ができます。
FutureWorks
冒頭でも述べましたが今後はRealtimeでBigDataを扱うケースが増えてくると思います。(まぁRealtimeで扱う時点でBigDataなのか?という議論はあると思いますが。)そんな中で活躍しそうなMiddleWareについてリンクをいくつか紹介しておきたいと思います。
- S4: Distributed Stream Computing Platform
- nathanmarz/storm · GitHub
- Jubatus : オンライン機械学習向け分散処理フレームワーク ― Jubatus 0.3.3 documentation
残りはリンクだけ紹介しておきます。