Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

CentOSでHadoopを使ってみる

            __  __          __
           / / / /___ _____/ /___  ____  ____
          / /_/ / __ ‘/ __  / __ \/ __ \/ __ \
         / __  / /_/ / /_/ / /_/ / /_/ / /_/ /
        /_/ /_/\__,_/\__,_/\____/\____/ .___/
                                     /_/

インストール

jdkのインストール
$ wget http://download.oracle.com/otn-pub/java/jdk/7u1-b08/jdk-7u1-linux-x64.rpm
$ sudo rpm -ivh jdk-7u1-linux-x64.rpm
準備中...                ########################################### [100%]
   1:jdk                    ########################################### [100%]
Unpacking JAR files...
	rt.jar...
	jsse.jar...
	charsets.jar...
	tools.jar...
	localedata.jar...
環境変数の設定
  • 全てのユーザの環境変数として設定したいため、/etc/profileにJAVA_HOMEの設定を追加する。
export JAVA_HOME=/usr/java/default
  • 設定変更を反映と確認をする。
$ source /etc/profile
$ echo $JAVA_HOME
/usr/java/default
reposの登録
  • yumでダウンロード/インストールするための例の奴。cloudera社のドメインから取得する。
$ wget http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo
$ sudo mv cloudera-cdh3.repo /etc/yum.repos.d/
$ sudo yum update yum
$ yum search hadoop
Loaded plugins: downloadonly, fastestmirror
cloudera-cdh3                                                                                                         81/81
===================================================== Matched: hadoop ======================================================
flume.noarch : Flume is a reliable, scalable, and manageable distributed log collection application for collecting data such
             : as logs and delivering it to data stores such as Hadoop's HDFS.
flume-master.noarch : The flume master daemon is the central administration and data path control point for flume nodes.
flume-node.noarch : The flume node daemon is a core element of flume's data path and is responsible for generating,
                  : processing, and delivering data.
hadoop-0.20.noarch : Hadoop is a software platform for processing vast amounts of data
hadoop-0.20-conf-pseudo.noarch : Hadoop installation in pseudo-distributed mode
hadoop-0.20-datanode.noarch : Hadoop Data Node
hadoop-0.20-debuginfo.i386 : Debug information for package hadoop-0.20
hadoop-0.20-debuginfo.x86_64 : Debug information for package hadoop-0.20
hadoop-0.20-doc.noarch : Hadoop Documentation
hadoop-0.20-fuse.i386 : Mountable HDFS
hadoop-0.20-fuse.x86_64 : Mountable HDFS
hadoop-0.20-jobtracker.noarch : Hadoop Job Tracker
hadoop-0.20-libhdfs.i386 : Hadoop Filesystem Library
hadoop-0.20-libhdfs.x86_64 : Hadoop Filesystem Library
hadoop-0.20-namenode.noarch : The Hadoop namenode manages the block locations of HDFS files
hadoop-0.20-native.i386 : Native libraries for Hadoop Compression
hadoop-0.20-native.x86_64 : Native libraries for Hadoop Compression
hadoop-0.20-pipes.i386 : Hadoop Pipes Library
hadoop-0.20-pipes.x86_64 : Hadoop Pipes Library
hadoop-0.20-sbin.i386 : Binaries for secured Hadoop clusters
hadoop-0.20-sbin.x86_64 : Binaries for secured Hadoop clusters
hadoop-0.20-secondarynamenode.noarch : Hadoop Secondary namenode
hadoop-0.20-source.noarch : Source code for Hadoop
hadoop-0.20-tasktracker.noarch : Hadoop Task Tracker
hadoop-hbase.noarch : HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big
                    : Data. This project's goal is the hosting of very large tables -- billions of rows X millions of
                    : columns -- atop clusters of commodity hardware.
hadoop-hbase-doc.noarch : Hbase Documentation
hadoop-hbase-master.noarch : The Hadoop HBase master Server.
hadoop-hbase-regionserver.noarch : The Hadoop HBase RegionServer server.
hadoop-hbase-thrift.noarch : The Hadoop HBase Thrift Interface
hadoop-hive.noarch : Hive is a data warehouse infrastructure built on top of Hadoop
hadoop-hive-metastore.noarch : Shared metadata repository for Hive.
hadoop-hive-server.noarch : Provides a Hive Thrift service.
hadoop-pig.noarch : Pig is a platform for analyzing large data sets
hadoop-zookeeper.noarch : A high-performance coordination service for distributed applications.
hadoop-zookeeper-server.noarch : The Hadoop Zookeeper server
hue.noarch : The hue metapackage
hue-common.i386 : A browser-based desktop interface for Hadoop
hue-common.x86_64 : A browser-based desktop interface for Hadoop
hue-filebrowser.noarch : A UI for the Hadoop Distributed File System (HDFS)
hue-jobbrowser.noarch : A UI for viewing Hadoop map-reduce jobs
hue-jobsub.noarch : A UI for designing and submitting map-reduce jobs to Hadoop
hue-plugins.noarch : Hadoop plugins for Hue
hue-shell.i386 : A shell for console based Hadoop applications
hue-shell.x86_64 : A shell for console based Hadoop applications
mahout.noarch : A set of Java libraries for scalable machine learning.
oozie.noarch : Oozie is a system that runs workflows of Hadoop jobs.
sqoop.noarch : Sqoop allows easy imports and exports of data sets between databases and the Hadoop Distributed File System
             : (HDFS).
yum install
  • とりえあず本体だけ入れる。
$ su
$ yum install hadoop-0.20 -y
  • 1台で動かす設定を入れる。
$ yum install hadoop-0.20-conf-pseudo -y
  • 上のコマンドを実行すると依存パッケージが入る。これらはデーモンプロセスを起動するために必要。
  • hive,pig,hbaseを入れる。
$ yum install hadoop-hive -y
$ yum install hadoop-pig -y
$ yum install hadoop-hbase -y
hadoopの起動
  • 依存して入ったパッケージによるデーモンプロセスを起動。
$ /etc/init.d/hadoop-0.20-datanode start
$ /etc/init.d/hadoop-0.20-namenode start
$ /etc/init.d/hadoop-0.20-tasktracker start
$ /etc/init.d/hadoop-0.20-jobtracker start
$ /etc/init.d/hadoop-0.20-secondarynamenode start #これは最初は不要

HDF(Hadoop Distributed File System)

  • HDFSを一言で表すならHadoop上のファイルを効率よく安全に管理するためのファイルシステム
  • Map/Reduceの仕組みを利用し、データの計算を分散サーバ上で行い、結果をネットワークを介して取得する。
  • NameNodeとDataNodeの2週類サーバで構成。
    • NameNodeサーバにファイルのメタ情報(どのサーバのどこのディレクトリに何のファイルのアクセス権で設定されているかなど)を格納。
    • DataNodeサーバにデータファイルを格納。(ファイルはブロックという特定サイズで分割し格納。)
  • 設定ファイルのディレクトリ:/etc/hadoop-0.20/conf.pseudo/
alias設定
  • hdfsのコマンドは長くなってしまうので、以下の1行を$HOME/.bashrc等にaliasとして張っておくと便利。
alias hdfs='/usr/bin/hadoop dfs'
hdfs上にフォルダを作成
$ hdfs -mkdir HDFS_TEST01
$ hdfs -ls
hdfs -ls
Found 1 items
drwxr-xr-x   - yuta supergroup          0 2011-12-04 22:36 /user/yuta/HDFS_TEST01
hdfsにファイルをコピー
  • ローカルで作成したファイルをhdfs上に配る。
$ cat hdfs_input/input 
haddop1
haddop2
haddop3
haddop4
haddop5
$ hdfs -put hdfs_input/input HDFS_TEST01
hdfs上のファイルを確認
$ hdfs -cat HDFS_TEST01/input
hadoop1
hadoop2
hadoop3
hadoop4
hadoop5
hdfs上のファイルを取得
$ hdfs -get HDFS_TEST01/input output 
$ cat output/input
hadoop1
hadoop2
hadoop3
hadoop4
hadoop5

Map/Reduce

  • Map/Reduceは正確に言うとMap/Shuffle/Reduce処理に分けられる。Shuffleは内部的に自動で行われる。
    • MapはHashを作成することをイメージする。例えば英語テキスト中の単語のカウントをする場合、単語を区切り、各単語に1という数値を割り当てる。その際に{key,value}というペアでデータを持つ。
    • Shuffleはkey順でのsortと重複しないようにユニークなkeyに対してvalueを割り当てる。shuffleの段階ではkeyの重複を削るだけ。まだReduce処理はしない。
    • Reduceでkeyに対するvalueを整形する。例えば単語カウントの場合、shuffleされた各value値を加算する。
円周率を計算してみる
  • サンプルとして存在するMap/Reduceのpi計算を行う。5というのがMapの数。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 5 2000
Number of Maps  = 5
Samples per Map = 2000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Starting Job
11/12/04 23:08:40 INFO mapred.FileInputFormat: Total input paths to process : 5
11/12/04 23:08:41 INFO mapred.JobClient: Running job: job_201112042137_0001
11/12/04 23:08:42 INFO mapred.JobClient:  map 0% reduce 0%
11/12/04 23:09:00 INFO mapred.JobClient:  map 20% reduce 0%
11/12/04 23:09:01 INFO mapred.JobClient:  map 40% reduce 0%
11/12/04 23:09:14 INFO mapred.JobClient:  map 60% reduce 0%
11/12/04 23:09:17 INFO mapred.JobClient:  map 80% reduce 0%
11/12/04 23:09:23 INFO mapred.JobClient:  map 100% reduce 0%
11/12/04 23:09:45 INFO mapred.JobClient:  map 100% reduce 100%
11/12/04 23:09:51 INFO mapred.JobClient: Job complete: job_201112042137_0001
11/12/04 23:09:51 INFO mapred.JobClient: Counters: 23
11/12/04 23:09:51 INFO mapred.JobClient:   Job Counters 
11/12/04 23:09:51 INFO mapred.JobClient:     Launched reduce tasks=1
11/12/04 23:09:51 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=67682
11/12/04 23:09:51 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
11/12/04 23:09:51 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
11/12/04 23:09:51 INFO mapred.JobClient:     Launched map tasks=5
11/12/04 23:09:51 INFO mapred.JobClient:     Data-local map tasks=5
11/12/04 23:09:51 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=46591
11/12/04 23:09:51 INFO mapred.JobClient:   FileSystemCounters
11/12/04 23:09:51 INFO mapred.JobClient:     FILE_BYTES_READ=116
11/12/04 23:09:51 INFO mapred.JobClient:     HDFS_BYTES_READ=1170
11/12/04 23:09:51 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=324508
11/12/04 23:09:51 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215
11/12/04 23:09:51 INFO mapred.JobClient:   Map-Reduce Framework
11/12/04 23:09:51 INFO mapred.JobClient:     Reduce input groups=2
11/12/04 23:09:51 INFO mapred.JobClient:     Combine output records=0
11/12/04 23:09:51 INFO mapred.JobClient:     Map input records=5
11/12/04 23:09:51 INFO mapred.JobClient:     Reduce shuffle bytes=140
11/12/04 23:09:51 INFO mapred.JobClient:     Reduce output records=0
11/12/04 23:09:51 INFO mapred.JobClient:     Spilled Records=20
11/12/04 23:09:51 INFO mapred.JobClient:     Map output bytes=90
11/12/04 23:09:51 INFO mapred.JobClient:     Map input bytes=120
11/12/04 23:09:51 INFO mapred.JobClient:     Combine input records=0
11/12/04 23:09:51 INFO mapred.JobClient:     Map output records=10
11/12/04 23:09:51 INFO mapred.JobClient:     SPLIT_RAW_BYTES=580
11/12/04 23:09:51 INFO mapred.JobClient:     Reduce input records=10
Job Finished in 72.311 seconds
Estimated value of Pi is 3.14080000000000000000

72.311sも掛かりながらこの精度ですか....

Map/Reduceを自分で書く

  • 以下ではHadoop拡張のHadoopStreamingを用いて標準入出力を介するプログラム処理を記述する。
  • 単語の出現回数を計算する処理を自分で書く。言語はPythonを利用する。
  • map.py reduce.pyの2種類を用意する
map.py/reduce.pyの作成
  • map.py
#!/usr/bin/env python
 
import sys
 
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
            print '%s\t%s' % (word, 1)
  • reduce.py
#!/usr/bin/env python

from operator import itemgetter
import sys
 
word2count = {}
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        pass
 
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)
  • map.pyのテスト。keyに対するvalueが設定される。
$ echo 'c c++ java python perl php javascript java python' | /home/yuta/work/dev/hadoop/map_reduce/map.py      
c	1
c++	1
java	1
python	1
perl	1
php	1
javascript	1
java	1
python	1
  • reduce.pyのテスト。keyに対してvalueの値が束ねられている。
$ echo 'c c++ java python perl php javascript java python' | /home/yuta/work/dev/hadoop/map_reduce/map.py | sort | /home/yuta/work/dev/hadoop/map_reduce/reduce.py
c	1
c++	1
java	2
javascript	1
perl	1
php	1
python	2
テキストデータをHDFS上にコピー
$ mkdir gutenberg
$ cd gutenberg
$ wget http://www.gutenberg.org/cache/epub/20417/pg20417.txt
$ wget http://www.gutenberg.org/cache/epub/5000/pg5000.txt
$ wget http://www.gutenberg.org/cache/epub/4300/pg4300.txt
$ cd ..
$ hdfs -copyFromLocal gutenberg gutenberg
$ hdfs -ls gutenberg
Found 3 items
-rw-r--r--   1 yuta supergroup     674566 2011-12-04 23:56 /user/yuta/gutenberg/pg20417.txt
-rw-r--r--   1 yuta supergroup    1573150 2011-12-04 23:56 /user/yuta/gutenberg/pg4300.txt
-rw-r--r--   1 yuta supergroup    1423801 2011-12-04 23:56 /user/yuta/gutenberg/pg5000.txt
hadoopでmap.py/reduce.pyを実行
$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -mapper /home/yuta/work/dev/hadoop/map_reduce/map.py -reducer /home/yuta/work/dev/hadoop/map_reduce/reduce.py -input gutenberg/* -output gutenberg-output
packageJobJar: [/var/lib/hadoop-0.20/cache/yuta/hadoop-unjar3531972439212126610/] [] /tmp/streamjob4995151306966509636.jar tmpDir=null
11/12/04 23:57:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/12/04 23:57:14 WARN snappy.LoadSnappy: Snappy native library not loaded
11/12/04 23:57:14 INFO mapred.FileInputFormat: Total input paths to process : 3
11/12/04 23:57:15 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local]
11/12/04 23:57:15 INFO streaming.StreamJob: Running job: job_201112042137_0003
11/12/04 23:57:15 INFO streaming.StreamJob: To kill this job, run:
11/12/04 23:57:15 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0003
11/12/04 23:57:15 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0003
11/12/04 23:57:16 INFO streaming.StreamJob:  map 0%  reduce 0%
11/12/04 23:58:20 INFO streaming.StreamJob:  map 100%  reduce 100%
11/12/04 23:58:20 INFO streaming.StreamJob: To kill this job, run:
11/12/04 23:58:20 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0003
11/12/04 23:58:20 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0003
11/12/04 23:58:20 ERROR streaming.StreamJob: Job not successful. Error: NA
11/12/04 23:58:20 INFO streaming.StreamJob: killJob...
Streaming Command Failed!

処理に失敗している.... 調べてみたところfileオプションを付ける必要があるみたい。

  • fileオプションを付けて再チャレンジ
$ hdoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -mapper /home/yuta/work/dev/hadoop/map_reduce/map.py -reducer /home/yuta/work/dev/hadoop/map_reduce/reduce.py -input gutenberg/* -output gutenberg-output -file /home/yuta/work/dev/hadoop/map_reduce/map.py -file /home/yuta/work/dev/hadoop/map_reduce/reduce.py
packageJobJar: [/home/yuta/work/dev/hadoop/map_reduce/map.py, /home/yuta/work/dev/hadoop/map_reduce/reduce.py, /var/lib/hadoop-0.20/cache/yuta/hadoop-unjar8731079524882793743/] [] /tmp/streamjob400379298028508245.jar tmpDir=null
11/12/05 00:26:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/12/05 00:26:29 WARN snappy.LoadSnappy: Snappy native library not loaded
11/12/05 00:26:29 INFO mapred.FileInputFormat: Total input paths to process : 3
11/12/05 00:26:29 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local]
11/12/05 00:26:29 INFO streaming.StreamJob: Running job: job_201112042137_0007
11/12/05 00:26:29 INFO streaming.StreamJob: To kill this job, run:
11/12/05 00:26:29 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0007
11/12/05 00:26:29 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0007
11/12/05 00:26:31 INFO streaming.StreamJob:  map 0%  reduce 0%
11/12/05 00:26:47 INFO streaming.StreamJob:  map 33%  reduce 0%
11/12/05 00:26:51 INFO streaming.StreamJob:  map 67%  reduce 0%
11/12/05 00:27:02 INFO streaming.StreamJob:  map 100%  reduce 0%
11/12/05 00:27:11 INFO streaming.StreamJob:  map 100%  reduce 100%
11/12/05 00:27:18 INFO streaming.StreamJob: Job complete: job_201112042137_0007
11/12/05 00:27:18 INFO streaming.StreamJob: Output: gutenberg-output

Warningが出ているけれども取りあえずは成功。

  • outputファイルを確認する。
$ hdfs -cat gutenberg-output/part-00000  | head -n 20
"(Lo)cra"	1
"1490	1
"1498,"	1
"35"	1
"40,"	1
"A	2
"AS-IS".	1
"A_	1
"Absoluti	1
"Alack!	1
"Alack!"	1
"Alla	1
"Allegorical	1
"Alpha	1
"Alpha,"	1
"Alpine-glow"	1
"An	2
"And	3
"Antoni	1
"At	1

Map/Reduceが成功して、wordの出現回数が記録されている。

Map/Reduceを最適化する
  • map.py
#!/usr/bin/env python

import sys
 
def read_input(file):
    for line in file:
        yield line.split()
 
def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print '%s%s%d' % (word, separator, 1)
 
if __name__ == "__main__":
    main()
  • reduce.py
#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys
 
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)
 
def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
 
if __name__ == "__main__":
    main()

Hadoopのエラー

Name node is in safe mode.Resources are low on NN. Safe mode must be turned off manually.

何かしらのエラーでHadoopがSafe Modeに切り替わってしまっているみたい。以下のコマンドでSafe Modeが解除できる。

$ hadoop dfsadmin -safemode leave
Safe mode is OFF
$ hadoop dfsadmin -safemode get
Safe mode is OFF
WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException:could only be replicated to 0 nodes, instead of 1

このエラーはhttp://localhost:50070/にアクセスすると直る。

今後学習しないといけないこと

時間がある時に次のことをまとめようと思う。

  • 分散grep
  • 分散sort
  • 分散キャッシュ