CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Keras で AutoEncoder を書いてみる

今回はニューラルネットワークのフレームワークの Keras を使って AutoEncoder を書いてみる。 AutoEncoder は入力になるべく近い出力をするように学習したネットワークをいう。 AutoEncoder は特徴量の次元圧縮や異常検知など、幅広い用途に用いられている。

使った環境は次の通り。

$ sw_vers        
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G95
$ python -V 
Python 3.7.4

下準備

まずは必要なパッケージをインストールしておく。

$ pip install keras tensorflow matplotlib

中間層が一層の AutoEncoder

Keras の Sequential API を使って実装した最も単純な AutoEncoder のサンプルコードを以下に示す。 データセットには MNIST を使った。 入力と出力が 28 x 28 = 784 次元なのに対し、中間層は一層で 36 次元しかない。 つまり、中間層では次元圧縮に相当する処理をしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from keras import layers
from keras import models
from keras import callbacks
from keras.datasets import mnist
from matplotlib import pyplot as plt
from matplotlib import cm

def main():
    # MNIST データセットを読み込む
    (x_train, train), (x_test, y_test) = mnist.load_data()
    image_height, image_width = 28, 28
    # 中間層で圧縮される次元数
    encoding_dim = 36  # 中間層の出力を 6 x 6 の画像として可視化するため

    # Flatten
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # Min-Max Normalization
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # 中間層が一層だけの単純な AutoEncoder
    model = models.Sequential()
    model.add(layers.Dense(encoding_dim, activation='relu',
                           input_shape=(image_height * image_width,)))
    model.add(layers.Dense(image_height * image_width,
                           activation='sigmoid'))

    # モデルの構造を確認する
    print(model.summary())

    model.compile(optimizer='adam',
                  loss='binary_crossentropy')

    fit_callbacs = [
        callbacks.EarlyStopping(monitor='val_loss',
                                patience=5,
                                mode='min')
    ]

    # モデルを学習させる
    model.fit(x_train, x_train,
              epochs=100,
              batch_size=256,
              shuffle=True,
              validation_data=(x_test, x_test),
              callbacks=fit_callbacs,
              )

    # テストデータの損失を確認しておく
    score = model.evaluate(x_test, x_test, verbose=0)
    print('test xentropy:', score)

    # 学習済みのモデルを元に、次元圧縮だけするモデルを用意する
    encoder = models.clone_model(model)
    encoder.compile(optimizer='adam',
                    loss='binary_crossentropy')
    encoder.set_weights(model.get_weights())
    # 最終段のレイヤーを取り除く
    encoder.pop()

    # テストデータからランダムに 10 点を選び出す
    p = np.random.random_integers(0, len(x_test), 10)
    x_test_sampled = x_test[p]
    # 選びだしたサンプルを AutoEncoder にかける
    x_test_sampled_pred = model.predict_proba(x_test_sampled,
                                              verbose=0)
    # 次元圧縮だけする場合
    x_test_sampled_enc = encoder.predict_proba(x_test_sampled,
                                               verbose=0)

    # 処理結果を可視化する
    fig, axes = plt.subplots(3, 10)
    for i, label in enumerate(y_test[p]):
        # 元画像を上段に表示する
        img = x_test_sampled[i].reshape(image_height, image_width)
        axes[0][i].imshow(img, cmap=cm.gray_r)
        axes[0][i].axis('off')
        axes[0][i].set_title(label, color='red')
        # AutoEncoder で次元圧縮した画像を下段に表示する
        enc_img = x_test_sampled_enc[i].reshape(6, 6)
        axes[1][i].imshow(enc_img, cmap=cm.gray_r)
        axes[1][i].axis('off')
        # AutoEncoder で復元した画像を下段に表示する
        pred_img = x_test_sampled_pred[i].reshape(image_height, image_width)
        axes[2][i].imshow(pred_img, cmap=cm.gray_r)
        axes[2][i].axis('off')

    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 検証用データに対する損失は約 0.087 だった。

$ python ae.py
...(snip)...
Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
dense_1 (Dense)              (None, 36)                28260
_________________________________________________________________
dense_2 (Dense)              (None, 784)               29008
=================================================================
Total params: 57,268
Trainable params: 57,268
Non-trainable params: 0
_________________________________________________________________
...(snip)...
test xentropy: 0.08722344622612

同時に、以下のグラフが得られる。 上段が入力画像、中段が AutoEncoder の中間層の出力、下段が復元された出力画像になっている。

f:id:momijiame:20190908213044p:plain

上記を見ると多少ボケたりかすれたりはしているものの、ちゃんと入力に近い画像が出力されていることがわかる。 中間層の出力は人間にはよくわからないけど、これでちゃんと元の画像に近いものが復元できるのはなんとも不思議な感じ。

中間層が 5 層の AutoEncoder

試しに先ほどのネットワークに中間層を足して、キャパシティを上げてみよう。 以下のサンプルコードでは次元を 784 -> 128 -> 64 -> 36 -> 64 -> 128 -> 784 と変化させている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from keras import layers
from keras import models
from keras import callbacks
from keras.datasets import mnist
from matplotlib import pyplot as plt
from matplotlib import cm

def main():
    # MNIST データセットを読み込む
    (x_train, train), (x_test, y_test) = mnist.load_data()
    image_height, image_width = 28, 28
    # 中間層で圧縮される次元数
    encoding_dim = 36  # 6 x 6 の画像として可視化してみるため

    # Flatten
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # Min-Max Normalization
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # 中間層を 4 層まで増やしたネットワーク
    model = models.Sequential()
    model.add(layers.Dense(128, activation='relu',
                           input_shape=(image_height * image_width,)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(encoding_dim, activation='relu'))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dense(image_height * image_width,
                           activation='sigmoid'))

    # モデルの構造を確認する
    print(model.summary())

    model.compile(optimizer='adam',
                  loss='binary_crossentropy')

    print(model.summary())

    fit_callbacs = [
        callbacks.EarlyStopping(monitor='val_loss',
                                patience=5,
                                mode='min')
    ]

    # モデルを学習させる
    model.fit(x_train, x_train,
              epochs=100,
              batch_size=256,
              shuffle=True,
              validation_data=(x_test, x_test),
              callbacks=fit_callbacs,
              )

    # テストデータの損失を確認しておく
    score = model.evaluate(x_test, x_test, verbose=0)
    print('test xentropy:', score)

    # 学習済みのモデルを元に、次元圧縮だけするモデルを用意する
    encoder = models.clone_model(model)
    encoder.compile(optimizer='adam',
                    loss='binary_crossentropy')
    encoder.set_weights(model.get_weights())
    # 中間層までのレイヤーを取り除く
    encoder.pop()
    encoder.pop()
    encoder.pop()

    # テストデータからランダムに 10 点を選び出す
    p = np.random.random_integers(0, len(x_test), 10)
    x_test_sampled = x_test[p]
    # 選びだしたサンプルを AutoEncoder にかける
    x_test_sampled_pred = model.predict_proba(x_test_sampled,
                                              verbose=0)
    # 次元圧縮だけする場合
    x_test_sampled_enc = encoder.predict_proba(x_test_sampled,
                                               verbose=0)

    # 処理結果を可視化する
    fig, axes = plt.subplots(3, 10)
    for i, label in enumerate(y_test[p]):
        # 元画像を上段に表示する
        img = x_test_sampled[i].reshape(image_height, image_width)
        axes[0][i].imshow(img, cmap=cm.gray_r)
        axes[0][i].axis('off')
        axes[0][i].set_title(label, color='red')
        # AutoEncoder で次元圧縮した画像を中段に表示する
        enc_img = x_test_sampled_enc[i].reshape(6, 6)
        axes[1][i].imshow(enc_img, cmap=cm.gray_r)
        axes[1][i].axis('off')
        # AutoEncoder で復元した画像を下段に表示する
        pred_img = x_test_sampled_pred[i].reshape(image_height, image_width)
        axes[2][i].imshow(pred_img, cmap=cm.gray_r)
        axes[2][i].axis('off')

    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみる。 テストデータの損失は先ほどより減って約 0.079 となった。

$ python ae.py
...(snip)...
Layer (type)                 Output Shape              Param #   
=================================================================
dense_1 (Dense)              (None, 128)               100480    
_________________________________________________________________
dense_2 (Dense)              (None, 64)                8256      
_________________________________________________________________
dense_3 (Dense)              (None, 36)                2340      
_________________________________________________________________
dense_4 (Dense)              (None, 64)                2368      
_________________________________________________________________
dense_5 (Dense)              (None, 128)               8320      
_________________________________________________________________
dense_6 (Dense)              (None, 784)               101136    
=================================================================
Total params: 222,900
Trainable params: 222,900
Non-trainable params: 0
_________________________________________________________________
...(snip)...
test xentropy: 0.07924877260923385

出力されたグラフは次の通り。 今度の中間層の出力は、先ほどよりもパターンが出ているような気がする。 例えば、左上と右下にどの画像でも白くなっている出力があったりするようだ。

f:id:momijiame:20190908213602p:plain

そんなかんじで。

PythonとKerasによるディープラーニング

PythonとKerasによるディープラーニング