CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: PySpark でサードパーティ製のライブラリを使って分散処理する

今回は PySpark でサードパーティ製のライブラリを使って分散処理をする方法について。

サンプルとして、次のような状況を試した。

  • Apache Spark + Hadoop YARN で構築した分散処理用のクラスタを用いる
  • サードパーティ製のライブラリとして scikit-learn を想定する
  • scikit-learn の学習済みモデルを、あらかじめローカルで用意しておく
  • Iris データセットと学習済みモデルを使った推論を PySpark で分散処理する

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
$ uname -r
3.10.0-957.21.3.el7.x86_64
$ python3 -V
Python 3.6.8
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_222
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ hadoop version
Hadoop 2.9.2
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 826afbeae31ca687bc2f8471dc841b66ed2c6704
Compiled by ajisaka on 2018-11-13T12:42Z
Compiled with protoc 2.5.0
From source with checksum 3a9939967262218aa556c684d107985
This command was run using /home/vagrant/hadoop-2.9.2/share/hadoop/common/hadoop-common-2.9.2.jar

Conda (Miniconda) で Python の仮想環境を作る

PySpark でサードパーティ製のライブラリを使う場合、いくつか検討すべき点がある。 その中でも、最初につまづくのは「ライブラリをいかにエグゼキュータのホストに配布するか」という点。 なぜなら、事前にエグゼキュータの各ホストにライブラリをインストールして回ることは現実的ではない。 Apache Spark のエグゼキュータのホストは環境によっては数百や数千台に及ぶ可能性もある。 管理が自動化されていることは前提としても、各アプリケーションごとにライブラリを追加する作業が生じるのは望ましくない。 そのため、あらかじめライブラリをインストールした仮想環境またはコンテナなどを実行時に配布する方が良い。

今回は Conda (Miniconda) で事前に作った仮想環境をエグゼキュータに配布する方法を取った。 これは、Conda で作った仮想環境はポータビリティがあるため。 virtualenv で作った仮想環境は、ベースとなった Python の実行環境に依存するためホストをまたいだポータビリティがない。 実験的なオプションとして提供されている --relocatableをつければできそうではあるけど、ドキュメントを読む限りあまり使う気持ちにはならない。

virtualenv.pypa.io

なので、まずは Miniconda をインストールしていく。

$ sudo yum -y install wget bzip2 zip
$ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sudo bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda

インストールした Conda で仮想環境を作る。 以下では環境の名前を conda_env としている。

$ /opt/conda/bin/conda create -n conda_env --copy -yq python=3.7

次のように、仮想環境ができた。

$ source /opt/conda/bin/activate conda_env
$ pip list
Package    Version
---------- ---------
certifi    2019.6.16
pip        19.1.1
setuptools 41.0.1
wheel      0.33.4

仮想環境に、PySpark のアプリケーションが動作するのに必要なサードパーティ製のライブラリをインストールする。 今回であれば scikit-learn を入れる。

$ pip install scikit-learn

インストールできたら、次は仮想環境のある場所に移動して ZIP ファイルに圧縮する。 これはエグゼキュータへの配布を考えてのこと。

$ cd ~/.conda/envs
$ zip -r conda_env.zip conda_env

PySpark で動作確認する

さて、仮想環境の準備ができたところで動作確認に移る。

まずは PySpark のインタプリタを起動しよう。

$ pyspark \
  --master yarn \
  --archives "conda_env.zip#defrost" \
  --conf spark.pyspark.driver.python=conda_env/bin/python \
  --conf spark.pyspark.python=defrost/conda_env/bin/python

上記では、クラスタ管理に Hadoop YARN を使っているので --master yarn で起動している。 そして、今回のポイントとなるのがそれ以降のオプションたち。 まず、--archives オプションはエグゼキュータに配布するファイルを指定している。 ここで、先ほど作った ZIP ファイルを指定することでエグゼキュータに Conda の仮想環境をバラまいている。 また、# 以降はファイルを解凍したときのディレクトリ名になる。 続いて --conf spark.pyspark.driver.python では Spark のドライバで使う Python を指定している。 ここでも先ほど作った仮想環境の Python を指定した。 そして、最大のポイントとなるのが --conf spark.pyspark.python で、ここでエグゼキュータのホスト上に解凍されたディレクトリに含まれる仮想環境の Python を指定している。

インタプリタが起動したら動作を確認していこう。 まず、ドライバで起動したインタプリタは次の通り Python 3.7 になっている。 ちゃんと Conda で作った仮想環境が使われているようだ。

>>> import sys
>>> sys.version_info
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)

しかし、エグゼキュータの方はどうだろうか? こちらも確認する必要がある。 そこで、まずは次のようにダミーの RDD を用意しておく。

>>> rdd = sc.range(2)
>>> rdd.getNumPartitions()
2

そして、次のように Python のバージョンを返す関数を定義しておく。

>>> def python_version(_):
...     """エグゼキュータ上の Python のバージョンを返す"""
...     import sys
...     return str(sys.version_info)
...

上記を先ほど作った RDD に対して実行することで、エグゼキュータ上の Python のバージョンを確認してみよう。

>>> from pprint import pprint
>>> pprint(rdd.map(python_version).collect())
["sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)",
 "sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)"]

上記の通り、ちゃんとエグゼキュータ上でも Python 3.7 が使えている。

とはいえ、まだ油断はできない。 ちゃんと scikit-learn はインポートできるだろうか? これについても確認しておく。

>>> def sklearn_version(_):
...     """エグゼキュータ上で scikit-learn をインポートしてバージョンを返す関数"""
...     import sklearn
...     return sklearn.__version__
...
 >>> pprint(rdd.map(sklearn_version).collect())
['0.21.3', '0.21.3']

どうやら、ちゃんとエグゼキュータ上で scikit-learn が使える状況にあるようだ。

ローカルで学習済みモデルを作る

分散環境で scikit-learn が使えるようになったところで、次にローカルで学習済みモデルを用意する。 とはいえ、めんどくさいのでインタプリタは先ほど起動したものを使い回すことにしよう。 PySpark の環境は、いくつかのインスタンスがグローバルスコープにある以外は通常の Python と何ら変わりがないので。

Iris データセットを読み込む。

>>> from sklearn import datasets
>>> dataset = datasets.load_iris()
>>> X, y = dataset.data, dataset.target

ホールドアウト検証用にデータを分割する。

>>> from sklearn.model_selection import train_test_split
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     shuffle=True,
...                                                     random_state=42)

学習データの方をランダムフォレストに学習させる。

>>> clf = RandomForestClassifier(n_estimators=100)
>>> clf.fit(X_train, y_train)
RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
                       max_depth=None, max_features='auto', max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=100,
                       n_jobs=None, oob_score=False, random_state=None,
                       verbose=0, warm_start=False)

これで学習済みモデルが手に入った。 通常であれば pickle などを使ってディスクに直列化して別の環境に持ち運ぶことになる。 今回はインタプリタが変わらないので、そのまま使うことにする。

データセットを PySpark のデータ表現に変換する

学習済みモデルができたので、次はこれを PySpark で動かしたい。 ただ、その前にデータがないと始まらないので Iris データセットを PySpark に読み込む。 具体的には NumPy の配列を PySpark の DataFrame に変換したい。

まずは PySpark の DataFrame を作るためにスキーマを定義する。 今回は特徴量が全て浮動小数点型だったので楽だけど、異なるときは型を変えていく必要がある。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import DoubleType
>>> df_schema = StructType([
...     StructField(feature_name, DoubleType(), False)
...     for feature_name in dataset.feature_names
... ])

PySpark は Numpy の配列を受け付けてくれないのでプリミティブな型に変換する関数を用意する。 まあ、実際には Numpy の配列をデータセットとして PySpark の環境に持っていくことなんてそうないだろうけど。

>>> def numpy_to_primitive(np_array):
...     """numpy の要素があると受け付けないので Python のネイティブな型に直す"""
...     for np_row in np_array:
...         yield [float(element) for element in np_row]
...

先ほどホールドアウトしておいたテストデータを DataFrame に変換する。

>>> X_test_df = spark.createDataFrame(numpy_to_primitive(X_test), df_schema)
>>> X_test_df.show(truncate=False, n=5)
+-----------------+----------------+-----------------+----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|
+-----------------+----------------+-----------------+----------------+
|6.1              |2.8             |4.7              |1.2             |
|5.7              |3.8             |1.7              |0.3             |
|7.7              |2.6             |6.9              |2.3             |
|6.0              |2.9             |4.5              |1.5             |
|6.8              |2.8             |4.8              |1.4             |
+-----------------+----------------+-----------------+----------------+
only showing top 5 rows

これでデータができた。

PySpark で学習済みモデルの推論を分散処理する

データセットとモデルが揃ったので、次は本題となる推論の部分を分散処理する。

まずは学習済みモデルを SparkContext#broadcast() を使ってエグゼキュータのインタプリタ上で使えるようにする。 これは要するにオブジェクトを SerDe してリモートのホスト上のインタプリタにロードしている。

>>> broadcasted_clf = sc.broadcast(clf)
>>> broadcasted_clf
<pyspark.broadcast.Broadcast object at 0x7fe46aa4f8d0>

まずはデータを一行ずつ推論させてみることにしよう。 今回であれば row は Iris データセットの一つの花の特徴量に対応する。 SparkContext#broadcast() でバラまいたオブジェクトは Broadcast#value アトリビュートからアクセスできる。

>>> def predict(row):
...     """推論を分散処理する"""
...     y_pred = broadcasted_clf.value.predict([row])
...     # 返り値が numpy の配列なので単なるリストに直したほうが PySpark では扱いやすい
...     return list(y_pred)
...

上記を使って推論してみよう。 DataFrame を RDD に変換した上で、RDD#flatMap() で分散処理を実行している。

>>> X_test_df.rdd.flatMap(predict).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

どうやら、それっぽい値が得られた。

RDD#flatMap() にしているのは、返しているのがリストだからで、もし単なる RDD#map() だと、次のようにリストの入れ子になる。

>>> X_test_df.rdd.map(predict).collect()
[[1], [0], [2], [1], [1], [0], [1], [2], [1], [1], [2], [0], [0], [0], [0], [1], [2], [1], [1], [2], [0], [2], [0], [2], [2], [2], [2], [2], [0], [0], [0], [0], [1], [0], [0], [2], [1], [0]]

得られた結果を Accuracy について評価してみよう。

>>> y_pred = X_test_df.rdd.flatMap(predict).collect()
>>> from sklearn.metrics import accuracy_score
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、ちゃんと推論できたようだ。

続いては複数行をまとめて推論させてみることにしよう。 Apache Spark では、パーティションという単位でデータをまとまりとして扱うことができる。 以下の関数は複数行のデータを受け取れるようにしてある。

>>> def predict_partition(map_of_rows):
...     """パーティション単位で推論を分散処理する"""
...     list_of_rows = list(map_of_rows)  # 複数行のデータが入ったリストに直す
...     y_pred = broadcasted_clf.value.predict(list_of_rows)
...     return y_pred
...

上記を実行してみよう。 パーティション単位で処理するときは RDD#mapPartitions() を使う。

>>> X_test_df.rdd.mapPartitions(predict_partition).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

こちらも、Accuracy で評価してみよう。 先ほどの結果と一致しているだろうか。

>>> y_pred = X_test_df.rdd.mapPartitions(predict_partition).collect()
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、大丈夫そうだ。

めでたしめでたし。

参考

blog.cloudera.co.jp

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム