CUBE SUGAR CONTAINER

技術系のこと書きます。

Apache Spark でクラスタリングすると動かなくなるプログラムについて

今回は Apache Spark をスタンドアロンで使っていると上手くいくのに、クラスタリングした途端に上手くいかなくなる状況がある、ということについて。

スタンドアロンなら上手くいく場合

まずは Apache Spark のコマンドラインシェルを起動する。 この場合はシングルノードのスタンドアロンで動かしている。

$ $SPARK_HOME/bin/spark-shell

複数の値が格納された RDD を作る。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

そして、それらに対して foreach() メソッドで println() 関数を適用してみる。

scala> rdd.foreach(println)
1
2
3

この場合、直感的にも正しく動作する。

クラスタリングすると動かなくなる

続いて YARN を使ってクラスタリングした状況でコマンドラインシェルを起動する。

$ $SPARK_HOME/bin/spark-shell --master yarn

先程と同じように複数の値が入った RDD を作る。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

そして、同じように println() 関数を適用してみよう。 しかし、今度は何ら出力されない。

scala> rdd.foreach(println)

シングルノードとクラスタリングしたという違いだけで動作が変わっている。

上記は rdd.foreach(println) という処理がワーカーノードで実行されてしまっているために起きている。 もし、ドライバ上で実行したいときは、次のように一旦 collect() メソッドでドライバに値を集約して実行しなきゃいけない。

scala> rdd.collect().foreach(println)
1
2
3

ワーカーノードに渡されるのは変数のコピー

同じような例をもう一つ紹介する。 今度は RDD に含まれる値の数を数えてみよう。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

値の数を数え上げるためのカウンタとなる変数を用意する。

scala> var counter = 0
counter: Int = 0

そして、foreach() メソッドで RDD の値を数え上げる。 直感的には counter 関数の値は RDD に含まれる数だけカウントアップされるように感じるはずだ。

scala> rdd.foreach(x => counter += x)

しかし、実際には変数は 0 のままでカウントアップされていない。

scala> println(counter)
0

こんなことが、どうして起こるのだろうか?

これは Apache Spark でクラスタリングしたとき、ワーカーノードに渡される変数が単なるコピーであることに由来している。 カウントアップの処理は、各ワーカーノード上でコピーの変数に対して実行されるためドライバ上のオリジナルには反映されない。

ワーカーと値を共有するにはアキュムレータを用いる

上記を意図通りに動かすにはカウンタとなる変数としてアキュムレータを使う必要があるようだ。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val counter = sc.longAccumulator
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: None, value: 0)
scala> rdd.foreach(x => counter.add(x))
scala> counter.value
res4: Long = 6

参考

spark.apache.org

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集