CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Keras で Convolutional AutoEncoder を書いてみる

以前に Keras で AutoEncoder を実装するエントリを書いた。 このときは AutoEncoder を構成する Neural Network のアーキテクチャとして単純な全結合層から成る MLP (Multi Layer Perceptron) を使っている。

blog.amedama.jp

一方で、データとして画像を扱う場合にはアーキテクチャとして CNN (Convolutional Neural Network) が使われることも多い。 そこで、今回は CNN をアーキテクチャとして採用した Convolutional AutoEncoder を書いてみた。

使った環境は次のとおり。 CNN は MLP に比べると計算量が大きいので GPU もしくは TPU が使える環境を用意した方が良い。

$ python -V
Python 3.6.9
$ uname -a
Linux b5244776fd7d 4.19.104+ #1 SMP Wed Feb 19 05:26:34 PST 2020 x86_64 x86_64 x86_64 GNU/Linux
$ pip list | egrep -i "(keras |tensorflow-gpu )"
Keras                    2.3.1          
tensorflow-gpu           2.1.0
$ nvidia-smi
Thu Apr 16 09:25:07 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.64.00    Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   54C    P0    39W / 250W |   4685MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
+-----------------------------------------------------------------------------+

下準備

はじめに、必要なパッケージをインストールしておく。

$ pip install keras tensorflow-gpu matplotlib

Convolutional AutoEncoder を Keras の Sequential API で実装する

以下のサンプルコードでは、Keras の Sequential API を使って Convolutional AutoEncoder を実装している。 ポイントは Conv2D 層のパディングで "same" を指定しないと次元をうまく合わせることが難しい。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
from keras import layers
from keras import models
from keras import callbacks
from keras import backend as K
from keras.datasets import mnist
from matplotlib import pyplot as plt
from matplotlib import cm


def main():
    # MNIST データセットを読み込む
    (x_train, train), (x_test, y_test) = mnist.load_data()
    image_height, image_width = 28, 28

    # バックエンドに依存したチャネルの位置を調整する
    if K.image_data_format() == 'channels_last':
        x_train = x_train.reshape(x_train.shape[0],
                                  image_height, image_width, 1)
        x_test = x_test.reshape(x_test.shape[0],
                                image_height, image_width, 1)
        input_shape = (image_height, image_width, 1)
    else:
        x_train = x_train.reshape(x_train.shape[0],
                                  1, image_height, image_width)
        x_test = x_test.reshape(x_test.shape[0],
                                1, image_height, image_width)
        input_shape = (1, image_height, image_width)

    # Min-Max Normalization (0. ~ 1. の範囲に値を収める)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # 畳み込み演算を用いた AutoEncoder のネットワーク (Sequential API)
    model = models.Sequential()
    # 28 x 28 x 1
    model.add(layers.Conv2D(16, kernel_size=(3, 3),
                            activation='relu', padding='same',
                            input_shape=input_shape))
    # 28 x 28 x 16
    model.add(layers.MaxPooling2D(pool_size=(2, 2), padding='same'))
    # 14 x 14 x 16
    model.add(layers.Conv2D(8, kernel_size=(3, 3),
                            activation='relu', padding='same'))
    # 14 x 14 x 8
    model.add(layers.MaxPooling2D(pool_size=(2, 2), padding='same'))
    # 7 x 7 x 8
    model.add(layers.Conv2D(8, kernel_size=(3, 3),
                            activation='relu', padding='same'))
    # 7 x 7 x 8
    model.add(layers.UpSampling2D(size=(2, 2)))
    # 14 x 14 x 8
    model.add(layers.Conv2D(16, kernel_size=(3, 3),
                            activation='relu', padding='same'))
    # 14 x 14 x 16
    model.add(layers.UpSampling2D(size=(2, 2)))
    # 28 x 28 x 16
    model.add(layers.Conv2D(1, kernel_size=(3, 3),
                            activation='sigmoid', padding='same'))
    # 28 x 28 x 1
    model.compile(optimizer='adam',
                  loss='binary_crossentropy')

    # モデルの構造を確認する
    print(model.summary())

    fit_callbacks = [
        callbacks.EarlyStopping(monitor='val_loss',
                                patience=5,
                                mode='min')
    ]

    # モデルを学習させる
    model.fit(x_train, x_train,
              epochs=1000,
              batch_size=4096,
              shuffle=True,
              validation_data=(x_test, x_test),
              callbacks=fit_callbacks,
              )

    # テストデータの損失を確認しておく
    score = model.evaluate(x_test, x_test, verbose=0)
    print('test xentropy:', score)

    # 学習済みのモデルを元に、次元圧縮だけするモデルを用意する
    encoder = models.clone_model(model)
    encoder.compile(optimizer='adam',
                    loss='binary_crossentropy')
    encoder.set_weights(model.get_weights())

    # 中間層までのレイヤーを取り除く
    encoder.pop()
    encoder.pop()
    encoder.pop()
    encoder.pop()

    # テストデータからランダムに 10 点を選び出す
    p = np.random.randint(0, len(x_test), 10)
    x_test_sampled = x_test[p]
    # 選びだしたサンプルを AutoEncoder にかける
    x_test_sampled_pred = model.predict_proba(x_test_sampled,
                                              verbose=0)
    # 次元圧縮だけする場合
    x_test_sampled_enc = encoder.predict_proba(x_test_sampled,
                                               verbose=0)

    # 処理結果を可視化する
    fig, axes = plt.subplots(3, 10, figsize=(12, 12))
    for i, label in enumerate(y_test[p]):
        # 元画像を上段に表示する
        img = x_test_sampled[i].reshape(image_height, image_width)
        axes[0][i].imshow(img, cmap=cm.gray_r)
        axes[0][i].axis('off')
        axes[0][i].set_title(label, color='red')
        # AutoEncoder で次元圧縮した画像を中段に表示する
        enc_img = x_test_sampled_enc[i].reshape(7, 7 * 8).T
        axes[1][i].imshow(enc_img, cmap=cm.gray_r)
        axes[1][i].axis('off')
        # AutoEncoder で復元した画像を下段に表示する
        pred_img = x_test_sampled_pred[i].reshape(image_height, image_width)
        axes[2][i].imshow(pred_img, cmap=cm.gray_r)
        axes[2][i].axis('off')

    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python cae.py
Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_11 (Conv2D)           (None, 28, 28, 16)        160       
_________________________________________________________________
max_pooling2d_5 (MaxPooling2 (None, 14, 14, 16)        0         
_________________________________________________________________
conv2d_12 (Conv2D)           (None, 14, 14, 8)         1160      
_________________________________________________________________
max_pooling2d_6 (MaxPooling2 (None, 7, 7, 8)           0         
_________________________________________________________________
conv2d_13 (Conv2D)           (None, 7, 7, 8)           584       
_________________________________________________________________
up_sampling2d_5 (UpSampling2 (None, 14, 14, 8)         0         
_________________________________________________________________
conv2d_14 (Conv2D)           (None, 14, 14, 16)        1168      
_________________________________________________________________
up_sampling2d_6 (UpSampling2 (None, 28, 28, 16)        0         
_________________________________________________________________
conv2d_15 (Conv2D)           (None, 28, 28, 1)         145       
=================================================================
Total params: 3,217
Trainable params: 3,217
Non-trainable params: 0
_________________________________________________________________
None
Train on 60000 samples, validate on 10000 samples
Epoch 1/1000
60000/60000 [==============================] - 1s 18us/step - loss: 0.6363 - val_loss: 0.5522
Epoch 2/1000
60000/60000 [==============================] - 1s 14us/step - loss: 0.5042 - val_loss: 0.4346
Epoch 3/1000
60000/60000 [==============================] - 1s 14us/step - loss: 0.3655 - val_loss: 0.2909

...(省略)...

Epoch 373/1000
60000/60000 [==============================] - 1s 14us/step - loss: 0.0728 - val_loss: 0.0721
Epoch 374/1000
60000/60000 [==============================] - 1s 14us/step - loss: 0.0727 - val_loss: 0.0721
Epoch 375/1000
60000/60000 [==============================] - 1s 14us/step - loss: 0.0727 - val_loss: 0.0721
test xentropy: 0.07211270518302917

検証用データに対するクロスエントロピーは約 0.072 と、前述した MLP の AutoEncoder よりも小さくなっていることがわかる。

以下は、上段が検証用データの画像、中段が AutoEncoder が次元圧縮した特徴の画像表現、下段が復元した画像となっている。

f:id:momijiame:20200416183423p:plain
Convolutional AutoEncoder の入出力の画像表現

入力に比べれば少しかすれているものの、ちゃんと復元できている。

Functional API を使った場合

おまけとして Functional API を使った例も以下に示す。 学習済みモデルから中間層の出力を取り出すところだけは Functional API を使う方法が分からなかったので Sequential API を使った。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from keras import layers
from keras import models
from keras import callbacks
from keras import backend as K
from keras.datasets import mnist
from matplotlib import pyplot as plt
from matplotlib import cm


def main():
    (x_train, train), (x_test, y_test) = mnist.load_data()
    image_height, image_width = 28, 28

    if K.image_data_format() == 'channels_last':
        x_train = x_train.reshape(x_train.shape[0],
                                  image_height, image_width, 1)
        x_test = x_test.reshape(x_test.shape[0],
                                image_height, image_width, 1)
        input_shape = (image_height, image_width, 1)
    else:
        x_train = x_train.reshape(x_train.shape[0],
                                  1, image_height, image_width)
        x_test = x_test.reshape(x_test.shape[0],
                                1, image_height, image_width)
        input_shape = (1, image_height, image_width)

    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # Functional API を使う場合
    input_ = layers.Input(shape=input_shape)
    # 28 x 28 x 1
    x = layers.Conv2D(16, kernel_size=(3, 3),
                      activation='relu', padding='same')(input_)
    # 28 x 28 x 16
    x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x)
    # 14 x 14 x 16
    x = layers.Conv2D(8, kernel_size=(3, 3),
                      activation='relu', padding='same')(x)
    # 14 x 14 x 8
    x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x)
    # 7 x 7 x 8
    x = layers.Conv2D(8, kernel_size=(3, 3),
                      activation='relu', padding='same')(x)
    # 7 x 7 x 8
    x = layers.UpSampling2D(size=(2, 2))(x)
    # 14 x 14 x 8
    x = layers.Conv2D(16, kernel_size=(3, 3),
                      activation='relu', padding='same')(x)
    # 14 x 14 x 16
    x = layers.UpSampling2D(size=(2, 2))(x)
    # 28 x 28 x 16
    output_ = layers.Conv2D(1, kernel_size=(3, 3),
                            activation='sigmoid', padding='same')(x)
    # 28 x 28 x 1
    model = models.Model(inputs=input_, output=output_)
    model.compile(optimizer='adam',
                  loss='binary_crossentropy')

    print(model.summary())

    fit_callbacks = [
        callbacks.EarlyStopping(monitor='val_loss',
                                patience=5,
                                mode='min')
    ]

    model.fit(x_train, x_train,
              epochs=1000,
              batch_size=4096,
              shuffle=True,
              validation_data=(x_test, x_test),
              callbacks=fit_callbacks,
              )

    score = model.evaluate(x_test, x_test, verbose=0)
    print('test xentropy:', score)

    encoder = models.clone_model(model)
    encoder.compile(optimizer='adam',
                    loss='binary_crossentropy')
    encoder.set_weights(model.get_weights())

    # Sequential API を使ってモデルを構築し直す
    encoder = models.Sequential()
    # 真ん中の層までを取り出す
    for layer in model.layers[:-4]:
        encoder.add(layer)

    p = np.random.randint(0, len(x_test), 10)
    x_test_sampled = x_test[p]
    x_test_sampled_pred = model.predict(x_test_sampled,
                                        verbose=0)
    x_test_sampled_enc = encoder.predict_proba(x_test_sampled,
                                               verbose=0)

    fig, axes = plt.subplots(3, 10, figsize=(12, 12))
    for i, label in enumerate(y_test[p]):
        img = x_test_sampled[i].reshape(image_height, image_width)
        axes[0][i].imshow(img, cmap=cm.gray_r)
        axes[0][i].axis('off')
        axes[0][i].set_title(label, color='red')

        enc_img = x_test_sampled_enc[i].reshape(7, 7 * 8).T
        axes[1][i].imshow(enc_img, cmap=cm.gray_r)
        axes[1][i].axis('off')

        pred_img = x_test_sampled_pred[i].reshape(image_height, image_width)
        axes[2][i].imshow(pred_img, cmap=cm.gray_r)
        axes[2][i].axis('off')

    plt.show()


if __name__ == '__main__':
    main()

結果は同じなので省略する。

いじょう。