以前に Keras で AutoEncoder を実装するエントリを書いた。 このときは AutoEncoder を構成する Neural Network のアーキテクチャとして単純な全結合層から成る MLP (Multi Layer Perceptron) を使っている。
一方で、データとして画像を扱う場合にはアーキテクチャとして CNN (Convolutional Neural Network) が使われることも多い。 そこで、今回は CNN をアーキテクチャとして採用した Convolutional AutoEncoder を書いてみた。
使った環境は次のとおり。 CNN は MLP に比べると計算量が大きいので GPU もしくは TPU が使える環境を用意した方が良い。
$ python -V Python 3.6.9 $ uname -a Linux b5244776fd7d 4.19.104+ #1 SMP Wed Feb 19 05:26:34 PST 2020 x86_64 x86_64 x86_64 GNU/Linux $ pip list | egrep -i "(keras |tensorflow-gpu )" Keras 2.3.1 tensorflow-gpu 2.1.0 $ nvidia-smi Thu Apr 16 09:25:07 2020 +-----------------------------------------------------------------------------+ | NVIDIA-SMI 440.64.00 Driver Version: 418.67 CUDA Version: 10.1 | |-------------------------------+----------------------+----------------------+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | |===============================+======================+======================| | 0 Tesla P100-PCIE... Off | 00000000:00:04.0 Off | 0 | | N/A 54C P0 39W / 250W | 4685MiB / 16280MiB | 0% Default | +-------------------------------+----------------------+----------------------+ +-----------------------------------------------------------------------------+ | Processes: GPU Memory | | GPU PID Type Process name Usage | |=============================================================================| +-----------------------------------------------------------------------------+
下準備
はじめに、必要なパッケージをインストールしておく。
$ pip install keras tensorflow-gpu matplotlib
Convolutional AutoEncoder を Keras の Sequential API で実装する
以下のサンプルコードでは、Keras の Sequential API を使って Convolutional AutoEncoder を実装している。
ポイントは Conv2D
層のパディングで "same" を指定しないと次元をうまく合わせることが難しい。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import numpy as np from keras import layers from keras import models from keras import callbacks from keras import backend as K from keras.datasets import mnist from matplotlib import pyplot as plt from matplotlib import cm def main(): # MNIST データセットを読み込む (x_train, train), (x_test, y_test) = mnist.load_data() image_height, image_width = 28, 28 # バックエンドに依存したチャネルの位置を調整する if K.image_data_format() == 'channels_last': x_train = x_train.reshape(x_train.shape[0], image_height, image_width, 1) x_test = x_test.reshape(x_test.shape[0], image_height, image_width, 1) input_shape = (image_height, image_width, 1) else: x_train = x_train.reshape(x_train.shape[0], 1, image_height, image_width) x_test = x_test.reshape(x_test.shape[0], 1, image_height, image_width) input_shape = (1, image_height, image_width) # Min-Max Normalization (0. ~ 1. の範囲に値を収める) x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min()) x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min()) # 畳み込み演算を用いた AutoEncoder のネットワーク (Sequential API) model = models.Sequential() # 28 x 28 x 1 model.add(layers.Conv2D(16, kernel_size=(3, 3), activation='relu', padding='same', input_shape=input_shape)) # 28 x 28 x 16 model.add(layers.MaxPooling2D(pool_size=(2, 2), padding='same')) # 14 x 14 x 16 model.add(layers.Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same')) # 14 x 14 x 8 model.add(layers.MaxPooling2D(pool_size=(2, 2), padding='same')) # 7 x 7 x 8 model.add(layers.Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same')) # 7 x 7 x 8 model.add(layers.UpSampling2D(size=(2, 2))) # 14 x 14 x 8 model.add(layers.Conv2D(16, kernel_size=(3, 3), activation='relu', padding='same')) # 14 x 14 x 16 model.add(layers.UpSampling2D(size=(2, 2))) # 28 x 28 x 16 model.add(layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')) # 28 x 28 x 1 model.compile(optimizer='adam', loss='binary_crossentropy') # モデルの構造を確認する print(model.summary()) fit_callbacks = [ callbacks.EarlyStopping(monitor='val_loss', patience=5, mode='min') ] # モデルを学習させる model.fit(x_train, x_train, epochs=1000, batch_size=4096, shuffle=True, validation_data=(x_test, x_test), callbacks=fit_callbacks, ) # テストデータの損失を確認しておく score = model.evaluate(x_test, x_test, verbose=0) print('test xentropy:', score) # 学習済みのモデルを元に、次元圧縮だけするモデルを用意する encoder = models.clone_model(model) encoder.compile(optimizer='adam', loss='binary_crossentropy') encoder.set_weights(model.get_weights()) # 中間層までのレイヤーを取り除く encoder.pop() encoder.pop() encoder.pop() encoder.pop() # テストデータからランダムに 10 点を選び出す p = np.random.randint(0, len(x_test), 10) x_test_sampled = x_test[p] # 選びだしたサンプルを AutoEncoder にかける x_test_sampled_pred = model.predict_proba(x_test_sampled, verbose=0) # 次元圧縮だけする場合 x_test_sampled_enc = encoder.predict_proba(x_test_sampled, verbose=0) # 処理結果を可視化する fig, axes = plt.subplots(3, 10, figsize=(12, 12)) for i, label in enumerate(y_test[p]): # 元画像を上段に表示する img = x_test_sampled[i].reshape(image_height, image_width) axes[0][i].imshow(img, cmap=cm.gray_r) axes[0][i].axis('off') axes[0][i].set_title(label, color='red') # AutoEncoder で次元圧縮した画像を中段に表示する enc_img = x_test_sampled_enc[i].reshape(7, 7 * 8).T axes[1][i].imshow(enc_img, cmap=cm.gray_r) axes[1][i].axis('off') # AutoEncoder で復元した画像を下段に表示する pred_img = x_test_sampled_pred[i].reshape(image_height, image_width) axes[2][i].imshow(pred_img, cmap=cm.gray_r) axes[2][i].axis('off') plt.show() if __name__ == '__main__': main()
上記を実行してみる。
$ python cae.py Model: "sequential_1" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d_11 (Conv2D) (None, 28, 28, 16) 160 _________________________________________________________________ max_pooling2d_5 (MaxPooling2 (None, 14, 14, 16) 0 _________________________________________________________________ conv2d_12 (Conv2D) (None, 14, 14, 8) 1160 _________________________________________________________________ max_pooling2d_6 (MaxPooling2 (None, 7, 7, 8) 0 _________________________________________________________________ conv2d_13 (Conv2D) (None, 7, 7, 8) 584 _________________________________________________________________ up_sampling2d_5 (UpSampling2 (None, 14, 14, 8) 0 _________________________________________________________________ conv2d_14 (Conv2D) (None, 14, 14, 16) 1168 _________________________________________________________________ up_sampling2d_6 (UpSampling2 (None, 28, 28, 16) 0 _________________________________________________________________ conv2d_15 (Conv2D) (None, 28, 28, 1) 145 ================================================================= Total params: 3,217 Trainable params: 3,217 Non-trainable params: 0 _________________________________________________________________ None Train on 60000 samples, validate on 10000 samples Epoch 1/1000 60000/60000 [==============================] - 1s 18us/step - loss: 0.6363 - val_loss: 0.5522 Epoch 2/1000 60000/60000 [==============================] - 1s 14us/step - loss: 0.5042 - val_loss: 0.4346 Epoch 3/1000 60000/60000 [==============================] - 1s 14us/step - loss: 0.3655 - val_loss: 0.2909 ...(省略)... Epoch 373/1000 60000/60000 [==============================] - 1s 14us/step - loss: 0.0728 - val_loss: 0.0721 Epoch 374/1000 60000/60000 [==============================] - 1s 14us/step - loss: 0.0727 - val_loss: 0.0721 Epoch 375/1000 60000/60000 [==============================] - 1s 14us/step - loss: 0.0727 - val_loss: 0.0721 test xentropy: 0.07211270518302917
検証用データに対するクロスエントロピーは約 0.072 と、前述した MLP の AutoEncoder よりも小さくなっていることがわかる。
以下は、上段が検証用データの画像、中段が AutoEncoder が次元圧縮した特徴の画像表現、下段が復元した画像となっている。
入力に比べれば少しかすれているものの、ちゃんと復元できている。
Functional API を使った場合
おまけとして Functional API を使った例も以下に示す。 学習済みモデルから中間層の出力を取り出すところだけは Functional API を使う方法が分からなかったので Sequential API を使った。
#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np from keras import layers from keras import models from keras import callbacks from keras import backend as K from keras.datasets import mnist from matplotlib import pyplot as plt from matplotlib import cm def main(): (x_train, train), (x_test, y_test) = mnist.load_data() image_height, image_width = 28, 28 if K.image_data_format() == 'channels_last': x_train = x_train.reshape(x_train.shape[0], image_height, image_width, 1) x_test = x_test.reshape(x_test.shape[0], image_height, image_width, 1) input_shape = (image_height, image_width, 1) else: x_train = x_train.reshape(x_train.shape[0], 1, image_height, image_width) x_test = x_test.reshape(x_test.shape[0], 1, image_height, image_width) input_shape = (1, image_height, image_width) x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min()) x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min()) # Functional API を使う場合 input_ = layers.Input(shape=input_shape) # 28 x 28 x 1 x = layers.Conv2D(16, kernel_size=(3, 3), activation='relu', padding='same')(input_) # 28 x 28 x 16 x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x) # 14 x 14 x 16 x = layers.Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same')(x) # 14 x 14 x 8 x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x) # 7 x 7 x 8 x = layers.Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same')(x) # 7 x 7 x 8 x = layers.UpSampling2D(size=(2, 2))(x) # 14 x 14 x 8 x = layers.Conv2D(16, kernel_size=(3, 3), activation='relu', padding='same')(x) # 14 x 14 x 16 x = layers.UpSampling2D(size=(2, 2))(x) # 28 x 28 x 16 output_ = layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')(x) # 28 x 28 x 1 model = models.Model(inputs=input_, output=output_) model.compile(optimizer='adam', loss='binary_crossentropy') print(model.summary()) fit_callbacks = [ callbacks.EarlyStopping(monitor='val_loss', patience=5, mode='min') ] model.fit(x_train, x_train, epochs=1000, batch_size=4096, shuffle=True, validation_data=(x_test, x_test), callbacks=fit_callbacks, ) score = model.evaluate(x_test, x_test, verbose=0) print('test xentropy:', score) encoder = models.clone_model(model) encoder.compile(optimizer='adam', loss='binary_crossentropy') encoder.set_weights(model.get_weights()) # Sequential API を使ってモデルを構築し直す encoder = models.Sequential() # 真ん中の層までを取り出す for layer in model.layers[:-4]: encoder.add(layer) p = np.random.randint(0, len(x_test), 10) x_test_sampled = x_test[p] x_test_sampled_pred = model.predict(x_test_sampled, verbose=0) x_test_sampled_enc = encoder.predict_proba(x_test_sampled, verbose=0) fig, axes = plt.subplots(3, 10, figsize=(12, 12)) for i, label in enumerate(y_test[p]): img = x_test_sampled[i].reshape(image_height, image_width) axes[0][i].imshow(img, cmap=cm.gray_r) axes[0][i].axis('off') axes[0][i].set_title(label, color='red') enc_img = x_test_sampled_enc[i].reshape(7, 7 * 8).T axes[1][i].imshow(enc_img, cmap=cm.gray_r) axes[1][i].axis('off') pred_img = x_test_sampled_pred[i].reshape(image_height, image_width) axes[2][i].imshow(pred_img, cmap=cm.gray_r) axes[2][i].axis('off') plt.show() if __name__ == '__main__': main()
結果は同じなので省略する。
いじょう。

- 作者:もみじあめ
- 発売日: 2020/02/29
- メディア: Kindle版