CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Keras でカスタムメトリックを扱う

今回は Keras に組み込みで用意されていない独自の評価指標 (カスタムメトリック) を扱う方法について書いてみる。

なお、Keras でカスタムメトリックを定義する方法については、以下の公式ドキュメントに記載がある。

keras.io

使った環境は次のとおり。 Keras にはスタンドアロン版ではなく TensorFlow 組み込みのもの (tf.keras) を使った。

$ sw_vers  
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G4032
$ python -V                                                       
Python 3.7.7
$ python -c "import tensorflow as tf; print(tf.__version__)"
2.2.0
$ python -c "import tensorflow as tf; print(tf.keras.__version__)"
2.3.0-tf

下準備

まずは TensorFlow をインストールしておく

$ pip install tensorflow

注意点について

Keras のメトリックを定義したオブジェクトには、正解と予測した内容が TensorFlow の Tensor オブジェクトとして渡される。 つまり、一般的な機械学習でおなじみの NumPy 配列のようには扱えない点に注意が必要となる。 そのような事情があるので、カスタムメトリックを計算するときは、はじめに REPL を使ってインタラクティブに動作を確認した方がわかりやすい。

ここでは、そのやり方について書いてみる。 はじめに、Python のインタプリタ (REPL) を起動しよう。

$ python

そして、TensorFlow のパッケージをインポートする。

>>> import tensorflow as tf
>>> from tensorflow.keras import backend as K

正解ラベルと、モデルが出力する予測を模したオブジェクトを次のように用意する。 今回の例は、多値分類問題のラベルを模している。

>>> y_true = tf.constant([[0., 0., 1.], [0., 1., 0.]])
>>> y_pred = tf.constant([[0., 0., 1.], [1., 0., 0.]])

このようにすると、TensorFlow 2 では NumPy の配列として中身が確認できる。

>>> y_true
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [0., 1., 0.]], dtype=float32)>
>>> y_pred
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [1., 0., 0.]], dtype=float32)>

このオブジェクトを使って計算方法の動作確認をしていく。

Recall を計算してみる

試しに Recall を計算してみよう。

正解と予測の Tensor で積を取って、両者が一致している部分だけ残す。

>>> y_true * y_pred
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [0., 0., 0.]], dtype=float32)>

残っている部分を足し合わせれば True Positive の要素数になる。

>>> true_positives = K.sum(y_true * y_pred)
>>> true_positives
<tf.Tensor: shape=(), dtype=float32, numpy=1.0>

正解ラベル部分を足し合わせれば、すべての Positive な要素数が得られる。

>>> total_positives = K.sum(y_true)
>>> total_positives
<tf.Tensor: shape=(), dtype=float32, numpy=2.0>

あとは、 割ってやれば Recall のスコアが得られるという寸法。

>>> recall = true_positives / total_positives
>>> recall
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

Accuracy を計算してみる

もうひとつの例として Accuracy も計算してみる。

はじめに、正解ラベルと予測ラベルのインデックスを取り出す。

>>> y_true_argmax = K.argmax(y_true)
>>> y_pred_argmax = K.argmax(y_pred)
>>> y_true_argmax
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([2, 1])>
>>> y_pred_argmax
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([2, 0])>

両者が一致しているものを調べる。

>>> y_matched = K.equal(y_true_argmax, y_pred_argmax)
>>> y_matched
<tf.Tensor: shape=(2,), dtype=bool, numpy=array([ True, False])>

あとは平均を計算すれば Accuracy になる。

>>> accuracy = K.mean(y_matched)
>>> accuracy
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

もちろん、ここで示したのはあくまで一例で、色々な計算方法が考えられる。

カスタムメトリックを定義して組み込む (Stateless)

それでは、実際に前述した処理を使ってカスタムメトリックを定義してみよう。 とはいえ、やることは正解と予測の Tensor を受け取ってメトリックをまた Tensor として返す関数を用意するだけ。 あとは、それを tf.keras.models.Model#compile() メソッドで metrics 引数に渡してやれば良い。

以下のサンプルコードでは MNIST データセットを MLP で予測するときに、カスタムメトリックとして Recall と Accuracy を定義して使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping


def custom_recall(y_true, y_pred):
    """正解と予測の Tensor から Recall を計算する関数"""
    true_positives = K.sum(y_true * y_pred)
    total_positives = K.sum(y_true)
    return true_positives / (total_positives + K.epsilon())  # ゼロ除算対策


def custom_accuracy(y_true, y_pred):
    """正解と予測の Tensor から Accuracy を計算する関数"""
    y_true_argmax = K.argmax(y_true)
    y_pred_argmax = K.argmax(y_pred)
    y_matched = K.equal(y_true_argmax, y_pred_argmax)
    return K.mean(y_matched)


def main():
    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # one-hot encode
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)

    # flatten
    image_height, image_width = x_train.shape[1:]
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # min-max normalize
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # Multi Layer Perceptron
    inputs = Input(shape=(image_height * image_width,))
    x = Dense(64, activation='relu')(inputs)
    x = Dense(64, activation='relu')(x)
    num_of_classes = y_train.shape[1]
    outputs = Dense(num_of_classes, activation='softmax')(x)

    callbacks = [
        # 検証データに対する Recall が 10 エポック改善しないときは学習を打ち切る
        EarlyStopping(monitor='val_custom_recall',
                      patience=10,
                      verbose=1,
                      mode='max'),
    ]
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  # カスタムメトリックを計算する関数を登録する
                  metrics=['accuracy', custom_recall, custom_accuracy],
                  )

    model.fit(x_train, y_train,
              batch_size=8192,
              epochs=1000,
              verbose=1,
              # ホールドアウトデータを検証データとして用いる
              validation_data=(x_test, y_test),
              callbacks=callbacks)


if __name__ == '__main__':
    main()

上記を保存して実行してみよう。 ちゃんと、custom_ から始まる名前でカスタムメトリックが学習過程のログに登場していることがわかる。

$ python stateless.py
...
Epoch 1/1000
8/8 [==============================] - 0s 46ms/step - loss: 2.1424 - accuracy: 0.3151 - custom_recall: 0.1247 - custom_accuracy: 0.3321 - val_loss: 1.8506 - val_accuracy: 0.5677 - val_custom_recall: 0.1703 - val_custom_accuracy: 0.5829
Epoch 2/1000
8/8 [==============================] - 0s 25ms/step - loss: 1.6449 - accuracy: 0.6475 - custom_recall: 0.2220 - custom_accuracy: 0.6544 - val_loss: 1.3144 - val_accuracy: 0.7322 - val_custom_recall: 0.3227 - val_custom_accuracy: 0.7484
Epoch 3/1000
8/8 [==============================] - 0s 31ms/step - loss: 1.1343 - accuracy: 0.7647 - custom_recall: 0.3867 - custom_accuracy: 0.7668 - val_loss: 0.8659 - val_accuracy: 0.8126 - val_custom_recall: 0.5142 - val_custom_accuracy: 0.8298
...
Epoch 207/1000
8/8 [==============================] - 0s 28ms/step - loss: 0.0086 - accuracy: 0.9991 - custom_recall: 0.9927 - custom_accuracy: 0.9991 - val_loss: 0.1126 - val_accuracy: 0.9738 - val_custom_recall: 0.9718 - val_custom_accuracy: 0.9769
Epoch 208/1000
8/8 [==============================] - 0s 28ms/step - loss: 0.0087 - accuracy: 0.9991 - custom_recall: 0.9927 - custom_accuracy: 0.9992 - val_loss: 0.1131 - val_accuracy: 0.9736 - val_custom_recall: 0.9716 - val_custom_accuracy: 0.9766
Epoch 209/1000
8/8 [==============================] - 0s 26ms/step - loss: 0.0086 - accuracy: 0.9992 - custom_recall: 0.9929 - custom_accuracy: 0.9992 - val_loss: 0.1140 - val_accuracy: 0.9743 - val_custom_recall: 0.9717 - val_custom_accuracy: 0.9774
Epoch 00209: early stopping

ただ、上記を見ると組み込みの accuracy と、自分で定義した custom_accuracy の値が一致していない。

カスタムメトリックを定義して組み込む (Stateful)

先ほどの例で組み込みのメトリックと自前のメトリックが一致しなかった理由は、カスタムメトリックを定義する方法に Stateless と Stateful という 2 つのやり方があるため。 組み込みの accuracy は Stateful なやり方で定義されている一方で、先ほど自分で定義した custom_accuracy は Stateless だったので値がズレてしまった。 あらかじめ断っておくと、値がズレているからといって計算が間違っているわけではない。

それでは、次は Stateful なやり方でカスタムメトリックを定義する方法を試してみよう。 Stateful なやり方では、tensorflow.keras.metrics.Metric を継承して必要なメソッドを実装することでメトリックを計算する。

以下のサンプルコードでは Stateful なやり方で Recall と Accuracy を計算している。 Stateful という名のとおり、tensorflow.keras.metrics.Metric では累積的に与えられる正解と予測のラベルからメトリックを計算することになる。 具体的には、update_state() メソッドで正解と予測ラベルが与えられて、結果を result() メソッドから得る。 そして、状態をリセットしたいときには reset_states() メソッドが呼ばれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
from tensorflow.keras.metrics import Metric
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping


class RecallMetric(Metric):
    """ステートフルに Recall を計算するクラス"""

    def __init__(self, name='custom_recall', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        # 状態を貯めておく変数を用意する
        self.true_positives = tf.Variable(0.)
        self.total_positives = tf.Variable(0.)

    def update_state(self, y_true, y_pred, sample_weight=None):
        """新しく正解と予測が追加で与えられたときの処理"""
        true_positives = K.sum(y_true * y_pred)
        total_positives = K.sum(y_true)

        self.true_positives.assign_add(true_positives)
        self.total_positives.assign_add(total_positives)

    def result(self):
        """現時点の状態から計算されるメトリックを返す"""
        return self.true_positives / (self.total_positives + K.epsilon())

    def reset_states(self):
        """状態をリセットするときに呼ばれるコールバック"""
        self.true_positives.assign(0.)
        self.total_positives.assign(0.)


class AccuracyMetric(Metric):
    """ステートフルに Accuracy を計算するクラス"""

    def __init__(self, name='custom_accuracy', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)

        self.matched = tf.Variable(0.)
        self.unmatched = tf.Variable(0.)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true_argmax = K.argmax(y_true)
        y_pred_argmax = K.argmax(y_pred)

        y_matched = K.sum(K.cast(K.equal(y_true_argmax, y_pred_argmax), dtype='float32'))
        y_unmatched = K.sum(K.cast(K.not_equal(y_true_argmax, y_pred_argmax), dtype='float32'))

        self.matched.assign_add(y_matched)
        self.unmatched.assign_add(y_unmatched)

    def result(self):
        return self.matched / (self.matched + self.unmatched)

    def reset_states(self):
        self.matched.assign(0.)
        self.unmatched.assign(0.)


def main():
    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # one-hot encode
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)

    # flatten
    image_height, image_width = x_train.shape[1:]
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # min-max normalize
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # Multi Layer Perceptron
    inputs = Input(shape=(image_height * image_width,))
    x = Dense(64, activation='relu')(inputs)
    x = Dense(64, activation='relu')(x)
    num_of_classes = y_train.shape[1]
    outputs = Dense(num_of_classes, activation='softmax')(x)

    callbacks = [
        # 検証データに対する Recall が 10 エポック改善しないときは学習を打ち切る
        EarlyStopping(monitor='val_custom_recall',
                      patience=10,
                      verbose=1,
                      mode='max'),
    ]
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  # カスタムメトリックを計算するオブジェクトを登録する
                  metrics=['accuracy', RecallMetric(), AccuracyMetric()],
                  )

    model.fit(x_train, y_train,
              batch_size=8192,
              epochs=1000,
              verbose=1,
              # ホールドアウトデータを検証データとして用いる
              validation_data=(x_test, y_test),
              callbacks=callbacks)


if __name__ == '__main__':
    main()

実際に、上記を実行してみよう。

$ python stateful.py
...
Epoch 1/1000
8/8 [==============================] - 0s 46ms/step - loss: 2.2027 - accuracy: 0.2119 - custom_recall: 0.1161 - custom_accuracy: 0.2119 - val_loss: 1.9262 - val_accuracy: 0.4457 - val_custom_recall: 0.1544 - val_custom_accuracy: 0.4457
Epoch 2/1000
8/8 [==============================] - 0s 25ms/step - loss: 1.7486 - accuracy: 0.5403 - custom_recall: 0.1948 - custom_accuracy: 0.5403 - val_loss: 1.4170 - val_accuracy: 0.6929 - val_custom_recall: 0.2810 - val_custom_accuracy: 0.6929
Epoch 3/1000
8/8 [==============================] - 0s 31ms/step - loss: 1.2274 - accuracy: 0.7467 - custom_recall: 0.3455 - custom_accuracy: 0.7467 - val_loss: 0.9200 - val_accuracy: 0.8056 - val_custom_recall: 0.4659 - val_custom_accuracy: 0.8056
...
Epoch 178/1000
8/8 [==============================] - 0s 26ms/step - loss: 0.0147 - accuracy: 0.9979 - custom_recall: 0.9888 - custom_accuracy: 0.9979 - val_loss: 0.1057 - val_accuracy: 0.9717 - val_custom_recall: 0.9662 - val_custom_accuracy: 0.9717
Epoch 179/1000
8/8 [==============================] - 0s 27ms/step - loss: 0.0145 - accuracy: 0.9980 - custom_recall: 0.9889 - custom_accuracy: 0.9980 - val_loss: 0.1055 - val_accuracy: 0.9714 - val_custom_recall: 0.9664 - val_custom_accuracy: 0.9714
Epoch 180/1000
8/8 [==============================] - 0s 27ms/step - loss: 0.0143 - accuracy: 0.9980 - custom_recall: 0.9891 - custom_accuracy: 0.9980 - val_loss: 0.1063 - val_accuracy: 0.9728 - val_custom_recall: 0.9663 - val_custom_accuracy: 0.9728
Epoch 00180: early stopping

今度は組み込みで用意されている accuracy の値と、自前で定義した custom_accuracy の値が一致していることがわかる。 ちなみに、Stateless と Stateful ではメトリックを計算するデータの区間が異なるために値が微妙にズレる。

まとめ

  • Keras のカスタムメトリックには正解と予測のラベルが Tensor オブジェクトとして渡される
  • カスタムメトリックを定義するには Stateless と Stateful なやり方がある
  • Stateless では正解と予測のラベルを受け取る関数を定義する
  • Stateful では tensorflow.keras.metrics.Metric クラスを継承して正解と予測のラベルからの計算結果をためこむ

PythonとKerasによるディープラーニング

PythonとKerasによるディープラーニング

  • 作者:Francois Chollet
  • 発売日: 2018/05/28
  • メディア: 単行本(ソフトカバー)