CUBE SUGAR CONTAINER

技術系のこと書きます。

Apache Spark を完全分散モードの YARN クラスタで動かす

Apache Spark を使って複数ノードで分散並列処理をする場合、まずは動作させるためのクラスタマネージャを選ぶことになる。 Apache Spark では以下のクラスタマネージャに対応している。

  • Apache Spark 組み込み (これはスタンドアロンモードと呼ばれる)
  • Apache Hadoop YARN
  • Apache Mesos

今回は、その中で二番目の Apache Hadoop の提供する YARN を使ってみる。 また、なるべく実環境に近いものを作りたいので Apache Hadoop は完全分散モードを使うことにした。 そのため、まず前提として次のエントリを元に Hadoop クラスタが組まれていることが前提となる。

blog.amedama.jp

Apache Hadoop を設定する

Apache Spark のクラスタマネージャに YARN を使うときのポイントは次の環境変数が設定されていること。

$ echo $HADOOP_HOME
/home/vagrant/hadoop-2.8.0
$ echo $HADOOP_CONF_DIR
/home/vagrant/hadoop-2.8.0/etc/hadoop

また、上記の手順で構築した場合、メモリが少ないと Apache Spark を動作させたときエラーになってしまう。 これは、実行前のチェックによってマシンに積まれているメモリを一定の割合以上使うことができないため。 そこで、次のようにして設定ファイルを編集することで、そのチェックを無効にしてやる。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/yarn-site.xml

設定ファイルを編集したので、それを各ノードに配布する。 そして NodeManager のプロセスも設定を読み直すために再起動が必要になるのでプロセスを止める。

$ for node in node1 node2
do
  scp $HADOOP_HOME/etc/hadoop/yarn-site.xml $node:$HADOOP_HOME/etc/hadoop/
  ssh $node 'pkill -f NodeManager'
done

そして、改めて各ノードの NodeManager を起動してやる。

$ $HADOOP_HOME/sbin/start-yarn.sh
starting yarn daemons
resourcemanager running as process 19527. Stop it first.
node1: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node1.out
node2: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node2.out

あとはユーザのホームディレクトリを作成しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p .

これで Hadoop クラスタ側の準備はできた。

Apache Spark を設定する

次に Apache Spark をインストールした上で設定する。

まずは公式サイトからバイナリをダウンロードして解凍する。

$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

上記のディレクトリを環境変数 SPARK_HOME に設定しておこう。

$ cat << 'EOF' >> ~/.bashrc
export SPARK_HOME=~/spark-2.1.1-bin-hadoop2.7
EOF
$ source ~/.bashrc

これだけで Apache Spark を使う準備は整った。

サンプルコードを動かしてみる

準備ができたので、試しに Apache Spark に同梱されているサンプルコードを動かしてみよう。 例えば、次のようにすると円周率を計算するプログラムを起動できる。

$ $SPARK_HOME/bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  $SPARK_HOME/examples/jars/spark-examples_*.jar \
  1000
...(snip)...
Pi is roughly 3.1417053514170536
...(snip)

エラーにならず円周率は大体 3.1417… ということが計算できた。

PySpark を使ってみる

次は Apache Spark を PySpark と呼ばれる Python で使えるインタラクティブシェルから使ってみる。 そして、分散並列処理のハローワールドとも言えるワードカウントを書いてみることにしよう。

ワードカウントの対象としては Apache Spark の README ファイルを使うことにした。 次のようにして、まずは HDFS にファイルをコピーする。

$ $HADOOP_HOME/bin/hdfs dfs -put $SPARK_HOME/README.md .

分散並列処理をするには、対象のファイルが全てのノードから参照できる状態にないといけない。 今回は HDFS にコピーしたけど Amazon S3 とか対応しているファイルシステムは色々とある。

pyspark コマンドを使って PySpark のインタラクティブシェルを起動しよう。 ここでポイントとなるのは --master オプションで yarn を指定すること。 これでローカルではなく YARN を使った分散実行がされるようになる。

$ $SPARK_HOME/bin/pyspark --master yarn
...(snip)...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.5 (default, Nov  6 2016 00:28:07)
SparkSession available as 'spark'.
>>>

HDFS に先ほどコピーしたファイルを読み込もう。 ファイルの読み込みは SparkContext#textFile() を使う。

>>> textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

これで HDFS のファイルのデータが各ノードに読み込まれた。

>>> textfile.first()
u'# Apache Spark'

まずはテキストを RDD#flatMap() を使ってスペースで分割する。 こうすると map した結果から得られたリストを展開してくれる。

>>> words = textfile.flatMap(lambda line: line.split(' '))

すると、こんな風になる。

>>> words.take(5)
[u'#', u'Apache', u'Spark', u'', u'Spark']

上記には空白のように意味のない単語も含まれるので、それを RDD#filter() で取り除く。

>>> valid_words = words.filter(lambda word: word)
>>> valid_words.take(5)
[u'#', u'Apache', u'Spark', u'Spark', u'is']

次に、各単語を RDD#map() を使ってキー・バリュー形式にする。 キーは単語でバリューは登場回数になる。

>>> keyvalues = valid_words.map(lambda word: (word, 1))
>>> keyvalues.first()
(u'#', 1)

最後に、各キーごとに RDD#reduceByKey() を使ってバリューを集計する。 これで各単語の出現回数が集計できる。

>>> word_count = keyvalues.reduceByKey(lambda a, b: a + b)
>>> word_count.take(3)
[(u'storage', 1), (u'"local"', 1), (u'including', 4)]

あとは集計結果を HDFS に書き出そう。

>>> word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')

使い終わったらインタラクティブシェルから抜ける。

>>> exit()

HDFS に書き出された内容を確認してみよう。 どうやら、ちゃんと単語ごとの出現回数がカウントできているようだ。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

PySpark で Python スクリプトを実行する

先ほどはインタラクティブシェルを使って PySpark を使ったけど、もちろんスクリプトファイルから実行することもできる。

一旦、先ほど書き出した HDFS のディレクトリは削除しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -rm -r -f output

次のスクリプトは、先ほどのワードカウントをスクリプトファイルにしたもの。 ポイントとしては、インタラクティブシェルでは最初からインスタンス化されていた SparkContext を自分で用意しなきゃいけないところ。 インタラクティブシェルではオプションで実行するモードを YARN にしていた代わりに、この場合は SparkConf で設定する必要がある。 とはいえ、他の部分は特に変わらない。

$ cat << 'EOF' > /var/tmp/wc.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext


def main():
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('WordCount')

    sc = SparkContext(conf=conf)

    textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

    words = textfile.flatMap(lambda line: line.split(' '))
    valid_words = words.filter(lambda word: word)
    keyvalues = valid_words.map(lambda word: (word, 1))
    word_count = keyvalues.reduceByKey(lambda a, b: a + b)

    word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')


if __name__ == '__main__':
    main()
EOF

実行する

スクリプトファイルを実行するときは spark-submit コマンドを使う。 このとき --py-files オプションを使って実行に必要なスクリプトファイルを、あらかじめノードに配布しておく。

$ $SPARK_HOME/bin/spark-submit \
  --master yarn \
  --py-files /var/tmp/wc.py \
  /var/tmp/wc.py

上記の実行が上手くいったら結果を出力したディレクトリを確認してみよう。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

実行したアプリケーションの確認

ちなみに Apache Spark が YARN 経由で実行したアプリケーションの情報は Hadoop の管理画面から状態を確認できる。

http://192.168.33.10:8088/

上記クラスタの構築を自動化する

ちなみに上記を全て手動で構築するのは手間がかかるので Vagrantfile を書いてみた。 次のようにして実行する。 メモリを 2GB 積んだ仮想マシンを 3 台起動するので、最低でも 8GB できれば 16GB のメモリを積んだ物理マシンで実行してもらいたい。

$ git clone https://gist.github.com/fefb9831e9f032ef264d8d517df57cb4.git spark-on-yarn
$ cd spark-on-yarn
$ sh boot.sh

vagrant ssh コマンドでマスターサーバにログインできる。

$ vagrant ssh

いじょう。

参考

Apache Spark については次の本を読んで勉強してみた。

初めてのSpark

初めてのSpark

  • 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2015/08/22
  • メディア: 大型本
  • この商品を含むブログ (4件) を見る