CUBE SUGAR CONTAINER

技術系のこと書きます。

Ubuntu 16.04 LTS のランレベルを変更して CUI で動かす

Ubuntu はデスクトップ環境が入ると自動的にランレベルが変更されて X Window System が立ち上がるようになる。 ただ、場合によっては依存パッケージの関係で意図せずそうなってしまうこともあるので元に戻すやり方について。

環境は次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-78-generic

このとき systemctl get-default コマンドを実行すると graphical.target になっているはず。 これがランレベルの 5 に対応している。

$ systemctl get-default
graphical.target

これをランレベル 3 のマルチユーザモードに直すには multi-user.target にしなきゃいけない。 systemctl set-default コマンドで変更しよう。

$ sudo systemctl set-default  multi-user.target
Created symlink from /etc/systemd/system/default.target to /lib/systemd/system/multi-user.target.

デフォルトのランレベルが変更されたことを確認する。

$ sudo systemctl get-default
multi-user.target

あとは再起動して X Window System が立ち上がってこないことを確認するだけ。

$ sudo shutdown -r now

めでたしめでたし。

Python: Keras/TensorFlow で GPU のメモリを必要な分だけ確保する

Keras のバックエンドに TensorFlow を使う場合、デフォルトでは一つのプロセスが GPU のメモリを全て使ってしまう。 今回は、その挙動を変更して使う分だけ確保させるように改めるやり方を書く。

環境には次のようにしてセットアップした Ubuntu 16.04 LTS を使っている。 blog.amedama.jp

サンプルとして動作させるアプリケーションには Keras が提供している MNIST データセットを CNN で認識するものを使う。 まずはこれをダウンロードしておこう。 同時に、セッションをクリアするパッチも追加しておく。

$ wget https://raw.githubusercontent.com/fchollet/keras/master/examples/mnist_cnn.py
$ echo 'K.clear_session()' >> mnist_cnn.py

上記を実行すると GPU を使ったニューラルネットワークの学習が始まる。

$ python mnist_cnn.py

学習している最中に、別のターミナルから nvidia-smi コマンドを実行してみよう。 すると、ビデオカードに載っているメモリのほとんど全てを上記のプロセスが使っていることが分かる。

$ nvidia-smi
Wed Jun  7 21:28:52 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.66                 Driver Version: 375.66                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 105...  Off  | 0000:01:00.0     Off |                  N/A |
| 49%   64C    P0    63W /  75W |   3863MiB /  4038MiB |     87%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID  Type  Process name                               Usage      |
|=============================================================================|
|    0      1874    C   python                                        3861MiB |
+-----------------------------------------------------------------------------+

全部で 4GB しかないメモリのうち 3.8GB を一つのプロセスが使っている。

この状況で、別のターミナルからもう一つ Keras のプロセスを動かしてみよう。 当たり前だけど、これは残りのメモリが少なすぎて実行に失敗する。

$ python mnist_cnn.py
...(snip)...
2017-06-07 21:46:15.514867: E tensorflow/stream_executor/cuda/cuda_driver.cc:893] failed to allocate 134.44M (140967936 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY
2017-06-07 21:46:15.835973: E tensorflow/stream_executor/cuda/cuda_dnn.cc:359] could not create cudnn handle: CUDNN_STATUS_INTERNAL_ERROR
2017-06-07 21:46:15.836015: E tensorflow/stream_executor/cuda/cuda_dnn.cc:326] could not destroy cudnn handle: CUDNN_STATUS_BAD_PARAM
2017-06-07 21:46:15.836032: F tensorflow/core/kernels/conv_ops.cc:659] Check failed: stream->parent()->GetConvolveAlgorithms(&algorithms) 
Aborted (core dumped)

これでは一つのマシンで同時に学習させられるモデルが一つだけになってしまう。

この挙動を変更するには Keras が使う TensorFlow のセッションの設定を変更する必要がある。 TensorFlow には GPU のオプションとして allow_growth というものがあり、これを有効にすると必要な分だけ確保するようになる。 あとは、そう設定した TensorFlow のセッションを Keras で使うようにできれば上手くいく。 これには keras.backend.tensorflow_backend モジュールにある set_session() という関数を使う。

その部分だけをスニペットにすると、こんな感じ。

import tensorflow as tf
from keras.backend import tensorflow_backend

config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
session = tf.Session(config=config)
tensorflow_backend.set_session(session)

試しに、さっきの MNIST サンプルのコードに上記を組み込んで動作を確認してみよう。

$ cat << 'EOF' > /tmp/keras-mnist.patch
import tensorflow as tf
from keras.backend import tensorflow_backend

config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
session = tf.Session(config=config)
tensorflow_backend.set_session(session)
EOF
$ sed -i -e '
/^from keras import backend as K$/r /tmp/keras-mnist.patch
' mnist_cnn.py

上記を実行すると、こんな感じで挿入される。 もちろん自分でエディタを使って編集しても構わない。

$ head -n 22 mnist_cnn.py | tail -n 10
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import tensorflow as tf
from keras.backend import tensorflow_backend

config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
session = tf.Session(config=config)
tensorflow_backend.set_session(session)

batch_size = 128

再度実行してみよう。

$ python mnist_cnn.py

そして別のターミナルから nvidia-smi コマンドを叩いてメモリの消費量を確認する。

$ nvidia-smi
Wed Jun  7 21:32:04 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.66                 Driver Version: 375.66                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 105...  Off  | 0000:01:00.0     Off |                  N/A |
| 44%   59C    P0    63W /  75W |    425MiB /  4038MiB |     87%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID  Type  Process name                               Usage      |
|=============================================================================|
|    0      1914    C   python                                         423MiB |
+-----------------------------------------------------------------------------+

さっきは 3.8GB も使っていたけど、今度は 423MB しか使っていない!

これなら複数のモデルを同時に学習させることができる。

$ nvidia-smi 
Wed Jun  7 21:58:02 2017       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.66                 Driver Version: 375.66                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 105...  Off  | 0000:01:00.0     Off |                  N/A |
| 41%   55C    P0    66W /  75W |    848MiB /  4038MiB |     98%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID  Type  Process name                               Usage      |
|=============================================================================|
|    0      2315    C   python                                         423MiB |
|    0      2348    C   python                                         423MiB |
+-----------------------------------------------------------------------------+

めでたしめでたし。

ところで、逐次的にメモリを確保するとなるとパフォーマンスに影響がないかが気になる。 そこで、編集前と編集後で学習にかかる時間に変化があるかについても調べてみた。

まずは編集前から。

$ time python mnist_cnn.py
...(snip)...
real    2m4.539s
user    1m59.228s
sys 0m11.508s

2 分 4 秒で終わっている。

次は編集後を。

$ time python mnist_cnn.py
...(snip)...
real    2m4.666s
user    1m59.800s
sys 0m11.480s

こちらも 2 分 4 秒で終わった。 どうやらパフォーマンスに大きな影響は無さそうだ。

まとめ

今回は Keras のバックエンドを TensorFlow で動かすときに必要な分だけメモリを確保するやり方について書いた。

CentOS7 で Apache Hadoop の完全分散モードを使ってみる

以前、このブログでは OSS 版の Apache Hadoop を疑似分散モードでセットアップする方法を試した。 疑似分散モードというのは、一つのホスト上に必要なデーモンを全て立ち上げる方法を指す。 このモードを使うと HDFS が使えるような、なるべく本番に近い環境が手軽に作れる。

blog.amedama.jp

ただ、疑似分散モードでは本当にちゃんと動作するのかが確認しづらい箇所もある。 それは、主にホストを分割してネットワーク越しにやり取りをする部分で、例えばファイアウォールの設定など。

そこで、今回は Apache Hadoop を完全分散モードでセットアップしてみることにした。 完全分散モードというのは本番運用されるのと同じ環境で、それぞれのデーモンを異なるホストで動かすやり方。 完全分散モードのセットアップ方法については次のドキュメントを参照する。

Apache Hadoop 2.7.3 –

構成について

今回は、なるべく少ないホストで作りたいので三台のホストを使ってセットアップする。 マスターノード一台とスレーブノード二台という構成になる。 それぞれのホストには、次のように IP アドレスを振った。

  • マスターノード
    • 192.168.33.10
  • スレーブノード1
    • 192.168.33.11
  • スレーブノード2
    • 192.168.33.12

ここからは、それぞれのホストでどういった機能が動作するかを説明する。 まずは、Apache Hadoop で動作させるサービス (デーモン) について説明しておく。

  • HDFS 関連

    • NameNode
      • どのノードに目当てのファイルがあるかといったメタデータを管理するサービス
    • DataNode
      • 細かく分割したファイルを実際に保持して必要に応じて操作できるようにするサービス
    • Secondary NameNode
      • メタデータなどの情報をメモリからディスクに永続化するためのサービス
  • YARN 関連

    • ResourceManager
      • 今どのノードで、どういったタスクが実行されているか管理するサービス
    • NodeManager
      • タスクを実行するためのサービス
  • MRv2 関連

    • MapReduce Job History Server
      • MapReduce ジョブの履歴を管理するサービス

上記のサービスを、それぞれ次のように割り当てる。

  • マスターノード

    • NameNode
    • ResourceManager
    • Secondary NameNode
    • MapReduce Job History Server
  • スレーブノード

    • DataNode
    • NodeManager

使った環境については次の通り。 疑似分散モードのときと同じように CentOS7 上に OSS 版の Apache Hadoop をセットアップする。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.21.1.el7.x86_64

全ホスト共通

まずは全ホスト共通の作業から実施する。

hostname

ホスト名を設定しておく。 これはやらなくても大丈夫だけど、操作対象のホストを間違えないようにやっておいた方が良いかも。 それぞれのホストで実施する。

マスターノードには “master” というホスト名をつけておく。

master $ sudo hostname master
master $ echo "master" | sudo tee /etc/hostname > /dev/null

一つ目のスレーブノードには “node1” というホスト名をつけておく。

node1 $ sudo hostname node1
node1 $ echo "node1" | sudo tee /etc/hostname > /dev/null

二つ目のスレーブノードには “node2” というホスト名をつけておく。

node2 $ sudo hostname node2
node2 $ echo "node2" | sudo tee /etc/hostname > /dev/null

/etc/hosts

次に /etc/hosts ファイルを編集する。 これは IP アドレスの代わりにホスト名を使って設定ファイルを書きたいので。

$ cat << 'EOF' | sudo tee -a /etc/hosts > /dev/null
192.168.33.10 master
192.168.33.11 node1
192.168.33.12 node2
EOF

ちゃんと設定ファイルが書き換わったことを確認しておく。

$ tail -n 3 /etc/hosts
192.168.33.10 master
192.168.33.11 node1
192.168.33.12 node2

依存パッケージ

次に必要なパッケージをインストールする。 epel-releasesshpass はスレーブノードには必要ないんだけど、まあ入れておいても問題はない。

$ sudo yum -y install epel-release
$ sudo yum -y install openssh-clients rsync wget java-1.8.0-openjdk-devel sshpass

Hadoop

次に Apache Hadoop をダウンロードする。

$ wget http://ftp.riken.jp/net/apache/hadoop/common/hadoop-2.8.0/hadoop-2.8.0.tar.gz
$ tar xf hadoop-2.8.0.tar.gz

シェルの設定ファイルに、いくつかの環境変数を指定しておこう。

$ cat << 'EOF' >> ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
export HADOOP_HOME=~/hadoop-2.8.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH
EOF
$ source ~/.bashrc

これで hadoop コマンドが動作することを確認しておく。

$ hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

マスターノード

ここからはマスターノードで作業をする。

SSH

Apache Hadoop のセットアップでは、マスターノードから各ホストに SSH でパスフレーズを使わずにログインできる必要がある。 そこで、まずはパスフレーズを空にした公開鍵ペアを用意しておく。

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

作成した公開鍵を各ホストに設置する。 これでマスターノードから各ホストにログインできるようになる。

$ for node in master node1 node2; do sshpass -p "vagrant" ssh-copy-id -i ~/.ssh/id_rsa.pub -o "StrictHostKeyChecking no" $node; done;

設定ファイルを編集する

ここからは Apache Hadoop の動作に必要な設定ファイルを編集していく。

etc/hadoop/slaves

まずはスレーブノードがどのホストなのかを示す設定ファイルを編集する。

$ cat << 'EOF' > $HADOOP_HOME/etc/hadoop/slaves
node1
node2
EOF

今回使うスレーブノードは node1node2 の二台になる。

$ cat $HADOOP_HOME/etc/hadoop/slaves
node1
node2
etc/hadoop/core-site.xml

次は HDFS の設定ファイルを編集する。 HDFS の接続エンドポイントがマスターノードであることを指定しておく。

$ cat << 'EOF' > /tmp/core-site.xml.property
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://192.168.33.10:9000</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/core-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/core-site.xml

こんな感じになる。

$ tail -n 6 $HADOOP_HOME/etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://192.168.33.10:9000</value>
  </property>
</configuration>
etc/hadoop/hdfs-site.xml

同様に HDFS のレプリケーション数 (ファイルのコピーをいくつ作るか) やセカンダリネームノードのエンドポイントを指定していく。

$ cat << 'EOF' > /tmp/hdfs-site.xml.property
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>192.168.33.10:50090</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/hdfs-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/hdfs-site.xml

こんな感じになる。

$ tail -n 6 $HADOOP_HOME/etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
</configuration>
etc/hadoop/mapred-site.xml

次は分散処理フレームワークの YARN を設定していく。

分散処理フレームワークとして YARN を使うことを指定する。

$ cp $HADOOP_HOME/etc/hadoop/mapred-site.xml{.template,}
$ cat << 'EOF' > /tmp/mapred-site.xml.property
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/mapred-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/mapred-site.xml

こんな感じになる。

$ tail -n 6 $HADOOP_HOME/etc/hadoop/mapred-site.xml
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>
etc/hadoop/yarn-site.xml

同様に YARN が動作するための設定を行う。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>master</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/yarn-site.xml

こんあ感じになる。

$ tail -n 11 $HADOOP_HOME/etc/hadoop/yarn-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>master</value>
  </property>
<!-- Site specific YARN configuration properties -->
</configuration>
設定ファイルをスレーブノードにコピーする

マスターノードで作った設定ファイルをスレーブノードにも設置する。 ここは SCP を使って一気にバラまくことにした。

$ for node in node1 node2; do scp $HADOOP_HOME/etc/hadoop/* $node:$HADOOP_HOME/etc/hadoop/; done;

クラスタを構築する

ここまでで必要な設定ファイルは用意できた。 あとはクラスタを構築していくだけ。 以下の操作もマスターノードのみで実施する。

まずは HDFS をフォーマットする。

$ $HADOOP_HOME/bin/hdfs namenode -format

続いて HDFS 関連のデーモンを起動する。

$ $HADOOP_HOME/sbin/start-dfs.sh

マスターノードでは、次のようにネームノードとセカンダリネームノードのサービスが起動する。

$ jps
9062 Jps
8951 SecondaryNameNode
8473 NameNode

続いて YARN 関連のデーモンを起動する。

$ $HADOOP_HOME/sbin/start-yarn.sh

マスターノードでは、次のようにリソースマネージャのサービスが起動する。

$ jps
9648 Jps
8951 SecondaryNameNode
8473 NameNode
9165 ResourceManager

続いて MapReduce のジョブ履歴を管理するサービスを起動する。

$ $HADOOP_HOME/sbin/mr-jobhistory-daemon.sh --config $HADOOP_CONF_DIR start historyserver

マスターノードでは、次のようにジョブヒストリサーバが起動する。

$ jps
9648 Jps
8951 SecondaryNameNode
8473 NameNode
9610 JobHistoryServer
9165 ResourceManager

ここまで実行するとスレーブノードでもネームノードとデータノードのサービスが立ち上がっている。

$ jps
4560 NodeManager
5090 Jps
4456 DataNode

使ってみよう

これで Apache Hadoop のクラスタを完全分散モードで組むことができた。 実際に軽く使ってみることにしよう。

まずはサンプルに入っているプログラムを使って円周率を計算させてみる。

$ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar pi 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
...(snip)...
Job Finished in 52.727 seconds
Estimated value of Pi is 3.14120000000000000000

上手く実行できた。

次は、たくさんのファイルの中から特定の文言が何回出現するかを調べる grep も動かしてみよう。 先ほどの円周率は単なる数値計算だったけど、こちらのプログラムは HDFS との連携も必要になる。

まずは検索対象のファイルを置くために HDFS のホームディレクトリを作っておく。

$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p .
$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user
drwxr-xr-x   - vagrant supergroup          0 2017-06-05 19:58 /user/vagrant

grep の検索対象は Apache Hadoop の設定ファイル群を使ってみよう。 設定ファイルの入ったディレクトリを input という名前で HDFS にコピーする。

$ $HADOOP_HOME/bin/hdfs dfs -put $HADOOP_HOME/etc/hadoop input

こんな感じでコピーされるはず。

$ $HADOOP_HOME/bin/hdfs dfs -ls input
Found 30 items
-rw-r--r--   2 vagrant supergroup       4942 2017-06-05 19:59 input/capacity-scheduler.xml
-rw-r--r--   2 vagrant supergroup       1335 2017-06-05 19:59 input/configuration.xsl
-rw-r--r--   2 vagrant supergroup        318 2017-06-05 19:59 input/container-executor.cfg
...(snip)...
-rw-r--r--   2 vagrant supergroup       2250 2017-06-05 19:59 input/yarn-env.cmd
-rw-r--r--   2 vagrant supergroup       4567 2017-06-05 19:59 input/yarn-env.sh
-rw-r--r--   2 vagrant supergroup        897 2017-06-05 19:59 input/yarn-site.xml

ちなみに、ファイルの実体はデータノードのサービスが動いているスレーブノードに分散して設置されている。

/tmp/hadoop-$(whoami)/dfs/data/current

サンプルにある grep のプログラムを動かしてみよう。 設定ファイルの中から dfs で始まる言葉を探している。

$ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'
17/06/05 20:00:35 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.33.10:8032
17/06/05 20:00:36 INFO input.FileInputFormat: Total input files to process : 30
17/06/05 20:00:36 INFO mapreduce.JobSubmitter: number of splits:30
17/06/05 20:00:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1496645818704_0002
17/06/05 20:00:37 INFO impl.YarnClientImpl: Submitted application application_1496645818704_0002
17/06/05 20:00:37 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1496645818704_0002/
17/06/05 20:00:37 INFO mapreduce.Job: Running job: job_1496645818704_0002
17/06/05 20:00:46 INFO mapreduce.Job: Job job_1496645818704_0002 running in uber mode : false
17/06/05 20:00:46 INFO mapreduce.Job:  map 0% reduce 0%
17/06/05 20:01:14 INFO mapreduce.Job:  map 13% reduce 0%
17/06/05 20:01:15 INFO mapreduce.Job:  map 20% reduce 0%
...(snip)

実行が終わったら HDFS の output ディレクトリに結果が出力される。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
6  dfs.audit.logger
4  dfs.class
3  dfs.logger
3  dfs.server.namenode.
2  dfs.audit.log.maxbackupindex
2  dfs.period
2  dfs.audit.log.maxfilesize
1  dfs.log
1  dfs.file
1  dfs.servers
1  dfsadmin
1  dfsmetrics.log
1  dfs.replication

ばっちりだね。

Vagrantfile

上記を手作業で毎回やるのは大変なので Vagrantfile を書いて自動化してみた。

Vagrantfile for Hadoop Cluster with CentOS 7 and Hadoop 2.8.0 (3 hosts) · GitHub

Vagrant と Git をインストールした Unix 系のシステムなら、次のようにして自動でクラスタを構築できる。 デフォルトで、それぞれのホストに 2GB のメモリを割り当てるので最低でも 8GB できれば 16GB 以上のメモリを積んだマシンで実行した方が良い。

$ git clone https://gist.github.com/0ff814ce4c3aa659723c6b5b0fc85557.git hadoop-cluster
$ cd hadoop-cluster
$ vagrant provision node1 node2 master

あとはそれぞれのホストに入って色々使ってみるだけ。

$ vagrant ssh master

いじょう。

まとめ

今回は Apache Hadoop のクラスタを完全分散モードで構築してみた。 完全分散モードであればマスターとスレーブがネットワークで分かれているので、よりプロダクション環境に近い状況で検証ができる。

Apache Hadoop については次の本がとても分かりやすい。

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

SSH/SCP のログイン自動化に sshpass が便利すぎた

手元で検証環境の構築なんかをするときは、何らかの形で自動化したくなる。 そんなとき、よく障壁となるのが SSH/SCP でパスワードの入力を求められるところだった。 例えば、複数のホストをまたいで操作したいときや、ソフトウェアが要件として公開鍵の設置を求めてくるときに必要となる。 そういった場面で SSH/SCP でログインするためのパスワード入力を自動化するところが、なかなか面倒くさい。 今回は、そんな折に sshpass の存在を知って使ってみたところ便利だった、という話。

操作の題材としてはローカルホストに SSH でログインすることを考えてみよう。 尚、あくまでこれはセキュアな環境で検証用の構築などを自動化するために使うことを想定している。 使い方を誤ればセキュリティ上のリスクとなるので注意してほしい。

使った環境は次の通り。

$ cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.21.1.el7.x86_64

SSH/SCP のログインを自動化する上での問題点

まず、当たり前だけど SSH デーモンでパスワードログインを有効にしている場合には、こんな感じでプロンプトが出る。

$ ssh localhost
vagrant@localhost's password:

あるいは、フィンガープリントが known_hosts ファイルに載っていない状態では、次のように確認される。

$ ssh localhost
The authenticity of host 'localhost (::1)' can't be established.
ECDSA key fingerprint is 2a:dd:72:1a:f5:01:be:03:cc:d2:3c:33:11:3b:77:f0.
Are you sure you want to continue connecting (yes/no)?

上記のような状況で、ログインを自動化するにはどうしたら良いか?というのが今回のポイント。 ひとまず、フィンガープリントの検証については -o オプションを使ってスキップできる。

$ ssh -o "StrictHostKeyChecking no" localhost
Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
vagrant@localhost's password: 

しかし肝心のログインパスワードの入力はというと…標準入力なんかで入れば楽ちんなんだけど、そうは上手くいかない。

$ echo "vagrant" | ssh -o "StrictHostKeyChecking no" localhost
Pseudo-terminal will not be allocated because stdin is not a terminal.
vagrant@localhost's password: 

ターミナル上での対話の自動化といえば古典的には expect がよく使われるけど、これもまた面倒くさい。 やりたいことに対してオーバースペック感がある。

sshpass を使ったログインパスワード入力の自動化

そんなとき使うと便利なのが sshpass だった。 CentOS7 なら EPEL にパッケージがあるのでインストールする。

$ sudo yum -y install epel-release
$ sudo yum -y install sshpass

あとは sshpass コマンドに -p オプションでパスワードを指定しつつ ssh コマンドを実行するだけ。

$ sshpass -p "vagrant" ssh -o "StrictHostKeyChecking no" localhost
Last login: Sat Jun  3 03:05:17 2017 from 10.0.2.2
[vagrant@localhost ~]$ 

あっさりログインできた。

もちろん SCP だってこの通り。

$ echo 'Hello, World!' > greeting.txt
$ sshpass -p "vagrant" scp -o "StrictHostKeyChecking no" greeting.txt localhost:/tmp
$ cat /tmp/greeting.txt 
Hello, World!

ばっちりだね。

ssh-copy-id のパスワード入力も自動化できる

ちなみに sshpass は公開鍵の設置の自動化にも役に立つ。 公開鍵を設置には ssh-copy-id を使うのが定石なんだけど、これのパスワード入力も代わりにやってくれる。 これも試してみることにしよう。

まずは公開鍵ペアを用意する。 ここでのポイントは -P オプションでパスフレーズを指定したり -f オプションで鍵の場所を指定すること。 こうすればインタラクティブモードにならないからワンライナーで作れる。

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

あとは、この作った公開鍵をどうやって設置するか、というのが問題になる。 ssh-copy-id を使えばパーミッションがどうとかいうのを考えずに公開鍵が設置できるのでめっちゃ楽できる。 とはいえ設置するのには SSH でのログインが必要なのでパスワードを聞かれる。

$ ssh-copy-id -i ~/.ssh/id_rsa.pub -o "StrictHostKeyChecking no" localhost
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
vagrant@localhost's password:

なんと、ここの部分も sshpass で自動化できる!やったー! さっきと同じように sshpass コマンド経由で ssh-copy-id コマンドを呼び出すだけ。

$ sshpass -p "vagrant" ssh-copy-id -i ~/.ssh/id_rsa.pub -o "StrictHostKeyChecking no" localhost
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh -o 'StrictHostKeyChecking no' 'localhost'"
and check to make sure that only the key(s) you wanted were added.

これでもういくらでも SSH/SCP でのログインが自動化できるね。

$ ssh localhost
Last login: Sat Jun  3 03:15:14 2017 from ::1
[vagrant@localhost ~]$

実際に Vagrant で自動化してみる

次は実際に Vagrant を使ったユースケースを試してみることにする。

まずは Shell Provisioner で provision.sh を実行するようにした Vagrantfile を用意する。 ポイントは privileged: false を指定しているところ。 こうすることで、プロビジョニングの実行ユーザが root ではなく一般ユーザの vagrant になる。 これをやらないとファイルの権限とかパスの指定を後から付け替えることになって色々とめんどくさいことになる。

$ cat << 'EOF' > Vagrantfile
# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "bento/centos-7.3"
  config.vm.provision "shell", privileged: false do |shell|
    shell.path = "provision.sh"
  end
end
EOF

ちなみに、パスワードログインを SSHD の設定ファイルで無効にしてある Vagrant Box もたまにあるので注意しよう。

続いて上記の Vagrantfile で実行する provision.sh を用意する。 この中では localhostssh-copy-id で公開鍵を設置するところまで自動化している。

$ cat << 'EOF' > provision.sh
#!/bin/sh

set -x
set -e

: "Install sshpass(1)" && {
  sudo yum -y install epel-release
  sudo yum -y install sshpass
}

: "Generate SSH keypair" && {
  ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
}

: "Install public key" && {
  sshpass -p "vagrant" ssh-copy-id -i ~/.ssh/id_rsa.pub -o "StrictHostKeyChecking no" localhost
}

EOF

上記の設定ファイルを元にして仮想マシンを立ち上げよう。

$ vagrant up
Bringing machine 'default' up with 'virtualbox' provider...
==> default: Importing base box 'bento/centos-7.3'...
==> default: Matching MAC address for NAT networking...
...(snip)...
==> default: 
==> default: Number of key(s) added: 1
==> default: 
==> default: Now try logging into the machine, with:   "ssh -o 'StrictHostKeyChecking no' 'vagrant@localhost'"
==> default: and check to make sure that only the key(s) you wanted were added.

仮想マシンが完成したらログインする。

$ vagrant ssh

すると、この状態で既に localhost にパスフレーズなしでログインできるようになっている!

$ ssh localhost
Last login: Sat Jun  3 04:10:29 2017 from 10.0.2.2
[vagrant@localhost ~]$

めでたしめでたし。

まとめ

  • sshpass を使うと SSH/SCP のログインパスワードの入力を自動化できる
  • ssh-copy-id と組み合わせて使うことで公開鍵の設置も自動化できる
  • 注意点としては、あくまでセキュアな環境で検証用にのみ使うこと

CentOS7 で Apache HBase を使ってみる

今回は分散データベースの一つである Apache HBase を使ってみる。 これは、いわゆる NoSQL と呼ばれるものの一つ。 Hadoop ファミリーの一員だけど MapReduce などは使わず直接 HDFS を触るアーキテクチャになっている。

HBase は、分散データベースの性質を紹介するときによく使われる CAP 定理でいうと C (一貫性) と P (分断耐性) を重視している。 このことから、何かの拍子に古いデータが見えたりするようなことがなく、ネットワーク障害が起きてもサービス提供は継続できる。 ただし、(冗長化できるとはいえ) マスターサーバがあることから、そこに障害が起きるとサービス提供ができなくなってしまう。

また、一口に NoSQL といってもどのようなデータ構造を持つかはソフトウェアによって全く異なる。 (これは NoSQL がリレーショナル・データベース以外のデータベースをおおまかに分類するための語なので当然といえる) HBase においては論理的にリレーショナル・データベースのようなテーブル構造を持つものの、その表現方法がなかなか面白い。 通常、リレーショナル・データベースであればデータはレコード (行) 単位で管理している。 それが HBase では、さらに細かくデータをセル (リレーショナル・データベースでいえばレコードの中の一つのカラム) 単位で持っている。 どのように実現しているかといえば、セルごとにキー・バリューのペアを一つ持っているイメージだ。 リレーショナル・データベースでいうところの主キーといえる行キーに対して、セルを表現するキー・バリューのペアがたくさん紐づく。 ここらへんは、次のスライドが分かりやすかった。

www.slideshare.net

今回は、そんな HBase を CentOS7 で使ってみることにする。 試すのは疑似分散モードにしたので、動作環境として前述した HDFS が必要になる。 なので、例えば以前このブログで書いたエントリなどを参考にして HDFS が使えるようになっていることが前提になる。 次のエントリでは Hadoop を擬似分散モードで立ち上げていて、そのとき HDFS も一緒に使えるようになる。

blog.amedama.jp

インストール手順書については、次の公式ドキュメントを参考にした。 ちなみに、一つ上にあるローカルディスクを使う手順なら HDFS をセットアップする必要がない。 ただ、手元で検証するのになるべく本番運用に近い形を、となると HDFS を用意して疑似分散モードで使った方が良さそう。

Apache HBase ™ Reference Guide

使った環境は次の通り。

$ cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.el7.x86_64

インストールする

まずは Apache のミラーサイトから HBase のバイナリをダウンロードする。 現在 (2017/5/25) の安定版リリースは 1.2.5 らしい。

www.apache.org

ダウンロードしたら解凍する。

$ wget http://ftp.riken.jp/net/apache/hbase/1.2.5/hbase-1.2.5-bin.tar.gz
$ tar xf hbase-1.2.5-bin.tar.gz

ここからは HBase のディレクトリ内で作業する。

$ cd hbase-1.2.5

まずは HBase の設定ファイルを編集する。 一番上の hbase.rootdir では、作業ディレクトリとして使う場所を HDFS 上のパスに指定している。 真ん中の hbase.cluster.distributed では動作モードを分散モードに設定している。 今回使うのは一つのホストだけど分散モードに設定することで擬似分散モードとして使えるようになる。 そして、最後の hbase.zookeeper.property.clientPort は、公式の手順書には無かったけど無いとエラーになるので追加した。 ここでは Apache ZooKeeper のクライアントが接続するポートを指定している。 これは、HBase がマスターサーバの選出を ZooKeeper で行っているためらしい。

$ cat << "EOF" > /tmp/hbase-site.xml.property
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:9000/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/hbase-site.xml.property
' conf/hbase-site.xml

上記の操作で、設定ファイルはこんな感じになる。

$ tail -n 14 conf/hbase-site.xml
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:9000/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
  </property>
</configuration>

ちなみに、上記で指定した HDFS のデータディレクトリは勝手に作られる。 なので、あえて自分で作る必要はないようだ。

次に HBase の動作に必要なので環境変数 JAVA_HOME を設定する。

$ export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk

これは HBase の環境設定を行うスクリプト hbase-env.sh に書いても良い。 こちらに書いておくと毎回呼ばなくて良いので楽ちん。

$ sed -i -e '
  s:^# export JAVA_HOME=.*$:export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk:
' conf/hbase-env.sh

起動する

これで下準備は整ったので HBase を起動してみよう。

$ bin/start-hbase.sh
localhost: starting zookeeper, logging to /home/vagrant/hbase-1.2.5/bin/../logs/hbase-vagrant-zookeeper-localhost.localdomain.out
starting master, logging to /home/vagrant/hbase-1.2.5/bin/../logs/hbase-vagrant-master-localhost.localdomain.out
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
starting regionserver, logging to /home/vagrant/hbase-1.2.5/bin/../logs/hbase-vagrant-1-regionserver-localhost.localdomain.out

Java で動いているプロセスを jps コマンドで確認してみる。 この中の HMasterHQuorumPeerHRegionServer が HBase の動作に直接関わっているプロセスだ。

$ jps
32496 Jps
31505 ResourceManager
31330 SecondaryNameNode
31607 NodeManager
32281 HMaster
32219 HQuorumPeer
32365 HRegionServer
31038 NameNode
31167 DataNode

また、HBase 用の作業ディレクリが HDFS 上にできていることも確認してみよう。

$ export HADOOP_HOME=~/hadoop-2.8.0
$ $HADOOP_HOME/bin/hdfs dfs -ls /hbase
Found 7 items
drwxr-xr-x   - vagrant supergroup          0 2017-05-25 18:47 /hbase/.tmp
drwxr-xr-x   - vagrant supergroup          0 2017-05-25 18:47 /hbase/MasterProcWALs
drwxr-xr-x   - vagrant supergroup          0 2017-05-25 18:47 /hbase/WALs
drwxr-xr-x   - vagrant supergroup          0 2017-05-25 18:47 /hbase/data
-rw-r--r--   3 vagrant supergroup         42 2017-05-25 18:47 /hbase/hbase.id
-rw-r--r--   3 vagrant supergroup          7 2017-05-25 18:47 /hbase/hbase.version
drwxr-xr-x   - vagrant supergroup          0 2017-05-25 18:47 /hbase/oldWALs

ちゃんとできているみたい。

使ってみる

ここからは HBase を実際に使ってみることにする。 HBase には各種プログラミング言語で実装されたクライアントがあるけど、同梱されているシェルを使うのが一番簡単かな。

$ bin/hbase shell

HBase のシェルを起動すると、こんな感じでコマンドが入力できるようになる。

$ bin/hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/vagrant/hbase-1.2.5/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/vagrant/hadoop-2.8.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.5, rd7b05f79dee10e0ada614765bb354b93d615a157, Wed Mar  1 00:34:48 CST 2017

hbase(main):001:0> 

例えばクラスタの状態を確認するには status コマンドを使う。 疑似分散モードでは一つのホストしか使わないので、あんまり意味ないけど。

> status
1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load

テーブルの状態を確認するには list を使う。

> list
TABLE                                                                           
0 row(s) in 0.0580 seconds

=> []

試しに users テーブルを作ってみよう。 このとき、テーブル名と一緒にカラムファミリーの名前も指定する。 カラムファミリーというのは、セルを表現するキー・バリューペアのキーをグループ化したもの。 例えば、今回作った profile というカラムファミリーにはキーとして age だの bloodtype だのが入ることになる。

> create 'users', 'profile'
0 row(s) in 2.3290 seconds

=> Hbase::Table - users

これで users テーブルができた。

> list
TABLE                                                                           
users                                                                           
1 row(s) in 0.0140 seconds

=> ["users"]

テーブルに関する情報は describe コマンドで確認できる。

> describe 'users'
Table users is ENABLED                                                          
users                                                                           
COLUMN FAMILIES DESCRIPTION                                                     
{NAME => 'profile', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false',
 KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',
 COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => 
'65536', REPLICATION_SCOPE => '0'}                                              
1 row(s) in 0.1230 seconds

それでは users テーブルにセルを追加してみよう。 HBase ではデータをリレーショナル・データベースのようなレコード単位ではなくセル単位で追加していく。 以下の例では users テーブルに主キー相当として Alice という行キーを追加している。 その行キーに対応するセルとしては profile カラムファミリーに age というキーで 20 という値を追加した。

> put 'users', 'Alice', 'profile:age', '20'
0 row(s) in 0.1030 seconds

ちなみに HBase にはデータ型という概念がない。 代わりに、すべてバイト配列として扱われる。 つまり、どのカラムをどのような表現で扱うかはアプリケーション側でケアしなければならない。

テーブル全体の状況は scan コマンドで確認できる。 ちゃんと先ほど入れた Alice のセルが格納されている。

> scan 'users'
ROW                   COLUMN+CELL                                               
 Alice                column=profile:age, timestamp=1495716559615, value=20     
1 row(s) in 0.0330 seconds

ピンポイントで一つの行データを手に入れるには get コマンドを使う。

> get 'users', 'Alice'
COLUMN                CELL                                                      
 profile:age          timestamp=1495716559615, value=20                         
1 row(s) in 0.0350 seconds

また、カラムファミリーはテーブルの定義時に指定しなきゃいけないけど、その中のキーは後から自由に定義できる。 血液型の bloodtype キーを追加してみよう。

> put 'users', 'Alice', 'profile:bloodtype', 'A'
0 row(s) in 0.0200 seconds

こんな感じで動的に追加できるようになっている。

> get 'users', 'Alice'
COLUMN                CELL                                                      
 profile:age          timestamp=1495716559615, value=20                         
 profile:bloodtype    timestamp=1495716597831, value=A                          
2 row(s) in 0.0120 seconds

行の中で特定のカラムだけを取得したいときは get コマンドでカラムファミリーやキーを指定する。

> get 'users', 'Alice', 'profile:age'
COLUMN                CELL                                                      
 profile:age          timestamp=1495716559615, value=20                         
1 row(s) in 0.0140 seconds
> get 'users', 'Alice', 'profile:age', 'profile:bloodtype'
COLUMN                CELL                                                      
 profile:age          timestamp=1495716559615, value=20                         
 profile:bloodtype    timestamp=1495716597831, value=A                          
2 row(s) in 0.0150 seconds

セルを削除するときは delete コマンドを指定する。

> delete 'users', 'Alice', 'profile:bloodtype'
0 row(s) in 0.0370 seconds
> get 'users', 'Alice'
COLUMN                CELL                                                      
 profile:age          timestamp=1495716559615, value=20                         
1 row(s) in 0.0140 seconds

ちなみに、なんとなく雰囲気で分かるだろうけどテーブルのカラムは行ごとにあったりなかったりしても構わない。 試しに Bob という行を追加してみよう。 この行には血液型だけを追加する。

> put 'users', 'Bob', 'profile:bloodtype', 'B'
0 row(s) in 0.0150 seconds

テーブルの状態を確認すると Alice には profile:age だけがあるのに対し Bob には profile:bloodtype だけがある。 HBase では、こうした歯抜けの状態も許されている。

> scan 'users'
ROW                   COLUMN+CELL                                               
 Alice                column=profile:age, timestamp=1495716559615, value=20     
 Bob                  column=profile:bloodtype, timestamp=1495716657793, value=B
2 row(s) in 0.0260 seconds

テーブル自体を削除するには、まずテーブルを disable コマンドで無効にする。

> disable 'users'
0 row(s) in 2.2960 seconds

無効にしただけなら、まだテーブルとしては確認できる。

> list
TABLE                                                                           
users                                                                           
1 row(s) in 0.0110 seconds

=> ["users"]

ただし get コマンドなど、実際の操作は受け付けられずエラーになる。

> get 'users', 'Alice'
COLUMN                CELL                                                      

ERROR: users is disabled.
...(snip)...

本当に削除するときは disable にした状態で drop コマンドを使う。

> drop 'users'
0 row(s) in 1.2790 seconds

これでテーブル自体がなくなった。

> list
TABLE                                                                           
0 row(s) in 0.0100 seconds

=> []

ちなみに、この HBase のシェルは JRuby で実装されたインタプリタがそのまま使われている。 なので、こんなこともできる。

> RUBY_VERSION
=> "1.8.7"
> 10.times { |i| p i }
0
1
2
3
4
5
6
7
8
9
=> 10

ちなみに、注意点として HBase では複数行のアトミックな変更ができない。 もしトランザクションのようなものが欲しいときは Apache Omid などを使う必要があるようだ。

Apache Omid – What is Omid?

まとめ

今回は分散データベースの一つである Apache HBase を使ってみた。 性質をよく把握した上でスケールする KVS の一種として扱う分には良さそうだ。

今回も、この本が参考になった。

Hadoop徹底入門 第2版

Hadoop徹底入門 第2版

CentOS7 で Apache Hive を使ってみる

今回は Apache Hadoop 上で動作する MapReduce アプリケーションの一つ Apache Hive を使ってみる。 Apache Hive を使うと Hadoop/HDFS の上で HiveQL という SQL のサブセットが使えるようになる。 実行したクエリは MapReduce のジョブに変換されて Hadoop クラスタで分散並列処理されることから高スループットが得られる。

ただし、MapReduce アプリケーションのご多分に漏れずレイテンシーはでかい。 ようするに一つ一つのクエリの実行自体には時間がかかってしまう。 また、一度追加したレコードについては基本的に更新したり削除することができない。 それらの特性から、オンライントランザクション処理 (OLTP) のような用途には全く向いていない。 代わりに、どんどん一方的にデータが蓄積されていくような状況で後からバッチとかで集計するような用途には向いていると思う。

そんな Apache Hive を今回は CentOS7 上に構築して使ってみる。 今回も Hadoop ディストリビューションは使わず、OSS のリリースパッケージをそのままインストールする。 その方が、おそらく挙動や設定について多くの知見が得られると思うので。 そして、Apache Hive は Apache Hadoop/HDFS 上で動作する。 そのため、今回使う環境は以下のエントリにもとづいて Apache Hadoop が疑似分散モードでセットアップ済みなことを想定する。

blog.amedama.jp

上記のエントリにもあるけど、今回使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.16.1.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

インストールする

インストール方法については次のドキュメントを参照した。 ただし、Apache Hive 2.x 系についてはここに書かれている内容だけではエラーになってしまった。 そのため公式サイト以外にも調べて必要な手順を足している。

GettingStarted - Apache Hive - Apache Software Foundation

AdminManual Installation - Apache Hive - Apache Software Foundation

下準備

まずはパッケージをダウンロードするために wget をインストールしておく。

$ sudo yum -y install wget

続いては環境変数 HADOOP_HOME に Apache Hadoop のインストール先ディレクトリを指定しておく。

$ export HADOOP_HOME=~/hadoop-2.8.0

続いては Apache Hive のパッケージをダウンロードする。 ミラーサイトは次の通りなので、好きなところを選ぼう。

www.apache.org

ただし、現状で Apache Hive には 1.x 系と 2.x 系という二つのメジャーバージョンが併用されている。 軽く調べた感じ、まだ 1.x 系の方が使われているっぽいかな? ただ、2.x 系には 1.x 系にない機能が色々とあるみたい。 ちなみに、どうやら Apache Hive は最新の安定版リリース以外ダウンロードサイトに置いていないようだ。

バージョン 1.x を使う場合

もしバージョン 1.x 系を使うときは、こうする。

$ wget http://ftp.riken.jp/net/apache/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz
$ tar xf apache-hive-1.2.2-bin.tar.gz
$ cd apache-hive-1.2.2-bin
バージョン 2.x を使う場合

バージョン 2.x 系を使うときは、こう。

$ wget http://ftp.riken.jp/net/apache/hive/hive-2.1.1/apache-hive-2.1.1-bin.tar.gz
$ tar xf apache-hive-2.1.1-bin.tar.gz
$ cd apache-hive-2.1.1-bin

セットアップする

どちらのメジャーバージョンを使うときも同じ手順でセットアップできるようだ。 ただし 2.x 系については 1.x 系で不要な操作も必ずしなければいけないようになっている。

まずは HDFS に Hive のデータ置き場を作る。

$ $HADOOP_HOME/bin/hadoop fs -mkdir -p /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

同様にテンポラリディレクトリも必要なので作っておこう。

$ $HADOOP_HOME/bin/hadoop fs -mkdir -p /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp

続いては Apache Hive がメタデータを入れておく RDBMS を準備する。 これには MySQL とか PostgreSQL とかも使えるけど、デフォルトでは Apache Derby という Java で書かれた組み込みのものが使われる。

$ bin/schematool -dbType derby -initSchema --verbose
...(snip)...
Initialization script completed
schemaTool completed

これで下準備はおわり。

使ってみる

続いては、実際に Apache Hive を使ってみることにしよう。 それにはまず hive というコマンドを起動する。

$ bin/hive

すると、次のように HiveQL を入力するためのシェルが起動する。

hive>

ちなみに、このとき 2.x 系を使っていると次のような警告が出る。 どうやら 2.x 系では MapReduce で動作させるのは非推奨となっていて、代わりに Spark とか Tez の上で動かせってことらしい。 ここらへんはまだよく分かっていないけど、おそらくレイテンシーの大きさを改善するための試みな気がしている。

Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

シェルの使い方

基本的には MySQL なんかのシェルを使うのと似たような感じで操作できる。 例えばデータベースの一覧を得るには SHOW DATABASES コマンドが使える。

hive> SHOW DATABASES;
OK
default
Time taken: 0.011 seconds, Fetched: 1 row(s)

今操作しているデータベースを表示したいときは、こうする。

hive> set hive.cli.print.current.db=true;

シェルの中に表示されるようになった。

hive (default)>

データベースを作る

データベースを新しく作るには CREATE DATABASE コマンドを使う。

hive (default)> CREATE DATABASE mydb;
OK
Time taken: 0.106 seconds

使うには USE コマンドを。 ここらへんは MySQL とほとんど同じ。

hive (default)> USE mydb;
OK
Time taken: 0.038 seconds

hive (mydb)>

テーブルを作る

続いてはテーブルを作ってみる。 ここも至って普通の SQL が使える。 代理キーを入れてみたけど、特に自動生成してくれるような機能はなさそうかな・・・?

hive (mydb)> CREATE TABLE users (
           >   id INT,
           >   name STRING,
           >   age INT
           > );
OK
Time taken: 0.068 seconds

SHOW TABLES で確認すると、ちゃんとテーブルができている。

hive (mydb)> SHOW TABLES;
OK
users
Time taken: 0.022 seconds, Fetched: 1 row(s)

続いてはレコードを追加してみる。 さっきまでの処理は単に RDB にメタデータを追加するだけで済んでいたのかすぐに終わったけど、この処理には結構かかる。 がっつり MapReduce のジョブに変換されているみたいだ。

hive (mydb)> INSERT INTO users VALUES (1, 'Alice', 20);
...(snip)...
Total MapReduce CPU Time Spent: 1 seconds 860 msec
OK
Time taken: 31.601 seconds

SELECT で追加されたレコードを確認してみよう。

hive (mydb)> SELECT * FROM users;
OK
1  Alice   20
Time taken: 0.22 seconds, Fetched: 1 row(s)

ちなみに、クエリがどう処理されているかを詳しく見るには EXPLAIN を先頭につけるらしい。

hive (mydb)> EXPLAIN SELECT * FROM users;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: users
          Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: id (type: int), name (type: string), age (type: int)
            outputColumnNames: _col0, _col1, _col2
            Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: NONE
            ListSink

Time taken: 0.086 seconds, Fetched: 17 row(s)

続いては複数のテーブルを連結してみる。 試しにユーザのメールアドレスを想定したテーブルを作ってみよう。

hive (mydb)> CREATE TABLE emails (
           >   id INT,
           >   address STRING
           > );
OK
Time taken: 0.062 seconds

hive (mydb)> INSERT INTO emails VALUES (1, 'alice@example.jp');
...(snip)...
OK
Time taken: 18.881 seconds

テーブル同士を連結 (JOIN) するときも一般的な RDBMS と同じようにできる。 ただし、この処理も実行には時間がかかる。

hive (mydb)> SELECT *
           > FROM users
           > JOIN emails ON users.id = emails.id
           > WHERE name like 'Alice';
...(snip)...
OK
1  Alice   20 1  alice@example.jp
Time taken: 31.934 seconds, Fetched: 1 row(s)

ところで、これまで見てきたテーブルなどのデータは HDFS 上にどのようにして格納されているのだろう? 見ると、次のようにテーブルごとでディレクトリになってその中にファイルが入っている。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:35 /user/hive/warehouse/mydb.db
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:35 /user/hive/warehouse/mydb.db/emails
-rwxrwxr-x   1 vagrant supergroup         19 2017-05-22 21:35 /user/hive/warehouse/mydb.db/emails/000000_0
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:33 /user/hive/warehouse/mydb.db/users
-rwxrwxr-x   1 vagrant supergroup         11 2017-05-22 21:33 /user/hive/warehouse/mydb.db/users/000000_0

さらにファイルの中身を見てみると・・・なんと、ただのテキストファイルになっている。 こんなデータ構造でも上手くいくのは Hadoop クラスタと HDFS の賜物といえるだろう。 HDFS で各ノードに分散配置された細切れのファイルを Hadoop クラスタが分散並列実行するために高いスループットが得られる。

$ $HADOOP_HOME/bin/hdfs dfs -cat /user/hive/warehouse/users/000000_0
1Alice20

テーブルを削除するには DROP TABLE を使う。

hive (mydb)> DROP TABLE users;
OK
Time taken: 0.946 seconds
hive (mydb)> DROP TABLE emails;
OK
Time taken: 0.113 seconds

テーブルを削除すればディレクトリごとファイルもなくなる。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 12:39 /user/hive/warehouse/mydb.db

データ構造を CSV にする

先ほど見た通り Apache Hive が扱うデータ構造は、デフォルトではただのテキストファイルになっている。 このことから、データ構造を例えば CSV にすることもできる。

テーブルを作るときのオプションとしてカンマ区切りになるよう指定しよう。

hive (mydb)> CREATE TABLE users (
           >   id INT,
           >   name STRING,
           >   age INT
           > )
           > ROW FORMAT DELIMITED
           > FIELDS TERMINATED BY ','
           > STORED AS TEXTFILE;
OK
Time taken: 0.079 seconds

もちろん、こうしておけば外部から CSV を読み込むこともできる。 例えば、次のようなファイルを用意しよう。

$ cat << 'EOF' > /tmp/users.csv
1,Alice,20
2,Bob,30
3,Carol,10
EOF

そして Hive で LOAD DATA を使って、それを読み込む。

hive (mydb)> LOAD DATA LOCAL INPATH '/tmp/users.csv' INTO TABLE users;
Loading data to table mydb.users
Table mydb.users stats: [numFiles=1, totalSize=31]
OK
Time taken: 0.223 seconds

データがちゃんと読み込まれている。

hive (mydb)> SELECT * FROM users WHERE age < 25;
OK
1  Alice   20
3  Carol   10
Time taken: 0.141 seconds, Fetched: 2 row(s)

そして、HDFS 上のデータの中身もこの通り。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse | grep users.csv
-rwxrwxr-x   1 vagrant supergroup         31 2017-05-22 21:40 /user/hive/warehouse/mydb.db/users/users.csv
$ $HADOOP_HOME/bin/hdfs dfs -cat /user/hive/warehouse/mydb.db/users/users.csv
1,Alice,20
2,Bob,30
3,Carol,10

Vagrantfile

上記のセットアップを手動でやるのは大変なので Vagrant で自動化してみた。 使い方は次の通り。

$ git clone https://gist.github.com/d89890c1e9874f5e15f8cff9311544a5.git hadoop-cluster-with-hive
$ cd hadoop-cluster-with-hive
$ sh boot.sh

あとはマスターノードにログインして色々とやる。

$ vagrant ssh master

まとめ

今回は CentOS7 に Apache Hive をインストールして使ってみた。 Apache Hive を使うことで Hadoop/HDFS 上でスケーラブルなデータストアを構築できる。 構築したデータストアは HiveQL という SQL のサブセットを通して操作できる。 HiveQL は、実行されると MapReduce のジョブに変換された上で Hadoop クラスタ上で並列分散実行される。 そのためレイテンシーは大きいものの高いスループットが期待できるようだ。

今回も次の本が理解の役に立った。

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Python から Hadoop Streaming を使ってみる

今回は、任意のプログラミング言語から Apache Hadoop を使うことのできる Hadoop Streaming という機能を使ってみる。

通常、Hadoop を使って MapReduce のジョブを直接扱うときは Java を使ってマッパーとリデューサーを書くことになる。 ただ、ご存知の通り Java のソースコードというのは重厚長大で、なかなか読み書きがしやすいとは言いにくい。 そこで、任意のプログラミング言語、具体的には標準入出力を処理する実行ファイルさえあれば使える機能ができた。 それが Hadoop Streaming というもの。 この機能を使うことで低レイヤーな MapReduce の処理を、使い慣れたプログラミング言語を使うなど好きなやり方で記述できる。

ちなみに、今回のエントリでは事前に Apache Hadoop がセットアップされていることを前提に書いていく。 具体的には、次のエントリのようにして疑似分散モードないし完全分散モードで構築されていること。

blog.amedama.jp

また、使用するプログラミング言語については使い慣れた Python を使うことにした。 そして、題材とする MapReduce のアプリケーションはワードカウントにする。 これは、MapReduce のハローワールド的な位置づけのアプリケーションになっている。 具体的な処理としては、テキストファイルの中に含まれる単語の数をカウントするというもの。

使った環境については前述したエントリにもあるけど、次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

Hadoop Streaming でワードカウントを実行する

まずはワードカウントの処理対象となるテキストファイルから用意しておこう。 各単語はシンプルにスペースで区切られている。 見たところHelloHadoop といった単語はたくさん含まれていそうだ。

$ cat << 'EOF' > /tmp/input1.txt
Hello Hadoop World!
This is the first Hadoop Streaming application!
EOF
$ cat << 'EOF' > /tmp/input2.txt
Hello Hello Hello
Hadoop Streaming application!
EOF

続いては上記のファイルを HDFS にコピーする。 ワーキングディレクトリについては、先のエントリでセットアップした Apache Hadoop のディレクトリ内にいることを想定している。

$ bin/hdfs dfs -put /tmp/input*.txt .

もし、次のようなエラーが出るときはユーザのホームディレクトリが HDFS 上に存在していないことを示している。

put: `.': No such file or directory: `hdfs://localhost:9000/user/vagrant'

そのときはホームディレクトリを用意しよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

次のようにファイルがコピーされていることを確認する。

$ bin/hdfs dfs -ls
Found 2 items
-rw-r--r--   1 vagrant supergroup         68 2017-05-20 01:09 input1.txt
-rw-r--r--   1 vagrant supergroup         48 2017-05-20 01:09 input2.txt

続いて、今回の本題である Hadoop Streaming 用のプログラムを用意する。 その前に Hadoop Streaming で使うプログラムの基礎知識について説明しておく。

まず、Hadoop Streaming では標準入出力を通して Apache Hadoop とユーザのプログラムがデータをやり取りする。 このとき、プログラムは標準入出力を通してやり取りする各行をキーバリューの含まれたものとして扱うことになる。 始めに入力から見ると、これは本当にただ行が標準入力から渡ってくるだけに過ぎない。 ユーザのプログラムは、それを適切にパースしてキーバリューに分解する。 そして、分解したキーバリューに対して何らかの処理を施した上で、最後は標準出力に書き出すことになる。 つまり、標準入出力さえ扱うことができれば、どんなプログラムであっても MapReduce の処理を記述できるということになる。 ただし、詳しくは後述するものの、その処理が並列分散実行される点については留意しておく必要がある。

さて、長くなってしまったけどそろそろ実例を見ていこう。 Hadoop Streaming ではマッパーとレデューサーを別々に用意する。 今回はワードカウントを題材にするので、そのプログラムを Python で書いてみた。

まずはマッパーから。 標準入力の内容をパースしてキーバリューにしたら、それを標準出力に書き出している。 キーとバリューのペアはタブを使って区切るのが望ましい。 なぜなら Hadoop Streaming がデフォルトでキーとバリューをタブ区切りとして扱うため。 もちろん、この区切り文字は変更できるんだけど、今回はデフォルトのままでいく。

$ cat << 'EOF' > /var/tmp/mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


def map_(line):
    # 空白などで単語ごとに行を区切る
    words = re.split(r'\s', line)
    # 単語と出現回数 (ここでは常に1) のタプルにする
    return [(key, 1) for key in words if key]


def write(kvs):
    for key, value in kvs:
        # キーとバリューをタブ区切りで標準出力に書き出す
        message = '{0}\t{1}'.format(key, value)
        print(message)


def main():
    # 標準入力から処理対象の文字列を読み込む
    for line in sys.stdin:
        # タブ区切りのキーとバリューに分解する
        key_and_values = map_(line)
        # 標準出力に書き込んでいく
        write(key_and_values)


if __name__ == '__main__':
    main()
EOF

続いてはリデューサー。 標準入力に含まれるキーバリューをパースしたら、その内容を元に単語の数をカウントしている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import defaultdict


# 集計結果を入れる辞書
results = defaultdict(int)


def reduce_(line):
    # 入力された行をタブで区切る
    key, value = line.split('\t')
    # キーごとに出現回数をカウントする
    results[key] += int(value)


def main():
    # キーの出現回数をカウントする
    for line in sys.stdin:
        reduce_(line)

    # 集計結果をキーとバリューのタブ区切りで標準出力に書き出す
    for key, value in results.items():
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記で作ったマッパーとリデューサーに実行権限をつけておく。 これは必須ではない。

$ chmod +x /var/tmp/mapper.py
$ chmod +x /var/tmp/reducer.py

これで準備が整った。 それでは早速 Hadoop Streaming を実行してみよう。 jar には Apache Hadoop に同梱されている Hadoop Streaming 用のものを指定する。 -files オプションで使用する実行ファイルを指定して、それを -mapper-reducer オプションで割り当てていく。 MapReduce ジョブの入出力については -input-output オプションで指定する。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output

無事にジョブが完了すると結果が output ディレクトリ以下に出力される。 内容を確認してみよう。

$ bin/hdfs dfs -cat output/*
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

どうやら、ちゃんとワードカウントが動作しているようだ。

マップだけする (リデュースしない) パターン

場合によっては MapReduce で、マップだけを使ってリデュースはしたくないということもあるはず。 これは、単純に Apache Hadoop クラスタを並列分散実行するためだけに使うパターン。 このときは -D mapred.reduce.tasks=0 というようにリデュースの並列度を 0 に指定する。

実際に試してみよう。 このパターンではリデューサーの指定が必要ない。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=0 \
    -files /var/tmp/mapper.py \
    -mapper mapper.py \
    -input input*.txt \
    -output output2

結果は次の通り。 マップ処理を通してキーバリュー形式になるだけで終わった。

$ bin/hdfs dfs -cat output2/*
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

Hadoop Streaming を扱う上での注意点

Hadoop Streaming のプログラムを書く上での注意点として一つ気づいたことがある。 それは、マップ処理は並列分散処理されるのは当たり前としても、リデュース処理も並列分散処理されうるということ。 なので、リデュース処理の中で一つのノードで完結していることを想定したような処理は含めてはいけないはず。

例えば、次のリデューサーは最終的な出力結果が降順ソートされることを期待した処理が含まれている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""リデューサーが並列実行されると上手く動かないので、こんな風には書かないこと"""

import sys
from collections import defaultdict


results = defaultdict(int)


def reduce_(line):
    key, value = line.split('\t')
    results[key] += int(value)


def main():
    for line in sys.stdin:
        reduce_(line)

    # キーの出現回数ごとに降順ソートする
    items = results.items()
    sorted_items = sorted(items, key=lambda x: x[1], reverse=True)

    for key, value in sorted_items:
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記を実行してみよう。 まずはリデュース処理の並列度が 1 の場合から。 つまり、一つのノード上で全てのリデュース処理が実行される。

$ chmod +x /var/tmp/reducer.py
$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=1 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output3

この場合は結果がちゃんとソートされて出る。

$ bin/hdfs dfs -cat output3/*
Hello   4
Hadoop  3
application!    2
Streaming   2
This    1
is  1
World!  1
the 1
first   1

しかし、並列度を上げてみるとどうだろうか? 例えば、次のようにすると並列度が 10 になる。 つまり、マッパーが処理したデータが 10 のノードに散らばって実行されることになる。 ただし、今回に関しては疑似分散モードなので一つのノード上でリデュースのタスクが 10 できるだけ。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=10 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4

同じことは -numReduceTasks オプションでも指定できる。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4 \
    -numReduceTasks 10

実行結果を確認してみよう。 今度は見事に出力結果がソートされずバラバラになっている。 これはリデュース処理が並列実行されたことで起こった。 各々のリデュース単位では降順ソートされたものの、全体ではそれを連結したものになるので意味をなさない。

$ bin/hdfs dfs -cat output4/*
Hello   4
is  1
the 1
first   1
Hadoop  3
application!    2
World!  1
Streaming   2
This    1

上記を見て次に気になるのは、どうやってリデュース処理を各タスクに分解して割り振っているか、ということだろう。 リデュース処理に関しては、同じキーは必ず同じノード上に集めて実行しなきゃいけない。 つまり標準入出力に流れる行の中で、どの部分がキーなのかを Apache Hadoop は認識する必要があるということ。 これは前述した区切り文字と関係していて、デフォルトでは区切り文字がタブになっている。 そのため、今回は特に何も指定しなくても Apache Hadoop は標準入出力の中からキーを認識できた。 もし、これが異なる場合には Hadoop Streaming を実行する際に、それをオプションで伝えてやる必要がある。

Hadoop Streaming アプリケーションの開発について

ところで、Hadoop Streaming のアプリケーションはどうやって開発すれば良いだろうか。 上記のように Hadoop Streaming 経由で毎回起動するのは、正直めんどくさい。 そんなときは HDFS を使わずにローカルディスクを、Hadoop Streaming の代わりにパイプを使ってデバッグするやり方がある。

結局のところ Hadoop Streaming がやっている標準入出力同士をつなぎ合わせるという処理は Unix のパイプと同じだ。 HDFS と Hadoop クラスタで、それが並列分散実行されるという点を除けば、だけど。

そのため、並列度が 1 の挙動については次のようにして確かめることができる。

$ cat /tmp/input*.txt | /var/tmp/mapper.py  | /var/tmp/reducer.py
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

リデュースしたくないときはパイプをつながなければいいだけ。

$ cat /tmp/input*.txt | /var/tmp/mapper.py
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

上記のようにして、まずは手元で動作確認した上で Hadoop Streaming にかければ良い。 この段階で動作しないアプリケーションは Hadoop Streaming に持っていっても絶対に動作はしないだろう。

まとめ

今回は Apache Hadoop を任意のプログラミング言語から扱うことのできる Hadoop Streaming という機能を試してみた。 この機能を使えば Java を使いたくないという人でも Apache Hadoop の低レイヤーな MapReduce 処理を記述できる。

参考文献

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築