Mahout の、と言いつつ今回も Hadoop の話ばかり。
Hadoop は各ノードにアプリケーションを配布する関係から、通常 jar を作らなければならない。そのため、Eclipse で書いたコードを実行するのもデバッグするのも非常にめんどくさい。
でもうまくやれば、スタンドアローンモード限定だが、Eclipse から jar も作らずそのまま Map-Reduce ジョブを起動できる。デバッグ実行もできるので、ブレイクポイント入れてステップ実行とかも可能だ。
今回はそういう環境の作り方の話。ずいぶん苦労したけど、出来るようになったら簡単。
Eclipse のセットアップなどは済んでいるものとする。Mahout の開発環境を Maven+Eclipse で作る (1) - Mi manca qualche giovedi`? 参照。
なお、Hadoop を展開すると contrib/eclipse-plugin という一見良さげなものが転がっているが、おそらくまともにメンテナンスされていないのではないか、と疑っている。Hadoop 0.20 に同梱のものはメニューから Run on Hadoop しても出るはずのダイアログが出ない。0.21 に同梱のものに至っては、Hadoop のネームノードを登録するためのダイアログすら出ないので、インストールすることしかできない(いずれも Eclipse 3.6 SR1 x64 の話)。
というわけで、今回の話ではこのプラグインは使わない。
Hadoop のプロジェクトを作る
まず Hadoop 自身を Eclipse に取り込む。
Hadoop は Maven ではなく Ant なので、プロジェクトのインポートは手作業で行う。が、今回必要としている最小限の範囲なら実はたいしてめんどくさくない。
Eclipse で Java Project を作って、"hadoop-0.20.2" などと名前を付けておく。
次にメニューから File > Import > General > Archive File を選び、Hadoop 0.20.2 のアーカイブファイル hadoop-0.20.2.tar.gz を先ほど作った hadoop-0.20.2 プロジェクトにインポートする。
次に Hadoop ソースフォルダとビルドに必要なライブラリを追加していくのだが、なぜか ant のライブラリだけが Hadoop のアーカイブに欠けているので、 http://ant.apache.org/bindownload.cgi から apache-ant-1.8.2-bin.zip あたりをさっくり落とし、ant.jar を取り出して $WORKSPACE/hadoop-0.20.2/hadoop-0.20.2/lib/ に他のライブラリ達と一緒に放り込んでおこう。
0.21.0 でない理由は Mahout の開発環境を Maven+Eclipse で作る (2) Hadoop セットアップ - Mi manca qualche giovedi`? など参照。
そして $WORKSPACE/hadoop-0.20.2/.classpath ファイルを次のように書き換える。
<?xml version="1.0" encoding="UTF-8"?> <classpath> <classpathentry kind="src" path="hadoop-0.20.2/src/core"/> <classpathentry kind="src" path="hadoop-0.20.2/src/mapred"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/commons-logging-1.0.4.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/xmlenc-0.52.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/commons-net-1.4.1.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/kfs-0.2.2.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/jets3t-0.6.1.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/servlet-api-2.5-6.1.14.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/jetty-6.1.14.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/jetty-util-6.1.14.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/commons-codec-1.3.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/log4j-1.2.15.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/commons-cli-1.2.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/ant.jar"/> <classpathentry kind="lib" path="hadoop-0.20.2/lib/commons-httpclient-3.0.1.jar"/> <classpathentry kind="output" path="bin"/> </classpath>
これが済んだら Eclipse のウィンドウに戻って、hadoop-0.20.2 プロジェクトを右クリック > Refresh する。
Package Explorer が次のようになればOK。すでにバックグラウンドでビルドが走っている。
これで Hadoop の取り込み完了。
今回は後のサンプルを動かすための最小限になっているので、凝ったコードだと不足するかもしれない。その場合は適宜ソースフォルダや参照ライブラリを追加。
黄色アイコンはほとんどが生 generic か、deprecated か、使ってない import か、DEAD CODE の WARNING。
なので、気にしなくても(多分)大丈夫。どうせ 2900 個くらいしかないから……。
単語カウントのサンプル
この環境が本当に Hadoop の開発に使えるのか、よくある単語カウントのサンプルコードを動かしてみよう。
Hadoop 本家のドキュメント Map/Reduce Tutorial に単語カウントのサンプルが載っているが、このコードはなんと deprecated の山。いや、そんなワナは勘弁してください。
というわけで、参考にするなら Hadoop徹底入門 あたりにしよう。この本の5章に載っている単語カウントのサンプルはちゃんと 0.20 対応のものになっている。
まずは Eclipse メニューの File > New > Java Project から、適当な名前で Java プロジェクトを作る。今回は wordcount としておく。
Finish ではなくて Next して、プロジェクトやライブラリの参照を設定しよう。Finish してしまってたら、プロジェクトを右クリックして Properties > Java Build Path で設定ダイアログを呼び出す。
- Projects タブで Add して、上で作った hadoop-0.20.2 を選ぶ
- Libraries タブで Add JARs して、hadoop-0.20.2/hadoop-0.20.2/lib/commons-cli-1.2.jar を選ぶ(GenericOptionsParser を使うのに必要)
さてこれで単語カウントのサンプルコードを入力すればいいのだが、せっかく Eclipse を使うのだから、driver と mapper と reducer のクラスをわけてあげることにしよう。単なるコピペにならないようにせめてワンポイント変えたんだなとか深読みは禁止。
// WordCountDriver.java import java.io.IOException; import java.util.Date; import java.util.Formatter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); GenericOptionsParser parser = new GenericOptionsParser(conf, args); args = parser.getRemainingArgs(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 実行の度に消さなくて済むよう、出力パスに日時を入れる Formatter formatter = new Formatter(); String outpath = "Out" + formatter.format("%1$tm%1$td%1$tH%1$tM%1$tS", new Date()); FileInputFormat.setInputPaths(job, new Path("In")); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); System.out.println(job.waitForCompletion(true)); } }
// WordCountMapper.java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); private final static IntWritable one = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } }
// WordCountReducer.java import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } }
もう一つ、$WORKSPACE/wordcount/bin/log4j.properties を作って、次の内容を書き込んでおこう。
log4j.rootLogger=INFO,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
サンプルの実行
テスト用データとして、$WORKSPACE/wordcount/In に適当な英文のテキストファイルを作っておく。
メニューの Run > Run (ctrl+F11) で、Hadoop のスタンドアローンモードで単語カウントが実行される。コンソールに次のように出れば成功。結果は Out**********/part-r-00000 とかに入っている。
11/02/18 19:52:39 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 11/02/18 19:52:39 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 11/02/18 19:52:39 INFO input.FileInputFormat: Total input paths to process : 1 11/02/18 19:52:39 INFO mapred.JobClient: Running job: job_local_0001 11/02/18 19:52:39 INFO input.FileInputFormat: Total input paths to process : 1 11/02/18 19:52:39 INFO mapred.MapTask: io.sort.mb = 100 11/02/18 19:52:39 INFO mapred.MapTask: data buffer = 79691776/99614720 11/02/18 19:52:39 INFO mapred.MapTask: record buffer = 262144/327680 11/02/18 19:52:40 INFO mapred.MapTask: Starting flush of map output 11/02/18 19:52:40 INFO mapred.MapTask: Finished spill 0 11/02/18 19:52:40 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 11/02/18 19:52:40 INFO mapred.LocalJobRunner: 11/02/18 19:52:40 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. 11/02/18 19:52:40 INFO mapred.LocalJobRunner: 11/02/18 19:52:40 INFO mapred.Merger: Merging 1 sorted segments 11/02/18 19:52:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 23563 bytes 11/02/18 19:52:40 INFO mapred.LocalJobRunner: 11/02/18 19:52:40 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 11/02/18 19:52:40 INFO mapred.LocalJobRunner: 11/02/18 19:52:40 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 11/02/18 19:52:40 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to Out0218195239 11/02/18 19:52:40 INFO mapred.LocalJobRunner: reduce > reduce 11/02/18 19:52:40 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 11/02/18 19:52:40 INFO mapred.JobClient: map 100% reduce 100% 11/02/18 19:52:40 INFO mapred.JobClient: Job complete: job_local_0001 11/02/18 19:52:40 INFO mapred.JobClient: Counters: 12 11/02/18 19:52:40 INFO mapred.JobClient: FileSystemCounters 11/02/18 19:52:40 INFO mapred.JobClient: FILE_BYTES_READ=73091 11/02/18 19:52:40 INFO mapred.JobClient: FILE_BYTES_WRITTEN=110186 11/02/18 19:52:40 INFO mapred.JobClient: Map-Reduce Framework 11/02/18 19:52:40 INFO mapred.JobClient: Reduce input groups=956 11/02/18 19:52:40 INFO mapred.JobClient: Combine output records=0 11/02/18 19:52:40 INFO mapred.JobClient: Map input records=94 11/02/18 19:52:40 INFO mapred.JobClient: Reduce shuffle bytes=0 11/02/18 19:52:40 INFO mapred.JobClient: Reduce output records=956 11/02/18 19:52:40 INFO mapred.JobClient: Spilled Records=4130 11/02/18 19:52:40 INFO mapred.JobClient: Map output bytes=19431 11/02/18 19:52:40 INFO mapred.JobClient: Combine input records=0 11/02/18 19:52:40 INFO mapred.JobClient: Map output records=2065 11/02/18 19:52:40 INFO mapred.JobClient: Reduce input records=2065 true
Run > Debug (F11) ならデバッグ実行できるので、ブレイクポイントとかだって有効。
これでちょっとした Map-Reduce を書いてみるのもだいぶ楽になるのでは?