PigでHadoopをより便利に使う!PigでのMapReduceまとめ
Hadoop Hacks ―プロフェッショナルが使う実践テクニック
- 作者: 中野猛,山下真一,猿田浩輔,上新卓也,小林隆
- 出版社/メーカー: オライリージャパン
- 発売日: 2012/04/25
- メディア: 単行本(ソフトカバー)
- 購入: 3人 クリック: 156回
- この商品を含むブログ (8件) を見る
Pig
HadoopのMapReduceを独自で記述するのは手間が掛かります。それらの手間を出来るだけ緩和させるための便利なツールとしてDSL形式の処理フローを定義する事でMapReduceを実行するHiveやPIgというものが存在します。HiveとPigはライバルブロジェクトのようで、本日紹介するPigはYahoo!が開発しているミドルウェアになります。Hiveについては以前簡単に紹介をしたので以下のリンクを参考にしてください。PigLatinという手続き型の文法でDataのload/filter/join/sort/group/join/limit/storeなどの処理を組み合わせ、inputからoutputまでの一連のMapReduceを定義する事ができます。JavaやStreamingでのMapReduceでは目的のデータを抽出するために多段MapReduce処理を記述する事もありますが、PigやHiveを使うと1回の実行で済んだりします。
Hadoopをより便利に使う!HiveでのMapReduceまとめ - Yuta.Kikuchiの日記
Pigの実行
Install
CentosでInstallする内容については以前まとめたので、詳しくはそちらを参照してください。
CentOSでHadoopを使ってみる - Yuta.Kikuchiの日記
必要なyum installは以下のものになります。$ yum install hadoop-0.20 -y $ yum install hadoop-0.20-conf-pseudo -y $ yum install hadoop-pig -yPigLatin
Pig Latin Basics
PigLatinとはPigの便利な文法のことです。ここでは代表的なLatinをいくつか紹介します。
文法 役割 LOAD データのファイルシステムから読み込み STORE データをファイルシステムに保存する DUMP 結果をコンソールに出力する FILTER 条件を指定しデータのフィルタリングを行う MATCHES データを正規表現の条件でフィルタリングする FOREACH 繰り返しデータ変換を行う SPLIT データのを分割する JOIN データの結合を行う GROUP データのグループ化を行う ORDER データをソートする DISTINCT データの重複を排除する LIMIT データの出力件数を制限する SAMPLE データのサンプリングを行う LocalModeで実行
Pigはlocalmodeとmapreducemodeをそれぞれ実行で切り分ける事が出来ます。localmodeはjobを投げるサーバの1台で、データのLoad/Storeもローカルサーバのものに対して行います。mapreducemodeはHDFS上のデータを利用し、全Hadoopクラスタに対して分散処理を行うモードになります。localで実行したい場合は-x localというオプションを起動時に設定します。pigコマンドを実行すると対話コンソールが出力されます。
$ man pig -x, -exectype=[local|mapreduce] execution type; mapreduce is default $ pig -x local 2012-12-15 02:28:51,942 [main] INFO org.apache.pig.Main - Logging error messages to: /home/yuta/pig_1355506131872.log 2012-12-15 02:28:52,375 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:/// grunt>Pig Practice
テストとして以下のようなTab区切りのデータをinput.txtとして用意し、正規表現の条件にマッチした行を抽出し、outputとして保存するような処理を実行します。
$ cat intput.txt Sample1 1981 1/1 11:00 Sample2 1982 1/2 12:00 Sample3 1983 1/3 13:00 $ pig -x local grunt> input_data = LOAD 'input.txt'; (Sample1 1981 1/1 11:00) (Sample2 1982 1/2 12:00) (Sample3 1983 1/3 13:00) grunt> filter_data = FILTER input_data BY $0 MATCHES '.*1983.*'; grunt> DUMP filter_data; (Sample3 1983 1/3 13:00) grunt> STORE filter_data INTO 'output'; grunt> quit $ cat output/part-m-00000 Sample3 1983 1/3 13:00PigStorage関数を使う事によって指定したDelimiterによるデータの分離と、分離したデータをリレーションに格納してくれます。分離後のデータはAS(リレーション名:データ型)のように定義し、データ型はchararayが文字列、intが整数、floatが実数です。
grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); grunt> filter_data = FILTER input_data BY year != 1981 AND date != '1/2' AND time == '13:00'; grunt> DUMP filter_data; (Sample3,1983,1/3,13:00)Filterした結果をGroup化させます。
grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); grunt> filter_data = FILTER input_data BY label MATCHES '.*Sample.*'; grunt> DUMP filter_data; (Sample1,1981,1/1,11:00) (Sample2,1982,1/2,12:00) (Sample3,1983,1/3,13:00) (Sample4,1983,1/4,14:00) (Sample5,1983,1/5,15:00) grunt> group_data = GROUP filter_data BY year; grunt> DUMP group_data; (1981,{(Sample1,1981,1/1,11:00)}) (1982,{(Sample2,1982,1/2,12:00)}) (1983,{(Sample3,1983,1/3,13:00),(Sample4,1983,1/4,14:00),(Sample5,1983,1/5,15:00)})Group化した結果を繰り返しのFOREACHを利用する事でGROUP内の最大値を求めます。
grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); grunt> group_data = GROUP input_data BY year; grunt> max_data = FOREACH group_data GENERATE group, MAX(input_data.time); grunt> DUMP max_data; (1981,11:00) (1982,12:00) (1983,15:00)Filterした結果を降順にソートします。
grunt> input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); grunt> filter_data = FILTER input_data BY label MATCHES '.*Sample.*'; grunt> order_data = ORDER filter_data BY year DESC; grunt> DUMP order_data; (Sample3,1983,1/3,13:00) (Sample4,1983,1/4,14:00) (Sample5,1983,1/5,15:00) (Sample2,1982,1/2,12:00) (Sample1,1981,1/1,11:00)外部スクリプトに保存して実行
Pigの実行は対話的なコンソールだけでなく、スクリプトを外部ファイルとして保存してコマンドに流すこともできます。
$ cat file.pig input_data = LOAD 'input.txt' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); filter_data = FILTER input_data BY label MATCHES '.*Sample.*'; group_data = GROUP filter_data BY year; DUMP group_data; $ pig -x local file.pig (1981,{(Sample1,1981,1/1,11:00)}) (1982,{(Sample2,1982,1/2,12:00)}) (1983,{(Sample3,1983,1/3,13:00),(Sample4,1983,1/4,14:00),(Sample5,1983,1/5,15:00)})外部スクリプトに対して必要なパラメータを設定したり、デフォルトの変数を設定する事ができます。パラメータの設定には-paramオプションを利用、デフォルト変数を利用するためには%defaultを使います。
$ cat test.pig %default INPUT input.txt %default OUTPUT /home/yuta/work/pig/result input_data = LOAD '$INPUT' USING PigStorage() AS (label:chararray,year:int,day:chararray,time:chararray); group_data = GROUP input_data BY year; max_data = FOREACH group_data GENERATE group, MAX(input_data.time); STORE max_data INTO '$OUTPUT'; $ pig -x local -param INPUT=input.txt -param OUTPUT=/home/yuta/work/pig/result test.pig $ cat /home/yuta/work/pig/result/part-r-00000 1981 11:00 1982 12:00 1983 15:00