読者です 読者をやめる 読者になる 読者になる

Web就活日記

愛と夢と人生について書きます

Hadoop MapReduceのExamplesで分散grep、WordCount、randomwriter、sort、join、数独、円周率計算を試してみる

Hadoop徹底入門

Hadoop徹底入門

Try

この記事はHadoopMapReduce examples.tarで以下の事を試した記録です。とても初歩的な事を書いています。

  • sudoku数独
  • Pi Estimator : 円周率計算
  • WordCount : 単語数カウント
  • grep : 文字列検索
  • randomwriter : 文字列生成
  • sort : sorting
  • join : データ結合

この記事では以下のサイトを参考にしました。中でもHadoop入門 IBMが一番詳しくて分かりやすいと思います。

また今までHadoopに関連した以下の記事を書きました。何かの参考にしていただければと思います。

Beginning

Env

以下はHadoop 0.20.2-cdh3u3が設定されている事を前提に記述しています。

Streaming

Hadoopは通常Streamingという拡張機能を利用して標準入出力形式のMapReduceを行います。StreamingによりJava以外の標準入出力が可能な言語であれば記述ができます。上のHadoop API OverViewのorg.apache.hadoop.streamingという項目を参照すると標準入出力の仕様が読み取れると思います。この記事ではStreamingを使う前のステップとして、HadoopMapReduceのSampleプログラムの動作と実装を見てみます。

FilePath & Sources

/usr/lib/hadoop-0.20/hadoop-examples.jar jarファイルは以下のパスに設置されており、jarの中に各種サンプルclassファイルが設定されていると思います。Sourceは本家のDownloadサイトから圧縮ファイルを取ってきて解凍してみます。
http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-0.20.2/
SourceのGrep、Join、Sort、WordCount、RandomWriter、PiEstimator、Sudoku.javaを利用します。

$ wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-0.20.2/hadoop-0.20.2.tar.gz
$ tar xf hadoop-0.20.2.tar
$ tree hadoop-0.20.2/src/examples/org/apache/hadoop/examples/ 
hadoop-0.20.2/src/examples/org/apache/hadoop/examples/
|-- AggregateWordCount.java
|-- AggregateWordHistogram.java
|-- DBCountPageView.java
|-- ExampleDriver.java
|-- Grep.java
|-- Join.java
|-- MultiFileWordCount.java
|-- PiEstimator.java
|-- RandomTextWriter.java
|-- RandomWriter.java
|-- SecondarySort.java
|-- SleepJob.java
|-- Sort.java
|-- WordCount.java
|-- dancing
|   |-- DancingLinks.java
|   |-- DistributedPentomino.java
|   |-- OneSidedPentomino.java
|   |-- Pentomino.java
|   |-- Sudoku.java
|   |-- package.html
|   `-- puzzle1.dta
|-- package.html
`-- terasort
    |-- TeraGen.java
    |-- TeraInputFormat.java
    |-- TeraOutputFormat.java
    |-- TeraSort.java
    |-- TeraValidate.java
    |-- job_history_summary.py
    `-- package.html

Sudoku

数独問題を解く単純なアルゴリズムです。HDFSMapReduceとはあまり関係無いですがExampleに設置されている面白いネタです。上で展開したソースコードにpuzzle1.dtaというサンプルデータが入っているのでそれを基に実行します。

Source

一部を貼っています。

  /**
   * Set up a puzzle board to the given size.
   * Boards may be asymmetric, but the squares will always be divided to be
   * more cells wide than they are tall. For example, a 6x6 puzzle will make 
   * sub-squares that are 3x2 (3 cells wide, 2 cells tall). Clearly that means
   * the board is made up of 2x3 sub-squares.
   * @param stream The input stream to read the data from
   */
  public Sudoku(InputStream stream) throws IOException {
    BufferedReader file = new BufferedReader(new InputStreamReader(stream));
    String line = file.readLine();
    List<int[]> result = new ArrayList<int[]>();
    while (line != null) {
      StringTokenizer tokenizer = new StringTokenizer(line);
      int size = tokenizer.countTokens();
      int[] col = new int[size];
      int y = 0;
      while(tokenizer.hasMoreElements()) {
        String word = tokenizer.nextToken();
        if ("?".equals(word)) {
          col[y] = - 1;
        } else {
          col[y] = Integer.parseInt(word);
        }   
        y += 1;
      }   
      result.add(col);
      line = file.readLine();
    }   
    size = result.size();
    board = (int[][]) result.toArray(new int [size][]);
    squareYSize = (int) Math.sqrt(size);
    squareXSize = size / squareYSize;
    file.close();
  }
Execution
$ cat hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta
8 5 ? 3 9 ? ? ? ?
? ? 2 ? ? ? ? ? ?
? ? 6 ? 1 ? ? ? 2
? ? 4 ? ? 3 ? 5 9
? ? 8 9 ? 1 4 ? ?
3 2 ? 4 ? ? 8 ? ?
9 ? ? ? 8 ? 5 ? ?
? ? ? ? ? ? 2 ? ?
? ? ? ? 4 5 ? 7 8

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar sudoku hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta
Solving hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta
8 5 1 3 9 2 6 4 7 
4 3 2 6 7 8 1 9 5 
7 9 6 5 1 4 3 8 2 
6 1 4 8 2 3 7 5 9 
5 7 8 9 6 1 4 2 3 
3 2 9 4 5 7 8 1 6 
9 4 7 2 8 6 5 3 1 
1 8 5 7 3 9 2 6 4 
2 6 3 1 4 5 9 7 8 

Found 1 solutions

Pi Estimator

円周率を計算します。残念ながら小数点以下の精度があまり高くありません。

Source

MapperClassのmapとReducerClassのreduce処理とmainで動かすrunメソッドについて抜き出しています。
hadoop-0.20.2/src/examples/org/apache/hadoop/examples/PiEstimator.java

  /** 
   * Parse arguments and then runs a map/reduce job.
   * Print output in standard out.
   * 
   * @return a non-zero if there is an error.  Otherwise, return 0.  
   */
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
      ToolRunner.printGenericCommandUsage(System.err);
      return -1; 
    }   
    
    final int nMaps = Integer.parseInt(args[0]);
    final long nSamples = Long.parseLong(args[1]);
    
    System.out.println("Number of Maps  = " + nMaps);
    System.out.println("Samples per Map = " + nSamples);
    
    final JobConf jobConf = new JobConf(getConf(), getClass());
    System.out.println("Estimated value of Pi is "
        + estimate(nMaps, nSamples, jobConf));
    return 0;
  }

  /**
   * Mapper class for Pi estimation.
   * Generate points in a unit square
   * and then count points inside/outside of the inscribed circle of the square.
   */
  public static class PiMapper extends MapReduceBase
    implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {

    /** Map method.
     * @param offset samples starting from the (offset+1)th sample.
     * @param size the number of samples for this map
     * @param out output {ture->numInside, false->numOutside}
     * @param reporter
     */
    public void map(LongWritable offset,
                    LongWritable size,
                    OutputCollector<BooleanWritable, LongWritable> out,
                    Reporter reporter) throws IOException {

      final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
      long numInside = 0L;
      long numOutside = 0L;

      for(long i = 0; i < size.get(); ) {
        //generate points in a unit square
        final double[] point = haltonsequence.nextPoint();

        //count points inside/outside of the inscribed circle of the square
        final double x = point[0] - 0.5;
        final double y = point[1] - 0.5;
        if (x*x + y*y > 0.25) {
          numOutside++;
        } else {
          numInside++;
        }

        //report status
        i++;
        if (i % 1000 == 0) {
          reporter.setStatus("Generated " + i + " samples.");
        }
      }

      //output map results
      out.collect(new BooleanWritable(true), new LongWritable(numInside));
      out.collect(new BooleanWritable(false), new LongWritable(numOutside));
    }
  }


   /**
   * Reducer class for Pi estimation.
   * Accumulate points inside/outside results from the mappers.
   */
    public static class PiReducer extends MapReduceBase
    implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
    
    private long numInside = 0;
    private long numOutside = 0;
    private JobConf conf; //configuration for accessing the file system
      
    /** Store job configuration. */
    @Override
    public void configure(JobConf job) {
      conf = job;
    }

    /**
     * Accumulate number of points inside/outside results from the mappers.
     * @param isInside Is the points inside? 
     * @param values An iterator to a list of point counts
     * @param output dummy, not used here.
     * @param reporter
     */
    public void reduce(BooleanWritable isInside,
                       Iterator<LongWritable> values,
                       OutputCollector<WritableComparable<?>, Writable> output,
                       Reporter reporter) throws IOException {
      if (isInside.get()) {
        for(; values.hasNext(); numInside += values.next().get());
      } else {
        for(; values.hasNext(); numOutside += values.next().get());
      }
    }
Execution

hadoop-examples.jarに対してpi(PiEstimator)、map数、sample数の3つの引数を与えます。しかし以下を実行するとエラーが出てしまいました。どこかに掲載されていた方法で http://localhost:50070 http://localhost:50030 の管理ツールにアクセスすると以前は直ったことは確認したのですが、今回はアクセスしただけでは直りませんでした。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 10 200
Number of Maps  = 10
Samples per Map = 200
12/05/08 10:26:58 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /user/yuta/PiEstimator_TMP_3_141592654/in/part0 could only be replicated to 0 nodes, instead of 1
(略)
12/05/08 10:26:58 WARN hdfs.DFSClient: Error Recovery for block null bad datanode[0] nodes == null
12/05/08 10:26:58 WARN hdfs.DFSClient: Could not get block locations. Source file "/user/yuta/PiEstimator_TMP_3_141592654/in/part0" - Aborting...

HDFSのファイル容量が膨らんでいる場合も上記のエラーが発生するようなのでデータ容量を見てみましたが、結果として容量は対した事ありませんでした。

$ hdfs -du   
Found 4 items
310146      hdfs://localhost/user/yuta/madmagi_in
919862      hdfs://localhost/user/yuta/madmagi_out_ma
2560696     hdfs://localhost/user/yuta/madmagi_out_ma_test
41180       hdfs://localhost/user/yuta/samples

hadoop namenodeの再formatを実行します。またsafemodeをOFFにします。※この作業は危険です。HDFSが使えなくなります。

$ su
$ stop-all.sh
$ hadoop namenode -format
$ start-all.sh
$ hadoop dfsadmin -safemode leave
Safe mode is OFF

ここで再度実行してみましたが、同じエラーがでました。またどうやら再formatが原因でついにHDFS上のファイル操作が全てできなくなりHadoopを一度resetすることにしました。結果としてはこの作業で直りましたが、HDFSのバックアップが面倒なのともっと良い解決方法がおそらくありそうなのでこれはおすすめできません。

$ su
$ rm -rf /var/log/hadoop-0.20
$ rm -rf /usr/lib/hadoop*
$ rm -rf /var/log/hadoop
$ rm -rf /var/lib/alternatives/hadoop-*
$ yum remove yum install hadoop-0.20 -y
$ yum install hadoop-0.20-conf-pseudo hadoop-0.20 -y
$ /etc/init.d/hadoop-0.20-datanode start
$ /etc/init.d/hadoop-0.20-namenode start
$ /etc/init.d/hadoop-0.20-tasktracker start
$ /etc/init.d/hadoop-0.20-jobtracker start

再度円周率計算を実行します。今度は成功しました。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 10 200
Number of Maps  = 10
Samples per Map = 200
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
12/05/09 12:04:28 INFO mapred.FileInputFormat: Total input paths to process : 10
12/05/09 12:04:29 INFO mapred.JobClient: Running job: job_201205091151_0002
12/05/09 12:04:30 INFO mapred.JobClient:  map 0% reduce 0%
12/05/09 12:04:44 INFO mapred.JobClient:  map 10% reduce 0%
12/05/09 12:04:45 INFO mapred.JobClient:  map 20% reduce 0%
12/05/09 12:05:00 INFO mapred.JobClient:  map 40% reduce 0%
12/05/09 12:05:08 INFO mapred.JobClient:  map 40% reduce 13%
12/05/09 12:05:11 INFO mapred.JobClient:  map 50% reduce 13%
12/05/09 12:05:12 INFO mapred.JobClient:  map 60% reduce 13%
12/05/09 12:05:20 INFO mapred.JobClient:  map 60% reduce 20%
12/05/09 12:05:21 INFO mapred.JobClient:  map 70% reduce 20%
12/05/09 12:05:24 INFO mapred.JobClient:  map 80% reduce 20%
12/05/09 12:05:30 INFO mapred.JobClient:  map 80% reduce 26%
12/05/09 12:05:32 INFO mapred.JobClient:  map 90% reduce 26%
12/05/09 12:05:33 INFO mapred.JobClient:  map 100% reduce 26%
12/05/09 12:05:36 INFO mapred.JobClient:  map 100% reduce 33%
12/05/09 12:05:38 INFO mapred.JobClient:  map 100% reduce 100%
12/05/09 12:05:41 INFO mapred.JobClient: Job complete: job_201205091151_0002
12/05/09 12:05:41 INFO mapred.JobClient: Counters: 27
12/05/09 12:05:41 INFO mapred.JobClient:   Job Counters 
12/05/09 12:05:41 INFO mapred.JobClient:     Launched reduce tasks=1
12/05/09 12:05:41 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=119740
12/05/09 12:05:41 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/05/09 12:05:41 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/05/09 12:05:41 INFO mapred.JobClient:     Launched map tasks=10
12/05/09 12:05:41 INFO mapred.JobClient:     Data-local map tasks=10
12/05/09 12:05:41 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=53325
12/05/09 12:05:41 INFO mapred.JobClient:   FileSystemCounters
12/05/09 12:05:41 INFO mapred.JobClient:     FILE_BYTES_READ=226
12/05/09 12:05:41 INFO mapred.JobClient:     HDFS_BYTES_READ=2340
12/05/09 12:05:41 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=602824
12/05/09 12:05:41 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215
12/05/09 12:05:41 INFO mapred.JobClient:   Map-Reduce Framework
12/05/09 12:05:41 INFO mapred.JobClient:     Map input records=10
12/05/09 12:05:41 INFO mapred.JobClient:     Reduce shuffle bytes=280
12/05/09 12:05:41 INFO mapred.JobClient:     Spilled Records=40
12/05/09 12:05:41 INFO mapred.JobClient:     Map output bytes=180
12/05/09 12:05:41 INFO mapred.JobClient:     CPU time spent (ms)=7620
12/05/09 12:05:41 INFO mapred.JobClient:     Total committed heap usage (bytes)=1309446144
12/05/09 12:05:41 INFO mapred.JobClient:     Map input bytes=240
12/05/09 12:05:41 INFO mapred.JobClient:     Combine input records=0
12/05/09 12:05:41 INFO mapred.JobClient:     SPLIT_RAW_BYTES=1160
12/05/09 12:05:41 INFO mapred.JobClient:     Reduce input records=20
12/05/09 12:05:41 INFO mapred.JobClient:     Reduce input groups=2
12/05/09 12:05:41 INFO mapred.JobClient:     Combine output records=0
12/05/09 12:05:41 INFO mapred.JobClient:     Physical memory (bytes) snapshot=1720111104
12/05/09 12:05:41 INFO mapred.JobClient:     Reduce output records=0
12/05/09 12:05:41 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=5607768064
12/05/09 12:05:41 INFO mapred.JobClient:     Map output records=20
Job Finished in 73.637 seconds
Estimated value of Pi is 3.14400000000000000000

WordCount

単語と単語の出現数を記録します。

Source
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount madmagi_in madmagi_out_wordcount    

$ hdfs -text madmagi_out_wordcount/part-r-00000
12/05/10 08:07:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/05/10 08:07:31 WARN snappy.LoadSnappy: Snappy native library not loaded
10	2
100	2
25	1
4	1
69	1
―	1
――	18
…	330
 	4
、	1014
。	484
々	1
『	3
』	3
〜	1
ぁ	2
ぁぁ	1
ぁぁぁ	1
ぁぁぁん	1
ぁぁっ	1
ぁあぁあ	1
ぁっ	2
ぁはは	1
あ	28
あぁ	1
ああ	11
あいつ	5
あくまで	1
あけみ	1
あげ	4
あげる	4
あげれ	2
あそば	1
あたし	14

grep

分散環境で文字列を検索します。日本語検索もできます。

Source
package org.apache.hadoop.examples;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
  private Grep() {}                               // singleton

  public int run(String[] args) throws Exception {
    if (args.length < 3) {
      System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }

    Path tempDir =
      new Path("grep-temp-"+
          Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf grepJob = new JobConf(getConf(), Grep.class);
    
    try {
      
      grepJob.setJobName("grep-search");

      FileInputFormat.setInputPaths(grepJob, args[0]);

      grepJob.setMapperClass(RegexMapper.class);
      grepJob.set("mapred.mapper.regex", args[2]);
      if (args.length == 4)
        grepJob.set("mapred.mapper.regex.group", args[3]);

      grepJob.setCombinerClass(LongSumReducer.class);
      grepJob.setReducerClass(LongSumReducer.class);

      FileOutputFormat.setOutputPath(grepJob, tempDir);
      grepJob.setOutputFormat(SequenceFileOutputFormat.class);
      grepJob.setOutputKeyClass(Text.class);
      grepJob.setOutputValueClass(LongWritable.class);

      JobClient.runJob(grepJob);

      JobConf sortJob = new JobConf(Grep.class);
      sortJob.setJobName("grep-sort");

      FileInputFormat.setInputPaths(sortJob, tempDir);
      sortJob.setInputFormat(SequenceFileInputFormat.class);

      sortJob.setMapperClass(InverseMapper.class);

      sortJob.setNumReduceTasks(1);                 // write a single file
      FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
      sortJob.setOutputKeyComparatorClass           // sort by decreasing freq
      (LongWritable.DecreasingComparator.class);

      JobClient.runJob(sortJob);
    }
    finally {
      FileSystem.get(grepJob).delete(tempDir, true);
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Grep(), args);
    System.exit(res);
  }
}
Execution

日本語の「まどか」と「さやか」を正規表現で指定します。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar grep madmagi_in madmagi_out_grep '(まどか|さやか)'

$ hdfs -text madmagi_out_grep/part-00000
12/05/10 07:54:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/05/10 07:54:47 WARN snappy.LoadSnappy: Snappy native library not loaded
112	さやか
75	まどか

randomwriter

ランダムなテキストデータを生成します。

Source

一部を抜き出して載せています。

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Date;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RandomWriter extends Configured implements Tool {
  
  /**
   * User counters
   */
  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  
  /**
   * A custom input format that creates virtual inputs of a single string
   * for each map.
   */
  static class RandomInputFormat implements InputFormat<Text, Text> {

    /** 
     * Generate the requested number of file splits, with the filename
     * set to the filename of the output file.
     */
    public InputSplit[] getSplits(JobConf job, 
                                  int numSplits) throws IOException {
      InputSplit[] result = new InputSplit[numSplits];
      Path outDir = FileOutputFormat.getOutputPath(job);
      for(int i=0; i < result.length; ++i) {
        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                                  (String[])null);
      }
      return result;
    }

    /**
     * Return a single record (filename, "") where the filename is taken from
     * the file split.
     */
    static class RandomRecordReader implements RecordReader<Text, Text> {
      Path name;
      public RandomRecordReader(Path p) {
        name = p;
      }
      public boolean next(Text key, Text value) {
        if (name != null) {
          key.set(name.getName());
          name = null;
          return true;
        }
        return false;
      }
      public Text createKey() {
        return new Text();
      }
      public Text createValue() {
        return new Text();
      }
      public long getPos() {
        return 0;
      }
      public void close() {}
      public float getProgress() {
        return 0.0f;
      }
    }

    public RecordReader<Text, Text> getRecordReader(InputSplit split,
                                        JobConf job, 
                                        Reporter reporter) throws IOException {
      return new RandomRecordReader(((FileSplit) split).getPath());
    }
  }

  static class Map extends MapReduceBase
    implements Mapper<WritableComparable, Writable,
                      BytesWritable, BytesWritable> {
    
    private long numBytesToWrite;
    private int minKeySize;
    private int keySizeRange;
    private int minValueSize;
    private int valueSizeRange;
    private Random random = new Random();
    private BytesWritable randomKey = new BytesWritable();
    private BytesWritable randomValue = new BytesWritable();
    
    private void randomizeBytes(byte[] data, int offset, int length) {
      for(int i=offset + length - 1; i >= offset; --i) {
        data[i] = (byte) random.nextInt(256);
      }
    }
    
    /**
     * Given an output filename, write a bunch of random records to it.
     */
    public void map(WritableComparable key, 
                    Writable value,
                    OutputCollector<BytesWritable, BytesWritable> output, 
                    Reporter reporter) throws IOException {
      int itemCount = 0;
      while (numBytesToWrite > 0) {
        int keyLength = minKeySize + 
          (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
        randomKey.setSize(keyLength);
        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
        int valueLength = minValueSize +
          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
        randomValue.setSize(valueLength);
        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
        output.collect(randomKey, randomValue);
        numBytesToWrite -= keyLength + valueLength;
        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
        if (++itemCount % 200 == 0) {
          reporter.setStatus("wrote record " + itemCount + ". " + 
                             numBytesToWrite + " bytes left.");
        }
      }
      reporter.setStatus("done with " + itemCount + " records.");
    }
  }
  
  /**
   * This is the main routine for launching a distributed random write job.
   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
   * The reduce doesn't do anything.
   * 
   * @throws IOException 
   */
  public int run(String[] args) throws Exception {    
    if (args.length == 0) {
      System.out.println("Usage: writer <out-dir>");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }
    
    Path outDir = new Path(args[0]);
    JobConf job = new JobConf(getConf());
    
    job.setJarByClass(RandomWriter.class);
    job.setJobName("random-writer");
    FileOutputFormat.setOutputPath(job, outDir);
    
    job.setOutputKeyClass(BytesWritable.class);
    job.setOutputValueClass(BytesWritable.class);
    
    job.setInputFormat(RandomInputFormat.class);
    job.setMapperClass(Map.class);        
    job.setReducerClass(IdentityReducer.class);
    job.setOutputFormat(SequenceFileOutputFormat.class);
    
    JobClient client = new JobClient(job);
    ClusterStatus cluster = client.getClusterStatus();
    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
                                             1*1024*1024*1024);
    if (numBytesToWritePerMap == 0) {
      System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
      return -2;
    }
    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
         numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
    if (numMaps == 0 && totalBytesToWrite > 0) {
      numMaps = 1;
      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
    }
    
    job.setNumMapTasks(numMaps);
    System.out.println("Running " + numMaps + " maps.");
    
    // reducer NONE
    job.setNumReduceTasks(0);
    
    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    JobClient.runJob(job);
    Date endTime = new Date();
    System.out.println("Job ended: " + endTime);
    System.out.println("The job took " + 
                       (endTime.getTime() - startTime.getTime()) /1000 + 
                       " seconds.");
    
    return 0;
  }
 
}
Execution

10MByteのランダムな16進数データを2つ作ります。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar randomwriter -D test.randomwrite.bytes_per_map=10000000 -D test.randomwriter.maps_per_host=2 random-data

$ hdfs -du random-data
Found 4 items
0           hdfs://localhost/user/yuta/random-data/_SUCCESS
59570       hdfs://localhost/user/yuta/random-data/_logs
10045460    hdfs://localhost/user/yuta/random-data/part-00000
10042286    hdfs://localhost/user/yuta/random-data/part-00001

$ hdfs -text random-data/part-00000
f aa b9 bc 94 4d bc af ac c0 9a 1c c9 24 31 8d 85 a6 80 5b 62 65 56 37 8a 95 6c a5 33 39 5c ac ab 38 53 67 55 92
f9 04 6c 86 cd 39 9d 13 63 8b 00 b7 0c 03 61 38 0a bc 0a af 92 17 63 f1 eb b1 89 57 03 12 56 1a 1c 87 c7 40 2c
4d 94 02 16 73 da 73 9f 72 ee de f1 17 0e b1 f6 cf 76 1d 17 d4 11 5d 65 c1 f0 d6 c3 af 38 fb 24 75 69 f9 01 54 52
c6 fc fa dd 98 18 a7 be e9 4f 66 51 9a b2 46 37 d6 e1 95 f1 2e 8c 02 c9 e1 02 a2 e1 c7 ff 70

同じようなプログラムでrandomtextwriterというものがあり、これは10MByteのデータを10個作成します。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar randomtextwriter -D test.randomtextwrite.bytes_per_map=10000000 -D test.randomtextwriter.maps_per_host=2 random-text-data

$ hdfs -du random-text-data
10265269    hdfs://localhost/user/yuta/random-text-data/part-00000
10265294    hdfs://localhost/user/yuta/random-text-data/part-00001
10265824    hdfs://localhost/user/yuta/random-text-data/part-00002
10265560    hdfs://localhost/user/yuta/random-text-data/part-00003
10267562    hdfs://localhost/user/yuta/random-text-data/part-00004
10266170    hdfs://localhost/user/yuta/random-text-data/part-00005
10265334    hdfs://localhost/user/yuta/random-text-data/part-00006
10265731    hdfs://localhost/user/yuta/random-text-data/part-00007
10266359    hdfs://localhost/user/yuta/random-text-data/part-00008
10265977    hdfs://localhost/user/yuta/random-text-data/part-00009

$ hdfs -text random-text-data/part-00000

bromate michigan ploration prospectiveness mendacity pyxie edificator posttraumatic oratorize
constitutor silverhead critically schoolmasterism unlapsing Mormyrus chooser licitness undinted
sangaree vinegarish precostal uncontradictableness warriorwise embryotic repealableness alen
catabaptist comprovincial archididascalian returnability giantly unachievable pope calabazilla
topsail epidymides palaeotheriodont hemimelus

sort

上randomwriterで生成したrandom-dataを利用してsortします。sort結果の確認が微妙ですが、一応結果を載せておきます。

Source
package org.apache.hadoop.examples;

import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sort<K,V> extends Configured implements Tool {
  private RunningJob jobResult = null;

  static int printUsage() {
    System.out.println("sort [-m <maps>] [-r <reduces>] " +
                       "[-inFormat <input format class>] " +
                       "[-outFormat <output format class>] " + 
                       "[-outKey <output key class>] " +
                       "[-outValue <output value class>] " +
                       "[-totalOrder <pcnt> <num samples> <max splits>] " +
                       "<input> <output>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  /**
   * The main driver for sort program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the 
   *                     job tracker.
   */
  public int run(String[] args) throws Exception {

    JobConf jobConf = new JobConf(getConf(), Sort.class);
    jobConf.setJobName("sorter");

    jobConf.setMapperClass(IdentityMapper.class);        
    jobConf.setReducerClass(IdentityReducer.class);

    JobClient client = new JobClient(jobConf);
    ClusterStatus cluster = client.getClusterStatus();
    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
    if (sort_reduces != null) {
       num_reduces = cluster.getTaskTrackers() * 
                       Integer.parseInt(sort_reduces);
    }
    Class<? extends InputFormat> inputFormatClass = 
      SequenceFileInputFormat.class;
    Class<? extends OutputFormat> outputFormatClass = 
      SequenceFileOutputFormat.class;
    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
    Class<? extends Writable> outputValueClass = BytesWritable.class;
    List<String> otherArgs = new ArrayList<String>();
    InputSampler.Sampler<K,V> sampler = null;
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          num_reduces = Integer.parseInt(args[++i]);
        } else if ("-inFormat".equals(args[i])) {
          inputFormatClass = 
            Class.forName(args[++i]).asSubclass(InputFormat.class);
        } else if ("-outFormat".equals(args[i])) {
          outputFormatClass = 
            Class.forName(args[++i]).asSubclass(OutputFormat.class);
        } else if ("-outKey".equals(args[i])) {
          outputKeyClass = 
            Class.forName(args[++i]).asSubclass(WritableComparable.class);
        } else if ("-outValue".equals(args[i])) {
          outputValueClass = 
            Class.forName(args[++i]).asSubclass(Writable.class);
        } else if ("-totalOrder".equals(args[i])) {
          double pcnt = Double.parseDouble(args[++i]);
          int numSamples = Integer.parseInt(args[++i]);
          int maxSplits = Integer.parseInt(args[++i]);
          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
          sampler =
            new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
        } else {
          otherArgs.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
            args[i-1]);
        return printUsage(); // exits
      }
    }

    // Set user-supplied (possibly default) job configs
    jobConf.setNumReduceTasks(num_reduces);

    jobConf.setInputFormat(inputFormatClass);
    jobConf.setOutputFormat(outputFormatClass);

    jobConf.setOutputKeyClass(outputKeyClass);
    jobConf.setOutputValueClass(outputValueClass);

    // Make sure there are exactly 2 parameters left.
    if (otherArgs.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
          otherArgs.size() + " instead of 2.");
      return printUsage();
    }
    FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
    FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));

    if (sampler != null) {
      System.out.println("Sampling input to effect total-order sort...");
      jobConf.setPartitionerClass(TotalOrderPartitioner.class);
      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
      Path partitionFile = new Path(inputDir, "_sortPartitioning");
      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
      InputSampler.<K,V>writePartitionFile(jobConf, sampler);
      URI partitionUri = new URI(partitionFile.toString() +
                                 "#" + "_sortPartitioning");
      DistributedCache.addCacheFile(partitionUri, jobConf);
      DistributedCache.createSymlink(jobConf);
    }

    System.out.println("Running on " +
        cluster.getTaskTrackers() +
        " nodes to sort from " + 
        FileInputFormat.getInputPaths(jobConf)[0] + " into " +
        FileOutputFormat.getOutputPath(jobConf) +
        " with " + num_reduces + " reduces.");
    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    jobResult = JobClient.runJob(jobConf);
    Date end_time = new Date();
    System.out.println("Job ended: " + end_time);
    System.out.println("The job took " + 
        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Sort(), args);
    System.exit(res);
  }

  /**
   * Get the last job that was run using this instance.
   * @return the results of the last job that was run
   */
  public RunningJob getResult() {
    return jobResult;
  }
}
Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar sort random-data sorted-data

$ hdfs -du sorted-data
Found 3 items
0           hdfs://localhost/user/yuta/sorted-data/_SUCCESS
63175       hdfs://localhost/user/yuta/sorted-data/_logs
20088639    hdfs://localhost/user/yuta/sorted-data/part-00000

$ hdfs -text sorted-data/part-00000
ac 93 31 14 e3 36 6a bb 43 2b 51 2f 4c 50 5e a4 fd 28 7c 34 2b bc 92 b3 f5 0f e2 95 c6 96 3f 0a d1 
ac 90 80 ac 4d 4d de 67 9c bd 0b 6b 14 1c ed fb 38 c2 51 c0 d7 42 9d 0b ab ea 2e ae fc c7 aa 6b 17 
9a cf 25 b9 74 e2 93 b0 47 c0 18 5a fc 1f 58 3d 3a b7 c8 c8 e5 1d 30 eb 52 b4 f5 9e 80 9c 23 51 a9 
7a 37 95 97 6e 1a 76 4a e7 60 67 fe e8 f2 00 3f d3 05 2d e1 57 00 1c 7c b1 9b 20 2b ba 1a 8e 9b 3a 
92 e6 65 56 13 f6 cd db b8 ed 24 1d d8 e4 da c5 f5 fb 20 1a cb 3f df a9 fe f4 2b aa f2 87 00 67 02 f6 
5f 32 a9 e1 4d 0e a4 05 15 91 38 34 cc a0 a7 43 9d 88 7a 9c ec 33 ad 2a 1e c3 08 18 61 fb 1f 5c 82 ed fb e1 0f da 1d

join

分割データの結合を行います。

Source
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.join.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Join extends Configured implements Tool {

  static int printUsage() {
    System.out.println("join [-m <maps>] [-r <reduces>] " +
                       "[-inFormat <input format class>] " +
                       "[-outFormat <output format class>] " + 
                       "[-outKey <output key class>] " +
                       "[-outValue <output value class>] " +
                       "[-joinOp <inner|outer|override>] " +
                       "[input]* <input> <output>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  /**
   * The main driver for sort program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the 
   *                     job tracker.
   */
  public int run(String[] args) throws Exception {
    JobConf jobConf = new JobConf(getConf(), Sort.class);
    jobConf.setJobName("join");

    jobConf.setMapperClass(IdentityMapper.class);        
    jobConf.setReducerClass(IdentityReducer.class);

    JobClient client = new JobClient(jobConf);
    ClusterStatus cluster = client.getClusterStatus();
    int num_maps = cluster.getTaskTrackers() * 
                   jobConf.getInt("test.sort.maps_per_host", 10);
    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
    if (sort_reduces != null) {
       num_reduces = cluster.getTaskTrackers() * 
                       Integer.parseInt(sort_reduces);
    }
    Class<? extends InputFormat> inputFormatClass = 
      SequenceFileInputFormat.class;
    Class<? extends OutputFormat> outputFormatClass = 
      SequenceFileOutputFormat.class;
    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
    Class<? extends Writable> outputValueClass = TupleWritable.class;
    String op = "inner";
    List<String> otherArgs = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          num_maps = Integer.parseInt(args[++i]);
        } else if ("-r".equals(args[i])) {
          num_reduces = Integer.parseInt(args[++i]);
        } else if ("-inFormat".equals(args[i])) {
          inputFormatClass = 
            Class.forName(args[++i]).asSubclass(InputFormat.class);
        } else if ("-outFormat".equals(args[i])) {
          outputFormatClass = 
            Class.forName(args[++i]).asSubclass(OutputFormat.class);
        } else if ("-outKey".equals(args[i])) {
          outputKeyClass = 
            Class.forName(args[++i]).asSubclass(WritableComparable.class);
        } else if ("-outValue".equals(args[i])) {
          outputValueClass = 
            Class.forName(args[++i]).asSubclass(Writable.class);
        } else if ("-joinOp".equals(args[i])) {
          op = args[++i];
        } else {
          otherArgs.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
            args[i-1]);
        return printUsage(); // exits
      }
    }

    // Set user-supplied (possibly default) job configs
    jobConf.setNumMapTasks(num_maps);
    jobConf.setNumReduceTasks(num_reduces);

    if (otherArgs.size() < 2) {
      System.out.println("ERROR: Wrong number of parameters: ");
      return printUsage();
    }

    FileOutputFormat.setOutputPath(jobConf, 
      new Path(otherArgs.remove(otherArgs.size() - 1)));
    List<Path> plist = new ArrayList<Path>(otherArgs.size());
    for (String s : otherArgs) {
      plist.add(new Path(s));
    }

    jobConf.setInputFormat(CompositeInputFormat.class);
    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
          op, inputFormatClass, plist.toArray(new Path[0])));
    jobConf.setOutputFormat(outputFormatClass);

    jobConf.setOutputKeyClass(outputKeyClass);
    jobConf.setOutputValueClass(outputValueClass);

    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    JobClient.runJob(jobConf);
    Date end_time = new Date();
    System.out.println("Job ended: " + end_time);
    System.out.println("The job took " + 
        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Join(), args);
    System.exit(res);
  }

}
Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar join random-data joined-data

$ hdfs -du joined-data                                                            
Found 3 items
0           hdfs://localhost/user/yuta/joined-data/_SUCCESS
63015       hdfs://localhost/user/yuta/joined-data/_logs
20158803    hdfs://localhost/user/yuta/joined-data/part-00000

$ hdfs -text joined-data/part-00000

[84 0d 6a 97 fc c7 fa f8 3d 32 a3 b9 e7 85 09 e4 43 ca 8a 56 d9 6d 74 7c ba 04 18
71 51 f7 c2 7b 03 89 84 0a ce 4a 27 7a 23 ee 3e 84 ef a7 3d e3 e8 6e 61 15 6a c8 e0 b1 9e 89 78 e9 10 f3 13 c7 a7 19
b4 09 2e 09 a8 f1 8e 9e 50 70 84 8d e2 85 95 27 87 88 16 eb 80 3e 22 40 82 da f3 a4 9f e0 8f 6c e6 82 16 f7 88 06 51
96 ef f2 32 ac e2 c9 7c 9d c2 6e 45 60 db b3 05 a5]

Tips

Config File

Hadoopの設定ファイルは以下の箇所に設置されます。

$ ree /usr/lib/hadoop-0.20/conf/
/usr/lib/hadoop-0.20/conf/
|-- README
|-- capacity-scheduler.xml
|-- configuration.xsl
|-- core-site.xml
|-- fair-scheduler.xml
|-- hadoop-env.sh
|-- hadoop-metrics.properties
|-- hadoop-policy.xml
|-- hdfs-site.xml
|-- log4j.properties
|-- mapred-queue-acls.xml
|-- mapred-site.xml
|-- masters
|-- org-xerial-snappy.properties
|-- slaves
|-- ssl-client.xml.example
|-- ssl-server.xml.example
`-- taskcontroller.cfg
log

Hadoopシステムのログは以下の箇所に記録されます。

$ tree /usr/lib/hadoop-0.20/logs
|-- hadoop-hadoop-datanode-localhost.localdomain.log.YYYY-MM-DD
|-- hadoop-hadoop-jobtracker-localhost.localdomain.log.YYYY-MM-DD
|-- hadoop-hadoop-namenode-localhost.localdomain.log.YYYY-MM-DD
|-- hadoop-hadoop-secondarynamenode-localhost.localdomain.log.YYYY-MM-DD
|-- hadoop-hadoop-tasktracker-localhost.localdomain.log.YYYY-MM-DD
SequenceFileとは

Binary型のKey-Valueレコードをまとめたファイルです。hadoop fs -text SequenceFile と実行するとテキストとして読めます。ただし独自に圧縮をかけたSequenceFileなども存在するため、その場合はDecode処理書く必要がありそうです。MapReduce実行中に以下のように怒られる時がありますが、それはinputFileがSequenceFile形式になっていないためです。

12/05/10 10:09:58 INFO mapred.JobClient: Task Id : attempt_201205100725_0022_m_000000_0, Status : FAILED
java.io.IOException: hdfs://localhost/user/yuta/madmagi_out_wordcount/part-r-00000 not a SequenceFile
Tool

何か困った時は以下のツールでチェックします。

スポンサーリンク