CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Under-sampling + Bagging なモデルを簡単に作れる K-Fold を実装してみた

不均衡データに対する分類問題のアプローチとして、多いクラスのデータを取り除く Under-sampling という手法がある。 さらに、複数の Under-sampling したデータを用いて、複数のモデルを用意する Bagging という手法を組み合わせることがある。 今回は、そんな Under-sampling + Bagging (UnderBagging) なモデルを簡単に作れる KFold を実装してみた。

Under-sampling + Bagging に関する既知の実装としては imbalanced-learnBalancedBaggingClassifier という分類器がある。 ただ、このアプローチだと、学習させる分類器が scikit-learn の API を備えている必要がある。 そこで、異なるアプローチとしてモデルではなくデータを K-Fold するタイミングで Under-sampling + Bagging してみることにした。

使った環境は次の通り。

$ sw_vers            
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4

下準備

まずは今回使うパッケージをインストールしておく。

$ pip install scikit-learn imbalanced-learn lightgbm

データを分割するタイミングで Under-sampling する K-Fold の実装

早速だけど以下にサンプルコードを示す。 具体的には UnderBaggingKFold という名前で scikit-learn の CrossValidation API を実装している。 データを K-Fold 分割するタイミングで imbalanced-learn の RandomUnderSampler を使って Under-sampling している。 なお、このサンプルコードは動作をデモするためのものなので、まだ分類器にデータを渡すことまではしていない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


def main():
    # ダミーの不均衡データを用意する
    X, y = np.arange(1, 21), np.zeros(20, dtype=np.int8)
    # 先頭の 4 要素だけ陽性 (Positive) データに指定する
    y[:4] = 1

    print('y:', y)

    # 乱数シードを指定した 5-Fold
    folds = UnderBaggingKFold(random_states=range(5))

    # データの分割され方を出力する
    for train_indices, test_indices in folds.split(X, y):
        print('train: X={X}, y={y}'.format(X=train_indices, y=y[train_indices]))
        print('test: X={X}, y={y}'.format(X=test_indices, y=y[test_indices]))


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 デモでは、全体が 20 要素ある中で先頭の 4 要素だけ陽性になった不均衡なデータを分割している。 各 Fold で、学習データとテスト用データがどのように分割されるか観察してみよう。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[3 5], y=[1 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3], y=[0 1]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2], y=[0 1]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[3 8], y=[1 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1], y=[0 1]

上記を見ると、全ての目的変数 (y) について陽性と陰性が均等に含まれた均衡データになっていることが分かる。

ちなみに whole_testing というオプションに True を渡すと、サンプリングされなかったデータが全てテスト用データに追加される。 まあ、ようするに陰性のデータが大量に突っ込まれる。

    # サンプリングされなかったデータを全てテスト用に追加する
    folds = UnderBaggingKFold(random_states=range(5),
                              whole_testing=True)

上記についても動作を確認しておこう。 学習用のデータに関しては先ほどと変化がないものの、テスト用のデータは陰性の要素が増えていることが分かる。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[ 3  5  4  6  7  8  9 11 14 15 16 17 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3  4  5  8  9 10 12 13 14 15 16 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2  5  6  7 10 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[ 3  8  4  6  7  9 10 12 13 14 15 16 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1  5  6  8  9 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]

LightGBM で Under-sampling + Bagging してみる

振る舞いの説明ができたので、続いては実際に分類器を学習させてみよう。 とりあえず LightGBM に擬似的に作った不均衡データを与えてみることにする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    """学習にかかる時間を計測するためのコンテキストマネージャ"""
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    # クラス比 0.99 : 0.01 のダミーデータを用意する
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    # メトリックに ROC AUC を用いた二値分類問題として解く
    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 5-Fold で乱数シードに 42 ~ 46 を指定している
    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # 上記で作った UnderBaggingKFold を folds に指定する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('under-bagging auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回使った環境では、学習に約 28 秒かかって ROC AUC では約 0.776 という結果が得られた。

$ python ublgbm.py
...(snip)...
elapsed time: 28.62 sec
under-bagging auc: 0.7760463716175261

ちなみに LightGBM.cv() 関数から学習済みモデルを取り出す方法については次のエントリを参照してほしい。

blog.amedama.jp

比較対象として LightGBM の単なる Bagging も確認する

先ほどの結果を見るだけでは、学習が早いのか遅いのか、性能が良いのか悪いのか判断が難しい。 そのため、比較対象として Under-sampling しない、単なる Bagging の検証もしておく。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 一般的な Stratified KFold
    folds = StratifiedKFold(n_splits=5,
                            shuffle=True,
                            random_state=42)

    with stopwatch():
        # アンダーサンプリングなしで学習する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('base auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 学習にかかった時間は約 38 秒で、ROC AUC は約 0.779 だった。

$ python baselgbm.py
...(snip)...
elapsed time: 38.79 sec
base auc: 0.7791468600807205

RandomForest でも Under-sampling + Bagging してみる

続いては scikit-learn の RandomForest でも Under-sampling + Bagging を試してみよう。 このケースでは cross_validate()cv オプションに UnderBaggingKFold のインスタンスを渡せば良い。 ちなみに、学習済みモデルは return_estimatorTrue にすれば受け取れる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import cross_validate
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    # 分類器としてランダムフォレストを使う
    clf = RandomForestClassifier(n_estimators=100,
                                 n_jobs=-1,
                                 verbose=1,
                                 random_state=42)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # cross_validate() 関数の 'cv' オプションに渡す
        result = cross_validate(clf, X, y,
                                scoring='roc_auc',
                                cv=folds, return_estimator=True)

    mean_score = np.array(result['test_score']).mean()
    print('rf auc:', mean_score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 ただし、この実験では学習時間や性能がどうのというより、複数の異なる API に対応しやすいことを示している。

$ python ubrf.py
...(snip)...
elapsed time: 110.35 sec
rf auc: 0.7572552391326518

注意事項

今回実装した K-Fold は、交差検証でモデルの性能を正しく見積もることよりも、モデルを学習させることに重きを置いている。 特に、少ないラベルを重複して選択するところにその側面が強く現れている。 そのため、モデルの性能を検証するという点では、Nested CV の要領でさらに外側にもう一段交差検証を重ねた方が良いかもしれない。

blog.amedama.jp

参考資料

blog.amedama.jp

upura.hatenablog.com

いじょう。

Python: PySpark でサードパーティ製のライブラリを使って分散処理する

今回は PySpark でサードパーティ製のライブラリを使って分散処理をする方法について。

サンプルとして、次のような状況を試した。

  • Apache Spark + Hadoop YARN で構築した分散処理用のクラスタを用いる
  • サードパーティ製のライブラリとして scikit-learn を想定する
  • scikit-learn の学習済みモデルを、あらかじめローカルで用意しておく
  • Iris データセットと学習済みモデルを使った推論を PySpark で分散処理する

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
$ uname -r
3.10.0-957.21.3.el7.x86_64
$ python3 -V
Python 3.6.8
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_222
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ hadoop version
Hadoop 2.9.2
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 826afbeae31ca687bc2f8471dc841b66ed2c6704
Compiled by ajisaka on 2018-11-13T12:42Z
Compiled with protoc 2.5.0
From source with checksum 3a9939967262218aa556c684d107985
This command was run using /home/vagrant/hadoop-2.9.2/share/hadoop/common/hadoop-common-2.9.2.jar

Conda (Miniconda) で Python の仮想環境を作る

PySpark でサードパーティ製のライブラリを使う場合、いくつか検討すべき点がある。 その中でも、最初につまづくのは「ライブラリをいかにエグゼキュータのホストに配布するか」という点。 なぜなら、事前にエグゼキュータの各ホストにライブラリをインストールして回ることは現実的ではない。 Apache Spark のエグゼキュータのホストは環境によっては数百や数千台に及ぶ可能性もある。 管理が自動化されていることは前提としても、各アプリケーションごとにライブラリを追加する作業が生じるのは望ましくない。 そのため、あらかじめライブラリをインストールした仮想環境またはコンテナなどを実行時に配布する方が良い。

今回は Conda (Miniconda) で事前に作った仮想環境をエグゼキュータに配布する方法を取った。 これは、Conda で作った仮想環境はポータビリティがあるため。 virtualenv で作った仮想環境は、ベースとなった Python の実行環境に依存するためホストをまたいだポータビリティがない。 実験的なオプションとして提供されている --relocatableをつければできそうではあるけど、ドキュメントを読む限りあまり使う気持ちにはならない。

virtualenv.pypa.io

なので、まずは Miniconda をインストールしていく。

$ sudo yum -y install wget bzip2 zip
$ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sudo bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda

インストールした Conda で仮想環境を作る。 以下では環境の名前を conda_env としている。

$ /opt/conda/bin/conda create -n conda_env --copy -yq python=3.7

次のように、仮想環境ができた。

$ source /opt/conda/bin/activate conda_env
$ pip list
Package    Version
---------- ---------
certifi    2019.6.16
pip        19.1.1
setuptools 41.0.1
wheel      0.33.4

仮想環境に、PySpark のアプリケーションが動作するのに必要なサードパーティ製のライブラリをインストールする。 今回であれば scikit-learn を入れる。

$ pip install scikit-learn

インストールできたら、次は仮想環境のある場所に移動して ZIP ファイルに圧縮する。 これはエグゼキュータへの配布を考えてのこと。

$ cd ~/.conda/envs
$ zip -r conda_env.zip conda_env

PySpark で動作確認する

さて、仮想環境の準備ができたところで動作確認に移る。

まずは PySpark のインタプリタを起動しよう。

$ pyspark \
  --master yarn \
  --archives "conda_env.zip#defrost" \
  --conf spark.pyspark.driver.python=conda_env/bin/python \
  --conf spark.pyspark.python=defrost/conda_env/bin/python

上記では、クラスタ管理に Hadoop YARN を使っているので --master yarn で起動している。 そして、今回のポイントとなるのがそれ以降のオプションたち。 まず、--archives オプションはエグゼキュータに配布するファイルを指定している。 ここで、先ほど作った ZIP ファイルを指定することでエグゼキュータに Conda の仮想環境をバラまいている。 また、# 以降はファイルを解凍したときのディレクトリ名になる。 続いて --conf spark.pyspark.driver.python では Spark のドライバで使う Python を指定している。 ここでも先ほど作った仮想環境の Python を指定した。 そして、最大のポイントとなるのが --conf spark.pyspark.python で、ここでエグゼキュータのホスト上に解凍されたディレクトリに含まれる仮想環境の Python を指定している。

インタプリタが起動したら動作を確認していこう。 まず、ドライバで起動したインタプリタは次の通り Python 3.7 になっている。 ちゃんと Conda で作った仮想環境が使われているようだ。

>>> import sys
>>> sys.version_info
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)

しかし、エグゼキュータの方はどうだろうか? こちらも確認する必要がある。 そこで、まずは次のようにダミーの RDD を用意しておく。

>>> rdd = sc.range(2)
>>> rdd.getNumPartitions()
2

そして、次のように Python のバージョンを返す関数を定義しておく。

>>> def python_version(_):
...     """エグゼキュータ上の Python のバージョンを返す"""
...     import sys
...     return str(sys.version_info)
...

上記を先ほど作った RDD に対して実行することで、エグゼキュータ上の Python のバージョンを確認してみよう。

>>> from pprint import pprint
>>> pprint(rdd.map(python_version).collect())
["sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)",
 "sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)"]

上記の通り、ちゃんとエグゼキュータ上でも Python 3.7 が使えている。

とはいえ、まだ油断はできない。 ちゃんと scikit-learn はインポートできるだろうか? これについても確認しておく。

>>> def sklearn_version(_):
...     """エグゼキュータ上で scikit-learn をインポートしてバージョンを返す関数"""
...     import sklearn
...     return sklearn.__version__
...
 >>> pprint(rdd.map(sklearn_version).collect())
['0.21.3', '0.21.3']

どうやら、ちゃんとエグゼキュータ上で scikit-learn が使える状況にあるようだ。

ローカルで学習済みモデルを作る

分散環境で scikit-learn が使えるようになったところで、次にローカルで学習済みモデルを用意する。 とはいえ、めんどくさいのでインタプリタは先ほど起動したものを使い回すことにしよう。 PySpark の環境は、いくつかのインスタンスがグローバルスコープにある以外は通常の Python と何ら変わりがないので。

Iris データセットを読み込む。

>>> from sklearn import datasets
>>> dataset = datasets.load_iris()
>>> X, y = dataset.data, dataset.target

ホールドアウト検証用にデータを分割する。

>>> from sklearn.model_selection import train_test_split
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     shuffle=True,
...                                                     random_state=42)

学習データの方をランダムフォレストに学習させる。

>>> clf = RandomForestClassifier(n_estimators=100)
>>> clf.fit(X_train, y_train)
RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
                       max_depth=None, max_features='auto', max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=100,
                       n_jobs=None, oob_score=False, random_state=None,
                       verbose=0, warm_start=False)

これで学習済みモデルが手に入った。 通常であれば pickle などを使ってディスクに直列化して別の環境に持ち運ぶことになる。 今回はインタプリタが変わらないので、そのまま使うことにする。

データセットを PySpark のデータ表現に変換する

学習済みモデルができたので、次はこれを PySpark で動かしたい。 ただ、その前にデータがないと始まらないので Iris データセットを PySpark に読み込む。 具体的には NumPy の配列を PySpark の DataFrame に変換したい。

まずは PySpark の DataFrame を作るためにスキーマを定義する。 今回は特徴量が全て浮動小数点型だったので楽だけど、異なるときは型を変えていく必要がある。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import DoubleType
>>> df_schema = StructType([
...     StructField(feature_name, DoubleType(), False)
...     for feature_name in dataset.feature_names
... ])

PySpark は Numpy の配列を受け付けてくれないのでプリミティブな型に変換する関数を用意する。 まあ、実際には Numpy の配列をデータセットとして PySpark の環境に持っていくことなんてそうないだろうけど。

>>> def numpy_to_primitive(np_array):
...     """numpy の要素があると受け付けないので Python のネイティブな型に直す"""
...     for np_row in np_array:
...         yield [float(element) for element in np_row]
...

先ほどホールドアウトしておいたテストデータを DataFrame に変換する。

>>> X_test_df = spark.createDataFrame(numpy_to_primitive(X_test), df_schema)
>>> X_test_df.show(truncate=False, n=5)
+-----------------+----------------+-----------------+----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|
+-----------------+----------------+-----------------+----------------+
|6.1              |2.8             |4.7              |1.2             |
|5.7              |3.8             |1.7              |0.3             |
|7.7              |2.6             |6.9              |2.3             |
|6.0              |2.9             |4.5              |1.5             |
|6.8              |2.8             |4.8              |1.4             |
+-----------------+----------------+-----------------+----------------+
only showing top 5 rows

これでデータができた。

PySpark で学習済みモデルの推論を分散処理する

データセットとモデルが揃ったので、次は本題となる推論の部分を分散処理する。

まずは学習済みモデルを SparkContext#broadcast() を使ってエグゼキュータのインタプリタ上で使えるようにする。 これは要するにオブジェクトを SerDe してリモートのホスト上のインタプリタにロードしている。

>>> broadcasted_clf = sc.broadcast(clf)
>>> broadcasted_clf
<pyspark.broadcast.Broadcast object at 0x7fe46aa4f8d0>

まずはデータを一行ずつ推論させてみることにしよう。 今回であれば row は Iris データセットの一つの花の特徴量に対応する。 SparkContext#broadcast() でバラまいたオブジェクトは Broadcast#value アトリビュートからアクセスできる。

>>> def predict(row):
...     """推論を分散処理する"""
...     y_pred = broadcasted_clf.value.predict([row])
...     # 返り値が numpy の配列なので単なるリストに直したほうが PySpark では扱いやすい
...     return list(y_pred)
...

上記を使って推論してみよう。 DataFrame を RDD に変換した上で、RDD#flatMap() で分散処理を実行している。

>>> X_test_df.rdd.flatMap(predict).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

どうやら、それっぽい値が得られた。

RDD#flatMap() にしているのは、返しているのがリストだからで、もし単なる RDD#map() だと、次のようにリストの入れ子になる。

>>> X_test_df.rdd.map(predict).collect()
[[1], [0], [2], [1], [1], [0], [1], [2], [1], [1], [2], [0], [0], [0], [0], [1], [2], [1], [1], [2], [0], [2], [0], [2], [2], [2], [2], [2], [0], [0], [0], [0], [1], [0], [0], [2], [1], [0]]

得られた結果を Accuracy について評価してみよう。

>>> y_pred = X_test_df.rdd.flatMap(predict).collect()
>>> from sklearn.metrics import accuracy_score
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、ちゃんと推論できたようだ。

続いては複数行をまとめて推論させてみることにしよう。 Apache Spark では、パーティションという単位でデータをまとまりとして扱うことができる。 以下の関数は複数行のデータを受け取れるようにしてある。

>>> def predict_partition(map_of_rows):
...     """パーティション単位で推論を分散処理する"""
...     list_of_rows = list(map_of_rows)  # 複数行のデータが入ったリストに直す
...     y_pred = broadcasted_clf.value.predict(list_of_rows)
...     return y_pred
...

上記を実行してみよう。 パーティション単位で処理するときは RDD#mapPartitions() を使う。

>>> X_test_df.rdd.mapPartitions(predict_partition).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

こちらも、Accuracy で評価してみよう。 先ほどの結果と一致しているだろうか。

>>> y_pred = X_test_df.rdd.mapPartitions(predict_partition).collect()
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、大丈夫そうだ。

めでたしめでたし。

参考

blog.cloudera.co.jp

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: PySpark で DataFrame にカラムを追加する

Apache Spark の Python 版インターフェースである PySpark で DataFrame オブジェクトにカラムを追加する方法について。 いくつかやり方があるので見ていく。 ちなみに DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。 そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4
$ pip list | grep -i pyspark
pyspark            2.4.3

下準備

まずは PySpark のインタプリタを起動しておく。 今回に関しては分散処理はしないのでローカルモードで構わない。

$ pyspark

サンプルとしてユーザ情報を模したデータを作ってみる。 まずは、次のように RDD (Resilient Distributed Dataset) を作る。

>>> users_rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
...   ('Daniel', 30),
... ])

上記にスキーマを定義して DataFrame に変換する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> df_schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])
>>> users_df = spark.createDataFrame(users_rdd, df_schema)

上手くいけば、次のようになる。

>>> users_df.show(truncate=False)
+------+---+
|name  |age|
+------+---+
|Alice |20 |
|Bob   |25 |
|Carol |30 |
|Daniel|30 |
+------+---+

今回はここに、年齢を倍にした double_age というカラムを追加してみる。

SparkSQL を使ってカラムを追加する

まずは SparkSQL を使ってカラムを追加してみる。

先ほどの DataFrame を SparkSQL から操作できるように登録しておく。

>>> users_df.registerTempTable('users')

あるいは、以下のようにしても良い。

>>> users_df.createOrReplaceTempView('users')

既存のカラムに加えて年齢を倍にしたカラムを追加するように SQL を用意する。

>>> query = """
... SELECT
...   name,
...   age,
...   age * 2 AS double_age
... FROM users
... """

そして SparkSession#sql() で実行する。

>>> new_users_df = spark.sql(query)

得られた DataFrame を見ると、ちゃんとカラムが新たに追加されている。

>>> new_users_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

DataFrame API を使ってカラムを追加する

DataFrame に生えたメソッドを使ってカラムを追加する方法もある。 見栄えはだいぶ変わるけど、先ほどとやっていることは基本的に変わらない。

>>> new_users_df = users_df.withColumn('double_age', users_df.age * 2)

DataFrame API は、使っていくと「これ SQL 書いてるのと変わらなくね?」ってなってくる。 なので、個人的にはあまり出番がない。

>>> new_users_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

RDD API を使ってカラムを追加する

最後に、Apache Spark の最もプリミティブなデータ表現である RDD の API を使って追加する方法について。 ただし、このやり方は UDF (User Defined Function) を使うので遅いはず。

まずは、次のように RDD を行単位で処理してカラムを追加する関数を用意する。

>>> def double_age(row):
...     """年齢を倍にしたカラムを追加する関数"""
...     return list(row) + [row['age'] * 2]
...

DataFrame の RDD に適用すると、次のようになる。

>>> new_users_rdd = users_df.rdd.map(double_age)
>>> new_users_rdd.collect()
[['Alice', 20, 40], ['Bob', 25, 50], ['Carol', 30, 60], ['Daniel', 30, 60]]

元の DataFrame に戻したいけど、そのままだとカラム名や型の情報がない。

>>> new_users_rdd.toDF().show(truncate=False)
+------+---+---+
|_1    |_2 |_3 |
+------+---+---+
|Alice |20 |40 |
|Bob   |25 |50 |
|Carol |30 |60 |
|Daniel|30 |60 |
+------+---+---+

そこで、元あった DataFrame のスキーマを改変する形で新たな DataFrame のスキーマを定義する。

>>> new_schema_fields = users_df.schema.fields + [StructField('double_age', IntegerType(), False)]
>>> new_schema = StructType(new_schema_fields)

用意したスキーマを使って DataFrame に変換する。

>>> new_user_df = new_users_rdd.toDF(new_schema)

これでカラム名や型の情報がちゃんとした DataFrame になった。

>>> new_user_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

補足

ちなみにカラムを削除したいときは、次のように DataFrame API で DataFrame#drop() を呼び出せば良い。

>>> new_user_df.drop('age').show(truncate=False)
+------+----------+
|name  |double_age|
+------+----------+
|Alice |40        |
|Bob   |50        |
|Carol |60        |
|Daniel|60        |
+------+----------+

いじょう。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: Apache Spark のパーティションは要素が空になるときがある

PySpark とたわむれていて、なんかたまにエラーになるなーと思って原因を調べて分かった話。 最初、パーティションの中身は空になる場合があるとは思っていなかったので、結構おどろいた。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ python -V
Python 3.7.4
$ java -version
openjdk version "12.0.1" 2019-04-16
OpenJDK Runtime Environment (build 12.0.1+12)
OpenJDK 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing)

下準備

下準備として PySpark をインストールしたら REPL を起動しておく。 今回の検証に関しては分散処理をしないローカルモードでも再現できる。

$ pip install pyspark
$ pyspark

サンプルデータを用意する

例えば SparkSession#range() を使ってサンプルの DataFrame オブジェクトを作る。

>>> df = spark.range(10)

中身は bigint 型の連番が格納されている。

>>> df
DataFrame[id: bigint]
>>> df.show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+

今回使う環境ではこの DataFrame は 4 つのパーティションに分けて処理されることが分かる。 パーティションというのは Apache Spark が内部的に RDD (Resilient Distributed Dataset) を処理する際の分割数を指している。 RDD は Apache Spark の最も低レイヤなデータ表現で、DataFrame も最終的には RDD に変換されて処理される。

>>> df.rdd.getNumPartitions()
4

試しにパーティションに入っている要素の数をカウントしてみることにしよう。 次のような関数を用意する。

>>> def size_of_partition(map_of_rows):
...     """パーティションの要素の数を計算する関数"""
...     list_of_rows = list(map_of_rows)
...     size_of_list = len(list_of_rows)
...     return [size_of_list]
...

これを RDD#mapPartitions() 経由で呼び出す。 これでパーティションの中の要素の数をカウントできる。

>>> df.rdd.mapPartitions(size_of_partition).collect()
[2, 3, 2, 3]

各パーティションには 2 ないし 3 の要素が入っているようだ。

意図的にパーティションを空にしてみる

続いては、意図的にパーティションの中身をスカスカにするためにパーティションの分割数を増やしてみよう。 先ほど 4 だった分割数を 20 まで増やしてみる。 パーティションの分割数を増やすには RDD#repartition() が使える。

>>> reparted_rdd = df.rdd.repartition(20)
>>> reparted_rdd.getNumPartitions()
20

この状態でパーティションの要素の数をカウントすると、次のようになった。 要素の数として 0 が登場していることから、パーティションによっては中身が空なことが分かる。

>>> reparted_rdd.mapPartitions(size_of_partition).collect()
[0, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 0, 0, 0, 0, 0]

意外だったのは、要素数を均すリバランスがされていないこと。

ちなみに先ほど使った RDD#repartition()RDD#coalesce() をオプション shuffle=True で呼び出した場合と等価なようだ。 オプション shuffle=True は要素の順序を保持しないことを表している。

>>> df.rdd.coalesce(20, shuffle=True)  # df.rdd.repartition(20) と等価

ちなみに、要素の順序を保持したままパーティションを拡張することはできない。

>>> unshuffled_reparted_rdd = df.rdd.coalesce(20, shuffle=False)
>>> unshuffled_reparted_rdd.getNumPartitions()
4

オプションの shuffleFalse だとパーティションの分割数が増えていないことが分かる。

ようするにパーティションの分割数を増やしたいときは、要素の順序が必ず入れ替わると考えた方が良い。 先ほどパーティションを増やした RDD も、確認すると順番が入れ替わっている。

>>> reparted_rdd.map(lambda x: x).collect()
[Row(id=0), Row(id=1), Row(id=7), Row(id=8), Row(id=9), Row(id=5), Row(id=6), Row(id=2), Row(id=3), Row(id=4)]

RDD のままでもいいけど、ちょっと分かりにくいかもしれないので DataFrame に直すとこんな感じ。

>>> reparted_rdd.map(lambda x: x).toDF(df.schema).show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|7  |
|8  |
|9  |
|5  |
|6  |
|2  |
|3  |
|4  |
+---+

いじょう。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: LightGBM の cv() 関数から取得した学習済みモデルを SerDe する

今回は、前回のエントリを書くきっかけになったネタについて。

blog.amedama.jp

上記は今回扱う LightGBM の cv() 関数から取得した _CVBooster のインスタンスで起きた問題だった。 このインスタンスは、そのままでは pickle で直列化・非直列化 (SerDe) できずエラーになってしまう。

ちなみに LightGBM の cv() 関数から学習済みモデルを取得する件については以下のエントリに書いてある。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G84
$ python -V            
Python 3.7.4

下準備

準備として LightGBM と Scikit-learn をインストールしておく。

$ pip install lightgbm scikit-learn

問題が生じるコード

まずは件の問題が生じるコードから。 以下のサンプルコードでは、取得した _CVBooster のインスタンスを pickle で直列化しようとしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle

import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split


class ModelExtractionCallback(object):
    """lightgbm.cv() 関数からモデルを取り出すコールバック"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        return self._model.best_iteration


def main():
    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    # デモ用にデータセットを分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        test_size=0.2,
                                                        random_state=42)


    # LightGBM のデータセット表現にラップする
    lgb_train = lgb.Dataset(X_train, y_train)

    # モデルを学習する
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    result = lgb.cv(lgb_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=10,
                    folds=skf,
                    seed=42,
                    callbacks=callbacks,
                    verbose_eval=10)

    print('cv logloss:', result['binary_logloss-mean'][-1])

    # モデルを取り出す
    proxy = extraction_cb.boosters_proxy

    # モデルを SerDe する
    serialized_model = pickle.dumps(proxy)
    restored_model = pickle.loads(serialized_model)

    # Deserialize したオブジェクト
    print(restored_model)

    # Hold-out しておいたデータを予測させる
    y_pred_probas = restored_model.predict(X_test)
    y_pred_proba = np.array(y_pred_probas).mean(axis=0)
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    # Accuracy について評価する
    acc = accuracy_score(y_test, y_pred)
    print('test accuracy:', acc)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、次のように例外になる。

$ python lgbcvbserde.py
...
cv logloss: 0.12616399920831986
Traceback (most recent call last):
  File "lgbcvbserde.py", line 99, in <module>
    main()
  File "lgbcvbserde.py", line 84, in main
    restored_model = pickle.loads(serialized_model)
  File "/Users/amedama/.virtualenvs/py37/lib/python3.7/site-packages/lightgbm/engine.py", line 262, in handler_function
    for booster in self.boosters:
TypeError: 'function' object is not iterable

これは、先のエントリに記述した通り以下の条件が重なることで生じている。

  • ラッパーとなる _CVBooster__getattr__() が実装されており __getstate__()__setstate() をトラップする
  • ラップされるオブジェクトに __getstate__()__setstate__() が実装されておりラッパー経由で呼ばれている

問題を修正するコード

問題の修正方法は先のエントリに記述した通り。 ラッパーとして動作するオブジェクト、今回であれば _CVBooster のインスタンスに __getstate__()__setstate__() が必要になる。 ただし、_CVBooster は LightGBM のパッケージなので直接ソースコードを修正することは望ましくない。 そのためモンキーパッチを駆使して解決する。

以下のサンプルコードではクラスに動的にメソッドを追加することで問題を修正している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle

import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split


class ModelExtractionCallback(object):
    """lightgbm.cv() 関数からモデルを取り出すコールバック"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        return self._model.best_iteration


def main():
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        test_size=0.2,
                                                        random_state=42)


    lgb_train = lgb.Dataset(X_train, y_train)

    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    result = lgb.cv(lgb_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=10,
                    folds=skf,
                    seed=42,
                    callbacks=callbacks,
                    verbose_eval=10)

    print('cv logloss:', result['binary_logloss-mean'][-1])

    proxy = extraction_cb.boosters_proxy

    # lightgbm.engine._CVBooster のクラスに
    # __getstate__() と __setstate__() を動的に追加する
    def __getstate__(self):
        return self.__dict__.copy()
    setattr(lgb.engine._CVBooster, '__getstate__', __getstate__)

    def __setstate__(self, state):
        self.__dict__.update(state)
    setattr(lgb.engine._CVBooster, '__setstate__', __setstate__)

    serialized_model = pickle.dumps(proxy)
    restored_model = pickle.loads(serialized_model)

    print(restored_model)

    y_pred_probas = restored_model.predict(X_test)
    y_pred_proba = np.array(y_pred_probas).mean(axis=0)
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    acc = accuracy_score(y_test, y_pred)
    print('test accuracy:', acc)


if __name__ == '__main__':
    main()

上記を実行してみよう。 SerDe の部分は全く修正していないけど、今度は例外にならず実行できている。

$ python lgbcvbserde.py
...
cv logloss: 0.12616399920831986
<lightgbm.engine._CVBooster object at 0x114704090>
test accuracy: 0.9736842105263158

ちなみに、上記のように _CVBooster ごと直列化しようとするから今回のような問題になるのであって、中身の Booster を格納したリストを直列化するという選択肢もある。

Python: __getattr__() のあるオブジェクトを直列化しようとしてハマった話

今回は特殊メソッドの __getattr__() があるオブジェクトを pickle で直列化・非直列化 (SerDe) しようとしたらハマった話について。

まず、特殊メソッドの __getattr__() をクラスに実装してあると、そのインスタンスは未定義のアトリビュートにアクセスが生じたとき呼び出しがトラップされる。 そして、この __getattr__() を実装したクラスのインスタンスを pickle で SerDe しようとしたとき思わぬ挙動となった。 結論から先に述べると __getattr__() を実装してあると __getstate__()__setstate__() の呼び出しまでトラップされてしまう。 これらのメソッドは SerDe の振る舞いをオーバーライドするための特殊メソッドとなっている。 この問題の対策としては __getattr__() のある SerDe が必要なクラスには __getstate__()__setstate__() を実装しておくことが考えられる。

なお、pickle を使ったオブジェクトの SerDe の概要については、以下のエントリを参照のこと。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G84
$ python -V
Python 3.7.4

特殊メソッド __getattr__() がないときの振る舞いについて

まずは __getattr__() を実装していないクラスを直列化・非直列化 (SerDe) してみる。 以下のサンプルコードでは Example というクラスのインスタンスをバイト列にしてから元のオブジェクトに戻している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle


class Example(object):
    """SerDe されるクラス"""

    def __init__(self, message):
        self.message = message

    def greet(self):
        print('Hello, {msg}!'.format(msg=self.message))


def main():
    # Example クラスをインスタンス化する
    o = Example('World')
    # メソッドを呼び出す
    o.greet()

    # オブジェクトをバイト列にシリアライズする
    # このときオブジェクトに __getstate__() があれば呼ばれる
    # このサンプルコードにはないためデフォルトの振る舞いになる
    s = pickle.dumps(o)

    # バイト列からオブジェクトをデシリアライズする
    # このときオブジェクトに __setstate__() があれば呼ばれる
    # このサンプルコードにはないためデフォルトの振る舞いになる
    restored_o = pickle.loads(s)

    # 復元したオブジェクトのメソッドを呼び出す
    restored_o.greet()


if __name__ == '__main__':
    main()

上記を実行した結果が次の通り。 ちゃんとオブジェクトをバイト列にして、また元のオブジェクトに戻せていることがわかる。

$ python serde1.py     
Hello, World!
Hello, World!

特殊メソッド __getattr__() があるときの振る舞いについて

続いては __getattr__() のあるオブジェクトを SerDe してみる。 ただ、先ほどの Example クラスに直接 __getattr__() を追加するのはユースケースとして考えにくいので、ちょっとアレンジを加えてある。 Example クラスはそのままに、そのラッパーとして動作する Wrapper クラスを用意して、そこに __getattr__() メソッドを実装した。 こういったプロキシのようなクラスは、プロキシする先のオブジェクトの呼び出しを中継するために __getattr__() を使うことが多い。 このような状況で Wrapper クラスのインスタンスを SerDe すると上手くいかない、というのが今回の本題となる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle


class Wrapper(object):
    """別のオブジェクトへのラッパーとして動作するクラス (SerDe される)"""

    def __init__(self, wrap_target):
        self.wrap_target = wrap_target

    def __getattr__(self, item):
        """未定義のアトリビュートへのアクセスをトラップする"""
        def _wrapper(*args, **kwargs):
            print('trapped undefined access:', item)
            # ラップするオブジェクトのアトリビュートを取得して呼び出す
            func = getattr(self.wrap_target, item)
            return func(*args, **kwargs)
        return _wrapper


class Example(object):
    """Wrapper 経由で呼び出されるクラス"""

    def __init__(self, message):
        self.message = message

    def greet(self):
        print('Hello, {msg}!'.format(msg=self.message))


def main():
    o = Example('World')

    # Wrapper でオブジェクトをラップする
    w = Wrapper(o)
    # ラッパー経由でメソッドを呼び出す
    w.greet()

    # XXX: __getstate__() が __getattr__() 経由で呼ばれようとする
    s = pickle.dumps(w)

    # XXX: __setstate__() が __getattr__() 経由で呼ばれようとする
    restored_w = pickle.loads(s)

    restored_w.greet()


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、次のように直列化するタイミングでエラーになる。 見ると __getstate__()Example のオブジェクトにない、という内容のようだ。

$ python serde2.py 
trapped undefined access: greet
Hello, World!
trapped undefined access: __getstate__
Traceback (most recent call last):
  File "serde2.py", line 50, in <module>
    main()
  File "serde2.py", line 41, in main
    s = pickle.dumps(w)
  File "serde2.py", line 17, in _wrapper
    func = getattr(self.wrap_target, item)
AttributeError: 'Example' object has no attribute '__getstate__'

Example クラスに __*state__() を実装すれば解決...しない

では、エラーメッセージに習って Example クラスに __getstate__()__setstate__() を実装すれば解決するだろうか? 試してみよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle


class Wrapper(object):
    """別のオブジェクトへのラッパーとして動作するクラス (SerDe される)"""

    def __init__(self, wrap_target):
        self.wrap_target = wrap_target

    def __getattr__(self, item):
        """未定義のアトリビュートへのアクセスをトラップする"""
        def _wrapper(*args, **kwargs):
            print('trapped undefined access:', item)
            func = getattr(self.wrap_target, item)
            return func(*args, **kwargs)
        return _wrapper


class Example(object):
    """Wrapper 経由で呼び出されるクラス"""

    def __init__(self, message):
        self.message = message

    def greet(self):
        print('Hello, {msg}!'.format(msg=self.message))

    def __getstate__(self):
        """__getstate__() を明示的に定義する"""
        return self.__dict__.copy()

    def __setstate__(self, state):
        """__setstate__() を明示的に定義する"""
        self.__dict__.update(state)


def main():
    o = Example('World')

    w = Wrapper(o)
    w.greet()

    # XXX: __getstate__() が __getattr__() 経由で呼ばれようとする
    s = pickle.dumps(w)

    # XXX: __setstate__() が __getattr__() 経由で呼ばれようとする
    restored_w = pickle.loads(s)

    restored_w.greet()


if __name__ == '__main__':
    main()

残念ながら、今度は以下のようなエラーになる。 そもそも SerDe したいのは Wrapper クラスのインスタンスなので Example クラスに実装しても解決できない。

$ python serde3.py 
trapped undefined access: greet
Hello, World!
trapped undefined access: __getstate__
trapped undefined access: __setstate__
Traceback (most recent call last):
  File "serde3.py", line 56, in <module>
    main()
  File "serde3.py", line 50, in main
    restored_w = pickle.loads(s)
  File "serde3.py", line 17, in _wrapper
    func = getattr(self.wrap_target, item)
AttributeError: 'function' object has no attribute '__setstate__'

このときのエラーメッセージがまた分かりにくくて、どうして function オブジェクトでエラーになるんだ、となる。

Wrapper クラスに __*state__() を実装してみる

ということで、今度は Wrapper クラスの方に __getstate__()__setstate__() を実装してみよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle


class Wrapper(object):
    """別のオブジェクトへのラッパーとして動作するクラス (SerDe される)"""

    def __init__(self, wrap_target):
        self.wrap_target = wrap_target

    def __getattr__(self, item):
        """未定義のアトリビュートへのアクセスをトラップする"""
        def _wrapper(*args, **kwargs):
            print('trapped undefined access:', item)
            func = getattr(self.wrap_target, item)
            return func(*args, **kwargs)
        return _wrapper

    def __getstate__(self):
        """__getstate__() を明示的に定義する"""
        return self.__dict__.copy()

    def __setstate__(self, state):
        """__setstate__() を明示的に定義する"""
        self.__dict__.update(state)


class Example(object):
    """Wrapper 経由で呼び出されるクラス"""

    def __init__(self, message):
        self.message = message

    def greet(self):
        print('Hello, {msg}!'.format(msg=self.message))


def main():
    o = Example('World')

    w = Wrapper(o)
    w.greet()

    # __getstate__() が明示的に定義されているため __getattr__() は呼ばれない
    s = pickle.dumps(w)

    # __setstate__() が明示的に定義されているため __getattr__() は呼ばれない
    restored_w = pickle.loads(s)

    restored_w.greet()


if __name__ == '__main__':
    main()

今度は次のようにエラーにならず SerDe できた。 Wrapper クラスに __getstate__()__setstate__() が定義されているため、呼び出しが __getattr__() にトラップされることがない。

$ python serde4.py
trapped undefined access: greet
Hello, World!
trapped undefined access: greet
Hello, World!

サードパーティーのライブラリで問題が発生しているとき

先ほどのようにクラスにメソッドを定義して救えるのは、自分で定義したクラスで問題が発生している場合に限られる。 もし、サードパーティ製のライブラリで同様の問題が生じた場合には、どのような解決策があるだろうか。 幸いなことに Python は既存のクラスにも動的にメソッドを追加できる。

以下のサンプルコードでは SerDe する直前で対象のクラスに __getstate__()__setstate__() を動的に追加している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle


class Wrapper(object):
    """別のオブジェクトへのラッパーとして動作するクラス (SerDe される)"""

    def __init__(self, wrap_target):
        self.wrap_target = wrap_target

    def __getattr__(self, item):
        """未定義のアトリビュートへのアクセスをトラップする"""
        def _wrapper(*args, **kwargs):
            print('trapped undefined access:', item)
            func = getattr(self.wrap_target, item)
            return func(*args, **kwargs)
        return _wrapper


class Example(object):
    """Wrapper 経由で呼び出されるクラス"""

    def __init__(self, message):
        self.message = message

    def greet(self):
        print('Hello, {msg}!'.format(msg=self.message))


def main():
    o = Example('World')

    w = Wrapper(o)
    w.greet()

    # オブジェクトに __getstate__() を動的に追加する
    def __getstate__(self):
        return self.__dict__.copy()
    setattr(Wrapper, '__getstate__', __getstate__)

    # オブジェクトに __setstate__() を動的に追加する
    def __setstate__(self, state):
        self.__dict__.update(state)
    setattr(Wrapper, '__setstate__', __setstate__)

    s = pickle.dumps(w)

    restored_w = pickle.loads(s)

    restored_w.greet()


if __name__ == '__main__':
    main()

上記を実行してみよう。 ちゃんと SerDe できていることがわかる。

$ python serde5.py 
trapped undefined access: greet
Hello, World!
trapped undefined access: greet
Hello, World!

macOS で CH34x のシリアルコンソールを使う

Arduino などで使われていることがある CH34x のチップを macOS から使う方法について。

基本的には以下のリポジトリに詳細が載っている。

github.com

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G84

インストール

もし過去に古いドライバを手動でインストールしたことがあるときは下部に記載したアンインストールを先に実行する。

Homebrew Cask を使ってドライバをインストールする。

$ brew cask reinstall wch-ch34x-usb-serial-driver 

マシンを再起動するか、あるいは以下のコマンドを実行してカーネルモジュールを読み込む。

$ sudo kextload /Library/Extensions/usbserial.kext

これで tty.wchusbserial から始まるデバイスファイルが見えるようになるはず。

$ ls /dev/tty.wchusbserial*
tty.wchusbserial141120

あとは一般的なシリアルデバイスとして screen なり pyserial などから使えば良い。

$ screen /dev/tty.wchusbserial141120 9600

手動で古いドライバを削除する

過去に古いドライバを手動でインストールしたことがあるときは、以下の手順にもとづいてアンインストールする。

まず、カーネルモジュールをアンロードする。

$ sudo kextunload /Library/Extensions/usbserial.kext
$ sudo kextunload /System/Library/Extensions/usb.kext

そして、カーネルモジュールのファイルを削除する。

$ sudo rm -rf /System/Library/Extensions/usb.kext
$ sudo rm -rf /Library/Extensions/usbserial.kext

いじょう。