Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

JavaでMapReduceを書くことが出来ない問題児がPigのデータ構造を調査しました

Programming Pig

Programming Pig

本当はJavaで書きたい。けどコンパイルや多段MapReduceは面倒なので

まずは僕の面倒くさがりな性格とプログラミング言語の話。10年前はJavaでWebアプリを書いていましたが、就職してScript言語をばりばり使っていた時期が長く続いたのでJavaから遠ざかってしまいました。もともとJavaコンパイルが嫌いで、環境を整えたり直ぐに動作確認ができなかったり。スピードを求められる単純作業がその面倒な事によって時間が削られることを嫌っています。(自分でも良くないことだと思っていますので、今後は時間が有るときにJavaを書いてみます)JavaMapReduceは柔軟であり速度的にも速いことは知っています。ただMapReduceで複雑なデータのJoinやデータ集計を行うためには多段のMapReduceを書かざるを得ないこともあり、単純集計のコードを複数管理することもまた面倒に感じられてしまいます。
という事で、僕はHadoop集計ではPigを使うことにしました(笑) PigはSQLチックなLatinでデータをパズル的に組み合わせてデータ集計が可能です。速度的には当然JavaMapReduceよりは遅くなってしまいますが(たしか平均で1.5倍ぐらい遅くなる)、一つのScriptで柔軟なデータ組み合わせができます。
Pigの良いところはたったの数行で強力なデータの組み合わせが表現できること、悪いところは1行の表現が難しく自分の直感とは全く異なる挙動をすることがあるので細かくDESCRIBEを用いてデータの形式を確認する必要があります。以前に自分のメモとしてPigの内容を紹介をしました。今日はデータの格納と組み合わせのTipsについて紹介したいと思います。

Index

  1. Pigのデータ構造調査
    1. Latinの日本語ドキュメント
    2. DataFormatの確認にDESCRIBEを利用する
    3. tuple,bagの中身を参照する
    4. InputからOutputしたいDataFormatを考える
    5. JOINとCOGROUPでは参照の仕方が異なる
    6. JOIN/COGROUPのBY属性にnull項目があるときは気をつける
    7. STOREした時のtuple/bag/mapは文字列として保存される。再度LOADする時はPig形式のデータで読み込みたい時はASで指定する
  2. その他
    1. 文法をCheckしてlocalで実行し、問題なければHDFS上でMapReduceさせる
    2. Reducerの数を調節したい
    3. STOREするDataをgzip圧縮したい

Pig データ構造調査

Latinの日本語ドキュメント

Pigに関する日本語のドキュメントは次のページを見ると詳しく載っています。
Pig Latin の基本 はてなブックマーク - Pig Latin の基本

DataFormatの確認にDESCRIBEを利用する
/* input */
John 17 Men youtube
John 17 Men yahoo
Kate 18 Women facebook
Kate 18 Women ebay
Kate 18 Women ebay
Tom 19 Men google

まずはDESCRIBEを覚えると良いです。DESCRIBEは変数(alias)がどういったDataFormatなのかをコンソールに出力してくれます。Pig内部のData操作はDESCRIBEで出力してみないと良くわからないことが多いです。変数の中身を確認するにはDUMPで結果を出力するか、STOREで結果ファイルパスに格納することができますが、LOADのデータサイズが大きいと実行に時間がかかってしまうのと複雑なPigは変数にどの変数の一部が格納されているのかが分かりづらくなってしまうので、開発中はDESCRIBEで変数のDataFormatだけをDebug出力して、コードをどんどん書いていくと良いでしょう。下のInputとSampleのPigコードに対してDESCRIBEさせた結果を見てみます。group_dataは名前、年齢、性別、閲覧Domainで共通GROUP化しています。下の説明だけで奥が深くなってしまうのですが、group_dataには共通項として設定したname/age/gen/domainがtuple形式のgroupとして、グルーピングされたdata部分がbag形式として保存されます。groupというデータ形式はGROUPを使った場合に付与されるデータ群、tupleは( )で表現される単純なデータの入れ物、Bagは{ } で表現される色々な種類のデータの入れ物としてまずは覚えておくといいと思います。

data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data = GROUP data BY ( name, age, gen, domain );
DESCRIBE group_data;
DUMP group_data;

/* DESCRIBEの結果 */
group_data: {group: (name: chararray,age: int,gen: chararray,domain: chararray),data: {name: chararray,age: int,gen: chararray,domain: chararray}} 

/* 出力結果 */
((Tom,19,Men,google),{(Tom,19,Men,google)})
((John,17,Men,yahoo),{(John,17,Men,yahoo)})
((John,17,Men,youtube),{(John,17,Men,youtube)})
((Kate,18,Women,ebay),{(Kate,18,Women,ebay),(Kate,18,Women,ebay)})
((Kate,18,Women,facebook),{(Kate,18,Women,facebook)})


tuple,bagの中身を参照する

tuple,bagともに中身を参照したい場合は.(ドット)を利用します。(tuple/bag名.名前)ドットを参照外し演算子と読んでいるようです。hash型のmapデータ型の場合はmap名#keyと指定します。tupleとbagの参照では少しデータの取り方に違いがあります。下のコードの例を見てみます。group_dataの中身であるtupleのgroupからname,ageをbagのdataからそれぞれデータを参照しようとしていますが、bagの方は出力がまだbag形式になってしまっています。

data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data = GROUP data BY ( name, age, gen, domain );
ref_data = FOREACH group_data GENERATE group.name AS gname, group.age AS gage, data.gen AS dgen, data.domain AS ddomain;
DUMP ref_data;

/* 出力結果 */
(Tom,19,{(Men)},{(google)})
(John,17,{(Men)},{(yahoo)})
(John,17,{(Men)},{(youtube)})
(Kate,18,{(Women),(Women)},{(ebay),(ebay)})
(Kate,18,{(Women)},{(facebook)})

bagの中身をtupleと同様に参照したい場合はFLATTENでbagを強制的に解除することができますが、その場合は折角のGROUPが外れて独立組み合わせが出力されてしまいますのでFLATTENを使う場合は注意が必要です。下の例では試しにdata.genを平坦化させてみました。そうするとbagが外れています。

data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data = GROUP data BY ( name, age, gen, domain );
ref_data = FOREACH group_data GENERATE group.name AS gname, group.age AS gage, FLATTEN( data.gen ) AS dgen, data.domain AS ddomain;
DUMP ref_data;

/* 出力結果 */
(Tom,19,Men,{(google)})
(John,17,Men,{(yahoo)})
(John,17,Men,{(youtube)})
(Kate,18,Women,{(ebay),(ebay)})
(Kate,18,Women,{(ebay),(ebay)})
(Kate,18,Women,{(facebook)})
InputからOutputしたいDataFormatを考える

下のようなInputに対してどのようなOutputを出したいのかをまずは考えます。例では名前、年齢、性別、どのサイトを見ているかという履歴リストから重複行をカウントすることを考えます。前提条件として名前、年齢、性別の組み合わせはユニークで、サイト名だけ重複する可能性がある例です。手段としては1.名前、年齢、性別、サイト名でデータをGROUPさせて、2.GROUPされたデータ行をCOUNTするという方法です。

/* input */
John 17 Men youtube
John 17 Men yahoo
Kate 18 Women facebook
Kate 18 Women ebay
Kate 18 Women ebay
Tom 19 Men google

/* outputしたい内容 */
John 17 Men youtube 1
John 17 Men yahoo 1
Kate 18 Women facebook 1
Kate 18 Women ebay 2
Tom 19 Men google 1
data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data = GROUP data BY ( name, age, gen, domain );
count_data = FOREACH group_data GENERATE group.name, group.age, group.gen, group.domain, COUNT( data.domain );
DUMP count_data;

/* 出力結果 */
(Tom,19,Men,google,1)
(John,17,Men,yahoo,1)
(John,17,Men,youtube,1)
(Kate,18,Women,ebay,2)
(Kate,18,Women,facebook,1)

Pigの3行目を少し変えた記述もできます。groupを直接参照しないでFLATTENを用いて解除させてあげる方法です。tupleを平坦化したデータなので展開後の名前をASで付与しています。結果としては上と同じなのでどちらの記述でも構わないと思います。

data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data = GROUP data BY ( name, age, gen, domain );
count_data = FOREACH group_data GENERATE FLATTEN( group ) AS ( name, age, gen, domain ), COUNT( data.domain );
-- count_data = FOREACH group_data GENERATE group.name, group.age, group.gen, group.domain, COUNT( data.domain );
DUMP count_data;

/* 上と同じ結果が出力される */
(Tom,19,Men,google,1)
(John,17,Men,yahoo,1)
(John,17,Men,youtube,1)
(Kate,18,Women,ebay,2)
(Kate,18,Women,facebook,1)

上の結果を更に名前、年齢、性別のIndexをユニーク化して、サイト名とカウント数をIndexの後に追記したいとします。この場合は一つ上の例から更に名前、年齢、性別でGROUP化します。サイト名とカウント数をtupleで表現し、更にそれをbagで囲むようにします。ここでのポイントは2回目のGROUP後のFOREACHでbag名.( カラム名1, カラム名2 )のようにtupleを参照しているところです。ここでTOTUPLE関数を利用してTOTUPLE( bag名.カラム名1, bag名.カラム名2 )とすると全く異なる結果になってしまうので注意が必要です。同様にTOBAG関数を使う時も注意が必要です。不用意に利用してしまうと意図しないデータが生成されてしまいます。データ形式としてlist_data1.domainとlist_data1.cntのそれぞれが複数のデータを持つbag形式になっていることが分かれば理解できるかと思います。

/* input */
John 17 Men youtube
John 17 Men yahoo
Kate 18 Women facebook
Kate 18 Women ebay
Kate 18 Women ebay
Tom 19 Men google

/* outputしたい内容 */
John 17 Men {( youtube, 1), (yahoo, 1)}
Kate 18 Women {(facebook, 1), (ebay, 2)}
Tom 19 Men {(google, 1)}
data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain AS domain, COUNT( data1.domain ) AS cnt; 

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.(domain, cnt );
DUMP list_data1_uniq;

/* 出力したかった内容が表現できた */
(Tom,19,Men,{(google,1)})
(John,17,Men,{(yahoo,1),(youtube,1)})
(Kate,18,Women,{(ebay,2),(facebook,1)})
data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain AS domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, TOTUPLE( list_data1.domain, list_data1.cnt );
DUMP list_data1_uniq;

/* サイト名、カウントがそれぞれまとめられて、最終的に全部の項目がtupleとして出力される */
(Tom,19,Men,({(google)},{(1)}))
(John,17,Men,({(yahoo),(youtube)},{(1),(1)}))
(Kate,18,Women,({(ebay),(facebook)},{(2),(1)}))

もう一つTOTUPLEを使った時の誤った例を紹介します。一つ目のGROUP後のFOREACHでTOTUPLEを利用してdomainとCOUNTの組みをtuple化しようとします。こうしてしまうと2回目のGROUP後のFOREACHで余計にtupleが生成されてしまいます。どうやらFOREACHでは対象データが複数存在する場合には自動的にbagが生成され、更にbag内をtuple化してくれるようなので、あえて自分でTOTUPLEを使わなくても良いということが分かります。

data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, TOTUPLE( group.domain, COUNT( data1.domain )) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.cnt;
DUMP list_data1_uniq;

/* tuple( domain, count )の外に更にtupleが付与されてしまっている */
(Tom,19,Men,{((google,1))})
(John,17,Men,{((yahoo,1)),((youtube,1))})
(Kate,18,Women,{((ebay,2)),((facebook,1))})
JOINとCOGROUPでは参照の仕方が異なる

2つ以上のデータを結合するJOINとCOGROUPですが、それらの結果に対するデータの参照方法が異なります。ここも良く陥るところなので注意が必要です。例としてinput1は今までと同じもの、input2では趣味のデータを追加しました。これらのデータをOUTER JOIN(外部結合)をしたいと考えます。外部結合にはJOINかCOGROUPが必要で、まずはJOINの例から紹介します。JOINのDESCRIBEを見るとlist_data1_uniq::nameのようにタブルコロンが使用されており、JOINのデータ参照にはダブルコロンを指定しないといけません。(GROUPはドットだったりややこしいですね)

/* input1 */
John 17 Men youtube
John 17 Men yahoo
Kate 18 Women facebook
Kate 18 Women ebay
Kate 18 Women ebay
Tom 19 Men google

/* input2 */
John 17 Men soccer 
Ken 17 Men game
Kate 18 Women shopping 
Tom 19 Men skydiving
Tom 19 Men fishing 
data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.( domain, cnt );

data2 = LOAD 'student2.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, hobby:chararray );
group_data2 = GROUP data2 BY ( name, age, gen );
list_data2 = FOREACH group_data2 GENERATE FLATTEN( group ) AS ( name, age, gen ), data2.hobby;

join_list = JOIN list_data1_uniq BY (name, age, gen ) FULL OUTER, list_data2 BY (name, age, gen );

DUMP join_list;

/* DESCRIBEの結果 */
join_list: {list_data1_uniq::name: chararray,list_data1_uniq::age: int,list_data1_uniq::gen: chararray,list_data1_uniq::list_data1: {domain: chararray,cnt: long},list_data2::name: chararray,list_data2::age: int,list_data2::gen: chararray,list_data2::hobby: {hobby: chararray}}

/*出力結果*/
(,,,,Ken,17,Men,{(game)})
(Tom,19,Men,{(google,1)},Tom,19,Men,{(skydiving),(fishing)})
(John,17,Men,{(yahoo,1),(youtube,1)},John,17,Men,{(soccer)})
(Kate,18,Women,{(ebay,2),(facebook,1)},Kate,18,Women,{(shopping)})

次にGOCROUPの結果を見てみます。上のコードのJOIN〜FULL OUTERの箇所を書き換えます。結果を出力してみるとデータの形式が少しJOINと異なっています。JOINはBYで指定したデータ項目が空だと空として出力していますが、COGROUPの場合はBYで指定したデータが片方で存在しなくても、もう片方のデータを使って出力しています。またJOINはデータを単純に結合したデータとして表示しますが、COGROUPはデータ項目ごとにbag形式に変換して出力します。出力の比較としてはCOGROUPの方が分かりやすいと思うのでOUTER JOINをしたい場合はCOGROUPを使うことを薦めます。さてDESCRIBEの結果はどうなっているかというと、list_data1_uniq.nameのようにドットで参照する必要があります。COGROUPの動作はGROUPを元に作られているようなのでGROUP/COGROUPのデータ参照はドット、JOINはダブルコロンと覚えておくと良いと思います。

data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.( domain, cnt );

data2 = LOAD 'student2.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, hobby:chararray );
group_data2 = GROUP data2 BY ( name, age, gen );
list_data2 = FOREACH group_data2 GENERATE FLATTEN( group ) AS ( name, age, gen ), data2.hobby;

join_list = COGROUP list_data1_uniq BY (name, age, gen ), list_data2 BY (name, age, gen );
DESCRIBE join_list;
DUMP join_list;

/* DESCRIBEの結果 */ 
join_list: {group: (name: chararray,age: int,gen: chararray),list_data1_uniq: {name: chararray,age: int,gen: chararray,list_data1: {domain: chararray,cnt: long}},list_data2: {name: chararray,age: int,gen: chararray,hobby: {hobby: chararray}}}

/*出力結果*/
((Ken,17,Men),{},{(Ken,17,Men,{(game)})})
((Tom,19,Men),{(Tom,19,Men,{(google,1)})},{(Tom,19,Men,{(skydiving),(fishing)})})
((John,17,Men),{(John,17,Men,{(yahoo,1),(youtube,1)})},{(John,17,Men,{(soccer)})})
((Kate,18,Women),{(Kate,18,Women,{(ebay,2),(facebook,1)})},{(Kate,18,Women,{(shopping)})})

JOINの例でデータをCOGROUPに近い形式に変換する例を紹介します。FOREACH GENERATEでname,age,genがそれぞれ空かどうかの判定を利用してinput1,input2のどちらか片方からデータを引っ張ってくるような記述をします。嵌ったところとしてはageがint型で定義されているので == '' での空判定を行おうとするとintとchararrayの比較になるのでerrorが表示されます。今回はnullかどうかの判定がしたいのでis nullまたはis not nullを指定します。ちなみにIsEmptyというPigの標準関数で空かどうかをチェックするものがありますが、bagかmapにしか適用できないため今回のようなintやchararrayのケースでは利用できません。

data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.( domain, cnt );

data2 = LOAD 'student2.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, hobby:chararray );
group_data2 = GROUP data2 BY ( name, age, gen );
list_data2 = FOREACH group_data2 GENERATE FLATTEN( group ) AS ( name, age, gen ), data2.hobby;

join_list = JOIN list_data1_uniq BY (name, age, gen ) FULL OUTER, list_data2 BY (name, age, gen );
fix_list = FOREACH join_list GENERATE ( list_data2::name is null ? list_data1_uniq::name : list_data2::name ),( list_data2::age is null ? list_data1_uniq::age : list_data2::age ), ( list_data2::gen is null ? list_data1_uniq::gen : list_data2::gen ), list_data1_uniq::list_data1, list_data2::hobby;
DUMP fix_list;

/* 出力結果 */
(Ken,17,Men,,{(game)})
(Tom,19,Men,{(google,1)},{(skydiving),(fishing)})
(John,17,Men,{(yahoo,1),(youtube,1)},{(soccer)})
(Kate,18,Women,{(ebay,2),(facebook,1)},{(shopping)})
JOIN/COGROUPのBY属性にnull項目があるときは気をつける

JOIN/COGROUPのBY属性にnullとなっているデータ項目が存在する時は気をつけましょう。意図しない結合結果になります。1行でまとめたい出力が2行になったりします。対応策としてはnullの項目に何かしら異常を示すデータを入れてしまうことです。ここではintがnullの場合には0を、chararrayがnullの場合は'N/A'を入れるようにしました。まずは異常値を入れない例です。異常値を入れないとJohnの項目が何故か2行出力されてしまいます。

/* input1 */
John   youtube
John   yahoo
Kate 18 Women facebook
Kate 18 Women ebay
Kate 18 Women ebay
Tom 19 Men google

/* input2 */
John   soccer
Ken 17 Men game
Kate 18 Women shopping 
Tom 19 Men skydiving
Tom 19 Men fishing
data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, group.age, group.gen, group.domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.( domain, cnt );

data2 = LOAD 'student2.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, hobby:chararray );
group_data2 = GROUP data2 BY ( name, age, gen );
list_data2 = FOREACH group_data2 GENERATE group.name, group.age, group.gen, data2.hobby;

join_list = JOIN list_data1_uniq BY (name, age, gen ) FULL OUTER, list_data2 BY (name, age, gen );
fix_list = FOREACH join_list GENERATE ( list_data2::name is null ? list_data1_uniq::name : list_data2::name ),( list_data2::age is null ? list_data1_uniq::age : list_data2::age ), ( list_data2::gen is null ? list_data1_uniq::gen : list_data2::gen ), list_data1_uniq::list_data1, list_data2::hobby;
DUMP fix_list;

/* 出力結果 */
(Ken,17,Men,,{(game)})
(Tom,19,Men,{(google,1)},{(skydiving),(fishing)})
(John,,,{(yahoo,1),(youtube,1)},)
(John,,,,{(soccer)})
(Kate,18,Women,{(ebay,2),(facebook,1)},{(shopping)})

次に異常値を入れた例です。Johnの項目が1行で出力されていて意図した結果を得ることが出来ました。

data1 = LOAD 'student1.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
group_data1 = GROUP data1 BY ( name, age, gen, domain );
list_data1 = FOREACH group_data1 GENERATE group.name, ( group.age is null ? 0 : group.age ) AS age, ( group.gen is null ? 'N/A' : group.gen ) AS gen, group.domain, COUNT( data1.domain ) AS cnt;

list_data1_group = GROUP list_data1 BY( name, age, gen );
list_data1_uniq = FOREACH list_data1_group GENERATE group.name, group.age, group.gen, list_data1.( domain, cnt );

data2 = LOAD 'student2.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, hobby:chararray );
group_data2 = GROUP data2 BY ( name, age, gen );
list_data2 = FOREACH group_data2 GENERATE group.name, ( group.age is null ? 0 : group.age ) AS age, ( group.gen is null ? 'N/A' : group.gen ) AS gen, data2.hobby;

join_list = JOIN list_data1_uniq BY (name, age, gen ) FULL OUTER, list_data2 BY (name, age, gen );
fix_list = FOREACH join_list GENERATE ( list_data2::name is null ? list_data1_uniq::name : list_data2::name ),( list_data2::age is null ? list_data1_uniq::age : list_data2::age ), ( list_data2::gen is null ? list_data1_uniq::gen : list_data2::gen ), list_data1_uniq::list_data1, list_data2::hobby;
DUMP fix_list;

/* 出力結果 */
Ken	17	Men		{(game)}
Tom	19	Men	{(google,1)}	{(skydiving),(fishing)}
John	0	N/A	{(yahoo,1),(youtube,1)}	{(soccer)}
Kate	18	Women	{(ebay,2),(facebook,1)}	{(shopping)}
STOREした時のtuple/bag/mapは文字列として保存される。再度LOADする時はPig形式のデータで読み込みたい時はASで指定する

上のJOINの結果をresultというディレクトリにSTOREします。STORE alias INTO 'ディレクトリ名' USING PigStorage();という行をDUMPの代わりに記述してあげるだけで保存ができます。PigStorageはdefaultでtab区切りに保存してくれます。実行するとresult/part-r-00000というファイルが生成されているので中身を確認します。確かにtab区切りで保存されていて、tupleやbagはそのまま文字列として記録されています。

cat result/part-r-00000 
Ken	17	Men		{(game)}
Tom	19	Men	{(google,1)}	{(skydiving),(fishing)}
John	17	Men	{(yahoo,1),(youtube,1)}	{(soccer)}
Kate	18	Women	{(ebay,2),(facebook,1)}	{(shopping)}

上のデータを再度bag/tuple形式で呼び出したい時はLOAD時にASの中でbagやtupleであることを明示します。定義は他のデータ型と同じでカラム名:bagやカラム名:tupleとして宣言します。当然bagやtupleとして宣言せずにchararrayとしてしまうと文字列として扱われてしまうので、()や{}が意味を持たないデータになってしまいます。

data = LOAD 'result/part-r-00000' USING PigStorage() AS ( name:chararray, age:int, gen:chararray, domains:bag{ t:tuple( domain ,cnt ) }, hobbys:bag{ t:tuple( hobby ) } );
DESCRIBE data;
DUMP data;

/* DESCRIBE結果 */
data: {name: chararray,age: int,gen: chararray,domains: {t: (domain: bytearray,cnt: bytearray)},hobbys: {t: (hobby: bytearray)}}

/* 出力結果 */
(Ken,17,Men,,{(game)})
(Tom,19,Men,{(google,1)},{(skydiving),(fishing)})
(John,17,Men,{(yahoo,1),(youtube,1)},{(soccer)})
(Kate,18,Women,{(ebay,2),(facebook,1)},{(shopping)})

その他 Tips

データ構造とは直接関係ないですが、Pigを利用することでのTipsを書いておきます。

文法をCheckしてlocalで実行し、問題なければHDFS上でMapReduceさせる

書いたPIgの文法が正しいかどうかのCheckは-cオプションで確認できます。syntax OKと出れば文法は問題無いように見えるのですが、残念なことに実際動かしていみると動かないケースは多々あります。-cオプションでは本当に簡単な文法しか確認できないようです。PigはLocalModeとHDFSModeがありDefaultはHDFSModeで、LocalModeで実行したい場合はpig -x localと指定します。

$ pig -c test.pig                <!-- 文法チェック -->
pig -c test.pig 
(略)
2013-01-26 11:59:02,272 [main] WARN  org.apache.pig.tools.grunt.GruntParser - 'dump' statement is ignored while processing 'explain -script' or '-check'
test.pig syntax OK

$ pig -x local test.pig       <!-- LocalModeで実行 -->
(略)
2013-01-26 12:04:51,460 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(John,17,Men,youtube)
(John,17,Men,yahoo)
(Kate,18,Women,facebook)
(Kate,18,Women,ebay)
(Kate,18,Women,ebay)
(Tom,19,Men,google)

$ pig test.pig                    <!-- HDFSModeで実行 -->
Reducerの数を調節したい

pigスクリプト内部で行う場合はSET default_parallelでReducerの数値を設定します。

SET default_parallel 2000;
data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
STOREするDataをgzip圧縮したい

pigスクリプト内部で行う場合はSET output.compression.enabled true;と SET output.compression.codec org.apache.hadoop.io.compress.GzipCodec; を指定します。またpigスクリプトに渡すパラメータとして設定する場合も-Dmapredとして設定することもできます。

SET default_parallel 2000;
SET output.compression.enabled true;
SET output.compression.codec org.apache.hadoop.io.compress.GzipCodec;
data = LOAD 'student.txt' USING PigStorage( ' ' ) AS ( name:chararray, age:int, gen:chararray, domain:chararray );
$ pig -Dmapred.output.compression.enabled=true -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec test.pig