読者です 読者をやめる 読者になる 読者になる

CUBE SUGAR CONTAINER

技術系のこと書きます。

Python から Hadoop Streaming を使ってみる

今回は、任意のプログラミング言語から Apache Hadoop を使うことのできる Hadoop Streaming という機能を使ってみる。

通常、Hadoop を使って MapReduce のジョブを直接扱うときは Java を使ってマッパーとリデューサーを書くことになる。 ただ、ご存知の通り Java のソースコードというのは重厚長大で、なかなか読み書きがしやすいとは言いにくい。 そこで、任意のプログラミング言語、具体的には標準入出力を処理する実行ファイルさえあれば使える機能ができた。 それが Hadoop Streaming というもの。 この機能を使うことで低レイヤーな MapReduce の処理を、使い慣れたプログラミング言語を使うなど好きなやり方で記述できる。

ちなみに、今回のエントリでは事前に Apache Hadoop がセットアップされていることを前提に書いていく。 具体的には、次のエントリのようにして疑似分散モードないし完全分散モードで構築されていること。

blog.amedama.jp

また、使用するプログラミング言語については使い慣れた Python を使うことにした。 そして、題材とする MapReduce のアプリケーションはワードカウントにする。 これは、MapReduce のハローワールド的な位置づけのアプリケーションになっている。 具体的な処理としては、テキストファイルの中に含まれる単語の数をカウントするというもの。

使った環境については前述したエントリにもあるけど、次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

Hadoop Streaming でワードカウントを実行する

まずはワードカウントの処理対象となるテキストファイルから用意しておこう。 各単語はシンプルにスペースで区切られている。 見たところHelloHadoop といった単語はたくさん含まれていそうだ。

$ cat << 'EOF' > /tmp/input1.txt
Hello Hadoop World!
This is the first Hadoop Streaming application!
EOF
$ cat << 'EOF' > /tmp/input2.txt
Hello Hello Hello
Hadoop Streaming application!
EOF

続いては上記のファイルを HDFS にコピーする。 ワーキングディレクトリについては、先のエントリでセットアップした Apache Hadoop のディレクトリ内にいることを想定している。

$ bin/hdfs dfs -put /tmp/input*.txt .

もし、次のようなエラーが出るときはユーザのホームディレクトリが HDFS 上に存在していないことを示している。

put: `.': No such file or directory: `hdfs://localhost:9000/user/vagrant'

そのときはホームディレクトリを用意しよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

次のようにファイルがコピーされていることを確認する。

$ bin/hdfs dfs -ls
Found 2 items
-rw-r--r--   1 vagrant supergroup         68 2017-05-20 01:09 input1.txt
-rw-r--r--   1 vagrant supergroup         48 2017-05-20 01:09 input2.txt

続いて、今回の本題である Hadoop Streaming 用のプログラムを用意する。 その前に Hadoop Streaming で使うプログラムの基礎知識について説明しておく。

まず、Hadoop Streaming では標準入出力を通して Apache Hadoop とユーザのプログラムがデータをやり取りする。 このとき、プログラムは標準入出力を通してやり取りする各行をキーバリューの含まれたものとして扱うことになる。 始めに入力から見ると、これは本当にただ行が標準入力から渡ってくるだけに過ぎない。 ユーザのプログラムは、それを適切にパースしてキーバリューに分解する。 そして、分解したキーバリューに対して何らかの処理を施した上で、最後は標準出力に書き出すことになる。 つまり、標準入出力さえ扱うことができれば、どんなプログラムであっても MapReduce の処理を記述できるということになる。 ただし、詳しくは後述するものの、その処理が並列分散実行される点については留意しておく必要がある。

さて、長くなってしまったけどそろそろ実例を見ていこう。 Hadoop Streaming ではマッパーとレデューサーを別々に用意する。 今回はワードカウントを題材にするので、そのプログラムを Python で書いてみた。

まずはマッパーから。 標準入力の内容をパースしてキーバリューにしたら、それを標準出力に書き出している。 キーとバリューのペアはタブを使って区切るのが望ましい。 なぜなら Hadoop Streaming がデフォルトでキーとバリューをタブ区切りとして扱うため。 もちろん、この区切り文字は変更できるんだけど、今回はデフォルトのままでいく。

$ cat << 'EOF' > /var/tmp/mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


def map_(line):
    # 空白などで単語ごとに行を区切る
    words = re.split(r'\s', line)
    # 単語と出現回数 (ここでは常に1) のタプルにする
    return [(key, 1) for key in words if key]


def write(kvs):
    for key, value in kvs:
        # キーとバリューをタブ区切りで標準出力に書き出す
        message = '{0}\t{1}'.format(key, value)
        print(message)


def main():
    # 標準入力から処理対象の文字列を読み込む
    for line in sys.stdin:
        # タブ区切りのキーとバリューに分解する
        key_and_values = map_(line)
        # 標準出力に書き込んでいく
        write(key_and_values)


if __name__ == '__main__':
    main()
EOF

続いてはリデューサー。 標準入力に含まれるキーバリューをパースしたら、その内容を元に単語の数をカウントしている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import defaultdict


# 集計結果を入れる辞書
results = defaultdict(int)


def reduce_(line):
    # 入力された行をタブで区切る
    key, value = line.split('\t')
    # キーごとに出現回数をカウントする
    results[key] += int(value)


def main():
    # キーの出現回数をカウントする
    for line in sys.stdin:
        reduce_(line)

    # 集計結果をキーとバリューのタブ区切りで標準出力に書き出す
    for key, value in results.items():
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記で作ったマッパーとリデューサーに実行権限をつけておく。 これは必須ではない。

$ chmod +x /var/tmp/mapper.py
$ chmod +x /var/tmp/reducer.py

これで準備が整った。 それでは早速 Hadoop Streaming を実行してみよう。 jar には Apache Hadoop に同梱されている Hadoop Streaming 用のものを指定する。 -files オプションで使用する実行ファイルを指定して、それを -mapper-reducer オプションで割り当てていく。 MapReduce ジョブの入出力については -input-output オプションで指定する。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output

無事にジョブが完了すると結果が output ディレクトリ以下に出力される。 内容を確認してみよう。

$ bin/hdfs dfs -cat output/*
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

どうやら、ちゃんとワードカウントが動作しているようだ。

マップだけする (リデュースしない) パターン

場合によっては MapReduce で、マップだけを使ってリデュースはしたくないということもあるはず。 これは、単純に Apache Hadoop クラスタを並列分散実行するためだけに使うパターン。 このときは -D mapred.reduce.tasks=0 というようにリデュースの並列度を 0 に指定する。

実際に試してみよう。 このパターンではリデューサーの指定が必要ない。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=0 \
    -files /var/tmp/mapper.py \
    -mapper mapper.py \
    -input input*.txt \
    -output output2

結果は次の通り。 マップ処理を通してキーバリュー形式になるだけで終わった。

$ bin/hdfs dfs -cat output2/*
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

Hadoop Streaming を扱う上での注意点

Hadoop Streaming のプログラムを書く上での注意点として一つ気づいたことがある。 それは、マップ処理は並列分散処理されるのは当たり前としても、リデュース処理も並列分散処理されうるということ。 なので、リデュース処理の中で一つのノードで完結していることを想定したような処理は含めてはいけないはず。

例えば、次のリデューサーは最終的な出力結果が降順ソートされることを期待した処理が含まれている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""リデューサーが並列実行されると上手く動かないので、こんな風には書かないこと"""

import sys
from collections import defaultdict


results = defaultdict(int)


def reduce_(line):
    key, value = line.split('\t')
    results[key] += int(value)


def main():
    for line in sys.stdin:
        reduce_(line)

    # キーの出現回数ごとに降順ソートする
    items = results.items()
    sorted_items = sorted(items, key=lambda x: x[1], reverse=True)

    for key, value in sorted_items:
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記を実行してみよう。 まずはリデュース処理の並列度が 1 の場合から。 つまり、一つのノード上で全てのリデュース処理が実行される。

$ chmod +x /var/tmp/reducer.py
$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=1 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output3

この場合は結果がちゃんとソートされて出る。

$ bin/hdfs dfs -cat output3/*
Hello   4
Hadoop  3
application!    2
Streaming   2
This    1
is  1
World!  1
the 1
first   1

しかし、並列度を上げてみるとどうだろうか? 例えば、次のようにすると並列度が 10 になる。 つまり、マッパーが処理したデータが 10 のノードに散らばって実行されることになる。 ただし、今回に関しては疑似分散モードなので一つのノード上でリデュースのタスクが 10 できるだけ。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=10 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4

同じことは -numReduceTasks オプションでも指定できる。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4 \
    -numReduceTasks 10

実行結果を確認してみよう。 今度は見事に出力結果がソートされずバラバラになっている。 これはリデュース処理が並列実行されたことで起こった。 各々のリデュース単位では降順ソートされたものの、全体ではそれを連結したものになるので意味をなさない。

$ bin/hdfs dfs -cat output4/*
Hello   4
is  1
the 1
first   1
Hadoop  3
application!    2
World!  1
Streaming   2
This    1

上記を見て次に気になるのは、どうやってリデュース処理を各タスクに分解して割り振っているか、ということだろう。 リデュース処理に関しては、同じキーは必ず同じノード上に集めて実行しなきゃいけない。 つまり標準入出力に流れる行の中で、どの部分がキーなのかを Apache Hadoop は認識する必要があるということ。 これは前述した区切り文字と関係していて、デフォルトでは区切り文字がタブになっている。 そのため、今回は特に何も指定しなくても Apache Hadoop は標準入出力の中からキーを認識できた。 もし、これが異なる場合には Hadoop Streaming を実行する際に、それをオプションで伝えてやる必要がある。

Hadoop Streaming アプリケーションの開発について

ところで、Hadoop Streaming のアプリケーションはどうやって開発すれば良いだろうか。 上記のように Hadoop Streaming 経由で毎回起動するのは、正直めんどくさい。 そんなときは HDFS を使わずにローカルディスクを、Hadoop Streaming の代わりにパイプを使ってデバッグするやり方がある。

結局のところ Hadoop Streaming がやっている標準入出力同士をつなぎ合わせるという処理は Unix のパイプと同じだ。 HDFS と Hadoop クラスタで、それが並列分散実行されるという点を除けば、だけど。

そのため、並列度が 1 の挙動については次のようにして確かめることができる。

$ cat /tmp/input*.txt | /var/tmp/mapper.py  | /var/tmp/reducer.py
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

リデュースしたくないときはパイプをつながなければいいだけ。

$ cat /tmp/input*.txt | /var/tmp/mapper.py
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

上記のようにして、まずは手元で動作確認した上で Hadoop Streaming にかければ良い。 この段階で動作しないアプリケーションは Hadoop Streaming に持っていっても絶対に動作はしないだろう。

まとめ

今回は Apache Hadoop を任意のプログラミング言語から扱うことのできる Hadoop Streaming という機能を試してみた。 この機能を使えば Java を使いたくないという人でも Apache Hadoop の低レイヤーな MapReduce 処理を記述できる。

参考文献

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築