Hadoop MapReduceのExamplesで分散grep、WordCount、randomwriter、sort、join、数独、円周率計算を試してみる
- 作者: 太田一樹,下垣徹,山下真一,猿田浩輔,藤井達朗,濱野賢一朗
- 出版社/メーカー: 翔泳社
- 発売日: 2011/01/28
- メディア: 大型本
- 購入: 14人 クリック: 668回
- この商品を含むブログ (43件) を見る
Try
この記事はHadoopのMapReduce examples.tarで以下の事を試した記録です。とても初歩的な事を書いています。
- sudoku : 数独
- Pi Estimator : 円周率計算
- WordCount : 単語数カウント
- grep : 文字列検索
- randomwriter : 文字列生成
- sort : sorting
- join : データ結合
この記事では以下のサイトを参考にしました。中でもHadoop入門 IBMが一番詳しくて分かりやすいと思います。
また今までHadoopに関連した以下の記事を書きました。何かの参考にしていただければと思います。
Beginning
Env
以下はHadoop 0.20.2-cdh3u3が設定されている事を前提に記述しています。
Streaming
Hadoopは通常Streamingという拡張機能を利用して標準入出力形式のMapReduceを行います。StreamingによりJava以外の標準入出力が可能な言語であれば記述ができます。上のHadoop API OverViewのorg.apache.hadoop.streamingという項目を参照すると標準入出力の仕様が読み取れると思います。この記事ではStreamingを使う前のステップとして、HadoopMapReduceのSampleプログラムの動作と実装を見てみます。
FilePath & Sources
/usr/lib/hadoop-0.20/hadoop-examples.jar jarファイルは以下のパスに設置されており、jarの中に各種サンプルclassファイルが設定されていると思います。Sourceは本家のDownloadサイトから圧縮ファイルを取ってきて解凍してみます。
http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-0.20.2/
SourceのGrep、Join、Sort、WordCount、RandomWriter、PiEstimator、Sudoku.javaを利用します。$ wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-0.20.2/hadoop-0.20.2.tar.gz $ tar xf hadoop-0.20.2.tar $ tree hadoop-0.20.2/src/examples/org/apache/hadoop/examples/ hadoop-0.20.2/src/examples/org/apache/hadoop/examples/ |-- AggregateWordCount.java |-- AggregateWordHistogram.java |-- DBCountPageView.java |-- ExampleDriver.java |-- Grep.java |-- Join.java |-- MultiFileWordCount.java |-- PiEstimator.java |-- RandomTextWriter.java |-- RandomWriter.java |-- SecondarySort.java |-- SleepJob.java |-- Sort.java |-- WordCount.java |-- dancing | |-- DancingLinks.java | |-- DistributedPentomino.java | |-- OneSidedPentomino.java | |-- Pentomino.java | |-- Sudoku.java | |-- package.html | `-- puzzle1.dta |-- package.html `-- terasort |-- TeraGen.java |-- TeraInputFormat.java |-- TeraOutputFormat.java |-- TeraSort.java |-- TeraValidate.java |-- job_history_summary.py `-- package.html
Sudoku
数独問題を解く単純なアルゴリズムです。HDFSやMapReduceとはあまり関係無いですがExampleに設置されている面白いネタです。上で展開したソースコードにpuzzle1.dtaというサンプルデータが入っているのでそれを基に実行します。
Source
一部を貼っています。
/** * Set up a puzzle board to the given size. * Boards may be asymmetric, but the squares will always be divided to be * more cells wide than they are tall. For example, a 6x6 puzzle will make * sub-squares that are 3x2 (3 cells wide, 2 cells tall). Clearly that means * the board is made up of 2x3 sub-squares. * @param stream The input stream to read the data from */ public Sudoku(InputStream stream) throws IOException { BufferedReader file = new BufferedReader(new InputStreamReader(stream)); String line = file.readLine(); List<int[]> result = new ArrayList<int[]>(); while (line != null) { StringTokenizer tokenizer = new StringTokenizer(line); int size = tokenizer.countTokens(); int[] col = new int[size]; int y = 0; while(tokenizer.hasMoreElements()) { String word = tokenizer.nextToken(); if ("?".equals(word)) { col[y] = - 1; } else { col[y] = Integer.parseInt(word); } y += 1; } result.add(col); line = file.readLine(); } size = result.size(); board = (int[][]) result.toArray(new int [size][]); squareYSize = (int) Math.sqrt(size); squareXSize = size / squareYSize; file.close(); }Execution
$ cat hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta 8 5 ? 3 9 ? ? ? ? ? ? 2 ? ? ? ? ? ? ? ? 6 ? 1 ? ? ? 2 ? ? 4 ? ? 3 ? 5 9 ? ? 8 9 ? 1 4 ? ? 3 2 ? 4 ? ? 8 ? ? 9 ? ? ? 8 ? 5 ? ? ? ? ? ? ? ? 2 ? ? ? ? ? ? 4 5 ? 7 8 $ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar sudoku hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta Solving hadoop-0.20.2/src/examples/org/apache/hadoop/examples/dancing/puzzle1.dta 8 5 1 3 9 2 6 4 7 4 3 2 6 7 8 1 9 5 7 9 6 5 1 4 3 8 2 6 1 4 8 2 3 7 5 9 5 7 8 9 6 1 4 2 3 3 2 9 4 5 7 8 1 6 9 4 7 2 8 6 5 3 1 1 8 5 7 3 9 2 6 4 2 6 3 1 4 5 9 7 8 Found 1 solutions
Pi Estimator
円周率を計算します。残念ながら小数点以下の精度があまり高くありません。
Source
MapperClassのmapとReducerClassのreduce処理とmainで動かすrunメソッドについて抜き出しています。
hadoop-0.20.2/src/examples/org/apache/hadoop/examples/PiEstimator.java/** * Parse arguments and then runs a map/reduce job. * Print output in standard out. * * @return a non-zero if there is an error. Otherwise, return 0. */ public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>"); ToolRunner.printGenericCommandUsage(System.err); return -1; } final int nMaps = Integer.parseInt(args[0]); final long nSamples = Long.parseLong(args[1]); System.out.println("Number of Maps = " + nMaps); System.out.println("Samples per Map = " + nSamples); final JobConf jobConf = new JobConf(getConf(), getClass()); System.out.println("Estimated value of Pi is " + estimate(nMaps, nSamples, jobConf)); return 0; } /** * Mapper class for Pi estimation. * Generate points in a unit square * and then count points inside/outside of the inscribed circle of the square. */ public static class PiMapper extends MapReduceBase implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> { /** Map method. * @param offset samples starting from the (offset+1)th sample. * @param size the number of samples for this map * @param out output {ture->numInside, false->numOutside} * @param reporter */ public void map(LongWritable offset, LongWritable size, OutputCollector<BooleanWritable, LongWritable> out, Reporter reporter) throws IOException { final HaltonSequence haltonsequence = new HaltonSequence(offset.get()); long numInside = 0L; long numOutside = 0L; for(long i = 0; i < size.get(); ) { //generate points in a unit square final double[] point = haltonsequence.nextPoint(); //count points inside/outside of the inscribed circle of the square final double x = point[0] - 0.5; final double y = point[1] - 0.5; if (x*x + y*y > 0.25) { numOutside++; } else { numInside++; } //report status i++; if (i % 1000 == 0) { reporter.setStatus("Generated " + i + " samples."); } } //output map results out.collect(new BooleanWritable(true), new LongWritable(numInside)); out.collect(new BooleanWritable(false), new LongWritable(numOutside)); } } /** * Reducer class for Pi estimation. * Accumulate points inside/outside results from the mappers. */ public static class PiReducer extends MapReduceBase implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> { private long numInside = 0; private long numOutside = 0; private JobConf conf; //configuration for accessing the file system /** Store job configuration. */ @Override public void configure(JobConf job) { conf = job; } /** * Accumulate number of points inside/outside results from the mappers. * @param isInside Is the points inside? * @param values An iterator to a list of point counts * @param output dummy, not used here. * @param reporter */ public void reduce(BooleanWritable isInside, Iterator<LongWritable> values, OutputCollector<WritableComparable<?>, Writable> output, Reporter reporter) throws IOException { if (isInside.get()) { for(; values.hasNext(); numInside += values.next().get()); } else { for(; values.hasNext(); numOutside += values.next().get()); } }Execution
hadoop-examples.jarに対してpi(PiEstimator)、map数、sample数の3つの引数を与えます。しかし以下を実行するとエラーが出てしまいました。どこかに掲載されていた方法で http://localhost:50070 http://localhost:50030 の管理ツールにアクセスすると以前は直ったことは確認したのですが、今回はアクセスしただけでは直りませんでした。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 10 200 Number of Maps = 10 Samples per Map = 200 12/05/08 10:26:58 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /user/yuta/PiEstimator_TMP_3_141592654/in/part0 could only be replicated to 0 nodes, instead of 1 (略) 12/05/08 10:26:58 WARN hdfs.DFSClient: Error Recovery for block null bad datanode[0] nodes == null 12/05/08 10:26:58 WARN hdfs.DFSClient: Could not get block locations. Source file "/user/yuta/PiEstimator_TMP_3_141592654/in/part0" - Aborting...HDFSのファイル容量が膨らんでいる場合も上記のエラーが発生するようなのでデータ容量を見てみましたが、結果として容量は対した事ありませんでした。
$ hdfs -du Found 4 items 310146 hdfs://localhost/user/yuta/madmagi_in 919862 hdfs://localhost/user/yuta/madmagi_out_ma 2560696 hdfs://localhost/user/yuta/madmagi_out_ma_test 41180 hdfs://localhost/user/yuta/sampleshadoop namenodeの再formatを実行します。またsafemodeをOFFにします。※この作業は危険です。HDFSが使えなくなります。
$ su $ stop-all.sh $ hadoop namenode -format $ start-all.sh $ hadoop dfsadmin -safemode leave Safe mode is OFFここで再度実行してみましたが、同じエラーがでました。またどうやら再formatが原因でついにHDFS上のファイル操作が全てできなくなりHadoopを一度resetすることにしました。結果としてはこの作業で直りましたが、HDFSのバックアップが面倒なのともっと良い解決方法がおそらくありそうなのでこれはおすすめできません。
$ su $ rm -rf /var/log/hadoop-0.20 $ rm -rf /usr/lib/hadoop* $ rm -rf /var/log/hadoop $ rm -rf /var/lib/alternatives/hadoop-* $ yum remove yum install hadoop-0.20 -y $ yum install hadoop-0.20-conf-pseudo hadoop-0.20 -y $ /etc/init.d/hadoop-0.20-datanode start $ /etc/init.d/hadoop-0.20-namenode start $ /etc/init.d/hadoop-0.20-tasktracker start $ /etc/init.d/hadoop-0.20-jobtracker start再度円周率計算を実行します。今度は成功しました。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 10 200 Number of Maps = 10 Samples per Map = 200 Wrote input for Map #0 Wrote input for Map #1 Wrote input for Map #2 Wrote input for Map #3 Wrote input for Map #4 Wrote input for Map #5 Wrote input for Map #6 Wrote input for Map #7 Wrote input for Map #8 Wrote input for Map #9 Starting Job 12/05/09 12:04:28 INFO mapred.FileInputFormat: Total input paths to process : 10 12/05/09 12:04:29 INFO mapred.JobClient: Running job: job_201205091151_0002 12/05/09 12:04:30 INFO mapred.JobClient: map 0% reduce 0% 12/05/09 12:04:44 INFO mapred.JobClient: map 10% reduce 0% 12/05/09 12:04:45 INFO mapred.JobClient: map 20% reduce 0% 12/05/09 12:05:00 INFO mapred.JobClient: map 40% reduce 0% 12/05/09 12:05:08 INFO mapred.JobClient: map 40% reduce 13% 12/05/09 12:05:11 INFO mapred.JobClient: map 50% reduce 13% 12/05/09 12:05:12 INFO mapred.JobClient: map 60% reduce 13% 12/05/09 12:05:20 INFO mapred.JobClient: map 60% reduce 20% 12/05/09 12:05:21 INFO mapred.JobClient: map 70% reduce 20% 12/05/09 12:05:24 INFO mapred.JobClient: map 80% reduce 20% 12/05/09 12:05:30 INFO mapred.JobClient: map 80% reduce 26% 12/05/09 12:05:32 INFO mapred.JobClient: map 90% reduce 26% 12/05/09 12:05:33 INFO mapred.JobClient: map 100% reduce 26% 12/05/09 12:05:36 INFO mapred.JobClient: map 100% reduce 33% 12/05/09 12:05:38 INFO mapred.JobClient: map 100% reduce 100% 12/05/09 12:05:41 INFO mapred.JobClient: Job complete: job_201205091151_0002 12/05/09 12:05:41 INFO mapred.JobClient: Counters: 27 12/05/09 12:05:41 INFO mapred.JobClient: Job Counters 12/05/09 12:05:41 INFO mapred.JobClient: Launched reduce tasks=1 12/05/09 12:05:41 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=119740 12/05/09 12:05:41 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/05/09 12:05:41 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/05/09 12:05:41 INFO mapred.JobClient: Launched map tasks=10 12/05/09 12:05:41 INFO mapred.JobClient: Data-local map tasks=10 12/05/09 12:05:41 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=53325 12/05/09 12:05:41 INFO mapred.JobClient: FileSystemCounters 12/05/09 12:05:41 INFO mapred.JobClient: FILE_BYTES_READ=226 12/05/09 12:05:41 INFO mapred.JobClient: HDFS_BYTES_READ=2340 12/05/09 12:05:41 INFO mapred.JobClient: FILE_BYTES_WRITTEN=602824 12/05/09 12:05:41 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=215 12/05/09 12:05:41 INFO mapred.JobClient: Map-Reduce Framework 12/05/09 12:05:41 INFO mapred.JobClient: Map input records=10 12/05/09 12:05:41 INFO mapred.JobClient: Reduce shuffle bytes=280 12/05/09 12:05:41 INFO mapred.JobClient: Spilled Records=40 12/05/09 12:05:41 INFO mapred.JobClient: Map output bytes=180 12/05/09 12:05:41 INFO mapred.JobClient: CPU time spent (ms)=7620 12/05/09 12:05:41 INFO mapred.JobClient: Total committed heap usage (bytes)=1309446144 12/05/09 12:05:41 INFO mapred.JobClient: Map input bytes=240 12/05/09 12:05:41 INFO mapred.JobClient: Combine input records=0 12/05/09 12:05:41 INFO mapred.JobClient: SPLIT_RAW_BYTES=1160 12/05/09 12:05:41 INFO mapred.JobClient: Reduce input records=20 12/05/09 12:05:41 INFO mapred.JobClient: Reduce input groups=2 12/05/09 12:05:41 INFO mapred.JobClient: Combine output records=0 12/05/09 12:05:41 INFO mapred.JobClient: Physical memory (bytes) snapshot=1720111104 12/05/09 12:05:41 INFO mapred.JobClient: Reduce output records=0 12/05/09 12:05:41 INFO mapred.JobClient: Virtual memory (bytes) snapshot=5607768064 12/05/09 12:05:41 INFO mapred.JobClient: Map output records=20 Job Finished in 73.637 seconds Estimated value of Pi is 3.14400000000000000000
WordCount
単語と単語の出現数を記録します。
Source
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount madmagi_in madmagi_out_wordcount $ hdfs -text madmagi_out_wordcount/part-r-00000 12/05/10 08:07:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/05/10 08:07:31 WARN snappy.LoadSnappy: Snappy native library not loaded 10 2 100 2 25 1 4 1 69 1 ― 1 ―― 18 … 330 4 、 1014 。 484 々 1 『 3 』 3 〜 1 ぁ 2 ぁぁ 1 ぁぁぁ 1 ぁぁぁん 1 ぁぁっ 1 ぁあぁあ 1 ぁっ 2 ぁはは 1 あ 28 あぁ 1 ああ 11 あいつ 5 あくまで 1 あけみ 1 あげ 4 あげる 4 あげれ 2 あそば 1 あたし 14
grep
分散環境で文字列を検索します。日本語検索もできます。
Source
package org.apache.hadoop.examples; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* Extracts matching regexs from input files and counts them. */ public class Grep extends Configured implements Tool { private Grep() {} // singleton public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("Grep <inDir> <outDir> <regex> [<group>]"); ToolRunner.printGenericCommandUsage(System.out); return -1; } Path tempDir = new Path("grep-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf grepJob = new JobConf(getConf(), Grep.class); try { grepJob.setJobName("grep-search"); FileInputFormat.setInputPaths(grepJob, args[0]); grepJob.setMapperClass(RegexMapper.class); grepJob.set("mapred.mapper.regex", args[2]); if (args.length == 4) grepJob.set("mapred.mapper.regex.group", args[3]); grepJob.setCombinerClass(LongSumReducer.class); grepJob.setReducerClass(LongSumReducer.class); FileOutputFormat.setOutputPath(grepJob, tempDir); grepJob.setOutputFormat(SequenceFileOutputFormat.class); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); JobClient.runJob(grepJob); JobConf sortJob = new JobConf(Grep.class); sortJob.setJobName("grep-sort"); FileInputFormat.setInputPaths(sortJob, tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); // write a single file FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); sortJob.setOutputKeyComparatorClass // sort by decreasing freq (LongWritable.DecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(grepJob).delete(tempDir, true); } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Grep(), args); System.exit(res); } }Execution
日本語の「まどか」と「さやか」を正規表現で指定します。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar grep madmagi_in madmagi_out_grep '(まどか|さやか)' $ hdfs -text madmagi_out_grep/part-00000 12/05/10 07:54:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/05/10 07:54:47 WARN snappy.LoadSnappy: Snappy native library not loaded 112 さやか 75 まどか
randomwriter
ランダムなテキストデータを生成します。
Source
一部を抜き出して載せています。
package org.apache.hadoop.examples; import java.io.IOException; import java.util.Date; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class RandomWriter extends Configured implements Tool { /** * User counters */ static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } /** * A custom input format that creates virtual inputs of a single string * for each map. */ static class RandomInputFormat implements InputFormat<Text, Text> { /** * Generate the requested number of file splits, with the filename * set to the filename of the output file. */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { InputSplit[] result = new InputSplit[numSplits]; Path outDir = FileOutputFormat.getOutputPath(job); for(int i=0; i < result.length; ++i) { result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, (String[])null); } return result; } /** * Return a single record (filename, "") where the filename is taken from * the file split. */ static class RandomRecordReader implements RecordReader<Text, Text> { Path name; public RandomRecordReader(Path p) { name = p; } public boolean next(Text key, Text value) { if (name != null) { key.set(name.getName()); name = null; return true; } return false; } public Text createKey() { return new Text(); } public Text createValue() { return new Text(); } public long getPos() { return 0; } public void close() {} public float getProgress() { return 0.0f; } } public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new RandomRecordReader(((FileSplit) split).getPath()); } } static class Map extends MapReduceBase implements Mapper<WritableComparable, Writable, BytesWritable, BytesWritable> { private long numBytesToWrite; private int minKeySize; private int keySizeRange; private int minValueSize; private int valueSizeRange; private Random random = new Random(); private BytesWritable randomKey = new BytesWritable(); private BytesWritable randomValue = new BytesWritable(); private void randomizeBytes(byte[] data, int offset, int length) { for(int i=offset + length - 1; i >= offset; --i) { data[i] = (byte) random.nextInt(256); } } /** * Given an output filename, write a bunch of random records to it. */ public void map(WritableComparable key, Writable value, OutputCollector<BytesWritable, BytesWritable> output, Reporter reporter) throws IOException { int itemCount = 0; while (numBytesToWrite > 0) { int keyLength = minKeySize + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); randomKey.setSize(keyLength); randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); int valueLength = minValueSize + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); randomValue.setSize(valueLength); randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); output.collect(randomKey, randomValue); numBytesToWrite -= keyLength + valueLength; reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength); reporter.incrCounter(Counters.RECORDS_WRITTEN, 1); if (++itemCount % 200 == 0) { reporter.setStatus("wrote record " + itemCount + ". " + numBytesToWrite + " bytes left."); } } reporter.setStatus("done with " + itemCount + " records."); } } /** * This is the main routine for launching a distributed random write job. * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. * The reduce doesn't do anything. * * @throws IOException */ public int run(String[] args) throws Exception { if (args.length == 0) { System.out.println("Usage: writer <out-dir>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } Path outDir = new Path(args[0]); JobConf job = new JobConf(getConf()); job.setJarByClass(RandomWriter.class); job.setJobName("random-writer"); FileOutputFormat.setOutputPath(job, outDir); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormat(RandomInputFormat.class); job.setMapperClass(Map.class); job.setReducerClass(IdentityReducer.class); job.setOutputFormat(SequenceFileOutputFormat.class); JobClient client = new JobClient(job); ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10); long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map", 1*1024*1024*1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0"); return -2; } long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); if (numMaps == 0 && totalBytesToWrite > 0) { numMaps = 1; job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite); } job.setNumMapTasks(numMaps); System.out.println("Running " + numMaps + " maps."); // reducer NONE job.setNumReduceTasks(0); Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(job); Date endTime = new Date(); System.out.println("Job ended: " + endTime); System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } }Execution
10MByteのランダムな16進数データを2つ作ります。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar randomwriter -D test.randomwrite.bytes_per_map=10000000 -D test.randomwriter.maps_per_host=2 random-data $ hdfs -du random-data Found 4 items 0 hdfs://localhost/user/yuta/random-data/_SUCCESS 59570 hdfs://localhost/user/yuta/random-data/_logs 10045460 hdfs://localhost/user/yuta/random-data/part-00000 10042286 hdfs://localhost/user/yuta/random-data/part-00001 $ hdfs -text random-data/part-00000 f aa b9 bc 94 4d bc af ac c0 9a 1c c9 24 31 8d 85 a6 80 5b 62 65 56 37 8a 95 6c a5 33 39 5c ac ab 38 53 67 55 92 f9 04 6c 86 cd 39 9d 13 63 8b 00 b7 0c 03 61 38 0a bc 0a af 92 17 63 f1 eb b1 89 57 03 12 56 1a 1c 87 c7 40 2c 4d 94 02 16 73 da 73 9f 72 ee de f1 17 0e b1 f6 cf 76 1d 17 d4 11 5d 65 c1 f0 d6 c3 af 38 fb 24 75 69 f9 01 54 52 c6 fc fa dd 98 18 a7 be e9 4f 66 51 9a b2 46 37 d6 e1 95 f1 2e 8c 02 c9 e1 02 a2 e1 c7 ff 70同じようなプログラムでrandomtextwriterというものがあり、これは10MByteのデータを10個作成します。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar randomtextwriter -D test.randomtextwrite.bytes_per_map=10000000 -D test.randomtextwriter.maps_per_host=2 random-text-data $ hdfs -du random-text-data 10265269 hdfs://localhost/user/yuta/random-text-data/part-00000 10265294 hdfs://localhost/user/yuta/random-text-data/part-00001 10265824 hdfs://localhost/user/yuta/random-text-data/part-00002 10265560 hdfs://localhost/user/yuta/random-text-data/part-00003 10267562 hdfs://localhost/user/yuta/random-text-data/part-00004 10266170 hdfs://localhost/user/yuta/random-text-data/part-00005 10265334 hdfs://localhost/user/yuta/random-text-data/part-00006 10265731 hdfs://localhost/user/yuta/random-text-data/part-00007 10266359 hdfs://localhost/user/yuta/random-text-data/part-00008 10265977 hdfs://localhost/user/yuta/random-text-data/part-00009 $ hdfs -text random-text-data/part-00000 bromate michigan ploration prospectiveness mendacity pyxie edificator posttraumatic oratorize constitutor silverhead critically schoolmasterism unlapsing Mormyrus chooser licitness undinted sangaree vinegarish precostal uncontradictableness warriorwise embryotic repealableness alen catabaptist comprovincial archididascalian returnability giantly unachievable pope calabazilla topsail epidymides palaeotheriodont hemimelus
sort
上randomwriterで生成したrandom-dataを利用してsortします。sort結果の確認が微妙ですが、一応結果を載せておきます。
Source
package org.apache.hadoop.examples; import java.io.IOException; import java.net.URI; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.InputSampler; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Sort<K,V> extends Configured implements Tool { private RunningJob jobResult = null; static int printUsage() { System.out.println("sort [-m <maps>] [-r <reduces>] " + "[-inFormat <input format class>] " + "[-outFormat <output format class>] " + "[-outKey <output key class>] " + "[-outValue <output value class>] " + "[-totalOrder <pcnt> <num samples> <max splits>] " + "<input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } /** * The main driver for sort program. * Invoke this method to submit the map/reduce job. * @throws IOException When there is communication problems with the * job tracker. */ public int run(String[] args) throws Exception { JobConf jobConf = new JobConf(getConf(), Sort.class); jobConf.setJobName("sorter"); jobConf.setMapperClass(IdentityMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); String sort_reduces = jobConf.get("test.sort.reduces_per_host"); if (sort_reduces != null) { num_reduces = cluster.getTaskTrackers() * Integer.parseInt(sort_reduces); } Class<? extends InputFormat> inputFormatClass = SequenceFileInputFormat.class; Class<? extends OutputFormat> outputFormatClass = SequenceFileOutputFormat.class; Class<? extends WritableComparable> outputKeyClass = BytesWritable.class; Class<? extends Writable> outputValueClass = BytesWritable.class; List<String> otherArgs = new ArrayList<String>(); InputSampler.Sampler<K,V> sampler = null; for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { jobConf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { num_reduces = Integer.parseInt(args[++i]); } else if ("-inFormat".equals(args[i])) { inputFormatClass = Class.forName(args[++i]).asSubclass(InputFormat.class); } else if ("-outFormat".equals(args[i])) { outputFormatClass = Class.forName(args[++i]).asSubclass(OutputFormat.class); } else if ("-outKey".equals(args[i])) { outputKeyClass = Class.forName(args[++i]).asSubclass(WritableComparable.class); } else if ("-outValue".equals(args[i])) { outputValueClass = Class.forName(args[++i]).asSubclass(Writable.class); } else if ("-totalOrder".equals(args[i])) { double pcnt = Double.parseDouble(args[++i]); int numSamples = Integer.parseInt(args[++i]); int maxSplits = Integer.parseInt(args[++i]); if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; sampler = new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits); } else { otherArgs.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); // exits } } // Set user-supplied (possibly default) job configs jobConf.setNumReduceTasks(num_reduces); jobConf.setInputFormat(inputFormatClass); jobConf.setOutputFormat(outputFormatClass); jobConf.setOutputKeyClass(outputKeyClass); jobConf.setOutputValueClass(outputValueClass); // Make sure there are exactly 2 parameters left. if (otherArgs.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + otherArgs.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(jobConf, otherArgs.get(0)); FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1))); if (sampler != null) { System.out.println("Sampling input to effect total-order sort..."); jobConf.setPartitionerClass(TotalOrderPartitioner.class); Path inputDir = FileInputFormat.getInputPaths(jobConf)[0]; inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf)); Path partitionFile = new Path(inputDir, "_sortPartitioning"); TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); InputSampler.<K,V>writePartitionFile(jobConf, sampler); URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning"); DistributedCache.addCacheFile(partitionUri, jobConf); DistributedCache.createSymlink(jobConf); } System.out.println("Running on " + cluster.getTaskTrackers() + " nodes to sort from " + FileInputFormat.getInputPaths(jobConf)[0] + " into " + FileOutputFormat.getOutputPath(jobConf) + " with " + num_reduces + " reduces."); Date startTime = new Date(); System.out.println("Job started: " + startTime); jobResult = JobClient.runJob(jobConf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Sort(), args); System.exit(res); } /** * Get the last job that was run using this instance. * @return the results of the last job that was run */ public RunningJob getResult() { return jobResult; } }Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar sort random-data sorted-data $ hdfs -du sorted-data Found 3 items 0 hdfs://localhost/user/yuta/sorted-data/_SUCCESS 63175 hdfs://localhost/user/yuta/sorted-data/_logs 20088639 hdfs://localhost/user/yuta/sorted-data/part-00000 $ hdfs -text sorted-data/part-00000 ac 93 31 14 e3 36 6a bb 43 2b 51 2f 4c 50 5e a4 fd 28 7c 34 2b bc 92 b3 f5 0f e2 95 c6 96 3f 0a d1 ac 90 80 ac 4d 4d de 67 9c bd 0b 6b 14 1c ed fb 38 c2 51 c0 d7 42 9d 0b ab ea 2e ae fc c7 aa 6b 17 9a cf 25 b9 74 e2 93 b0 47 c0 18 5a fc 1f 58 3d 3a b7 c8 c8 e5 1d 30 eb 52 b4 f5 9e 80 9c 23 51 a9 7a 37 95 97 6e 1a 76 4a e7 60 67 fe e8 f2 00 3f d3 05 2d e1 57 00 1c 7c b1 9b 20 2b ba 1a 8e 9b 3a 92 e6 65 56 13 f6 cd db b8 ed 24 1d d8 e4 da c5 f5 fb 20 1a cb 3f df a9 fe f4 2b aa f2 87 00 67 02 f6 5f 32 a9 e1 4d 0e a4 05 15 91 38 34 cc a0 a7 43 9d 88 7a 9c ec 33 ad 2a 1e c3 08 18 61 fb 1f 5c 82 ed fb e1 0f da 1d
join
分割データの結合を行います。
Source
package org.apache.hadoop.examples; import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.join.*; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Join extends Configured implements Tool { static int printUsage() { System.out.println("join [-m <maps>] [-r <reduces>] " + "[-inFormat <input format class>] " + "[-outFormat <output format class>] " + "[-outKey <output key class>] " + "[-outValue <output value class>] " + "[-joinOp <inner|outer|override>] " + "[input]* <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } /** * The main driver for sort program. * Invoke this method to submit the map/reduce job. * @throws IOException When there is communication problems with the * job tracker. */ public int run(String[] args) throws Exception { JobConf jobConf = new JobConf(getConf(), Sort.class); jobConf.setJobName("join"); jobConf.setMapperClass(IdentityMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); int num_maps = cluster.getTaskTrackers() * jobConf.getInt("test.sort.maps_per_host", 10); int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); String sort_reduces = jobConf.get("test.sort.reduces_per_host"); if (sort_reduces != null) { num_reduces = cluster.getTaskTrackers() * Integer.parseInt(sort_reduces); } Class<? extends InputFormat> inputFormatClass = SequenceFileInputFormat.class; Class<? extends OutputFormat> outputFormatClass = SequenceFileOutputFormat.class; Class<? extends WritableComparable> outputKeyClass = BytesWritable.class; Class<? extends Writable> outputValueClass = TupleWritable.class; String op = "inner"; List<String> otherArgs = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { num_maps = Integer.parseInt(args[++i]); } else if ("-r".equals(args[i])) { num_reduces = Integer.parseInt(args[++i]); } else if ("-inFormat".equals(args[i])) { inputFormatClass = Class.forName(args[++i]).asSubclass(InputFormat.class); } else if ("-outFormat".equals(args[i])) { outputFormatClass = Class.forName(args[++i]).asSubclass(OutputFormat.class); } else if ("-outKey".equals(args[i])) { outputKeyClass = Class.forName(args[++i]).asSubclass(WritableComparable.class); } else if ("-outValue".equals(args[i])) { outputValueClass = Class.forName(args[++i]).asSubclass(Writable.class); } else if ("-joinOp".equals(args[i])) { op = args[++i]; } else { otherArgs.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); // exits } } // Set user-supplied (possibly default) job configs jobConf.setNumMapTasks(num_maps); jobConf.setNumReduceTasks(num_reduces); if (otherArgs.size() < 2) { System.out.println("ERROR: Wrong number of parameters: "); return printUsage(); } FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.remove(otherArgs.size() - 1))); List<Path> plist = new ArrayList<Path>(otherArgs.size()); for (String s : otherArgs) { plist.add(new Path(s)); } jobConf.setInputFormat(CompositeInputFormat.class); jobConf.set("mapred.join.expr", CompositeInputFormat.compose( op, inputFormatClass, plist.toArray(new Path[0]))); jobConf.setOutputFormat(outputFormatClass); jobConf.setOutputKeyClass(outputKeyClass); jobConf.setOutputValueClass(outputValueClass); Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(jobConf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Join(), args); System.exit(res); } }Execution
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar join random-data joined-data $ hdfs -du joined-data Found 3 items 0 hdfs://localhost/user/yuta/joined-data/_SUCCESS 63015 hdfs://localhost/user/yuta/joined-data/_logs 20158803 hdfs://localhost/user/yuta/joined-data/part-00000 $ hdfs -text joined-data/part-00000 [84 0d 6a 97 fc c7 fa f8 3d 32 a3 b9 e7 85 09 e4 43 ca 8a 56 d9 6d 74 7c ba 04 18 71 51 f7 c2 7b 03 89 84 0a ce 4a 27 7a 23 ee 3e 84 ef a7 3d e3 e8 6e 61 15 6a c8 e0 b1 9e 89 78 e9 10 f3 13 c7 a7 19 b4 09 2e 09 a8 f1 8e 9e 50 70 84 8d e2 85 95 27 87 88 16 eb 80 3e 22 40 82 da f3 a4 9f e0 8f 6c e6 82 16 f7 88 06 51 96 ef f2 32 ac e2 c9 7c 9d c2 6e 45 60 db b3 05 a5]
Tips
Config File
Hadoopの設定ファイルは以下の箇所に設置されます。
$ ree /usr/lib/hadoop-0.20/conf/ /usr/lib/hadoop-0.20/conf/ |-- README |-- capacity-scheduler.xml |-- configuration.xsl |-- core-site.xml |-- fair-scheduler.xml |-- hadoop-env.sh |-- hadoop-metrics.properties |-- hadoop-policy.xml |-- hdfs-site.xml |-- log4j.properties |-- mapred-queue-acls.xml |-- mapred-site.xml |-- masters |-- org-xerial-snappy.properties |-- slaves |-- ssl-client.xml.example |-- ssl-server.xml.example `-- taskcontroller.cfglog
Hadoopシステムのログは以下の箇所に記録されます。
$ tree /usr/lib/hadoop-0.20/logs |-- hadoop-hadoop-datanode-localhost.localdomain.log.YYYY-MM-DD |-- hadoop-hadoop-jobtracker-localhost.localdomain.log.YYYY-MM-DD |-- hadoop-hadoop-namenode-localhost.localdomain.log.YYYY-MM-DD |-- hadoop-hadoop-secondarynamenode-localhost.localdomain.log.YYYY-MM-DD |-- hadoop-hadoop-tasktracker-localhost.localdomain.log.YYYY-MM-DDSequenceFileとは
Binary型のKey-Valueレコードをまとめたファイルです。hadoop fs -text SequenceFile と実行するとテキストとして読めます。ただし独自に圧縮をかけたSequenceFileなども存在するため、その場合はDecode処理書く必要がありそうです。MapReduce実行中に以下のように怒られる時がありますが、それはinputFileがSequenceFile形式になっていないためです。
12/05/10 10:09:58 INFO mapred.JobClient: Task Id : attempt_201205100725_0022_m_000000_0, Status : FAILED java.io.IOException: hdfs://localhost/user/yuta/madmagi_out_wordcount/part-r-00000 not a SequenceFile