Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

PigでHadoopをより便利に使う!PigでのMapReduceまとめ

Hadoop Hacks ―プロフェッショナルが使う実践テクニック

Hadoop Hacks ―プロフェッショナルが使う実践テクニック

Pig

HadoopMapReduceを独自で記述するのは手間が掛かります。それらの手間を出来るだけ緩和させるための便利なツールとしてDSL形式の処理フローを定義する事でMapReduceを実行するHiveやPIgというものが存在します。HiveとPigはライバルブロジェクトのようで、本日紹介するPigはYahoo!が開発しているミドルウェアになります。Hiveについては以前簡単に紹介をしたので以下のリンクを参考にしてください。PigLatinという手続き型の文法でDataのload/filter/join/sort/group/join/limit/storeなどの処理を組み合わせ、inputからoutputまでの一連のMapReduceを定義する事ができます。JavaやStreamingでのMapReduceでは目的のデータを抽出するために多段MapReduce処理を記述する事もありますが、PigやHiveを使うと1回の実行で済んだりします。
Hadoopをより便利に使う!HiveでのMapReduceまとめ - Yuta.Kikuchiの日記 はてなブックマーク - Hadoopをより便利に使う!HiveでのMapReduceまとめ - Yuta.Kikuchiの日記

Pigの実行

Install

CentosでInstallする内容については以前まとめたので、詳しくはそちらを参照してください。
CentOSでHadoopを使ってみる - Yuta.Kikuchiの日記 はてなブックマーク - CentOSでHadoopを使ってみる - Yuta.Kikuchiの日記
必要なyum installは以下のものになります。

$ yum install hadoop-0.20 -y
$ yum install hadoop-0.20-conf-pseudo -y
$ yum install hadoop-pig -y
PigLatin

Pig Latin Basics はてなブックマーク - Pig Latin Basics
PigLatinとはPigの便利な文法のことです。ここでは代表的なLatinをいくつか紹介します。

文法 役割
LOAD データのファイルシステムから読み込み
STORE データをファイルシステムに保存する
DUMP 結果をコンソールに出力する
FILTER 条件を指定しデータのフィルタリングを行う
MATCHES データを正規表現の条件でフィルタリングする
FOREACH 繰り返しデータ変換を行う
SPLIT データのを分割する
JOIN データの結合を行う
GROUP データのグループ化を行う
ORDER データをソートする
DISTINCT データの重複を排除する
LIMIT データの出力件数を制限する
SAMPLE データのサンプリングを行う
LocalModeで実行

Pigはlocalmodeとmapreducemodeをそれぞれ実行で切り分ける事が出来ます。localmodeはjobを投げるサーバの1台で、データのLoad/Storeもローカルサーバのものに対して行います。mapreducemodeはHDFS上のデータを利用し、全Hadoopクラスタに対して分散処理を行うモードになります。localで実行したい場合は-x localというオプションを起動時に設定します。pigコマンドを実行すると対話コンソールが出力されます。

$ man pig
       -x, -exectype=[local|mapreduce]
              execution type; mapreduce is default

$ pig -x local
2012-12-15 02:28:51,942 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/yuta/pig_1355506131872.log
2012-12-15 02:28:52,375 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt> 
Pig Practice

テストとして以下のようなTab区切りのデータをinput.txtとして用意し、正規表現の条件にマッチした行を抽出し、outputとして保存するような処理を実行します。

$ cat intput.txt
Sample1  1981  1/1   11:00
Sample2  1982  1/2   12:00
Sample3  1983  1/3   13:00

$ pig -x local
grunt> input_data = LOAD 'input.txt';
(Sample1  1981  1/1   11:00)
(Sample2  1982  1/2   12:00)
(Sample3  1983  1/3   13:00)

grunt> filter_data = FILTER input_data BY $0 MATCHES '.*1983.*';
grunt> DUMP filter_data;
(Sample3  1983  1/3   13:00)
grunt> STORE filter_data INTO 'output';  
grunt> quit

$ cat output/part-m-00000 
Sample3  1983  1/3   13:00

PigStorage関数を使う事によって指定したDelimiterによるデータの分離と、分離したデータをリレーションに格納してくれます。分離後のデータはAS(リレーション名:データ型)のように定義し、データ型はchararayが文字列、intが整数、floatが実数です。

grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
grunt> filter_data = FILTER input_data BY year != 1981 AND date != '1/2' AND time == '13:00';
grunt> DUMP filter_data; 
(Sample3,1983,1/3,13:00)

Filterした結果をGroup化させます。

grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
grunt> filter_data = FILTER input_data BY label MATCHES '.*Sample.*';
grunt> DUMP filter_data;
(Sample1,1981,1/1,11:00)
(Sample2,1982,1/2,12:00)
(Sample3,1983,1/3,13:00)
(Sample4,1983,1/4,14:00)
(Sample5,1983,1/5,15:00)
grunt> group_data = GROUP filter_data BY year;
grunt> DUMP group_data;
(1981,{(Sample1,1981,1/1,11:00)})
(1982,{(Sample2,1982,1/2,12:00)})
(1983,{(Sample3,1983,1/3,13:00),(Sample4,1983,1/4,14:00),(Sample5,1983,1/5,15:00)})

Group化した結果を繰り返しのFOREACHを利用する事でGROUP内の最大値を求めます。

grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
grunt> group_data = GROUP input_data BY year;
grunt> max_data = FOREACH group_data GENERATE group, MAX(input_data.time);
grunt> DUMP max_data;
(1981,11:00)
(1982,12:00)
(1983,15:00)

Filterした結果を降順にソートします。

grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
grunt> filter_data = FILTER input_data BY label MATCHES '.*Sample.*';
grunt> order_data = ORDER filter_data BY year DESC;
grunt> DUMP order_data;
(Sample3,1983,1/3,13:00)
(Sample4,1983,1/4,14:00)
(Sample5,1983,1/5,15:00)
(Sample2,1982,1/2,12:00)
(Sample1,1981,1/1,11:00)
外部スクリプトに保存して実行

Pigの実行は対話的なコンソールだけでなく、スクリプトを外部ファイルとして保存してコマンドに流すこともできます。

$ cat file.pig
input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
filter_data = FILTER input_data BY label MATCHES '.*Sample.*';
group_data = GROUP filter_data BY year;
DUMP group_data;

$ pig -x local file.pig
(1981,{(Sample1,1981,1/1,11:00)})
(1982,{(Sample2,1982,1/2,12:00)})
(1983,{(Sample3,1983,1/3,13:00),(Sample4,1983,1/4,14:00),(Sample5,1983,1/5,15:00)})

外部スクリプトに対して必要なパラメータを設定したり、デフォルトの変数を設定する事ができます。パラメータの設定には-paramオプションを利用、デフォルト変数を利用するためには%defaultを使います。

$ cat test.pig
%default INPUT input.txt 
%default OUTPUT /home/yuta/work/pig/result  

input_data = LOAD '$INPUT' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray);
group_data = GROUP input_data BY year;
max_data = FOREACH group_data GENERATE group, MAX(input_data.time);
STORE max_data INTO '$OUTPUT';

$ pig -x local -param INPUT=input.txt -param OUTPUT=/home/yuta/work/pig/result test.pig
$ cat /home/yuta/work/pig/result/part-r-00000
1981 	11:00
1982 	12:00
1983 	15:00