CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: LightGBM で Under-sampling + Bagging したモデルを Probability Calibration してみる

クラス間の要素数に偏りのある不均衡なデータに対する分類問題のアプローチとして、多いクラスのデータを減らすアンダーサンプリングという手法がある。 データをアンダーサンプリングしてモデルに学習させることで、評価指標が改善したりモデルの学習時間を削減できる。

ただし、アンダーサンプリングしたデータで学習させたモデルから得られる推論結果の確率にはバイアスが生じるらしい。 以下のブログでは、生じたバイアスを補正 (Probability Calibration) する方法が紹介されている。

pompom168.hatenablog.com

今回は、上記の手法をロジスティック回帰以外で試したときの結果が気になったので、やってみることにした。 モデルには LightGBM + Under-sampling + Bagging を選んだ。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4

下準備

まずは下準備として必要なパッケージをインストールしておく。

$ pip install scikit-learn imbalanced-learn matplotlib lightgbm

ロジスティック回帰 + Under-sampling の場合

LightGBM で試す前に、前述したブログを追試してみる。 ちょっと長いけどサンプルコードは以下の通り。 各種評価指標と、グラフとしてはキャリブレーションカーブを出力してみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.calibration import calibration_curve
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from imblearn.under_sampling import RandomUnderSampler
from matplotlib import pyplot as plt


def probability_calibration(y_proba, beta):
    """サンプリングレートを元に確率を補正する"""
    calibrated_proba = y_proba / (y_proba + (1 - y_proba) / beta)
    return calibrated_proba


def main():
    # クラス比が 0.9 : 0.1 の不均衡な二値分類データを作る
    X, y = make_classification(n_samples=100_000,
                               n_features=5,
                               n_classes=2,
                               weights=[0.9, 0.1],
                               random_state=42)

    # 学習用データと検証用データに分割する (Hold-out)
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.2,
                                                        random_state=42)

    # まずは不均衡データのままロジスティック回帰モデルを学習させる
    clf = LogisticRegression(random_state=42, solver='lbfgs')
    clf.fit(X_train, y_train)
    # 検証用データを予測させる
    y_pred_proba_base = clf.predict_proba(X_test)[:, 1]
    y_pred_base = np.where(y_pred_proba_base > 0.5, 1, 0)

    # Under-sampling で均衡データにする
    sampler = RandomUnderSampler(random_state=42)
    X_train_sampled, y_train_sampled = sampler.fit_sample(X_train, y_train)

    # 均衡データでロジスティック回帰モデルを学習させる
    clf = LogisticRegression(random_state=42, solver='lbfgs')
    clf.fit(X_train_sampled, y_train_sampled)
    # 検証用データを予測させる
    y_pred_proba_us = clf.predict_proba(X_test)[:, 1]
    y_pred_us = np.where(y_pred_proba_us > 0.5, 1, 0)

    # サンプリングレートを元に確率を補正する
    y_train_zero_len = np.count_nonzero(y_train_sampled == 0)
    beta = y_train_zero_len / len(y_train)
    y_pred_proba_cb = probability_calibration(y_pred_proba_us,
                                              beta)
    y_pred_cb = np.where(y_pred_proba_cb > 0.5, 1, 0)

    # 各種評価指標を出力する
    print('precision (base): ',
          metrics.precision_score(y_test, y_pred_base))
    print('precision (under-sampling): ',
          metrics.precision_score(y_test, y_pred_us))
    print('precision (calibrated): ',
          metrics.precision_score(y_test, y_pred_cb))
    print('recall (base): ',
          metrics.recall_score(y_test, y_pred_base))
    print('recall (under-sampling): ',
          metrics.recall_score(y_test, y_pred_us))
    print('recall (calibrated): ',
          metrics.recall_score(y_test, y_pred_cb))
    print('F1 (base): ',
          metrics.f1_score(y_test, y_pred_base))
    print('F1 (under-sampling): ',
          metrics.f1_score(y_test, y_pred_us))
    print('F1 (calibrated): ',
          metrics.f1_score(y_test, y_pred_cb))
    print('ROC AUC (base): ',
          metrics.roc_auc_score(y_test, y_pred_base))
    print('ROC AUC (under-sampling): ',
          metrics.roc_auc_score(y_test, y_pred_us))
    print('ROC AUC (calibrated): ',
          metrics.roc_auc_score(y_test, y_pred_cb))

    # 各モデルが予測した内容の統計量
    print('y_test mean', y_test.mean())
    print('y_proba mean (base)', y_pred_proba_base.mean())
    print('y_proba mean (under-sampling)', y_pred_proba_us.mean())
    print('y_proba mean (calibrated)', y_pred_proba_cb.mean())

    # キャリブレーションカーブを計算する
    base_curve = calibration_curve(y_test, y_pred_proba_base,
                                   n_bins=10)
    undersampling_curve = calibration_curve(y_test, y_pred_proba_us,
                                           n_bins=10)
    calibrated_curve = calibration_curve(y_test, y_pred_proba_cb,
                                         n_bins=10)

    # プロットする
    fig, axes = plt.subplots(2, 1, figsize=(8, 7))

    ax1 = axes[0]
    ax1.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated')
    ax1.plot(base_curve[0], base_curve[1],
             label='base')
    ax1.plot(undersampling_curve[0], undersampling_curve[1],
             label='under-sampling')
    ax1.plot(calibrated_curve[0], calibrated_curve[1],
             label='calibrated')

    ax1.grid()
    ax1.set_ylabel('Fraction of positives')
    ax1.set_xlabel('Prediction probability')
    ax1.set_ylim([-0.05, 1.05])
    ax1.legend(loc='lower right')

    ax2 = axes[1]
    ax2.hist(y_pred_proba_base, bins=144, alpha=0.5,
             color='red', label='y_pred_proba (base)')
    ax2.hist(y_pred_proba_us, bins=144, alpha=0.5,
             color='orange', label='y_pred_proba (under-sampling)')
    ax2.hist(y_pred_proba_cb, bins=144, alpha=0.5,
             color='green', label='y_pred_proba (calibrated)')
    ax2.axvline(x=y_test.mean(),
                color='blue', label='y_test mean')
    ax2.axvline(x=y_pred_proba_base.mean(),
                color='red', label='y_proba mean (base)')
    ax2.axvline(x=y_pred_proba_us.mean(),
                color='orange', label='y_proba mean (under-sampling)')
    ax2.axvline(x=y_pred_proba_cb.mean(),
                color='green', label='y_proba mean (calibrated)')

    ax2.set_xlabel('Prediction Probability')
    ax2.set_ylabel('Frequency')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 標準出力の前半は、各種評価指標を以下の条件で計算している。

  • アンダーサンプリングしていないデータを学習させたモデルが出力した確率値 (base)
  • アンダーサンプリングしたデータを学習させたモデルが出力した確率値 (under-sampling)
  • アンダーサンプリングしたデータを学習させたモデルが出力した確率値を補正した確率値 (calibrated)

後半は、それぞれが出力した確率値の平均になる。

precision (base):  0.7960526315789473
precision (under-sampling):  0.4034117104656524
precision (calibrated):  0.8048582995951417
recall (base):  0.5203057811753464
recall (under-sampling):  0.8361204013377926
recall (calibrated):  0.47491638795986624
F1 (base):  0.6292978907830107
F1 (under-sampling):  0.5442388431037164
F1 (calibrated):  0.5973557692307693
ROC AUC (base):  0.7523626409646208
ROC AUC (under-sampling):  0.8457979568536285
ROC AUC (calibrated):  0.7307289819399487
y_test mean 0.10465
y_proba mean (base) 0.10308716245156492
y_proba mean (under-sampling) 0.2733953199075095
y_proba mean (calibrated) 0.09521502104917157

上記を見ると、評価指標によってはアンダーサンプリングが有効に作用している様子が伺える。 また、補正済みの確率値を使った評価指標では、アンダーサンプリングしなかった場合の結果に近づいていることが分かる。 そして、モデルの出力した確率値の平均も興味深い。 真のテスト用データの平均が 0.10465 なのに対して、アンダーサンプリングなしでは 0.10308 と、割と良い線いってる。 それに比べてアンダーサンプリングすると 0.27339 と大きくバイアスがかかっていることが分かる。 そして、補正したものでは 0.0952 と、真の平均に近づけられている。

より詳しくグラフにプロットしたものが以下の通り。 上の図は Q-Q プロットのようなもので、陽性データの真の出現頻度とモデルの予測を比較している。 理想的な状況では斜めに一直線になる。 下の図はモデルが出力した確率値のヒストグラムと、確率の平均値になっている。

f:id:momijiame:20190825010912p:plain

上記を見ると、アンダーサンプリングしたデータを使って学習させたモデルの出力には大きなバイアスが生じていることが分かる。 それに対して補正したデータではバイアスが減少している。

上記の結果より、確率を補正するとアンダーサンプリングしないデータを使って学習させたモデルの出力を再生するような印象を受けた。

LightGBM + Undersampling + Bagging の場合

それでは、先ほどと同じことを LightGBM でアンダーサンプリングしつつ、モデルを 5-Fold で Bagging した場合でも試してみよう。

サンプルコードは次の通り。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
import lightgbm as lgb
from sklearn.calibration import calibration_curve
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn import metrics
from imblearn.under_sampling import RandomUnderSampler
from matplotlib import pyplot as plt


def probability_calibration(y_proba, beta):
    '''サンプリングレートを元に確率を補正する'''
    calibrated_proba = y_proba / (y_proba + (1 - y_proba) / beta)
    return calibrated_proba


class UnderBaggingKFold(BaseCrossValidator):
    '''CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される'''

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        '''
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        '''
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        self.sample_indices_ = []

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        '''データを学習用とテスト用に分割する'''
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータのインデックスを記録しておく
            self.sample_indices_ = sampled_indices

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


class ModelExtractionCallback(object):
    '''lightgbm.cv() から学習済みモデルを取り出すためのコールバックに使うクラス

    NOTE: 非公開クラス '_CVBooster' に依存しているため将来的に動かなく恐れがある
    '''

    def __init__(self):
        self._model = None

    def __call__(self, env):
        # _CVBooster の参照を保持する
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            # コールバックが呼ばれていないときは例外にする
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        # Booster へのプロキシオブジェクトを返す
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        # Booster のリストを返す
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        # Early stop したときの boosting round を返す
        return self._model.best_iteration


def main():
    # 不均衡な二値分類データを作る
    X, y = make_classification(n_samples=100_000,
                               n_features=5,
                               n_classes=2,
                               weights=[0.9, 0.1],
                               random_state=42)

    # 学習用データと検証用データに分割する (Hold-out)
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.2,
                                                        random_state=42)

    # まずは不均衡データのまま学習させる
    lgb_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
    }
    X_tr, X_ev, y_tr, y_ev = train_test_split(X_train, y_train,
                                              test_size=0.2,
                                              random_state=42)
    lgb_train = lgb.Dataset(X_tr, y_tr)
    lgb_eval = lgb.Dataset(X_ev, y_ev, reference=lgb_train)
    booster = lgb.train(lgb_params, lgb_train, valid_sets=lgb_eval,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        verbose_eval=10)
    # 検証用データを予測させる
    y_pred_proba_base = booster.predict(X_test)
    y_pred_base = np.where(y_pred_proba_base > 0.5, 1, 0)

    # 均衡データで学習させる (UnderBagging)
    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))
    lgb_train = lgb.Dataset(X_train, y_train)
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]
    lgb.cv(lgb_params, lgb_train,
           num_boost_round=1000,
           early_stopping_rounds=10,
           folds=folds,
           callbacks=callbacks,
           verbose_eval=10)
    cv_booster = extraction_cb.boosters_proxy
    # 検証用データを予測させる
    y_pred_proba_ub_list = cv_booster.predict(X_test)
    y_pred_proba_ub = np.array(y_pred_proba_ub_list).mean(axis=0)
    y_pred_ub = np.where(y_pred_proba_ub > 0.5, 1, 0)

    # サンプリングレートを元に確率を補正する
    y_sampled = y_train[folds.sample_indices_]
    y_sampled_negative_len = np.count_nonzero(y_sampled == 0)
    beta = y_sampled_negative_len / len(y_train)
    y_pred_proba_cb = probability_calibration(y_pred_proba_ub,
                                              beta)
    y_pred_cb = np.where(y_pred_proba_cb > 0.5, 1, 0)

    # 各種評価指標を出力する
    print('precision (base): ',
          metrics.precision_score(y_test, y_pred_base))
    print('precision (under-bagging): ',
          metrics.precision_score(y_test, y_pred_ub))
    print('precision (calibrated): ',
          metrics.precision_score(y_test, y_pred_cb))
    print('recall (base): ',
          metrics.recall_score(y_test, y_pred_base))
    print('recall (under-bagging): ',
          metrics.recall_score(y_test, y_pred_ub))
    print('recall (calibrated): ',
          metrics.recall_score(y_test, y_pred_cb))
    print('F1 (base): ',
          metrics.f1_score(y_test, y_pred_base))
    print('F1 (under-bagging): ',
          metrics.f1_score(y_test, y_pred_ub))
    print('F1 (calibrated): ',
          metrics.f1_score(y_test, y_pred_cb))
    print('ROC AUC (base): ',
          metrics.roc_auc_score(y_test, y_pred_base))
    print('ROC AUC (under-bagging): ',
          metrics.roc_auc_score(y_test, y_pred_ub))
    print('ROC AUC (calibrated): ',
          metrics.roc_auc_score(y_test, y_pred_cb))

    # 各モデルが予測した内容の統計量
    print('y_test mean', y_test.mean())
    print('y_proba mean (base)', y_pred_proba_base.mean())
    print('y_proba mean (under-sampling)', y_pred_proba_ub.mean())
    print('y_proba mean (calibrated)', y_pred_proba_cb.mean())

    # キャリブレーションカーブを計算する
    base_curve = calibration_curve(y_test, y_pred_proba_base,
                                   n_bins=10)
    underbagging_curve = calibration_curve(y_test, y_pred_proba_ub,
                                           n_bins=10)
    calibrated_curve = calibration_curve(y_test, y_pred_proba_cb,
                                         n_bins=10)

    # プロットする
    fig, axes = plt.subplots(2, 1, figsize=(8, 7))

    ax1 = axes[0]
    ax1.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated')
    ax1.plot(base_curve[0], base_curve[1],
             label='base')
    ax1.plot(underbagging_curve[0], underbagging_curve[1],
             label='under-bagging')
    ax1.plot(calibrated_curve[0], calibrated_curve[1],
             label='calibrated')

    ax1.grid()
    ax1.set_ylabel('Fraction of positives')
    ax1.set_xlabel('Prediction probability')
    ax1.set_ylim([-0.05, 1.05])
    ax1.legend(loc='lower right')

    ax2 = axes[1]
    ax2.hist(y_pred_proba_base, bins=144, alpha=0.5,
             color='red', label='y_pred_proba (base)')
    ax2.hist(y_pred_proba_ub, bins=144, alpha=0.5,
             color='orange', label='y_pred_proba (under-bagging)')
    ax2.hist(y_pred_proba_cb, bins=144, alpha=0.5,
             color='green', label='y_pred_proba (calibrated)')
    ax2.axvline(x=y_test.mean(),
                color='blue', label='y_test mean')
    ax2.axvline(x=y_pred_proba_base.mean(),
                color='red', label='y_proba mean (base)')
    ax2.axvline(x=y_pred_proba_ub.mean(),
                color='orange', label='y_proba mean (under-bagging)')
    ax2.axvline(x=y_pred_proba_cb.mean(),
                color='green', label='y_proba mean (calibrated)')

    ax2.set_xlabel('Prediction Probability')
    ax2.set_ylabel('Frequency')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python lgbm.py
...(snip)...
precision (base):  0.8451915559030493
precision (under-bagging):  0.39481204660364916
precision (calibrated):  0.8634105960264901
recall (base):  0.5164835164835165
recall (under-bagging):  0.8580984233158147
recall (calibrated):  0.4983277591973244
F1 (base):  0.6411625148279954
F1 (under-bagging):  0.5408009635651913
F1 (calibrated):  0.6319297182671918
ROC AUC (base):  0.7527131939931404
ROC AUC (under-bagging):  0.8521798309687914
ROC AUC (calibrated):  0.7445567427248139
y_test mean 0.10465
y_proba mean (base) 0.10305398392930754
y_proba mean (under-sampling) 0.26559621708983305
y_proba mean (calibrated) 0.09724228389477375

各種評価指標と確率の平均値については、先ほどと同じような振る舞いを見せている。

グラフについては次の通り。

f:id:momijiame:20190825012519p:plain

こちらも、ロジスティック回帰と同様にアンダーサンプリングで生じたバイアスが低減できていることが分かる。

いじょう。