CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Under-sampling + Bagging なモデルを簡単に作れる K-Fold を実装してみた

不均衡データに対する分類問題のアプローチとして、多いクラスのデータを取り除く Under-sampling という手法がある。 さらに、複数の Under-sampling したデータを用いて、複数のモデルを用意する Bagging という手法を組み合わせることがある。 今回は、そんな Under-sampling + Bagging (UnderBagging) なモデルを簡単に作れる KFold を実装してみた。

Under-sampling + Bagging に関する既知の実装としては imbalanced-learnBalancedBaggingClassifier という分類器がある。 ただ、このアプローチだと、学習させる分類器が scikit-learn の API を備えている必要がある。 そこで、異なるアプローチとしてモデルではなくデータを K-Fold するタイミングで Under-sampling + Bagging してみることにした。

使った環境は次の通り。

$ sw_vers            
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G87
$ python -V
Python 3.7.4

下準備

まずは今回使うパッケージをインストールしておく。

$ pip install scikit-learn imbalanced-learn lightgbm

データを分割するタイミングで Under-sampling する K-Fold の実装

早速だけど以下にサンプルコードを示す。 具体的には UnderBaggingKFold という名前で scikit-learn の CrossValidation API を実装している。 データを K-Fold 分割するタイミングで imbalanced-learn の RandomUnderSampler を使って Under-sampling している。 なお、このサンプルコードは動作をデモするためのものなので、まだ分類器にデータを渡すことまではしていない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


def main():
    # ダミーの不均衡データを用意する
    X, y = np.arange(1, 21), np.zeros(20, dtype=np.int8)
    # 先頭の 4 要素だけ陽性 (Positive) データに指定する
    y[:4] = 1

    print('y:', y)

    # 乱数シードを指定した 5-Fold
    folds = UnderBaggingKFold(random_states=range(5))

    # データの分割され方を出力する
    for train_indices, test_indices in folds.split(X, y):
        print('train: X={X}, y={y}'.format(X=train_indices, y=y[train_indices]))
        print('test: X={X}, y={y}'.format(X=test_indices, y=y[test_indices]))


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 デモでは、全体が 20 要素ある中で先頭の 4 要素だけ陽性になった不均衡なデータを分割している。 各 Fold で、学習データとテスト用データがどのように分割されるか観察してみよう。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[3 5], y=[1 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3], y=[0 1]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2], y=[0 1]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[3 8], y=[1 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1], y=[0 1]

上記を見ると、全ての目的変数 (y) について陽性と陰性が均等に含まれた均衡データになっていることが分かる。

ちなみに whole_testing というオプションに True を渡すと、サンプリングされなかったデータが全てテスト用データに追加される。 まあ、ようするに陰性のデータが大量に突っ込まれる。

    # サンプリングされなかったデータを全てテスト用に追加する
    folds = UnderBaggingKFold(random_states=range(5),
                              whole_testing=True)

上記についても動作を確認しておこう。 学習用のデータに関しては先ほどと変化がないものの、テスト用のデータは陰性の要素が増えていることが分かる。

$ python ubkfold.py 
y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[13  0  2 12 10  1], y=[0 1 1 0 0 1]
test: X=[ 3  5  4  6  7  8  9 11 14 15 16 17 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 7  0 11  2  6  1], y=[0 1 0 1 0 1]
test: X=[17  3  4  5  8  9 10 12 13 14 15 16 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0]
test: X=[16  2  5  6  7 10 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 0  2 17 11  1  5], y=[1 1 0 0 1 0]
test: X=[ 3  8  4  6  7  9 10 12 13 14 15 16 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0]
train: X=[ 2  3  0  4  7 16], y=[1 1 1 0 0 0]
test: X=[10  1  5  6  8  9 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]

LightGBM で Under-sampling + Bagging してみる

振る舞いの説明ができたので、続いては実際に分類器を学習させてみよう。 とりあえず LightGBM に擬似的に作った不均衡データを与えてみることにする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    """学習にかかる時間を計測するためのコンテキストマネージャ"""
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    # クラス比 0.99 : 0.01 のダミーデータを用意する
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    # メトリックに ROC AUC を用いた二値分類問題として解く
    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 5-Fold で乱数シードに 42 ~ 46 を指定している
    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # 上記で作った UnderBaggingKFold を folds に指定する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('under-bagging auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回使った環境では、学習に約 28 秒かかって ROC AUC では約 0.776 という結果が得られた。

$ python ublgbm.py
...(snip)...
elapsed time: 28.62 sec
under-bagging auc: 0.7760463716175261

ちなみに LightGBM.cv() 関数から学習済みモデルを取り出す方法については次のエントリを参照してほしい。

blog.amedama.jp

比較対象として LightGBM の単なる Bagging も確認する

先ほどの結果を見るだけでは、学習が早いのか遅いのか、性能が良いのか悪いのか判断が難しい。 そのため、比較対象として Under-sampling しない、単なる Bagging の検証もしておく。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    lgbm_params = {
        'objective': 'binary',
        'metric': 'auc',
    }
    lgb_train = lgb.Dataset(X, y)

    # 一般的な Stratified KFold
    folds = StratifiedKFold(n_splits=5,
                            shuffle=True,
                            random_state=42)

    with stopwatch():
        # アンダーサンプリングなしで学習する
        result = lgb.cv(lgbm_params,
                        lgb_train,
                        num_boost_round=1000,
                        early_stopping_rounds=10,
                        seed=42,
                        folds=folds,
                        verbose_eval=10,
                        )
    print('base auc:', result['auc-mean'][-1])


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 学習にかかった時間は約 38 秒で、ROC AUC は約 0.779 だった。

$ python baselgbm.py
...(snip)...
elapsed time: 38.79 sec
base auc: 0.7791468600807205

RandomForest でも Under-sampling + Bagging してみる

続いては scikit-learn の RandomForest でも Under-sampling + Bagging を試してみよう。 このケースでは cross_validate()cv オプションに UnderBaggingKFold のインスタンスを渡せば良い。 ちなみに、学習済みモデルは return_estimatorTrue にすれば受け取れる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from contextlib import contextmanager

import lightgbm as lgb
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import BaseCrossValidator
from sklearn.model_selection import cross_validate
from sklearn.model_selection import train_test_split
from imblearn.under_sampling import RandomUnderSampler


class UnderBaggingKFold(BaseCrossValidator):
    """CV に使うだけで UnderBagging できる KFold 実装

    NOTE: 少ないクラスのデータは各 Fold で重複して選択される"""

    def __init__(self, n_splits=5, shuffle=True, random_states=None,
                 test_size=0.2, whole_testing=False):
        """
        :param n_splits: Fold の分割数
        :param shuffle: 分割時にデータをシャッフルするか
        :param random_states: 各 Fold の乱数シード
        :param test_size: Under-sampling された中でテスト用データとして使う割合
        :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか
        """
        self.n_splits = n_splits
        self.shuffle = shuffle
        self.random_states = random_states
        self.test_size = test_size
        self.whole_testing = whole_testing

        if random_states is not None:
            # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる
            self.n_splits = len(random_states)
        else:
            # 乱数シードが指定されていないときは分割数だけ None で埋めておく
            self.random_states = [None] * self.n_splits

        # 分割数だけ Under-sampling 用のインスタンスを作っておく
        self.samplers_ = [
            RandomUnderSampler(random_state=random_state)
            for random_state in self.random_states
        ]

    def split(self, X, y=None, groups=None):
        """データを学習用とテスト用に分割する"""
        if X.ndim < 2:
            # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う
            X = np.vstack(X)

        for i in range(self.n_splits):
            # データを Under-sampling して均衡データにする
            sampler = self.samplers_[i]
            _, y_sampled = sampler.fit_resample(X, y)
            # 選ばれたデータのインデックスを取り出す
            sampled_indices = sampler.sample_indices_

            # 選ばれたデータを学習用とテスト用に分割する
            split_data = train_test_split(sampled_indices,
                                          shuffle=self.shuffle,
                                          test_size=self.test_size,
                                          stratify=y_sampled,
                                          random_state=self.random_states[i],
                                          )
            train_indices, test_indices = split_data

            if self.whole_testing:
                # Under-sampling で選ばれなかったデータをテスト用に追加する
                mask = np.ones(len(X), dtype=np.bool)
                mask[sampled_indices] = False
                X_indices = np.arange(len(X))
                non_sampled_indices = X_indices[mask]
                test_indices = np.concatenate([test_indices,
                                               non_sampled_indices])

            yield train_indices, test_indices

    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits


@contextmanager
def stopwatch():
    before = time.time()
    yield
    after = time.time()
    print(f'elapsed time: {after - before:.2f} sec')


def main():
    args = {
        'n_samples': 1_000_000,
        'n_features': 80,
        'n_informative': 3,
        'n_redundant': 0,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 1,
        'weights': [0.99, 0.01],
        'random_state': 42,
    }
    X, y = make_classification(**args)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    # 分類器としてランダムフォレストを使う
    clf = RandomForestClassifier(n_estimators=100,
                                 n_jobs=-1,
                                 verbose=1,
                                 random_state=42)

    folds = UnderBaggingKFold(random_states=range(42, 42 + 5))

    with stopwatch():
        # cross_validate() 関数の 'cv' オプションに渡す
        result = cross_validate(clf, X, y,
                                scoring='roc_auc',
                                cv=folds, return_estimator=True)

    mean_score = np.array(result['test_score']).mean()
    print('rf auc:', mean_score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 ただし、この実験では学習時間や性能がどうのというより、複数の異なる API に対応しやすいことを示している。

$ python ubrf.py
...(snip)...
elapsed time: 110.35 sec
rf auc: 0.7572552391326518

注意事項

今回実装した K-Fold は、交差検証でモデルの性能を正しく見積もることよりも、モデルを学習させることに重きを置いている。 特に、少ないラベルを重複して選択するところにその側面が強く現れている。 そのため、モデルの性能を検証するという点では、Nested CV の要領でさらに外側にもう一段交差検証を重ねた方が良いかもしれない。

blog.amedama.jp

参考資料

blog.amedama.jp

upura.hatenablog.com

いじょう。