しろかい!

アプリ開発や機械学習などの開発Tips.

HadoopStreaming で MapReduce を Python で動かす

HadoopStreaming を 使って PythonHadoop を動かす方法です.
Mapper と Reducer は通常 Java で記述しなければいけませんが,HadoopStreaming を使えば Python などの他の言語で書けるので実装がとても楽になります!

HadoopStreaming とは?

Mapper と Reducer のやり取りを標準入出力を介することで,他のプログラミング言語からも Hadoop (MapReduce) を利用できるようにする仕組みです.
標準入出力さえ扱えれば基本的にどんな言語でも構わないので,Python だけでなく Ruby, Perl, シェルスクリプト等を使って Mapper と Reducer を実装することができます.

詳しい説明は以下をどうぞ!
Hadoop Streamingメモ(Hishidama's Hadoop Streaming Memo)

HadoopStreaming の入手

HadoopStreaming を利用するには,「hadoop-streaming-***.jar」という jar ファイルが必要です(*** には Hadoop のバージョンが入る).

以下,入手方法です.

  1. Hadoop の DL ページにアクセス.
    http://ftp.jaist.ac.jp/pub/apache/hadoop/common/
  2. 使用しているバージョンの Hadoopディレクトリに入り,その中にある「hadoop-***.tar.gz」を DL する.
  3. DL したファイルを解凍.
  4. 解凍してできたディレクトリの以下に「hadoop-streaming-***.jar」がある.
    hadoop-***/share/hadoop/tools/lib/hadoop-streaming-***.jar

HadoopStreaming の使い方

Hadoop のサンプルとして定番の「Word Count」を例に説明します.
以下の記事を参考にさせて頂きました.

入力ファイルの用意

単語を数え上げるファイルを用意します.

$ echo "a a a b b c" > input.txt

別に自前で用意しても構いません.

Mapper と Reducer を実装

今回は Python で実装します.

mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

# 入力テキストを標準入力から受け取り1行ずつ処理
for line in sys.stdin:
    # テキストをスペースで区切って単語に分割
    for word in line.strip().split():
        # 各単語を「word        1」という形式で標準出力に出力
        # タブで区切られた左が key, 右が value に相当
        print '{0}\t1'.format(word)
reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import Counter

# 単語のカウントを行う Counter オブジェクト
counter = Counter()

# Mapper の出力を標準入力から受け取り1行ずつ処理
for line in sys.stdin:
    # key(=word), value(=count) に分割する
    word, count = line.strip().split('\t')
    # カウント
    counter[word] += int(count)

# カウント結果を1単語ずつ「word count」という形式で標準出力に出力
for word, count in counter.most_common():
    print '{0}\t{1}'.format(word, count)

collections.Counter モジュールは Python2.7 以降じゃないと動きません.2.6 以下の場合は参考記事のコードを使って下さい(reducer.py も同様).

実行

HDFS に入力ファイルを置く

先程作成した input.txt を HDFS に転送します.

$ hadoop fs -put input.txt
HadoopStreaming を実行

以下のコマンドで実行できます.
オプションの意味は 下記 を参照して下さい.

$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -file mapper.py reducer.py -input input.txt -output wc.out

あとは MapReduce が終了するのを待つだけです!

結果を確認

結果の確認方法は普段と同じです.

$ hadoop fs -cat 'wc.out/part-*'
a   3
b   2
c   1

その他

ローカルで HadoopStreaming のテストを行う

下記のようにシェルコマンドをパイプで繋ぐことで,ローカル環境で簡単に HadoopStreaming の動作テストができます.

$ cat input.txt | python mapper.py | sort | python reducer.py
a   3
b   2
c   1

このコマンドは以下のような仕組みで動作しています.

  1. cat で入力ファイルを標準出力に出力し,パイプで mapper.py の標準入力に渡す.
  2. mapper.py の出力を sort に渡す.(MapReduce の Shuffle に相当)
  3. sort の結果を reducer.py に渡し,結果を出力.

標準入出力を使ってデータをやり取りする HadoopStreaming の性質をうまく利用していますね!

HadoopStreaming のオプション

代表的なオプションを以下に記載しておきます.

  • -mapper [file]: Mapper の実行ファイルを指定
  • -reducer [file]: Reducer の実行ファイルを指定
  • -file [file1] [file2] ...: Mapper や Reducer で実行するファイルをリモートサーバに配布
  • -input [file or dir]: HDFS上の入力ファイルを指定
  • -output [dir]: 結果を出力するHDFS上のディレクトリを指定
  • -numReduceTasks [num]: 実行する Reducer の数を指定
  • -combiner [file]: Combiner の実行ファイルを指定
  • -cacheFile [file]#[symlink]: DistributedCache に渡すHDFS上のファイルを指定.また,それを Mapper や Reducer で読み込むために利用する symlink も指定します.

以下のコマンドですべてのオプションを参照できます.

$ hadoop jar hadoop-streaming-***.jar -info
HadoopStreaming で Combiner を使う

HadoopStreaming でも Combiner が使えます.
使い方は簡単で,HadoopStreaming 実行時に -combiner [file] オプションで Combiner の実行ファイルを指定するだけです.

例えば先程の Word Count は次のようにすれば Combiner が使えます.

$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -combiner reducer.py -file mapper.py reducer.py -input input.txt -output wc.out

今回は Reducer と Combiner は全く同じファイルなので問題ありませんが,異なる場合は -file オプションに Combier のファイルを含めるようにして下さい.

数GBぐらいの大きなテキストファイルを入力にすると,Combier を使うことによる実行時間の短縮が実感できるかと思います.

HadoopStreaming で DistributedCache を使う

DistributedCache ももちろん使えます.-cacheFile [file]#[symlink] というオプションを付けるだけです!

例として,特定のワードはカウントしない Word Count を実装します.
カウントしないワード (= ストップワード) のリストを DistributedCache を使って読み込みます.

まずはストップワードのファイルを作成し,HDFS に置きます.
今回は 'b' をストップワードに指定します.

$ echo b > stopwords.txt
$ hadoop fs -put stopwords.txt

次に mapper.py にストップワードを省く処理を書き足します.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

# ストップワードを読み込んでセットに格納
stopwords = set()
for line in open('stopwords.txt'):
    stopwords.add(line.rstrip())

# 入力テキストを標準入力から受け取り1行ずつ処理
for line in sys.stdin:
    # テキストをスペースで区切って単語に分割
    for word in line.strip().split():
        # ストップワードなら処理しない
        if word in stopwords:
            continue
        # 各単語を「word        1」という形式で標準出力に出力
        # タブで区切られた左が key, 右が value に相当
        print '{0}\t1'.format(word)

これらができたら実行します.-cacheFile 'stopwords.txt#stopwords.txt' を忘れずに付けるようにします.

$ hadoop jar hadoop-streaming-***.jar -mapper mapper.py -reducer reducer.py -file mapper.py reducer.py -input input.txt -output wc2.out -cacheFile 'stopwords.txt#stopwords.txt'

出力ディレクトリも先程と別の場所を指定するのを忘れないようにして下さい.エラー出ます…

実行が終わったら結果を見てみます.

$ hadoop fs -cat 'wc2.out/*'
a   3
c   1

確かに 'b' がカウントされていませんね!

まとめ

HadoopStreaming を 使って PythonHadoop を動かす方法を解説しました. Python に限らず,簡単に好きな言語から Hadoop を動かすことができ,自分の得意な言語が使えるのは便利ですね!
ただ1点注意として,Javaで実装した場合に比べて処理時間は少し長くなってしまうようです*1.けど実装にかかる時間を大幅に短縮できるので,実装から処理までトータルで考えれば時短になるとは思います笑
Javaだから…とHadoopを敬遠されていたも,この機会に是非チャレンジしてみてはいかがでしょうか.