CUBE SUGAR CONTAINER

技術系のこと書きます。

Mac OS X で Apache Spark を触ってみる

最近 Apache Spark について耳にすることが多い。 Apache Spark は、ビッグデータ処理における並列分散処理基盤を提供する OSS の一つ。 似たような用途としては Apache Hadoop も有名だけど、それよりも最大で 100 倍ほど高速に動作するんだとか。 高速に動作する理由としては、各ノードのメモリに乗り切るサイズのデータならディスクを介さずに扱える点が大きいらしい。

今回は、そんな Apache Spark を Mac OS X で軽く触ってみることにする。 本来であれば、用途的には複数のノードを用意して並列分散処理をさせるところだけど使うのは一つのノードだけ。 また Apache Spark を操作するには Java, Scala, Python のインターフェースがある。 その中でも、今回は Python のインターフェース (PySpark) を使ってみることにした。

環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストール

まずは Apache Spark の動作に JRE が必要なのでインストールしておく。 インストールには Homebrew Cask を使うと楽できる。

$ brew cask install java

Apache Spark のインストールは Homebrew を使ってさくっといける。

$ brew install apache-spark

インタラクティブシェルから触ってみる

Apache Spark を Python から扱うには pyspark というコマンドを使う。 このコマンドを起動すると、Python から Apache Spark を扱う上で必要なパッケージなどが自動的にインポートされる。 それ以外については、特に普段使っている REPL と違いはないようだ。

$ pyspark
Python 2.7.10 (default, Feb  6 2017, 23:53:20) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 18:42:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 18:43:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/04 18:43:13 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/05/04 18:43:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.10 (default, Feb  6 2017 23:53:20)
SparkSession available as 'spark'.
>>> 

サイトパッケージの場所を調べてもシステムの Python の場所になっているし。

>>> from pip import locations
>>> locations.user_site
'/Users/amedama/Library/Python/2.7/lib/python/site-packages'

これは、どうやらデフォルトの python コマンドで起動されるものが使われているだけっぽい? 試しに virtualenv を使ってデフォルトが Python 3.5 になるようにしてみる。

$ python --version
Python 3.5.3

この状態で pyspark コマンドを実行すると、ちゃんと Python 3.5 が使われるようになった。 予想は当たっていたようだ。

$ pyspark    
Python 3.5.3 (default, Feb 26 2017, 01:47:55) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 19:29:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 19:30:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.3 (default, Feb 26 2017 01:47:55)
SparkSession available as 'spark'.
>>> 

インタラクティブシェルを終了するときは通常の Python の REPL と同じように Ctrl-D とか exit() 関数とかで。

>>> exit()

テキストファイルを処理してみる

それでは、次は実際に Apache Spark でテキストファイルを扱ってみることにしよう。 題材は Apache Spark の README ファイルにする。

$ brew install wget
$ wget https://raw.githubusercontent.com/apache/spark/master/README.md

あと、なんか操作していると随所で psutil 入れた方が良いよっていう警告が出るので入れておく。

$ pip install psutil

インタラクティブシェルを起動しよう。

$ pyspark

PySpark のインタラクティブシェルでは sc という SparkContext のインスタンスが処理の取っ掛かりになるみたい。 これは、あらかじめインタラクティブシェルを起動した時点で用意されている。

>>> sc
<pyspark.context.SparkContext object at 0x1068cf810>

まずは SparkContext#textFile() メソッドでテキストファイルを読み込む。

>>> textfile = sc.textFile("README.md")

これで得られるインスタンスは Resilient Distributed Dataset (RDD) と呼ばれる形式になっている。 基本的に Apache Spark では、この RDD という単位でデータを扱うことになる。

>>> textfile
README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

この RDD には、あらかじめ色々な API が用意されている。 例えば RDD に含まれるデータの数は count() というメソッドで得られる。

>>> textfile.count()
103

他にも、データセットの最初の要素を取り出す first() だとか。

>>> textfile.first()
u'# Apache Spark'

データセットに含まれる要素全てを得るには collect() などを使う。

>>> textfile.collect()
[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', u'supports general computation graphs for data analysis. It also supports a', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'MLlib for machine learning, GraphX for graph processing,', u'and Spark Streaming for stream processing.', u'', u'<http://spark.apache.org/>', u'', u'', u'## Online Documentation', u'', u'You can find the latest Spark documentation, including a programming', u'guide, on the [project web page](http://spark.apache.org/documentation.html).', u'This README file only contains basic setup instructions.', u'', u'## Building Spark', u'', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'', u'    build/mvn -DskipTests clean package', u'', u'(You do not need to do this if you downloaded a pre-built package.)', u'', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'More detailed documentation is available from the project site, at', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'', u'## Interactive Scala Shell', u'', u'The easiest way to start using Spark is through the Scala shell:', u'', u'    ./bin/spark-shell', u'', u'Try the following command, which should return 1000:', u'', u'    scala> sc.parallelize(1 to 1000).count()', u'', u'## Interactive Python Shell', u'', u'Alternatively, if you prefer Python, you can use the Python shell:', u'', u'    ./bin/pyspark', u'', u'And run the following command, which should also return 1000:', u'', u'    >>> sc.parallelize(range(1000)).count()', u'', u'## Example Programs', u'', u'Spark also comes with several sample programs in the `examples` directory.', u'To run one of them, use `./bin/run-example <class> [params]`. For example:', u'', u'    ./bin/run-example SparkPi', u'', u'will run the Pi example locally.', u'', u'You can set the MASTER environment variable when running examples to submit', u'examples to a cluster. This can be a mesos:// or spark:// URL,', u'"yarn" to run on YARN, and "local" to run', u'locally with one thread, or "local[N]" to run locally with N threads. You', u'can also use an abbreviated class name if the class is in the `examples`', u'package. For instance:', u'', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'', u'Many of the example programs print usage help if no params are given.', u'', u'## Running Tests', u'', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'can be run using:', u'', u'    ./dev/run-tests', u'', u'Please see the guidance on how to', u'[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).', u'', u'## A Note About Hadoop Versions', u'', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'storage systems. Because the protocols have changed in different versions of', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'', u'Please refer to the build documentation at', u'["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', u'for detailed guidance on building for a particular distribution of Hadoop, including', u'building for particular Hive and Hive Thriftserver distributions.', u'', u'## Configuration', u'', u'Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)', u'in the online documentation for an overview on how to configure Spark.', u'', u'## Contributing', u'', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)', u'for information on how to get started contributing to the project.']

RDD に用意されている全ての API について知りたいときは、以下の公式ドキュメントを参照する感じで。

spark.apache.org

例として「Spark」という文字列が含まれる行だけを取り出してみよう。 このような要素には filter() メソッドが使える。

>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)

処理の結果も、また RDD で得られる。 見たところ 20 行あるようだ。

>>> filtered_rdd.count()
20

要素を見ると、たしかにどの行にも「Spark」の文字列が含まれている。

>>> filtered_rdd.collect()
[u'# Apache Spark', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'and Spark Streaming for stream processing.', u'You can find the latest Spark documentation, including a programming', u'## Building Spark', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'The easiest way to start using Spark is through the Scala shell:', u'Spark also comes with several sample programs in the `examples` directory.', u'    ./bin/run-example SparkPi', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'in the online documentation for an overview on how to configure Spark.', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)']

次は、ビッグデータ処理のハローワールドとも言えるワードカウントを試してみよう。 まずは、テキストの内容をスペースで区切る。 これは MapReduce アルゴリズムでいう Map に相当する。

>>> words = textfile.flatMap(lambda line: line.split())
>>> words.count()
495

まあ、このままだと空白で区切っただけなので単語ではないものも含まれちゃうけど。

>>> words.first()
u'#'

先ほどテキストを区切るのに flatMap() を使ったのは、ただの map() だとリストが複数含まれるデータセットになってしまうため。 リストをさらに開いた状態 (flat) にしておかないと扱いづらい。

>>> textfile.map(lambda line: line.split()).first()
[u'#', u'Apache', u'Spark']

続いて、各単語に対して出現頻度をカウントするために数字を添えてタプルにする。 これも Map 処理だね。

>>> words_tuple = words.map(lambda word: (word, 1))
>>> words_tuple.first()
(u'#', 1)

あとはキー (各単語) ごとに出現頻度をカウントする。 これが MapReduce でいう Reduce に相当する。 ここの処理では、タプル同士を足し算すると第二要素のカウンタが増えることになる。 この動作は Python の流儀からすると、ちょっと直感に反するね。

>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)

これで、それぞれの単語の出現頻度がカウントできた。

>>> words_count.collect()[:10]
[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('storage', 1), ('developing', 1), ('It', 2), ('package.', 1), ('particular', 2), ('development', 1)]

一番登場する頻度が多いのはどれなのかを調べるために、カウンタの値にもとづいて降順ソートする。 どうやら「the」が一番多いらしい。

>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]
[('the', 24), ('to', 17), ('Spark', 16), ('for', 12), ('and', 9), ('##', 9), ('a', 8), ('run', 7), ('on', 7), ('can', 7)]

まとめ

今回は Apache Spark を Python のインタラクティブシェルを通して軽く触ってみた。