今回は PySpark でサードパーティ製のライブラリを使って分散処理をする方法について。
サンプルとして、次のような状況を試した。
- Apache Spark + Hadoop YARN で構築した分散処理用のクラスタを用いる
- サードパーティ製のライブラリとして scikit-learn を想定する
- scikit-learn の学習済みモデルを、あらかじめローカルで用意しておく
- Iris データセットと学習済みモデルを使った推論を PySpark で分散処理する
使った環境は次の通り。
$ cat /etc/redhat-release CentOS Linux release 7.6.1810 (Core) $ uname -r 3.10.0-957.21.3.el7.x86_64 $ python3 -V Python 3.6.8 $ pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.3 /_/ Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_222 Branch Compiled by user on 2019-05-01T05:08:38Z Revision Url Type --help for more information. $ hadoop version Hadoop 2.9.2 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 826afbeae31ca687bc2f8471dc841b66ed2c6704 Compiled by ajisaka on 2018-11-13T12:42Z Compiled with protoc 2.5.0 From source with checksum 3a9939967262218aa556c684d107985 This command was run using /home/vagrant/hadoop-2.9.2/share/hadoop/common/hadoop-common-2.9.2.jar
Conda (Miniconda) で Python の仮想環境を作る
PySpark でサードパーティ製のライブラリを使う場合、いくつか検討すべき点がある。 その中でも、最初につまづくのは「ライブラリをいかにエグゼキュータのホストに配布するか」という点。 なぜなら、事前にエグゼキュータの各ホストにライブラリをインストールして回ることは現実的ではない。 Apache Spark のエグゼキュータのホストは環境によっては数百や数千台に及ぶ可能性もある。 管理が自動化されていることは前提としても、各アプリケーションごとにライブラリを追加する作業が生じるのは望ましくない。 そのため、あらかじめライブラリをインストールした仮想環境またはコンテナなどを実行時に配布する方が良い。
今回は Conda (Miniconda) で事前に作った仮想環境をエグゼキュータに配布する方法を取った。
これは、Conda で作った仮想環境はポータビリティがあるため。
virtualenv で作った仮想環境は、ベースとなった Python の実行環境に依存するためホストをまたいだポータビリティがない。
実験的なオプションとして提供されている --relocatable
をつければできそうではあるけど、ドキュメントを読む限りあまり使う気持ちにはならない。
なので、まずは Miniconda をインストールしていく。
$ sudo yum -y install wget bzip2 zip $ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh $ sudo bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda
インストールした Conda で仮想環境を作る。
以下では環境の名前を conda_env
としている。
$ /opt/conda/bin/conda create -n conda_env --copy -yq python=3.7
次のように、仮想環境ができた。
$ source /opt/conda/bin/activate conda_env $ pip list Package Version ---------- --------- certifi 2019.6.16 pip 19.1.1 setuptools 41.0.1 wheel 0.33.4
仮想環境に、PySpark のアプリケーションが動作するのに必要なサードパーティ製のライブラリをインストールする。 今回であれば scikit-learn を入れる。
$ pip install scikit-learn
インストールできたら、次は仮想環境のある場所に移動して ZIP ファイルに圧縮する。 これはエグゼキュータへの配布を考えてのこと。
$ cd ~/.conda/envs
$ zip -r conda_env.zip conda_env
PySpark で動作確認する
さて、仮想環境の準備ができたところで動作確認に移る。
まずは PySpark のインタプリタを起動しよう。
$ pyspark \
--master yarn \
--archives "conda_env.zip#defrost" \
--conf spark.pyspark.driver.python=conda_env/bin/python \
--conf spark.pyspark.python=defrost/conda_env/bin/python
上記では、クラスタ管理に Hadoop YARN を使っているので --master yarn
で起動している。
そして、今回のポイントとなるのがそれ以降のオプションたち。
まず、--archives
オプションはエグゼキュータに配布するファイルを指定している。
ここで、先ほど作った ZIP ファイルを指定することでエグゼキュータに Conda の仮想環境をバラまいている。
また、#
以降はファイルを解凍したときのディレクトリ名になる。
続いて --conf spark.pyspark.driver.python
では Spark のドライバで使う Python を指定している。
ここでも先ほど作った仮想環境の Python を指定した。
そして、最大のポイントとなるのが --conf spark.pyspark.python
で、ここでエグゼキュータのホスト上に解凍されたディレクトリに含まれる仮想環境の Python を指定している。
インタプリタが起動したら動作を確認していこう。 まず、ドライバで起動したインタプリタは次の通り Python 3.7 になっている。 ちゃんと Conda で作った仮想環境が使われているようだ。
>>> import sys >>> sys.version_info sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)
しかし、エグゼキュータの方はどうだろうか? こちらも確認する必要がある。 そこで、まずは次のようにダミーの RDD を用意しておく。
>>> rdd = sc.range(2) >>> rdd.getNumPartitions() 2
そして、次のように Python のバージョンを返す関数を定義しておく。
>>> def python_version(_): ... """エグゼキュータ上の Python のバージョンを返す""" ... import sys ... return str(sys.version_info) ...
上記を先ほど作った RDD に対して実行することで、エグゼキュータ上の Python のバージョンを確認してみよう。
>>> from pprint import pprint >>> pprint(rdd.map(python_version).collect()) ["sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)", "sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)"]
上記の通り、ちゃんとエグゼキュータ上でも Python 3.7 が使えている。
とはいえ、まだ油断はできない。 ちゃんと scikit-learn はインポートできるだろうか? これについても確認しておく。
>>> def sklearn_version(_): ... """エグゼキュータ上で scikit-learn をインポートしてバージョンを返す関数""" ... import sklearn ... return sklearn.__version__ ... >>> pprint(rdd.map(sklearn_version).collect()) ['0.21.3', '0.21.3']
どうやら、ちゃんとエグゼキュータ上で scikit-learn が使える状況にあるようだ。
ローカルで学習済みモデルを作る
分散環境で scikit-learn が使えるようになったところで、次にローカルで学習済みモデルを用意する。 とはいえ、めんどくさいのでインタプリタは先ほど起動したものを使い回すことにしよう。 PySpark の環境は、いくつかのインスタンスがグローバルスコープにある以外は通常の Python と何ら変わりがないので。
Iris データセットを読み込む。
>>> from sklearn import datasets >>> dataset = datasets.load_iris() >>> X, y = dataset.data, dataset.target
ホールドアウト検証用にデータを分割する。
>>> from sklearn.model_selection import train_test_split >>> X_train, X_test, y_train, y_test = train_test_split(X, y, ... shuffle=True, ... random_state=42)
学習データの方をランダムフォレストに学習させる。
>>> clf = RandomForestClassifier(n_estimators=100) >>> clf.fit(X_train, y_train) RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini', max_depth=None, max_features='auto', max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=None, oob_score=False, random_state=None, verbose=0, warm_start=False)
これで学習済みモデルが手に入った。 通常であれば pickle などを使ってディスクに直列化して別の環境に持ち運ぶことになる。 今回はインタプリタが変わらないので、そのまま使うことにする。
データセットを PySpark のデータ表現に変換する
学習済みモデルができたので、次はこれを PySpark で動かしたい。 ただ、その前にデータがないと始まらないので Iris データセットを PySpark に読み込む。 具体的には NumPy の配列を PySpark の DataFrame に変換したい。
まずは PySpark の DataFrame を作るためにスキーマを定義する。 今回は特徴量が全て浮動小数点型だったので楽だけど、異なるときは型を変えていく必要がある。
>>> from pyspark.sql.types import StructType >>> from pyspark.sql.types import StructField >>> from pyspark.sql.types import DoubleType >>> df_schema = StructType([ ... StructField(feature_name, DoubleType(), False) ... for feature_name in dataset.feature_names ... ])
PySpark は Numpy の配列を受け付けてくれないのでプリミティブな型に変換する関数を用意する。 まあ、実際には Numpy の配列をデータセットとして PySpark の環境に持っていくことなんてそうないだろうけど。
>>> def numpy_to_primitive(np_array): ... """numpy の要素があると受け付けないので Python のネイティブな型に直す""" ... for np_row in np_array: ... yield [float(element) for element in np_row] ...
先ほどホールドアウトしておいたテストデータを DataFrame に変換する。
>>> X_test_df = spark.createDataFrame(numpy_to_primitive(X_test), df_schema) >>> X_test_df.show(truncate=False, n=5) +-----------------+----------------+-----------------+----------------+ |sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)| +-----------------+----------------+-----------------+----------------+ |6.1 |2.8 |4.7 |1.2 | |5.7 |3.8 |1.7 |0.3 | |7.7 |2.6 |6.9 |2.3 | |6.0 |2.9 |4.5 |1.5 | |6.8 |2.8 |4.8 |1.4 | +-----------------+----------------+-----------------+----------------+ only showing top 5 rows
これでデータができた。
PySpark で学習済みモデルの推論を分散処理する
データセットとモデルが揃ったので、次は本題となる推論の部分を分散処理する。
まずは学習済みモデルを SparkContext#broadcast()
を使ってエグゼキュータのインタプリタ上で使えるようにする。
これは要するにオブジェクトを SerDe してリモートのホスト上のインタプリタにロードしている。
>>> broadcasted_clf = sc.broadcast(clf) >>> broadcasted_clf <pyspark.broadcast.Broadcast object at 0x7fe46aa4f8d0>
まずはデータを一行ずつ推論させてみることにしよう。
今回であれば row
は Iris データセットの一つの花の特徴量に対応する。
SparkContext#broadcast()
でバラまいたオブジェクトは Broadcast#value
アトリビュートからアクセスできる。
>>> def predict(row): ... """推論を分散処理する""" ... y_pred = broadcasted_clf.value.predict([row]) ... # 返り値が numpy の配列なので単なるリストに直したほうが PySpark では扱いやすい ... return list(y_pred) ...
上記を使って推論してみよう。
DataFrame を RDD に変換した上で、RDD#flatMap()
で分散処理を実行している。
>>> X_test_df.rdd.flatMap(predict).collect() [1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]
どうやら、それっぽい値が得られた。
RDD#flatMap()
にしているのは、返しているのがリストだからで、もし単なる RDD#map()
だと、次のようにリストの入れ子になる。
>>> X_test_df.rdd.map(predict).collect() [[1], [0], [2], [1], [1], [0], [1], [2], [1], [1], [2], [0], [0], [0], [0], [1], [2], [1], [1], [2], [0], [2], [0], [2], [2], [2], [2], [2], [0], [0], [0], [0], [1], [0], [0], [2], [1], [0]]
得られた結果を Accuracy について評価してみよう。
>>> y_pred = X_test_df.rdd.flatMap(predict).collect() >>> from sklearn.metrics import accuracy_score >>> accuracy_score(y_test, y_pred) 1.0
どうやら、ちゃんと推論できたようだ。
続いては複数行をまとめて推論させてみることにしよう。 Apache Spark では、パーティションという単位でデータをまとまりとして扱うことができる。 以下の関数は複数行のデータを受け取れるようにしてある。
>>> def predict_partition(map_of_rows): ... """パーティション単位で推論を分散処理する""" ... list_of_rows = list(map_of_rows) # 複数行のデータが入ったリストに直す ... y_pred = broadcasted_clf.value.predict(list_of_rows) ... return y_pred ...
上記を実行してみよう。
パーティション単位で処理するときは RDD#mapPartitions()
を使う。
>>> X_test_df.rdd.mapPartitions(predict_partition).collect() [1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]
こちらも、Accuracy で評価してみよう。 先ほどの結果と一致しているだろうか。
>>> y_pred = X_test_df.rdd.mapPartitions(predict_partition).collect()
>>> accuracy_score(y_test, y_pred)
1.0
どうやら、大丈夫そうだ。
めでたしめでたし。
参考
入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
- 作者: Tomasz Drabas,Denny Lee,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2017/11/22
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る
スマートPythonプログラミング: Pythonのより良い書き方を学ぶ
- 作者: もみじあめ
- 発売日: 2016/03/12
- メディア: Kindle版
- この商品を含むブログ (1件) を見る