不均衡データに対する分類問題のアプローチとして、多いクラスのデータを取り除く Under-sampling という手法がある。 さらに、複数の Under-sampling したデータを用いて、複数のモデルを用意する Bagging という手法を組み合わせることがある。 今回は、そんな Under-sampling + Bagging (UnderBagging) なモデルを簡単に作れる KFold を実装してみた。
Under-sampling + Bagging に関する既知の実装としては imbalanced-learn に BalancedBaggingClassifier という分類器がある。 ただ、このアプローチだと、学習させる分類器が scikit-learn の API を備えている必要がある。 そこで、異なるアプローチとしてモデルではなくデータを K-Fold するタイミングで Under-sampling + Bagging してみることにした。
使った環境は次の通り。
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G87 $ python -V Python 3.7.4
下準備
まずは今回使うパッケージをインストールしておく。
$ pip install scikit-learn imbalanced-learn lightgbm
データを分割するタイミングで Under-sampling する K-Fold の実装
早速だけど以下にサンプルコードを示す。
具体的には UnderBaggingKFold
という名前で scikit-learn の CrossValidation API を実装している。
データを K-Fold 分割するタイミングで imbalanced-learn の RandomUnderSampler を使って Under-sampling している。
なお、このサンプルコードは動作をデモするためのものなので、まだ分類器にデータを渡すことまではしていない。
#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np from sklearn.model_selection import BaseCrossValidator from sklearn.model_selection import train_test_split from imblearn.under_sampling import RandomUnderSampler class UnderBaggingKFold(BaseCrossValidator): """CV に使うだけで UnderBagging できる KFold 実装 NOTE: 少ないクラスのデータは各 Fold で重複して選択される""" def __init__(self, n_splits=5, shuffle=True, random_states=None, test_size=0.2, whole_testing=False): """ :param n_splits: Fold の分割数 :param shuffle: 分割時にデータをシャッフルするか :param random_states: 各 Fold の乱数シード :param test_size: Under-sampling された中でテスト用データとして使う割合 :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか """ self.n_splits = n_splits self.shuffle = shuffle self.random_states = random_states self.test_size = test_size self.whole_testing = whole_testing if random_states is not None: # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる self.n_splits = len(random_states) else: # 乱数シードが指定されていないときは分割数だけ None で埋めておく self.random_states = [None] * self.n_splits # 分割数だけ Under-sampling 用のインスタンスを作っておく self.samplers_ = [ RandomUnderSampler(random_state=random_state) for random_state in self.random_states ] def split(self, X, y=None, groups=None): """データを学習用とテスト用に分割する""" if X.ndim < 2: # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う X = np.vstack(X) for i in range(self.n_splits): # データを Under-sampling して均衡データにする sampler = self.samplers_[i] _, y_sampled = sampler.fit_resample(X, y) # 選ばれたデータのインデックスを取り出す sampled_indices = sampler.sample_indices_ # 選ばれたデータを学習用とテスト用に分割する split_data = train_test_split(sampled_indices, shuffle=self.shuffle, test_size=self.test_size, stratify=y_sampled, random_state=self.random_states[i], ) train_indices, test_indices = split_data if self.whole_testing: # Under-sampling で選ばれなかったデータをテスト用に追加する mask = np.ones(len(X), dtype=np.bool) mask[sampled_indices] = False X_indices = np.arange(len(X)) non_sampled_indices = X_indices[mask] test_indices = np.concatenate([test_indices, non_sampled_indices]) yield train_indices, test_indices def get_n_splits(self, X=None, y=None, groups=None): return self.n_splits def main(): # ダミーの不均衡データを用意する X, y = np.arange(1, 21), np.zeros(20, dtype=np.int8) # 先頭の 4 要素だけ陽性 (Positive) データに指定する y[:4] = 1 print('y:', y) # 乱数シードを指定した 5-Fold folds = UnderBaggingKFold(random_states=range(5)) # データの分割され方を出力する for train_indices, test_indices in folds.split(X, y): print('train: X={X}, y={y}'.format(X=train_indices, y=y[train_indices])) print('test: X={X}, y={y}'.format(X=test_indices, y=y[test_indices])) if __name__ == '__main__': main()
上記のサンプルコードを実行してみよう。 デモでは、全体が 20 要素ある中で先頭の 4 要素だけ陽性になった不均衡なデータを分割している。 各 Fold で、学習データとテスト用データがどのように分割されるか観察してみよう。
$ python ubkfold.py y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[13 0 2 12 10 1], y=[0 1 1 0 0 1] test: X=[3 5], y=[1 0] train: X=[ 7 0 11 2 6 1], y=[0 1 0 1 0 1] test: X=[17 3], y=[0 1] train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0] test: X=[16 2], y=[0 1] train: X=[ 0 2 17 11 1 5], y=[1 1 0 0 1 0] test: X=[3 8], y=[1 0] train: X=[ 2 3 0 4 7 16], y=[1 1 1 0 0 0] test: X=[10 1], y=[0 1]
上記を見ると、全ての目的変数 (y) について陽性と陰性が均等に含まれた均衡データになっていることが分かる。
ちなみに whole_testing
というオプションに True
を渡すと、サンプリングされなかったデータが全てテスト用データに追加される。
まあ、ようするに陰性のデータが大量に突っ込まれる。
# サンプリングされなかったデータを全てテスト用に追加する folds = UnderBaggingKFold(random_states=range(5), whole_testing=True)
上記についても動作を確認しておこう。 学習用のデータに関しては先ほどと変化がないものの、テスト用のデータは陰性の要素が増えていることが分かる。
$ python ubkfold.py y: [1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[13 0 2 12 10 1], y=[0 1 1 0 0 1] test: X=[ 3 5 4 6 7 8 9 11 14 15 16 17 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[ 7 0 11 2 6 1], y=[0 1 0 1 0 1] test: X=[17 3 4 5 8 9 10 12 13 14 15 16 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[3 1 0 4 8 9], y=[1 1 1 0 0 0] test: X=[16 2 5 6 7 10 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[ 0 2 17 11 1 5], y=[1 1 0 0 1 0] test: X=[ 3 8 4 6 7 9 10 12 13 14 15 16 18 19], y=[1 0 0 0 0 0 0 0 0 0 0 0 0 0] train: X=[ 2 3 0 4 7 16], y=[1 1 1 0 0 0] test: X=[10 1 5 6 8 9 11 12 13 14 15 17 18 19], y=[0 1 0 0 0 0 0 0 0 0 0 0 0 0]
LightGBM で Under-sampling + Bagging してみる
振る舞いの説明ができたので、続いては実際に分類器を学習させてみよう。 とりあえず LightGBM に擬似的に作った不均衡データを与えてみることにする。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from contextlib import contextmanager import lightgbm as lgb import numpy as np from sklearn.datasets import make_classification from sklearn.model_selection import BaseCrossValidator from sklearn.model_selection import train_test_split from imblearn.under_sampling import RandomUnderSampler class UnderBaggingKFold(BaseCrossValidator): """CV に使うだけで UnderBagging できる KFold 実装 NOTE: 少ないクラスのデータは各 Fold で重複して選択される""" def __init__(self, n_splits=5, shuffle=True, random_states=None, test_size=0.2, whole_testing=False): """ :param n_splits: Fold の分割数 :param shuffle: 分割時にデータをシャッフルするか :param random_states: 各 Fold の乱数シード :param test_size: Under-sampling された中でテスト用データとして使う割合 :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか """ self.n_splits = n_splits self.shuffle = shuffle self.random_states = random_states self.test_size = test_size self.whole_testing = whole_testing if random_states is not None: # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる self.n_splits = len(random_states) else: # 乱数シードが指定されていないときは分割数だけ None で埋めておく self.random_states = [None] * self.n_splits # 分割数だけ Under-sampling 用のインスタンスを作っておく self.samplers_ = [ RandomUnderSampler(random_state=random_state) for random_state in self.random_states ] def split(self, X, y=None, groups=None): """データを学習用とテスト用に分割する""" if X.ndim < 2: # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う X = np.vstack(X) for i in range(self.n_splits): # データを Under-sampling して均衡データにする sampler = self.samplers_[i] _, y_sampled = sampler.fit_resample(X, y) # 選ばれたデータのインデックスを取り出す sampled_indices = sampler.sample_indices_ # 選ばれたデータを学習用とテスト用に分割する split_data = train_test_split(sampled_indices, shuffle=self.shuffle, test_size=self.test_size, stratify=y_sampled, random_state=self.random_states[i], ) train_indices, test_indices = split_data if self.whole_testing: # Under-sampling で選ばれなかったデータをテスト用に追加する mask = np.ones(len(X), dtype=np.bool) mask[sampled_indices] = False X_indices = np.arange(len(X)) non_sampled_indices = X_indices[mask] test_indices = np.concatenate([test_indices, non_sampled_indices]) yield train_indices, test_indices def get_n_splits(self, X=None, y=None, groups=None): return self.n_splits @contextmanager def stopwatch(): """学習にかかる時間を計測するためのコンテキストマネージャ""" before = time.time() yield after = time.time() print(f'elapsed time: {after - before:.2f} sec') def main(): # クラス比 0.99 : 0.01 のダミーデータを用意する args = { 'n_samples': 1_000_000, 'n_features': 80, 'n_informative': 3, 'n_redundant': 0, 'n_repeated': 0, 'n_classes': 2, 'n_clusters_per_class': 1, 'weights': [0.99, 0.01], 'random_state': 42, } X, y = make_classification(**args) # メトリックに ROC AUC を用いた二値分類問題として解く lgbm_params = { 'objective': 'binary', 'metric': 'auc', } lgb_train = lgb.Dataset(X, y) # 5-Fold で乱数シードに 42 ~ 46 を指定している folds = UnderBaggingKFold(random_states=range(42, 42 + 5)) with stopwatch(): # 上記で作った UnderBaggingKFold を folds に指定する result = lgb.cv(lgbm_params, lgb_train, num_boost_round=1000, early_stopping_rounds=10, seed=42, folds=folds, verbose_eval=10, ) print('under-bagging auc:', result['auc-mean'][-1]) if __name__ == '__main__': main()
上記を実行してみよう。
今回使った環境では、学習に約 28 秒かかって ROC AUC では約 0.776
という結果が得られた。
$ python ublgbm.py ...(snip)... elapsed time: 28.62 sec under-bagging auc: 0.7760463716175261
ちなみに LightGBM.cv()
関数から学習済みモデルを取り出す方法については次のエントリを参照してほしい。
比較対象として LightGBM の単なる Bagging も確認する
先ほどの結果を見るだけでは、学習が早いのか遅いのか、性能が良いのか悪いのか判断が難しい。 そのため、比較対象として Under-sampling しない、単なる Bagging の検証もしておく。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from contextlib import contextmanager import lightgbm as lgb from sklearn.datasets import make_classification from sklearn.model_selection import StratifiedKFold @contextmanager def stopwatch(): before = time.time() yield after = time.time() print(f'elapsed time: {after - before:.2f} sec') def main(): args = { 'n_samples': 1_000_000, 'n_features': 80, 'n_informative': 3, 'n_redundant': 0, 'n_repeated': 0, 'n_classes': 2, 'n_clusters_per_class': 1, 'weights': [0.99, 0.01], 'random_state': 42, } X, y = make_classification(**args) lgbm_params = { 'objective': 'binary', 'metric': 'auc', } lgb_train = lgb.Dataset(X, y) # 一般的な Stratified KFold folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) with stopwatch(): # アンダーサンプリングなしで学習する result = lgb.cv(lgbm_params, lgb_train, num_boost_round=1000, early_stopping_rounds=10, seed=42, folds=folds, verbose_eval=10, ) print('base auc:', result['auc-mean'][-1]) if __name__ == '__main__': main()
上記の実行結果は次の通り。
学習にかかった時間は約 38 秒で、ROC AUC は約 0.779
だった。
$ python baselgbm.py ...(snip)... elapsed time: 38.79 sec base auc: 0.7791468600807205
RandomForest でも Under-sampling + Bagging してみる
続いては scikit-learn の RandomForest でも Under-sampling + Bagging を試してみよう。
このケースでは cross_validate()
の cv
オプションに UnderBaggingKFold
のインスタンスを渡せば良い。
ちなみに、学習済みモデルは return_estimator
を True
にすれば受け取れる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from contextlib import contextmanager import lightgbm as lgb import numpy as np from sklearn.datasets import make_classification from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import BaseCrossValidator from sklearn.model_selection import cross_validate from sklearn.model_selection import train_test_split from imblearn.under_sampling import RandomUnderSampler class UnderBaggingKFold(BaseCrossValidator): """CV に使うだけで UnderBagging できる KFold 実装 NOTE: 少ないクラスのデータは各 Fold で重複して選択される""" def __init__(self, n_splits=5, shuffle=True, random_states=None, test_size=0.2, whole_testing=False): """ :param n_splits: Fold の分割数 :param shuffle: 分割時にデータをシャッフルするか :param random_states: 各 Fold の乱数シード :param test_size: Under-sampling された中でテスト用データとして使う割合 :param whole_testing: Under-sampling で選ばれなかった全てのデータをテスト用データに追加するか """ self.n_splits = n_splits self.shuffle = shuffle self.random_states = random_states self.test_size = test_size self.whole_testing = whole_testing if random_states is not None: # 各 Fold の乱数シードが指定されているなら分割数をそれに合わせる self.n_splits = len(random_states) else: # 乱数シードが指定されていないときは分割数だけ None で埋めておく self.random_states = [None] * self.n_splits # 分割数だけ Under-sampling 用のインスタンスを作っておく self.samplers_ = [ RandomUnderSampler(random_state=random_state) for random_state in self.random_states ] def split(self, X, y=None, groups=None): """データを学習用とテスト用に分割する""" if X.ndim < 2: # RandomUnderSampler#fit_resample() は X が 1d-array だと文句を言う X = np.vstack(X) for i in range(self.n_splits): # データを Under-sampling して均衡データにする sampler = self.samplers_[i] _, y_sampled = sampler.fit_resample(X, y) # 選ばれたデータのインデックスを取り出す sampled_indices = sampler.sample_indices_ # 選ばれたデータを学習用とテスト用に分割する split_data = train_test_split(sampled_indices, shuffle=self.shuffle, test_size=self.test_size, stratify=y_sampled, random_state=self.random_states[i], ) train_indices, test_indices = split_data if self.whole_testing: # Under-sampling で選ばれなかったデータをテスト用に追加する mask = np.ones(len(X), dtype=np.bool) mask[sampled_indices] = False X_indices = np.arange(len(X)) non_sampled_indices = X_indices[mask] test_indices = np.concatenate([test_indices, non_sampled_indices]) yield train_indices, test_indices def get_n_splits(self, X=None, y=None, groups=None): return self.n_splits @contextmanager def stopwatch(): before = time.time() yield after = time.time() print(f'elapsed time: {after - before:.2f} sec') def main(): args = { 'n_samples': 1_000_000, 'n_features': 80, 'n_informative': 3, 'n_redundant': 0, 'n_repeated': 0, 'n_classes': 2, 'n_clusters_per_class': 1, 'weights': [0.99, 0.01], 'random_state': 42, } X, y = make_classification(**args) folds = UnderBaggingKFold(random_states=range(42, 42 + 5)) # 分類器としてランダムフォレストを使う clf = RandomForestClassifier(n_estimators=100, n_jobs=-1, verbose=1, random_state=42) folds = UnderBaggingKFold(random_states=range(42, 42 + 5)) with stopwatch(): # cross_validate() 関数の 'cv' オプションに渡す result = cross_validate(clf, X, y, scoring='roc_auc', cv=folds, return_estimator=True) mean_score = np.array(result['test_score']).mean() print('rf auc:', mean_score) if __name__ == '__main__': main()
上記の実行結果は次の通り。 ただし、この実験では学習時間や性能がどうのというより、複数の異なる API に対応しやすいことを示している。
$ python ubrf.py ...(snip)... elapsed time: 110.35 sec rf auc: 0.7572552391326518
注意事項
今回実装した K-Fold は、交差検証でモデルの性能を正しく見積もることよりも、モデルを学習させることに重きを置いている。 特に、少ないラベルを重複して選択するところにその側面が強く現れている。 そのため、モデルの性能を検証するという点では、Nested CV の要領でさらに外側にもう一段交差検証を重ねた方が良いかもしれない。
参考資料
いじょう。

スマートPythonプログラミング: Pythonのより良い書き方を学ぶ
- 作者: もみじあめ
- 発売日: 2016/03/12
- メディア: Kindle版
- この商品を含むブログ (1件) を見る