CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: IsolationForest で教師なし学習の外れ値検知を試す

今回は教師なし学習で外れ値の検知に使える IsolationForest というアルゴリズムを試してみる。 このアルゴリズムの興味深いところは、教師データの中にある程度外れ値が含まれていても構わないという点。 つまり、アノテーションしていないデータをそのまま突っ込むことが許容されている。

IsolationForest のアルゴリズムでは、決定木を使った分類しやすさにもとづいてデータが正常か外れ値かを判断する。 外れ値は正常なデータに比べると数が少なく、特徴が大きく異なると仮定する。 だとすると、外れ値は正常なデータに比べて分類するのに木の深さがより多く必要と考える。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.4
BuildVersion:   18E226
$ python -V            
Python 3.7.3

下準備

まずは scikit-learn と matplotlib をインストールしておく。

$ pip install scikit-learn matplotlib

ダミデータを用意する

IsolationForest を試すのに、以下のようなダミーデータを生成した。 特徴量は二次元で、ひとまず Blob でクラスタが 1 つだけある正常値の周囲に一様分布で外れ値を散らしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn.datasets import make_blobs
from matplotlib import pyplot as plt


def main():
    # 正常値のデータを作る
    NORMAL_DATA_N = 1000
    args = {
        # 正常値のデータ点数
        'n_samples': NORMAL_DATA_N,
        # 特徴量の次元数
        'n_features': 2,
        # クラスタの数
        'centers': 1,
        # データを生成するための乱数
        'random_state': 42,
    }
    X_normal, y_normal = make_blobs(**args)

    # 外れ値のデータを作る
    OUTLIER_DATA_N = 200
    DISTRIBUTION_RANGE = 6
    X_outlier = np.random.uniform(low=-DISTRIBUTION_RANGE,
                                  high=DISTRIBUTION_RANGE,
                                  size=(OUTLIER_DATA_N, 2))
    y_outlier = np.ones((OUTLIER_DATA_N, ))

    # 正常値を中心に外れ値を分布させる
    X_outlier += X_normal.mean(axis=0)

    # くっつけて一つのデータセットにする
    X = np.concatenate([X_normal, X_outlier], axis=0)
    y = np.concatenate([y_normal, y_outlier], axis=0)

    plt.scatter(X[y == 0, 0],
                X[y == 0, 1],
                label='negative')
    plt.scatter(X[y == 1, 0],
                X[y == 1, 1],
                label='positive')
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python blob.py

すると、以下のような散布図が得られる。

f:id:momijiame:20190420122548p:plain

正常値の領域にも外れ値が含まれてしまっているけど、まあ完全に分離可能なデータなんてそうそうないので。

IsolationForest を適用してみる

それでは、先ほどのデータに IsolationForest を適用してみよう。

以下のサンプルコードでは、ホールドアウト検証でどのように分類されるか可視化すると共に評価指標を計算している。 IsolationForest のインスタンスで fit() メソッドを呼ぶときに目的変数 (y_train) に関する情報を渡していないのがポイント。 また、渡している説明変数 (X_train) の中には約 20% の外れ値が含まれている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from scipy import stats
from sklearn.datasets import make_blobs
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import matthews_corrcoef
from matplotlib import pyplot as plt


def main():
    # 正常値のデータを作る
    NORMAL_DATA_N = 1000
    args = {
        # 正常値のデータ点数
        'n_samples': NORMAL_DATA_N,
        # 特徴量の次元数
        'n_features': 2,
        # クラスタの数
        'centers': 1,
        # データを生成するための乱数
        'random_state': 42,
    }
    X_normal, y_normal = make_blobs(**args)

    # 外れ値のデータを作る
    OUTLIER_DATA_N = 200
    DIST_RANGE = 6
    X_outlier = np.random.uniform(low=-DIST_RANGE,
                                  high=DIST_RANGE,
                                  size=(OUTLIER_DATA_N, 2))
    y_outlier = np.ones((OUTLIER_DATA_N, ))

    # 正常値を中心に外れ値を分布させる
    X_outlier += X_normal.mean(axis=0)

    # くっつけて一つのデータセットにする
    X = np.concatenate([X_normal, X_outlier], axis=0)
    y = np.concatenate([y_normal, y_outlier], axis=0)

    # ホールドアウト検証するためにデータを分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.3,
                                                        shuffle=True,
                                                        random_state=42,
                                                        stratify=y)

    # IsolationForest
    clf = IsolationForest(n_estimators=100,
                          contamination='auto',
                          behaviour='new',
                          random_state=42)

    # 学習用データを学習させる
    clf.fit(X_train)

    # 検証用データを分類する
    y_pred = clf.predict(X_test)

    # IsolationForest は 正常=1 異常=-1 として結果を返す
    y_pred[y_pred == 1] = 0
    y_pred[y_pred == -1] = 1

    # 評価指標を計算する
    print('precision:', precision_score(y_test, y_pred))
    print('recall:', recall_score(y_test, y_pred))
    print('f1:', f1_score(y_test, y_pred))
    print('mcc:', matthews_corrcoef(y_test, y_pred))

    xx, yy = np.meshgrid(np.linspace(X_normal[:, 0].mean() - DIST_RANGE * 1.2,
                                     X_normal[:, 0].mean() + DIST_RANGE * 1.2,
                                     100),
                         np.linspace(X_normal[:, 1].mean() - DIST_RANGE * 1.2,
                                     X_normal[:, 1].mean() + DIST_RANGE * 1.2,
                                     100))
    Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)
    threshold = stats.scoreatpercentile(y_pred,
                                        100 * (OUTLIER_DATA_N / NORMAL_DATA_N))
    plt.contour(xx, yy, Z, levels=[threshold],
                    linewidths=2, colors='red')

    plt.scatter(X_train[y_train == 0, 0],
                X_train[y_train == 0, 1],
                label='train negative',
                c='b')
    plt.scatter(X_train[y_train == 1, 0],
                X_train[y_train == 1, 1],
                label='train positive',
                c='r')

    plt.scatter(X_test[y_pred == 0, 0],
                X_test[y_pred == 0, 1],
                label='test negative (prediction)',
                c='g')
    plt.scatter(X_test[y_pred == 1, 0],
                X_test[y_pred == 1, 1],
                label='test positive (prediction)',
                c='y')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python isoforest.py 
precision: 0.9215686274509803
recall: 0.7833333333333333
f1: 0.8468468468468469
mcc: 0.8229295359450786

同時に、以下のグラフが得られる。 正常値のクラスタの周囲に識別境界ができて、外れ値検出ができていることが分かる。

f:id:momijiame:20190420122902p:plain

正常値のクラスタを増やしてみる

正常値のクラスタが一つのときは、まあなんとなく上手くいきそうなことが分かった。 続いては二つに増やしてみることにしよう。

以下のサンプルコードでは、先ほどの内容を少し修正してダミーデータのクラスタを二つに増やしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from scipy import stats
from sklearn.datasets import make_blobs
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import matthews_corrcoef
from matplotlib import pyplot as plt


def main():
    NORMAL_DATA_N = 1000
    args = {
        'n_samples': NORMAL_DATA_N,
        'n_features': 2,
        # クラスタの数を増やしてみる
        'centers': 2,
        'random_state': 42,
    }
    X_normal, _ = make_blobs(**args)
    y_normal = np.zeros((NORMAL_DATA_N, ))

    OUTLIER_DATA_N = 200
    DIST_RANGE = 6
    X_outlier = np.random.uniform(low=-DIST_RANGE,
                                  high=DIST_RANGE,
                                  size=(OUTLIER_DATA_N, 2))
    y_outlier = np.ones((OUTLIER_DATA_N, ))

    X_outlier += X_normal.mean(axis=0)

    X = np.concatenate([X_normal, X_outlier], axis=0)
    y = np.concatenate([y_normal, y_outlier], axis=0)

    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=0.3,
                                                        shuffle=True,
                                                        random_state=42,
                                                        stratify=y)

    clf = IsolationForest(n_estimators=100,
                          contamination='auto',
                          behaviour='new',
                          random_state=42)

    clf.fit(X_train)

    y_pred = clf.predict(X_test)

    y_pred[y_pred == 1] = 0
    y_pred[y_pred == -1] = 1

    print('precision:', precision_score(y_test, y_pred))
    print('recall:', recall_score(y_test, y_pred))
    print('f1:', f1_score(y_test, y_pred))
    print('mcc:', matthews_corrcoef(y_test, y_pred))

    xx, yy = np.meshgrid(np.linspace(X_normal[:, 0].mean() - DIST_RANGE * 1.2,
                                     X_normal[:, 0].mean() + DIST_RANGE * 1.2,
                                     100),
                         np.linspace(X_normal[:, 1].mean() - DIST_RANGE * 1.2,
                                     X_normal[:, 1].mean() + DIST_RANGE * 1.2,
                                     100))
    Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)
    threshold = stats.scoreatpercentile(y_pred,
                                        100 * (OUTLIER_DATA_N / NORMAL_DATA_N))
    plt.contour(xx, yy, Z, levels=[threshold],
                    linewidths=2, colors='red')

    plt.scatter(X_train[y_train == 0, 0],
                X_train[y_train == 0, 1],
                label='train negative',
                c='b')
    plt.scatter(X_train[y_train == 1, 0],
                X_train[y_train == 1, 1],
                label='train positive',
                c='r')

    plt.scatter(X_test[y_pred == 0, 0],
                X_test[y_pred == 0, 1],
                label='test negative (prediction)',
                c='g')
    plt.scatter(X_test[y_pred == 1, 0],
                X_test[y_pred == 1, 1],
                label='test positive (prediction)',
                c='y')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

実行結果は次の通り。

$ python twoclusters.py 
precision: 0.5238095238095238
recall: 0.7333333333333333
f1: 0.611111111111111
mcc: 0.528680532637681

また、以下のグラフが得られる。

f:id:momijiame:20190420123453p:plain

評価指標は悪化しているものの、可視化したものを見るとちゃんと二つのクラスタの周囲に分離境界ができていることがわかる。

いじょう。