CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Keras の学習曲線をコールバックで動的にプロットする

Keras でニューラルネットワークの学習が進む様子は一般的にコンソールの出力で確認できる。 しかし、もっと視覚的にリアルタイムで確認したいと考えて、今回はコールバックと Matplotlib を駆使して可視化してみることにした。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G95
$ python -V         
Python 3.7.4
$ pip list | egrep -i "(keras|tensorflow)"
Keras                2.2.5  
Keras-Applications   1.0.8  
Keras-Preprocessing  1.1.0  
tensorflow           1.14.0 
tensorflow-estimator 1.14.0 

下準備

まずは今回使うパッケージをインストールしておく。

$ pip install keras tensorflow matplotlib

学習曲線を動的にプロットする

Keras で学習曲線を動的にプロットするサンプルコードが次の通り。 データセットには MNIST を使って、ニューラルネットワークは単純な MLP (Multi Layer Perceptron) にした。 可視化は LearningVisualizationCallback というクラスで実装している。 このクラスは keras.callbacks.Callback を継承していて、各エポックごとに呼ばれる on_epoch_end() メソッド内で学習曲線をプロットしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from collections import defaultdict

import numpy as np
from keras import callbacks
from keras.datasets import mnist
from keras.layers import Dense
from keras.layers import Dropout
from keras.losses import categorical_crossentropy
from keras.models import Sequential
from keras.utils import to_categorical
from keras import backend as K
import tensorflow as tf
from matplotlib import pyplot as plt
from tensorflow.python.client import device_lib


class LearningVisualizationCallback(callbacks.Callback):
    """学習曲線を可視化するためのコールバッククラス"""

    def __init__(self, higher_better_metrics, fig=None, ax=None):
        self._metric_histories = defaultdict(list)
        self._metric_history_lines = {}
        self._higher_better_metrics = set(higher_better_metrics)
        self._metric_type_higher_better = defaultdict(bool)
        self._best_score_vlines = {}
        self._best_score_texts = {}

        # 描画領域を初期化する
        self._fig = fig
        self._ax = ax
        if self._fig is None and self._ax is None:
            self._fig, self._ax = plt.subplots()
        self._ax.set_title('learning curve')
        self._ax.set_xlabel('epoch')
        self._ax.set_ylabel('score')
        self._fig.canvas.draw()
        self._fig.show()

    def on_epoch_end(self, epoch, logs=None):
        """各エポック毎に呼ばれるコールバック"""

        # 各メトリックのスコアを保存する
        for metric, score in logs.items():
            self._metric_histories[metric].append(score)

            # 初回だけの設定
            if epoch == 0:
                # メトリックの種別を保存する
                for higher_better_metric in self._higher_better_metrics:
                    if higher_better_metric in metric:
                        self._metric_type_higher_better[metric] = True
                        break
                # スコアの履歴を描画するオブジェクトを生成する
                history_line, = self._ax.plot([], [])
                self._metric_history_lines[metric] = history_line
                history_line.set_label(metric)
                if 'val' not in metric:
                    # 学習データのメトリックは検証データに比べると重要度が落ちるので点線
                    history_line.set_linestyle('--')
                else:
                    # ベストスコアの線を描画するオブジェクトを生成する
                    best_vline = self._ax.axvline(0)
                    best_vline.set_color(history_line.get_color())
                    best_vline.set_linestyle(':')
                    self._best_score_vlines[metric] = best_vline
                    # ベストスコアの文字列を描画するオブジェクトを生成する
                    vpos = 'top' if self._metric_type_higher_better[metric] else 'bottom'
                    best_text = self._ax.text(0, 0, '',
                                              va=vpos, ha='right', weight='bold')
                    best_text.set_color(history_line.get_color())
                    self._best_score_texts[metric] = best_text

        # 描画内容を更新する
        for metric, scores in self._metric_histories.items():
            # グラフデータを更新する
            history_line = self._metric_history_lines[metric]
            history_line.set_data(np.arange(len(scores)), scores)
            if 'val' in metric:
                if self._metric_type_higher_better[metric]:
                    best_score_find_func = np.max
                    best_epoch_find_func = np.argmax
                else:
                    best_score_find_func = np.min
                    best_epoch_find_func = np.argmin
                best_score = best_score_find_func(scores)
                # 縦線
                best_epoch = best_epoch_find_func(scores)
                best_vline = self._best_score_vlines[metric]
                best_vline.set_xdata(best_epoch)
                # テキスト
                best_text = self._best_score_texts[metric]
                best_text.set_text('epoch:{}, score:{:.6f}'.format(best_epoch, best_score))
                best_text.set_x(best_epoch)
                best_text.set_y(best_score)

        # グラフの見栄えを調整する
        self._ax.legend()
        self._ax.relim()
        self._ax.autoscale_view()

        # 再描画する
        plt.pause(0.001)

    def show_until_close(self):
        """ウィンドウを閉じるまで表示し続けるためのメソッド"""
        plt.show()


def main():
    # MNIST データセットを読み込む
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # 入力画像の情報
    image_width, image_height = 28, 28
    num_classes = 10

    # Flatten
    x_train = x_train.reshape(x_train.shape[0], (image_height * image_width))
    x_test = x_test.reshape(x_test.shape[0], (image_height * image_width))

    # Min-Max Normalization
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # ラベル情報を One-Hot エンコードする
    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)

    # MLP (Multi Layer Perceptron) のネットワークを組む
    model = Sequential()
    model.add(Dense(512, activation='relu', input_shape=(784,)))
    model.add(Dropout(0.2))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))

    # モデルをコンパイルする
    model.compile(loss=categorical_crossentropy,
                  optimizer='Adam',
                  metrics=['accuracy'])

    # 学習曲線を可視化するコールバックを用意する
    higher_better_metrics = ['acc']
    visualize_cb = LearningVisualizationCallback(higher_better_metrics)
    callbacks = [
        visualize_cb,
    ]

    # モデルを学習する
    model.fit(x_train, y_train,
              batch_size=128,
              epochs=20,
              verbose=1,
              validation_data=(x_test, y_test),
              # コールバックを登録する
              callbacks=callbacks,
              )

    # テストデータで評価する
    score = model.evaluate(x_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    # ウィンドウを閉じるまで表示し続ける
    visualize_cb.show_until_close()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python kerasviz.py
...(snip)...
Test loss: 0.08890371433906939
Test accuracy: 0.9824

すると、次のように最初のエポック以降の学習状況がグラフにプロットされる。

f:id:momijiame:20190904224802g:plain

いじょう。

Python: LightGBM で Under-sampling + Bagging したモデルを Probability Calibration してみる

クラス間の要素数に偏りのある不均衡なデータに対する分類問題のアプローチとして、多いクラスのデータを減らすアンダーサンプリングという手法がある。 データをアンダーサンプリングしてモデルに学習させることで、評価指標が改善したりモデルの学習時間を削減できる。

ただし、アンダーサンプリングしたデータで学習させたモデルから得られる推論結果の確率にはバイアスが生じるらしい。 以下のブログでは、生じたバイアスを補正 (Probability Calibration) する方法が紹介されている。

pompom168.hatenablog.com

今回は、上記の手法をロジスティック回帰以外で試したときの結果が気になったので、やってみることにした。 モデルには LightGBM + Under-sampling + Bagging を選んだ。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4

下準備

まずは下準備として必要なパッケージをインストールしておく。

$ pip install scikit-learn imbalanced-learn matplotlib lightgbm

ロジスティック回帰 + Under-sampling の場合

LightGBM で試す前に、前述したブログを追試してみる。 ちょっと長いけどサンプルコードは以下の通り。 各種評価指標と、グラフとしてはキャリブレーションカーブを出力してみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.calibration import calibration_curve
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from imblearn.under_sampling import RandomUnderSampler
from matplotlib import pyplot as plt


def probability_calibration(y_proba, beta):
    """サンプリングレートを元に確率を補正する"""
    calibrated_proba = y_proba / (y_proba + (1 - y_proba) / beta)
    return calibrated_proba


def main():
    # クラス比が 0.9 : 0.1 の不均衡な二値分類データを作る
    X, y = make_classification(n_samples=100_000,
                               n_features=5,
                               n_classes=2,
                               weights=[0.9, 0.1],
                               random_state=42)

    # 学習用データと検証用データに分割する (Hold-out)
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.2,
                                                        random_state=42)

    # まずは不均衡データのままロジスティック回帰モデルを学習させる
    clf = LogisticRegression(random_state=42, solver='lbfgs')
    clf.fit(X_train, y_train)
    # 検証用データを予測させる
    y_pred_proba_base = clf.predict_proba(X_test)[:, 1]
    y_pred_base = np.where(y_pred_proba_base > 0.5, 1, 0)

    # Under-sampling で均衡データにする
    sampler = RandomUnderSampler(random_state=42)
    X_train_sampled, y_train_sampled = sampler.fit_sample(X_train, y_train)

    # 均衡データでロジスティック回帰モデルを学習させる
    clf = LogisticRegression(random_state=42, solver='lbfgs')
    clf.fit(X_train_sampled, y_train_sampled)
    # 検証用データを予測させる
    y_pred_proba_us = clf.predict_proba(X_test)[:, 1]
    y_pred_us = np.where(y_pred_proba_us > 0.5, 1, 0)

    # サンプリングレートを元に確率を補正する
    y_train_zero_len = np.count_nonzero(y_train_sampled == 0)
    beta = y_train_zero_len / len(y_train)
    y_pred_proba_cb = probability_calibration(y_pred_proba_us,
                                              beta)
    y_pred_cb = np.where(y_pred_proba_cb > 0.5, 1, 0)

    # 各種評価指標を出力する
    print('precision (base): ',
          metrics.precision_score(y_test, y_pred_base))
    print('precision (under-sampling): ',
          metrics.precision_score(y_test, y_pred_us))
    print('precision (calibrated): ',
          metrics.precision_score(y_test, y_pred_cb))
    print('recall (base): ',
          metrics.recall_score(y_test, y_pred_base))
    print('recall (under-sampling): ',
          metrics.recall_score(y_test, y_pred_us))
    print('recall (calibrated): ',
          metrics.recall_score(y_test, y_pred_cb))
    print('F1 (base): ',
          metrics.f1_score(y_test, y_pred_base))
    print('F1 (under-sampling): ',
          metrics.f1_score(y_test, y_pred_us))
    print('F1 (calibrated): ',
          metrics.f1_score(y_test, y_pred_cb))
    print('ROC AUC (base): ',
          metrics.roc_auc_score(y_test, y_pred_base))
    print('ROC AUC (under-sampling): ',
          metrics.roc_auc_score(y_test, y_pred_us))
    print('ROC AUC (calibrated): ',
          metrics.roc_auc_score(y_test, y_pred_cb))

    # 各モデルが予測した内容の統計量
    print('y_test mean', y_test.mean())
    print('y_proba mean (base)', y_pred_proba_base.mean())
    print('y_proba mean (under-sampling)', y_pred_proba_us.mean())
    print('y_proba mean (calibrated)', y_pred_proba_cb.mean())

    # キャリブレーションカーブを計算する
    base_curve = calibration_curve(y_test, y_pred_proba_base,
                                   n_bins=10)
    undersampling_curve = calibration_curve(y_test, y_pred_proba_us,
                                           n_bins=10)
    calibrated_curve = calibration_curve(y_test, y_pred_proba_cb,
                                         n_bins=10)

    # プロットする
    fig, axes = plt.subplots(2, 1, figsize=(8, 7))

    ax1 = axes[0]
    ax1.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated')
    ax1.plot(base_curve[0], base_curve[1],
             label='base')
    ax1.plot(undersampling_curve[0], undersampling_curve[1],
             label='under-sampling')
    ax1.plot(calibrated_curve[0], calibrated_curve[1],
             label='calibrated')

    ax1.grid()
    ax1.set_ylabel('Fraction of positives')
    ax1.set_xlabel('Prediction probability')
    ax1.set_ylim([-0.05, 1.05])
    ax1.legend(loc='lower right')

    ax2 = axes[1]
    ax2.hist(y_pred_proba_base, bins=144, alpha=0.5,
             color='red', label='y_pred_proba (base)')
    ax2.hist(y_pred_proba_us, bins=144, alpha=0.5,
             color='orange', label='y_pred_proba (under-sampling)')
    ax2.hist(y_pred_proba_cb, bins=144, alpha=0.5,
             color='green', label='y_pred_proba (calibrated)')
    ax2.axvline(x=y_test.mean(),
                color='blue', label='y_test mean')
    ax2.axvline(x=y_pred_proba_base.mean(),
                color='red', label='y_proba mean (base)')
    ax2.axvline(x=y_pred_proba_us.mean(),
                color='orange', label='y_proba mean (under-sampling)')
    ax2.axvline(x=y_pred_proba_cb.mean(),
                color='green', label='y_proba mean (calibrated)')

    ax2.set_xlabel('Prediction Probability')
    ax2.set_ylabel('Frequency')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 標準出力の前半は、各種評価指標を以下の条件で計算している。

  • アンダーサンプリングしていないデータを学習させたモデルが出力した確率値 (base)
  • アンダーサンプリングしたデータを学習させたモデルが出力した確率値 (under-sampling)
  • アンダーサンプリングしたデータを学習させたモデルが出力した確率値を補正した確率値 (calibrated)

後半は、それぞれが出力した確率値の平均になる。

precision (base):  0.7960526315789473
precision (under-sampling):  0.4034117104656524
precision (calibrated):  0.8048582995951417
recall (base):  0.5203057811753464
recall (under-sampling):  0.8361204013377926
recall (calibrated):  0.47491638795986624
F1 (base):  0.6292978907830107
F1 (under-sampling):  0.5442388431037164
F1 (calibrated):  0.5973557692307693
ROC AUC (base):  0.7523626409646208
ROC AUC (under-sampling):  0.8457979568536285
ROC AUC (calibrated):  0.7307289819399487
y_test mean 0.10465
y_proba mean (base) 0.10308716245156492
y_proba mean (under-sampling) 0.2733953199075095
y_proba mean (calibrated) 0.09521502104917157

上記を見ると、評価指標によってはアンダーサンプリングが有効に作用している様子が伺える。 また、補正済みの確率値を使った評価指標では、アンダーサンプリングしなかった場合の結果に近づいていることが分かる。 そして、モデルの出力した確率値の平均も興味深い。 真のテスト用データの平均が 0.10465 なのに対して、アンダーサンプリングなしでは 0.10308 と、割と良い線いってる。 それに比べてアンダーサンプリングすると 0.27339 と大きくバイアスがかかっていることが分かる。 そして、補正したものでは 0.0952 と、真の平均に近づけられている。

より詳しくグラフにプロットしたものが以下の通り。 上の図は Q-Q プロットのようなもので、陽性データの真の出現頻度とモデルの予測を比較している。 理想的な状況では斜めに一直線になる。 下の図はモデルが出力した確率値のヒストグラムと、確率の平均値になっている。

f:id:momijiame:20190825010912p:plain

上記を見ると、アンダーサンプリングしたデータを使って学習させたモデルの出力には大きなバイアスが生じていることが分かる。 それに対して補正したデータではバイアスが減少している。

上記の結果より、確率を補正するとアンダーサンプリングしないデータを使って学習させたモデルの出力を再生するような印象を受けた。

LightGBM + Undersampling + Bagging の場合

それでは、先ほどと同じことを LightGBM でアンダーサンプリングしつつ、モデルを 5-Fold で Bagging した場合でも試してみよう。

サンプルコードは次の通り。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
import lightgbm as lgb
from sklearn.calibration import calibration_curve
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn import metrics
from imblearn.under_sampling import RandomUnderSampler
from matplotlib import pyplot as plt


def probability_calibration(y_proba, beta):
    '''サンプリングレートを元に確率を補正する'''
    calibrated_proba = y_proba / (y_proba + (1 - y_proba) / beta)
    return calibrated_proba


class UnderBaggingKFold(BaseCrossValidator):
    '''CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される'''

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        '''
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        '''
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        self.sample_indices_ = []

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        '''データを学習用とテスト用に分割する'''
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータのインデックスを記録しておく
            self.sample_indices_ = sampled_indices

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


class ModelExtractionCallback(object):
    '''lightgbm.cv() から学習済みモデルを取り出すためのコールバックに使うクラス

    NOTE: 非公開クラス '_CVBooster' に依存しているため将来的に動かなく恐れがある
    '''

    def __init__(self):
        self._model = None

    def __call__(self, env):
        # _CVBooster の参照を保持する
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            # コールバックが呼ばれていないときは例外にする
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        # Booster へのプロキシオブジェクトを返す
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        # Booster のリストを返す
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        # Early stop したときの boosting round を返す
        return self._model.best_iteration


def main():
    # 不均衡な二値分類データを作る
    X, y = make_classification(n_samples=100_000,
                               n_features=5,
                               n_classes=2,
                               weights=[0.9, 0.1],
                               random_state=42)

    # 学習用データと検証用データに分割する (Hold-out)
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.2,
                                                        random_state=42)

    # まずは不均衡データのまま学習させる
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    X_tr, X_ev, y_tr, y_ev = train_test_split(X_train, y_train,
                                              test_size=0.2,
                                              random_state=42)
    lgb_train = lgb.Dataset(X_tr, y_tr)
    lgb_eval = lgb.Dataset(X_ev, y_ev, reference=lgb_train)
    booster = lgb.train(lgb_params, lgb_train, valid_sets=lgb_eval,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        verbose_eval=10)
    # 検証用データを予測させる
    y_pred_proba_base = booster.predict(X_test)
    y_pred_base = np.where(y_pred_proba_base > 0.5, 1, 0)

    # 均衡データで学習させる (UnderBagging)
    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))
    lgb_train = lgb.Dataset(X_train, y_train)
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb.cv(lgb_params, lgb_train,
           num_boost_round=1000,
           early_stopping_rounds=10,
           folds=folds,
           callbacks=callbacks,
           verbose_eval=10)
    cv_booster = extraction_cb.boosters_proxy
    # 検証用データを予測させる
    y_pred_proba_ub_list = cv_booster.predict(X_test)
    y_pred_proba_ub = np.array(y_pred_proba_ub_list).mean(axis=0)
    y_pred_ub = np.where(y_pred_proba_ub > 0.5, 1, 0)

    # サンプリングレートを元に確率を補正する
    y_sampled = y_train[folds.sample_indices_]
    y_sampled_negative_len = np.count_nonzero(y_sampled == 0)
    beta = y_sampled_negative_len / len(y_train)
    y_pred_proba_cb = probability_calibration(y_pred_proba_ub,
                                              beta)
    y_pred_cb = np.where(y_pred_proba_cb > 0.5, 1, 0)

    # 各種評価指標を出力する
    print('precision (base): ',
          metrics.precision_score(y_test, y_pred_base))
    print('precision (under-bagging): ',
          metrics.precision_score(y_test, y_pred_ub))
    print('precision (calibrated): ',
          metrics.precision_score(y_test, y_pred_cb))
    print('recall (base): ',
          metrics.recall_score(y_test, y_pred_base))
    print('recall (under-bagging): ',
          metrics.recall_score(y_test, y_pred_ub))
    print('recall (calibrated): ',
          metrics.recall_score(y_test, y_pred_cb))
    print('F1 (base): ',
          metrics.f1_score(y_test, y_pred_base))
    print('F1 (under-bagging): ',
          metrics.f1_score(y_test, y_pred_ub))
    print('F1 (calibrated): ',
          metrics.f1_score(y_test, y_pred_cb))
    print('ROC AUC (base): ',
          metrics.roc_auc_score(y_test, y_pred_base))
    print('ROC AUC (under-bagging): ',
          metrics.roc_auc_score(y_test, y_pred_ub))
    print('ROC AUC (calibrated): ',
          metrics.roc_auc_score(y_test, y_pred_cb))

    # 各モデルが予測した内容の統計量
    print('y_test mean', y_test.mean())
    print('y_proba mean (base)', y_pred_proba_base.mean())
    print('y_proba mean (under-sampling)', y_pred_proba_ub.mean())
    print('y_proba mean (calibrated)', y_pred_proba_cb.mean())

    # キャリブレーションカーブを計算する
    base_curve = calibration_curve(y_test, y_pred_proba_base,
                                   n_bins=10)
    underbagging_curve = calibration_curve(y_test, y_pred_proba_ub,
                                           n_bins=10)
    calibrated_curve = calibration_curve(y_test, y_pred_proba_cb,
                                         n_bins=10)

    # プロットする
    fig, axes = plt.subplots(2, 1, figsize=(8, 7))

    ax1 = axes[0]
    ax1.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated')
    ax1.plot(base_curve[0], base_curve[1],
             label='base')
    ax1.plot(underbagging_curve[0], underbagging_curve[1],
             label='under-bagging')
    ax1.plot(calibrated_curve[0], calibrated_curve[1],
             label='calibrated')

    ax1.grid()
    ax1.set_ylabel('Fraction of positives')
    ax1.set_xlabel('Prediction probability')
    ax1.set_ylim([-0.05, 1.05])
    ax1.legend(loc='lower right')

    ax2 = axes[1]
    ax2.hist(y_pred_proba_base, bins=144, alpha=0.5,
             color='red', label='y_pred_proba (base)')
    ax2.hist(y_pred_proba_ub, bins=144, alpha=0.5,
             color='orange', label='y_pred_proba (under-bagging)')
    ax2.hist(y_pred_proba_cb, bins=144, alpha=0.5,
             color='green', label='y_pred_proba (calibrated)')
    ax2.axvline(x=y_test.mean(),
                color='blue', label='y_test mean')
    ax2.axvline(x=y_pred_proba_base.mean(),
                color='red', label='y_proba mean (base)')
    ax2.axvline(x=y_pred_proba_ub.mean(),
                color='orange', label='y_proba mean (under-bagging)')
    ax2.axvline(x=y_pred_proba_cb.mean(),
                color='green', label='y_proba mean (calibrated)')

    ax2.set_xlabel('Prediction Probability')
    ax2.set_ylabel('Frequency')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python lgbm.py
...(snip)...
precision (base):  0.8451915559030493
precision (under-bagging):  0.39481204660364916
precision (calibrated):  0.8634105960264901
recall (base):  0.5164835164835165
recall (under-bagging):  0.8580984233158147
recall (calibrated):  0.4983277591973244
F1 (base):  0.6411625148279954
F1 (under-bagging):  0.5408009635651913
F1 (calibrated):  0.6319297182671918
ROC AUC (base):  0.7527131939931404
ROC AUC (under-bagging):  0.8521798309687914
ROC AUC (calibrated):  0.7445567427248139
y_test mean 0.10465
y_proba mean (base) 0.10305398392930754
y_proba mean (under-sampling) 0.26559621708983305
y_proba mean (calibrated) 0.09724228389477375

各種評価指標と確率の平均値については、先ほどと同じような振る舞いを見せている。

グラフについては次の通り。

f:id:momijiame:20190825012519p:plain

こちらも、ロジスティック回帰と同様にアンダーサンプリングで生じたバイアスが低減できていることが分かる。

いじょう。

Python: Under-sampling + Bagging なモデルを簡単に作れる K-Fold を実装してみた

不均衡データに対する分類問題のアプローチとして、多いクラスのデータを取り除く Under-sampling という手法がある。 さらに、複数の Under-sampling したデータを用いて、複数のモデルを用意する Bagging という手法を組み合わせることがある。 今回は、そんな Under-sampling + Bagging (UnderBagging) なモデルを簡単に作れる KFold を実装してみた。

Under-sampling + Bagging に関する既知の実装としては imbalanced-learnBalancedBaggingClassifier という分類器がある。 ただ、このアプローチだと、学習させる分類器が scikit-learn の API を備えている必要がある。 そこで、異なるアプローチとしてモデルではなくデータを K-Fold するタイミングで Under-sampling + Bagging してみることにした。

使った環境は次の通り。

$ sw_vers            
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4

下準備

まずは今回使うパッケージをインストールしておく。

$ pip install scikit-learn imbalanced-learn lightgbm

データを分割するタイミングで Under-sampling する K-Fold の実装

早速だけど以下にサンプルコードを示す。 具体的には UnderBaggingKFold という名前で scikit-learn の CrossValidation API を実装している。 データを K-Fold 分割するタイミングで imbalanced-learn の RandomUnderSampler を使って Under-sampling している。 なお、このサンプルコードは動作をデモするためのものなので、まだ分類器にデータを渡すことまではしていない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


def main():
    # ダミーの不均衡データを用意する
    X, y = np.arange(1, 21), np.zeros(20, dtype=np.int8)
    # 先頭の 4 要素だけ陽性 (Positive) データに指定する
    y[:4] = 1

    print('y:', y)

    # 乱数シードを指定した 5-Fold
    folds = UnderBaggingKFold(random_states=range(5))

    # データの分割され方を出力する
    for train_indices, test_indices in folds.split(X, y):
        print('train: X={X}, y={y}'.format(X=train_indices, y=y[train_indices]))
        print('test: X={X}, y={y}'.format(X=test_indices, y=y[test_indices]))


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 デモでは、全体が 20 要素ある中で先頭の 4 要素だけ陽性になった不均衡なデータを分割している。 各 Fold で、学習データとテスト用データがどのように分割されるか観察してみよう。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[3 5], y=[1 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3], y=[0 1]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2], y=[0 1]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[3 8], y=[1 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1], y=[0 1]

上記を見ると、全ての目的変数 (y) について陽性と陰性が均等に含まれた均衡データになっていることが分かる。

ちなみに whole_testing というオプションに True を渡すと、サンプリングされなかったデータが全てテスト用データに追加される。 まあ、ようするに陰性のデータが大量に突っ込まれる。

    # サンプリングされなかったデータを全てテスト用に追加する
    folds = UnderBaggingKFold(random_states=range(5),
                              whole_testing=True)

上記についても動作を確認しておこう。 学習用のデータに関しては先ほどと変化がないものの、テスト用のデータは陰性の要素が増えていることが分かる。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[ 3  5  4  6  7  8  9 11 14 15 16 17 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3  4  5  8  9 10 12 13 14 15 16 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2  5  6  7 10 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[ 3  8  4  6  7  9 10 12 13 14 15 16 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1  5  6  8  9 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]

LightGBM で Under-sampling + Bagging してみる

振る舞いの説明ができたので、続いては実際に分類器を学習させてみよう。 とりあえず LightGBM に擬似的に作った不均衡データを与えてみることにする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    """学習にかかる時間を計測するためのコンテキストマネージャ"""
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    # クラス比 0.99 : 0.01 のダミーデータを用意する
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    # メトリックに ROC AUC を用いた二値分類問題として解く
    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 5-Fold で乱数シードに 42 ~ 46 を指定している
    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # 上記で作った UnderBaggingKFold を folds に指定する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('under-bagging auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回使った環境では、学習に約 28 秒かかって ROC AUC では約 0.776 という結果が得られた。

$ python ublgbm.py
...(snip)...
elapsed time: 28.62 sec
under-bagging auc: 0.7760463716175261

ちなみに LightGBM.cv() 関数から学習済みモデルを取り出す方法については次のエントリを参照してほしい。

blog.amedama.jp

比較対象として LightGBM の単なる Bagging も確認する

先ほどの結果を見るだけでは、学習が早いのか遅いのか、性能が良いのか悪いのか判断が難しい。 そのため、比較対象として Under-sampling しない、単なる Bagging の検証もしておく。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 一般的な Stratified KFold
    folds = StratifiedKFold(n_splits=5,
                            shuffle=True,
                            random_state=42)

    with stopwatch():
        # アンダーサンプリングなしで学習する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('base auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 学習にかかった時間は約 38 秒で、ROC AUC は約 0.779 だった。

$ python baselgbm.py
...(snip)...
elapsed time: 38.79 sec
base auc: 0.7791468600807205

RandomForest でも Under-sampling + Bagging してみる

続いては scikit-learn の RandomForest でも Under-sampling + Bagging を試してみよう。 このケースでは cross_validate()cv オプションに UnderBaggingKFold のインスタンスを渡せば良い。 ちなみに、学習済みモデルは return_estimatorTrue にすれば受け取れる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import cross_validate
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    # 分類器としてランダムフォレストを使う
    clf = RandomForestClassifier(n_estimators=100,
                                 n_jobs=-1,
                                 verbose=1,
                                 random_state=42)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # cross_validate() 関数の 'cv' オプションに渡す
        result = cross_validate(clf, X, y,
                                scoring='roc_auc',
                                cv=folds, return_estimator=True)

    mean_score = np.array(result['test_score']).mean()
    print('rf auc:', mean_score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 ただし、この実験では学習時間や性能がどうのというより、複数の異なる API に対応しやすいことを示している。

$ python ubrf.py
...(snip)...
elapsed time: 110.35 sec
rf auc: 0.7572552391326518

注意事項

今回実装した K-Fold は、交差検証でモデルの性能を正しく見積もることよりも、モデルを学習させることに重きを置いている。 特に、少ないラベルを重複して選択するところにその側面が強く現れている。 そのため、モデルの性能を検証するという点では、Nested CV の要領でさらに外側にもう一段交差検証を重ねた方が良いかもしれない。

blog.amedama.jp

参考資料

blog.amedama.jp

upura.hatenablog.com

いじょう。

Python: PySpark でサードパーティ製のライブラリを使って分散処理する

今回は PySpark でサードパーティ製のライブラリを使って分散処理をする方法について。

サンプルとして、次のような状況を試した。

  • Apache Spark + Hadoop YARN で構築した分散処理用のクラスタを用いる
  • サードパーティ製のライブラリとして scikit-learn を想定する
  • scikit-learn の学習済みモデルを、あらかじめローカルで用意しておく
  • Iris データセットと学習済みモデルを使った推論を PySpark で分散処理する

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
$ uname -r
3.10.0-957.21.3.el7.x86_64
$ python3 -V
Python 3.6.8
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_222
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ hadoop version
Hadoop 2.9.2
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 826afbeae31ca687bc2f8471dc841b66ed2c6704
Compiled by ajisaka on 2018-11-13T12:42Z
Compiled with protoc 2.5.0
From source with checksum 3a9939967262218aa556c684d107985
This command was run using /home/vagrant/hadoop-2.9.2/share/hadoop/common/hadoop-common-2.9.2.jar

Conda (Miniconda) で Python の仮想環境を作る

PySpark でサードパーティ製のライブラリを使う場合、いくつか検討すべき点がある。 その中でも、最初につまづくのは「ライブラリをいかにエグゼキュータのホストに配布するか」という点。 なぜなら、事前にエグゼキュータの各ホストにライブラリをインストールして回ることは現実的ではない。 Apache Spark のエグゼキュータのホストは環境によっては数百や数千台に及ぶ可能性もある。 管理が自動化されていることは前提としても、各アプリケーションごとにライブラリを追加する作業が生じるのは望ましくない。 そのため、あらかじめライブラリをインストールした仮想環境またはコンテナなどを実行時に配布する方が良い。

今回は Conda (Miniconda) で事前に作った仮想環境をエグゼキュータに配布する方法を取った。 これは、Conda で作った仮想環境はポータビリティがあるため。 virtualenv で作った仮想環境は、ベースとなった Python の実行環境に依存するためホストをまたいだポータビリティがない。 実験的なオプションとして提供されている --relocatableをつければできそうではあるけど、ドキュメントを読む限りあまり使う気持ちにはならない。

virtualenv.pypa.io

なので、まずは Miniconda をインストールしていく。

$ sudo yum -y install wget bzip2 zip
$ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sudo bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda

インストールした Conda で仮想環境を作る。 以下では環境の名前を conda_env としている。

$ /opt/conda/bin/conda create -n conda_env --copy -yq python=3.7

次のように、仮想環境ができた。

$ source /opt/conda/bin/activate conda_env
$ pip list
Package    Version
---------- ---------
certifi    2019.6.16
pip        19.1.1
setuptools 41.0.1
wheel      0.33.4

仮想環境に、PySpark のアプリケーションが動作するのに必要なサードパーティ製のライブラリをインストールする。 今回であれば scikit-learn を入れる。

$ pip install scikit-learn

インストールできたら、次は仮想環境のある場所に移動して ZIP ファイルに圧縮する。 これはエグゼキュータへの配布を考えてのこと。

$ cd ~/.conda/envs
$ zip -r conda_env.zip conda_env

PySpark で動作確認する

さて、仮想環境の準備ができたところで動作確認に移る。

まずは PySpark のインタプリタを起動しよう。

$ pyspark \
  --master yarn \
  --archives "conda_env.zip#defrost" \
  --conf spark.pyspark.driver.python=conda_env/bin/python \
  --conf spark.pyspark.python=defrost/conda_env/bin/python

上記では、クラスタ管理に Hadoop YARN を使っているので --master yarn で起動している。 そして、今回のポイントとなるのがそれ以降のオプションたち。 まず、--archives オプションはエグゼキュータに配布するファイルを指定している。 ここで、先ほど作った ZIP ファイルを指定することでエグゼキュータに Conda の仮想環境をバラまいている。 また、# 以降はファイルを解凍したときのディレクトリ名になる。 続いて --conf spark.pyspark.driver.python では Spark のドライバで使う Python を指定している。 ここでも先ほど作った仮想環境の Python を指定した。 そして、最大のポイントとなるのが --conf spark.pyspark.python で、ここでエグゼキュータのホスト上に解凍されたディレクトリに含まれる仮想環境の Python を指定している。

インタプリタが起動したら動作を確認していこう。 まず、ドライバで起動したインタプリタは次の通り Python 3.7 になっている。 ちゃんと Conda で作った仮想環境が使われているようだ。

>>> import sys
>>> sys.version_info
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)

しかし、エグゼキュータの方はどうだろうか? こちらも確認する必要がある。 そこで、まずは次のようにダミーの RDD を用意しておく。

>>> rdd = sc.range(2)
>>> rdd.getNumPartitions()
2

そして、次のように Python のバージョンを返す関数を定義しておく。

>>> def python_version(_):
...     """エグゼキュータ上の Python のバージョンを返す"""
...     import sys
...     return str(sys.version_info)
...

上記を先ほど作った RDD に対して実行することで、エグゼキュータ上の Python のバージョンを確認してみよう。

>>> from pprint import pprint
>>> pprint(rdd.map(python_version).collect())
["sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)",
 "sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)"]

上記の通り、ちゃんとエグゼキュータ上でも Python 3.7 が使えている。

とはいえ、まだ油断はできない。 ちゃんと scikit-learn はインポートできるだろうか? これについても確認しておく。

>>> def sklearn_version(_):
...     """エグゼキュータ上で scikit-learn をインポートしてバージョンを返す関数"""
...     import sklearn
...     return sklearn.__version__
...
 >>> pprint(rdd.map(sklearn_version).collect())
['0.21.3', '0.21.3']

どうやら、ちゃんとエグゼキュータ上で scikit-learn が使える状況にあるようだ。

ローカルで学習済みモデルを作る

分散環境で scikit-learn が使えるようになったところで、次にローカルで学習済みモデルを用意する。 とはいえ、めんどくさいのでインタプリタは先ほど起動したものを使い回すことにしよう。 PySpark の環境は、いくつかのインスタンスがグローバルスコープにある以外は通常の Python と何ら変わりがないので。

Iris データセットを読み込む。

>>> from sklearn import datasets
>>> dataset = datasets.load_iris()
>>> X, y = dataset.data, dataset.target

ホールドアウト検証用にデータを分割する。

>>> from sklearn.model_selection import train_test_split
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     shuffle=True,
...                                                     random_state=42)

学習データの方をランダムフォレストに学習させる。

>>> clf = RandomForestClassifier(n_estimators=100)
>>> clf.fit(X_train, y_train)
RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
                       max_depth=None, max_features='auto', max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=100,
                       n_jobs=None, oob_score=False, random_state=None,
                       verbose=0, warm_start=False)

これで学習済みモデルが手に入った。 通常であれば pickle などを使ってディスクに直列化して別の環境に持ち運ぶことになる。 今回はインタプリタが変わらないので、そのまま使うことにする。

データセットを PySpark のデータ表現に変換する

学習済みモデルができたので、次はこれを PySpark で動かしたい。 ただ、その前にデータがないと始まらないので Iris データセットを PySpark に読み込む。 具体的には NumPy の配列を PySpark の DataFrame に変換したい。

まずは PySpark の DataFrame を作るためにスキーマを定義する。 今回は特徴量が全て浮動小数点型だったので楽だけど、異なるときは型を変えていく必要がある。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import DoubleType
>>> df_schema = StructType([
...     StructField(feature_name, DoubleType(), False)
...     for feature_name in dataset.feature_names
... ])

PySpark は Numpy の配列を受け付けてくれないのでプリミティブな型に変換する関数を用意する。 まあ、実際には Numpy の配列をデータセットとして PySpark の環境に持っていくことなんてそうないだろうけど。

>>> def numpy_to_primitive(np_array):
...     """numpy の要素があると受け付けないので Python のネイティブな型に直す"""
...     for np_row in np_array:
...         yield [float(element) for element in np_row]
...

先ほどホールドアウトしておいたテストデータを DataFrame に変換する。

>>> X_test_df = spark.createDataFrame(numpy_to_primitive(X_test), df_schema)
>>> X_test_df.show(truncate=False, n=5)
+-----------------+----------------+-----------------+----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|
+-----------------+----------------+-----------------+----------------+
|6.1              |2.8             |4.7              |1.2             |
|5.7              |3.8             |1.7              |0.3             |
|7.7              |2.6             |6.9              |2.3             |
|6.0              |2.9             |4.5              |1.5             |
|6.8              |2.8             |4.8              |1.4             |
+-----------------+----------------+-----------------+----------------+
only showing top 5 rows

これでデータができた。

PySpark で学習済みモデルの推論を分散処理する

データセットとモデルが揃ったので、次は本題となる推論の部分を分散処理する。

まずは学習済みモデルを SparkContext#broadcast() を使ってエグゼキュータのインタプリタ上で使えるようにする。 これは要するにオブジェクトを SerDe してリモートのホスト上のインタプリタにロードしている。

>>> broadcasted_clf = sc.broadcast(clf)
>>> broadcasted_clf
<pyspark.broadcast.Broadcast object at 0x7fe46aa4f8d0>

まずはデータを一行ずつ推論させてみることにしよう。 今回であれば row は Iris データセットの一つの花の特徴量に対応する。 SparkContext#broadcast() でバラまいたオブジェクトは Broadcast#value アトリビュートからアクセスできる。

>>> def predict(row):
...     """推論を分散処理する"""
...     y_pred = broadcasted_clf.value.predict([row])
...     # 返り値が numpy の配列なので単なるリストに直したほうが PySpark では扱いやすい
...     return list(y_pred)
...

上記を使って推論してみよう。 DataFrame を RDD に変換した上で、RDD#flatMap() で分散処理を実行している。

>>> X_test_df.rdd.flatMap(predict).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

どうやら、それっぽい値が得られた。

RDD#flatMap() にしているのは、返しているのがリストだからで、もし単なる RDD#map() だと、次のようにリストの入れ子になる。

>>> X_test_df.rdd.map(predict).collect()
[[1], [0], [2], [1], [1], [0], [1], [2], [1], [1], [2], [0], [0], [0], [0], [1], [2], [1], [1], [2], [0], [2], [0], [2], [2], [2], [2], [2], [0], [0], [0], [0], [1], [0], [0], [2], [1], [0]]

得られた結果を Accuracy について評価してみよう。

>>> y_pred = X_test_df.rdd.flatMap(predict).collect()
>>> from sklearn.metrics import accuracy_score
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、ちゃんと推論できたようだ。

続いては複数行をまとめて推論させてみることにしよう。 Apache Spark では、パーティションという単位でデータをまとまりとして扱うことができる。 以下の関数は複数行のデータを受け取れるようにしてある。

>>> def predict_partition(map_of_rows):
...     """パーティション単位で推論を分散処理する"""
...     list_of_rows = list(map_of_rows)  # 複数行のデータが入ったリストに直す
...     y_pred = broadcasted_clf.value.predict(list_of_rows)
...     return y_pred
...

上記を実行してみよう。 パーティション単位で処理するときは RDD#mapPartitions() を使う。

>>> X_test_df.rdd.mapPartitions(predict_partition).collect()
[1, 0, 2, 1, 1, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 1, 0, 0, 2, 1, 0]

こちらも、Accuracy で評価してみよう。 先ほどの結果と一致しているだろうか。

>>> y_pred = X_test_df.rdd.mapPartitions(predict_partition).collect()
>>> accuracy_score(y_test, y_pred)
1.0

どうやら、大丈夫そうだ。

めでたしめでたし。

参考

blog.cloudera.co.jp

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: PySpark で DataFrame にカラムを追加する

Apache Spark の Python 版インターフェースである PySpark で DataFrame オブジェクトにカラムを追加する方法について。 いくつかやり方があるので見ていく。 ちなみに DataFrame や、それを支える内部的な RDD はイミュータブル (不変) なオブジェクトになっている。 そのため、カラムを追加するときは既存のオブジェクトを変更するのではなく、新たなオブジェクトを作ることになる。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4
$ pip list | grep -i pyspark
pyspark            2.4.3

下準備

まずは PySpark のインタプリタを起動しておく。 今回に関しては分散処理はしないのでローカルモードで構わない。

$ pyspark

サンプルとしてユーザ情報を模したデータを作ってみる。 まずは、次のように RDD (Resilient Distributed Dataset) を作る。

>>> users_rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
...   ('Daniel', 30),
... ])

上記にスキーマを定義して DataFrame に変換する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> df_schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])
>>> users_df = spark.createDataFrame(users_rdd, df_schema)

上手くいけば、次のようになる。

>>> users_df.show(truncate=False)
+------+---+
|name  |age|
+------+---+
|Alice |20 |
|Bob   |25 |
|Carol |30 |
|Daniel|30 |
+------+---+

今回はここに、年齢を倍にした double_age というカラムを追加してみる。

SparkSQL を使ってカラムを追加する

まずは SparkSQL を使ってカラムを追加してみる。

先ほどの DataFrame を SparkSQL から操作できるように登録しておく。

>>> users_df.registerTempTable('users')

あるいは、以下のようにしても良い。

>>> users_df.createOrReplaceTempView('users')

既存のカラムに加えて年齢を倍にしたカラムを追加するように SQL を用意する。

>>> query = """
... SELECT
...   name,
...   age,
...   age * 2 AS double_age
... FROM users
... """

そして SparkSession#sql() で実行する。

>>> new_users_df = spark.sql(query)

得られた DataFrame を見ると、ちゃんとカラムが新たに追加されている。

>>> new_users_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

DataFrame API を使ってカラムを追加する

DataFrame に生えたメソッドを使ってカラムを追加する方法もある。 見栄えはだいぶ変わるけど、先ほどとやっていることは基本的に変わらない。

>>> new_users_df = users_df.withColumn('double_age', users_df.age * 2)

DataFrame API は、使っていくと「これ SQL 書いてるのと変わらなくね?」ってなってくる。 なので、個人的にはあまり出番がない。

>>> new_users_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

RDD API を使ってカラムを追加する

最後に、Apache Spark の最もプリミティブなデータ表現である RDD の API を使って追加する方法について。 ただし、このやり方は UDF (User Defined Function) を使うので遅いはず。

まずは、次のように RDD を行単位で処理してカラムを追加する関数を用意する。

>>> def double_age(row):
...     """年齢を倍にしたカラムを追加する関数"""
...     return list(row) + [row['age'] * 2]
...

DataFrame の RDD に適用すると、次のようになる。

>>> new_users_rdd = users_df.rdd.map(double_age)
>>> new_users_rdd.collect()
[['Alice', 20, 40], ['Bob', 25, 50], ['Carol', 30, 60], ['Daniel', 30, 60]]

元の DataFrame に戻したいけど、そのままだとカラム名や型の情報がない。

>>> new_users_rdd.toDF().show(truncate=False)
+------+---+---+
|_1    |_2 |_3 |
+------+---+---+
|Alice |20 |40 |
|Bob   |25 |50 |
|Carol |30 |60 |
|Daniel|30 |60 |
+------+---+---+

そこで、元あった DataFrame のスキーマを改変する形で新たな DataFrame のスキーマを定義する。

>>> new_schema_fields = users_df.schema.fields + [StructField('double_age', IntegerType(), False)]
>>> new_schema = StructType(new_schema_fields)

用意したスキーマを使って DataFrame に変換する。

>>> new_user_df = new_users_rdd.toDF(new_schema)

これでカラム名や型の情報がちゃんとした DataFrame になった。

>>> new_user_df.show(truncate=False)
+------+---+----------+
|name  |age|double_age|
+------+---+----------+
|Alice |20 |40        |
|Bob   |25 |50        |
|Carol |30 |60        |
|Daniel|30 |60        |
+------+---+----------+

補足

ちなみにカラムを削除したいときは、次のように DataFrame API で DataFrame#drop() を呼び出せば良い。

>>> new_user_df.drop('age').show(truncate=False)
+------+----------+
|name  |double_age|
+------+----------+
|Alice |40        |
|Bob   |50        |
|Carol |60        |
|Daniel|60        |
+------+----------+

いじょう。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: Apache Spark のパーティションは要素が空になるときがある

PySpark とたわむれていて、なんかたまにエラーになるなーと思って原因を調べて分かった話。 最初、パーティションの中身は空になる場合があるとは思っていなかったので、結構おどろいた。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_121
Branch
Compiled by user  on 2019-05-01T05:08:38Z
Revision
Url
Type --help for more information.
$ python -V
Python 3.7.4
$ java -version
openjdk version "12.0.1" 2019-04-16
OpenJDK Runtime Environment (build 12.0.1+12)
OpenJDK 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing)

下準備

下準備として PySpark をインストールしたら REPL を起動しておく。 今回の検証に関しては分散処理をしないローカルモードでも再現できる。

$ pip install pyspark
$ pyspark

サンプルデータを用意する

例えば SparkSession#range() を使ってサンプルの DataFrame オブジェクトを作る。

>>> df = spark.range(10)

中身は bigint 型の連番が格納されている。

>>> df
DataFrame[id: bigint]
>>> df.show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+

今回使う環境ではこの DataFrame は 4 つのパーティションに分けて処理されることが分かる。 パーティションというのは Apache Spark が内部的に RDD (Resilient Distributed Dataset) を処理する際の分割数を指している。 RDD は Apache Spark の最も低レイヤなデータ表現で、DataFrame も最終的には RDD に変換されて処理される。

>>> df.rdd.getNumPartitions()
4

試しにパーティションに入っている要素の数をカウントしてみることにしよう。 次のような関数を用意する。

>>> def size_of_partition(map_of_rows):
...     """パーティションの要素の数を計算する関数"""
...     list_of_rows = list(map_of_rows)
...     size_of_list = len(list_of_rows)
...     return [size_of_list]
...

これを RDD#mapPartitions() 経由で呼び出す。 これでパーティションの中の要素の数をカウントできる。

>>> df.rdd.mapPartitions(size_of_partition).collect()
[2, 3, 2, 3]

各パーティションには 2 ないし 3 の要素が入っているようだ。

意図的にパーティションを空にしてみる

続いては、意図的にパーティションの中身をスカスカにするためにパーティションの分割数を増やしてみよう。 先ほど 4 だった分割数を 20 まで増やしてみる。 パーティションの分割数を増やすには RDD#repartition() が使える。

>>> reparted_rdd = df.rdd.repartition(20)
>>> reparted_rdd.getNumPartitions()
20

この状態でパーティションの要素の数をカウントすると、次のようになった。 要素の数として 0 が登場していることから、パーティションによっては中身が空なことが分かる。

>>> reparted_rdd.mapPartitions(size_of_partition).collect()
[0, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 0, 0, 0, 0, 0]

意外だったのは、要素数を均すリバランスがされていないこと。

ちなみに先ほど使った RDD#repartition()RDD#coalesce() をオプション shuffle=True で呼び出した場合と等価なようだ。 オプション shuffle=True は要素の順序を保持しないことを表している。

>>> df.rdd.coalesce(20, shuffle=True)  # df.rdd.repartition(20) と等価

ちなみに、要素の順序を保持したままパーティションを拡張することはできない。

>>> unshuffled_reparted_rdd = df.rdd.coalesce(20, shuffle=False)
>>> unshuffled_reparted_rdd.getNumPartitions()
4

オプションの shuffleFalse だとパーティションの分割数が増えていないことが分かる。

ようするにパーティションの分割数を増やしたいときは、要素の順序が必ず入れ替わると考えた方が良い。 先ほどパーティションを増やした RDD も、確認すると順番が入れ替わっている。

>>> reparted_rdd.map(lambda x: x).collect()
[Row(id=0), Row(id=1), Row(id=7), Row(id=8), Row(id=9), Row(id=5), Row(id=6), Row(id=2), Row(id=3), Row(id=4)]

RDD のままでもいいけど、ちょっと分かりにくいかもしれないので DataFrame に直すとこんな感じ。

>>> reparted_rdd.map(lambda x: x).toDF(df.schema).show(truncate=False)
+---+
|id |
+---+
|0  |
|1  |
|7  |
|8  |
|9  |
|5  |
|6  |
|2  |
|3  |
|4  |
+---+

いじょう。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Python: LightGBM の cv() 関数から取得した学習済みモデルを SerDe する

今回は、前回のエントリを書くきっかけになったネタについて。

blog.amedama.jp

上記は今回扱う LightGBM の cv() 関数から取得した _CVBooster のインスタンスで起きた問題だった。 このインスタンスは、そのままでは pickle で直列化・非直列化 (SerDe) できずエラーになってしまう。

ちなみに LightGBM の cv() 関数から学習済みモデルを取得する件については以下のエントリに書いてある。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G84
$ python -V            
Python 3.7.4

下準備

準備として LightGBM と Scikit-learn をインストールしておく。

$ pip install lightgbm scikit-learn

問題が生じるコード

まずは件の問題が生じるコードから。 以下のサンプルコードでは、取得した _CVBooster のインスタンスを pickle で直列化しようとしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle

import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split


class ModelExtractionCallback(object):
    """lightgbm.cv() 関数からモデルを取り出すコールバック"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        return self._model.best_iteration


def main():
    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    # デモ用にデータセットを分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        test_size=0.2,
                                                        random_state=42)


    # LightGBM のデータセット表現にラップする
    lgb_train = lgb.Dataset(X_train, y_train)

    # モデルを学習する
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    result = lgb.cv(lgb_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=10,
                    folds=skf,
                    seed=42,
                    callbacks=callbacks,
                    verbose_eval=10)

    print('cv logloss:', result['binary_logloss-mean'][-1])

    # モデルを取り出す
    proxy = extraction_cb.boosters_proxy

    # モデルを SerDe する
    serialized_model = pickle.dumps(proxy)
    restored_model = pickle.loads(serialized_model)

    # Deserialize したオブジェクト
    print(restored_model)

    # Hold-out しておいたデータを予測させる
    y_pred_probas = restored_model.predict(X_test)
    y_pred_proba = np.array(y_pred_probas).mean(axis=0)
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    # Accuracy について評価する
    acc = accuracy_score(y_test, y_pred)
    print('test accuracy:', acc)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、次のように例外になる。

$ python lgbcvbserde.py
...
cv logloss: 0.12616399920831986
Traceback (most recent call last):
  File "lgbcvbserde.py", line 99, in <module>
    main()
  File "lgbcvbserde.py", line 84, in main
    restored_model = pickle.loads(serialized_model)
  File "/Users/amedama/.virtualenvs/py37/lib/python3.7/site-packages/lightgbm/engine.py", line 262, in handler_function
    for booster in self.boosters:
TypeError: 'function' object is not iterable

これは、先のエントリに記述した通り以下の条件が重なることで生じている。

  • ラッパーとなる _CVBooster__getattr__() が実装されており __getstate__()__setstate() をトラップする
  • ラップされるオブジェクトに __getstate__()__setstate__() が実装されておりラッパー経由で呼ばれている

問題を修正するコード

問題の修正方法は先のエントリに記述した通り。 ラッパーとして動作するオブジェクト、今回であれば _CVBooster のインスタンスに __getstate__()__setstate__() が必要になる。 ただし、_CVBooster は LightGBM のパッケージなので直接ソースコードを修正することは望ましくない。 そのためモンキーパッチを駆使して解決する。

以下のサンプルコードではクラスに動的にメソッドを追加することで問題を修正している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pickle

import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split


class ModelExtractionCallback(object):
    """lightgbm.cv() 関数からモデルを取り出すコールバック"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        return self._model.best_iteration


def main():
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        test_size=0.2,
                                                        random_state=42)


    lgb_train = lgb.Dataset(X_train, y_train)

    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    result = lgb.cv(lgb_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=10,
                    folds=skf,
                    seed=42,
                    callbacks=callbacks,
                    verbose_eval=10)

    print('cv logloss:', result['binary_logloss-mean'][-1])

    proxy = extraction_cb.boosters_proxy

    # lightgbm.engine._CVBooster のクラスに
    # __getstate__() と __setstate__() を動的に追加する
    def __getstate__(self):
        return self.__dict__.copy()
    setattr(lgb.engine._CVBooster, '__getstate__', __getstate__)

    def __setstate__(self, state):
        self.__dict__.update(state)
    setattr(lgb.engine._CVBooster, '__setstate__', __setstate__)

    serialized_model = pickle.dumps(proxy)
    restored_model = pickle.loads(serialized_model)

    print(restored_model)

    y_pred_probas = restored_model.predict(X_test)
    y_pred_proba = np.array(y_pred_probas).mean(axis=0)
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    acc = accuracy_score(y_test, y_pred)
    print('test accuracy:', acc)


if __name__ == '__main__':
    main()

上記を実行してみよう。 SerDe の部分は全く修正していないけど、今度は例外にならず実行できている。

$ python lgbcvbserde.py
...
cv logloss: 0.12616399920831986
<lightgbm.engine._CVBooster object at 0x114704090>
test accuracy: 0.9736842105263158

ちなみに、上記のように _CVBooster ごと直列化しようとするから今回のような問題になるのであって、中身の Booster を格納したリストを直列化するという選択肢もある。