CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Google Colaboratory で Cloud TPU を TensorFlow から試してみる

Google Colaboratory では、ランタイムのタイプを変更することで Cloud TPU (Tensor Processing Unit) を利用できる。 Cloud TPU は、Google が開発しているハードウェアアクセラレータの一種。 利用することで、行列計算のパフォーマンス向上が期待できる。 ただ、Cloud TPU は CPU や GPU に比べると扱う上でのクセがそれなりにつよい。 今回は、そんな Cloud TPU を使ってみることにする。

使った環境は次のとおり。

# pip list | grep "^tensorflow "
tensorflow                         2.1.0

もくじ

下準備

あらかじめ、メニューの「ランタイム」から「ランタイムのタイプを変更」を選択して、ハードウェアアクセラレータに TPU を指定しておく。

そして、TensorFlow をインポートしておく。

>>> import tensorflow as tf

TPU に接続する

まず、TPU を利用するには、最初に TPU クラスタへ接続する必要がある。 というのも、TPU のデバイスは実行中のホストで動作しているわけではない。 専用のホストに搭載されていて、それを gRPC 経由で制御するらしい。

はじめに、Google Colaboratory の環境であれば tf.distribute.cluster_resolver.TPUClusterResolver() を引数なしで実行する。 これで、利用可能な TPU クラスタの情報が得られる。

>>> tpu = tf.distribute.cluster_resolver.TPUClusterResolver()

あとは、tf.config.experimental_connect_to_cluster() を使って TPU クラスタに接続する。

>>> tf.config.experimental_connect_to_cluster(tpu)

接続できたら、TPU を初期化する。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

これで TPU を利用する準備ができた。 tf.config.list_logical_devices() を使って、タイプが TPU のデバイスを調べると、認識しているデバイスの一覧が確認できる。 下記を見て分かるとおり、複数のデバイスが確認できる。

>>> devices = tf.config.list_logical_devices('TPU')
>>> devices
[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]

現在利用できる TPU v2 / v3 には、最小構成単位である TPU ボード 1 枚につき 4 つの TPU チップが載っている。 そして、それぞれのチップには 2 つの TPU コアがあるため、合計で 8 つのコアがある。 上記は、各コアがデバイスとして TensorFlow から認識されていることを示している。

これは、GPU であれば筐体に複数枚のグラフィックカードを差していたり、あるいは複数のコアが載った GPU チップを利用している状態に近い。 つまり、TPU のパフォーマンスを最大限に活用しようとすると、必然的に複数のデバイスを使った分散学習をすることになる。

ちなみに、Google Colaboratory で利用できるのは単一の TPU ボードだけっぽい。 Google Cloud 経由で利用する場合には、それ以外に TPU Pod や TPU スライスといった、複数の TPU ボードから成るシステムも利用できる。 その場合も、おそらく見え方としては上記のデバイスが増えるだけなんだろう。

単一のデバイスで演算する

さて、デバイスを認識できるようになったので、早速その中の一つを使って行列演算を試してみよう。

まずは、適当に (2, 3) な形状の行列と (3, 2) な形状の行列を作る。

>>> tf.random.set_seed(42)
>>> x = tf.random.normal(shape=(2, 3))
>>> y = tf.random.normal(shape=(3, 2))

TensorFlow では、tf.device() 関数にデバイスの情報を渡してコンテキストマネージャとして使うと、そのデバイス上で演算を実行できる。 試しに先頭の TPU デバイスを使って行列の積を求めてみよう。

>>> with tf.device(devices[0]):
...      z = tf.matmul(x, y)

次のように、ちゃんと計算できた。

>>> z
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[0.5277252 , 4.685486  ],
       [0.8692589 , 0.21500015]], dtype=float32)>

複数のデバイスで演算する

さて、単一のデバイスで計算できることは分かったので、続いては複数のデバイスで分散処理してみよう。

その前に、一旦 TPU の状態を初期化しておく。 TPU で何か新しい処理を始めるときは、初期化しておかないと上手く動作しないことがある。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

TensorFlow で複数のデバイスを使った分散処理をするときは、tf.distribute.Strategy というオブジェクト (以下、ストラテジオブジェクト) を使うことになる。 このオブジェクトには具体的な実装がいくつかあって、何を使うかによってどのように分散処理を進めるかが決まる。 ただし、TPU を使うときは tf.distribute.TPUStrategy を使うことに決まっているので選択の余地はない。

>>> strategy = tf.distribute.TPUStrategy(tpu)

試しに、先ほどと同じように行列の積を分散処理でやらせてみよう。 そのためには、まず行列の積を計算するためのヘルパー関数を次のように定義しておく。 生の tf.matmul() をそのまま使えないの?と思うけど、どうやら今のところ使えなさそう。

>>> @tf.function
... def matmul_fn(x, y):
...   """行列の積を計算する関数"""
...   z = tf.matmul(x, y)
...   return z
... 

あとは、上記の関数を先ほどのストラテジオブジェクトの run() メソッド経由で呼び出すだけ。

>>> zs = strategy.run(matmul_fn, args=(x, y))

結果を確認してみよう。 PerReplica というオブジェクトで、コアと同じ数の計算結果が得られていることがわかる。 それぞれのコアで同じ計算がされたようだ。

>>> zs
PerReplica:{
  0: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  1: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  2: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  3: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  4: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  5: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  6: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  7: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>
}

さて、上記はすべての処理に同じデータを与えているので、結果もすべて同じになっている。 なるほどって感じだけど、これでは複数のデバイスを使っている意味がない。 そこで、続いてはデバイス毎に与えるデータを変えてみよう。

まずは、以下のようにして整数を順番に返す Dataset オブジェクトを作る。

>>> range_dataset = tf.data.Dataset.range(16)
>>> list(range_dataset.as_numpy_iterator())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

続いて、上記を分散処理に使うデバイスの数に合わせてミニバッチへ分割する。 以下は Strategy#num_replicas_in_sync に 1 をかけているので、各デバイスに 1 つずつサンプルを与える場合の設定。

>>> batch_size = 1 * strategy.num_replicas_in_sync
>>> batch_dataset = range_dataset.batch(batch_size)
>>> list(batch_dataset.as_numpy_iterator())
[array([0, 1, 2, 3, 4, 5, 6, 7]), array([ 8,  9, 10, 11, 12, 13, 14, 15])]

そして、上記の Dataset オブジェクトを Strategy#experimental_distribute_dataset() メソッドに渡す。 すると、DistributedDataset というオブジェクトが得られる。

>>> dist_dataset = strategy.experimental_distribute_dataset(batch_dataset)
>>> dist_dataset
<tensorflow.python.distribute.input_lib.DistributedDataset at 0x7f546167d110>

この DistributedDataset オブジェクトからは、先ほど分散処理の結果として返ってきた PerReplica というオブジェクトが得られる。

>>> ite = iter(dist_dataset)
>>> x = next(ite)
>>> x
PerReplica:{
  0: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([0])>,
  1: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([1])>,
  2: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([2])>,
  3: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([3])>,
  4: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([4])>,
  5: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([5])>,
  6: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([6])>,
  7: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([7])>
}

上記の PerReplica オブジェクトを使うと、それぞれのデバイスに対して異なる入力データを与えることができる。 以下の、引数を 2 倍する関数で試してみよう。

>>> @tf.function
... def double_fn(x):
...     """引数を 2 倍する関数"""
...     return x * 2
... 

DistributedDataset オブジェクトから得られる PerReplica オブジェクトを、ストラテジオブジェクト経由で上記の関数に渡す。 すると、以下のように返り値として各要素が 2 倍になった PerReplica オブジェクトが得られることがわかる。

>>> for x in dist_dataset:
...     result = strategy.run(double_fn, args=(x, ))
...     print(result)
PerReplica:{
  0: tf.Tensor([0], shape=(1,), dtype=int64),
  1: tf.Tensor([2], shape=(1,), dtype=int64),
  2: tf.Tensor([4], shape=(1,), dtype=int64),
  3: tf.Tensor([6], shape=(1,), dtype=int64),
  4: tf.Tensor([8], shape=(1,), dtype=int64),
  5: tf.Tensor([10], shape=(1,), dtype=int64),
  6: tf.Tensor([12], shape=(1,), dtype=int64),
  7: tf.Tensor([14], shape=(1,), dtype=int64)
}
PerReplica:{
  0: tf.Tensor([16], shape=(1,), dtype=int64),
  1: tf.Tensor([18], shape=(1,), dtype=int64),
  2: tf.Tensor([20], shape=(1,), dtype=int64),
  3: tf.Tensor([22], shape=(1,), dtype=int64),
  4: tf.Tensor([24], shape=(1,), dtype=int64),
  5: tf.Tensor([26], shape=(1,), dtype=int64),
  6: tf.Tensor([28], shape=(1,), dtype=int64),
  7: tf.Tensor([30], shape=(1,), dtype=int64)
}

上記から、複数のデバイスで、異なる入力データを使った分散処理をできることがわかった。

単一のデバイスで勾配降下法を試す

次は、また単一のデバイスに戻って、自動微分を使った勾配降下法が機能することを確認してみよう。 要するに、ニューラルネットワークが最適化できる本質的な部分の動作を見ておく。

以下のサンプルコードでは、最小化したい関数 objective() を定義している。 そして、それに適当な初期値を与えて、SGD をオプティマイザに最小化している。 実際に損失と勾配を計算してパラメータを更新しているのは training_step() という関数。

# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import tensorflow as tf


def objective(params: tf.Variable) -> tf.Tensor:
    """最小化したい関数"""
    # x_0^2 + x_1^2
    loss = params[0] ** 2 + params[1] ** 2
    return loss


def main():
    # TPU クラスタに接続する
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    devices = tf.config.list_logical_devices('TPU')
    print('TPU devices:', end='')
    pprint(devices)

    # 使用するオプティマイザ
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

    @tf.function
    def training_step(params: tf.Variable) -> None:
        """勾配降下法を使った最適化の 1 ステップ"""
        with tf.GradientTape() as t:
            # 損失を計算する
            loss = objective(params)
        # 勾配を計算する
        grads = t.gradient(loss, params)
        # パラメータを更新する
        optimizer.apply_gradients([(grads, params)])
        # tf.print(params)  # 少なくとも今の TPU では利用できない...

    # 初期値を用意する
    tensor = tf.constant([1., 4.], dtype=tf.float32)

    # 先頭の TPU デバイスで計算する
    with tf.device(devices[0]):
        # TPU デバイス上に Variable を用意する
        params = tf.Variable(tensor, trainable=True)
        # 最適化のループ
        for _ in range(20):  # 回数は適当
            training_step(params)

    # 結果を出力する
    print(f'{objective(params)} @ {params.numpy()}')


if __name__ == '__main__':
    main()

上記の実行結果は次のとおり。

(snip) ...
TPU devices:[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]
0.0022596875205636024 @ [0.01152921 0.04611686]

最適化によって、最終的な objective(params) の結果が小さくなっていることが確認できる。

複数のデバイスで CNN を tf.keras で学習する

次は、これまでのサンプルよりも少し実用性が高めのコードを試す。 具体的には CNN のモデルを tf.keras を使って組んで、CIFAR-10 のデータを学習させてみる。

ポイントとしては、モデルやメトリックなどをストラテジオブジェクトのスコープで組み立てるところ。 こうすると、たとえば Variable オブジェクトは内部的にデバイス間で値が同期できる MirroredVariable になったりするらしい。

CIFAR-10 のデータはメモリに収まるサイズなので、オンメモリのデータから Dataset オブジェクトを生成している。 これが、もしメモリに収まりきらないときは TFRecord フォーマットで GCS に保存する必要がある。

TPU を使う際には、CPU や GPU の環境で動作したコードを転用するのがベストプラクティスらしい。 以下のサンプルコードでは、それがやりやすいように環境毎のストラテジオブジェクトを取得できる detect_strategy() という関数を定義した。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf


def detect_strategy():
    """利用できるハードウェアアクセラレータ毎に適した tf.distribute.Strategy を返す関数"""
    try:
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        devices = tf.config.list_logical_devices('TPU')
        if len(devices) > 0:
            # TPU が利用できる
            return tf.distribute.TPUStrategy(tpu)
    except ValueError:
        pass

    devices = tf.config.list_logical_devices('GPU')
    if len(devices) > 0:
        # GPU が利用できる
        return tf.distribute.MirroredStrategy()

    # Default
    return tf.distribute.get_strategy()


def normalize(element):
    """画像データを浮動小数点型にキャストして正規化する処理"""
    image = element['image']
    normalized_image = tf.cast(image, tf.float32) / 255.
    label = element['label']
    return normalized_image, label


def datafeed_pipeline(x, y, batch_size):
    """オンメモリのテンソルからデータを読み出す Dataset パイプライン"""
    mappings = {
        'image': x,
        'label': y,
    }
    ds = tf.data.Dataset.from_tensor_slices(mappings)
    ds = ds.map(normalize)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.cache()
    return ds


def main():
    # データセットをオンメモリに読み込む
    (train_x, train_y), (test_x, test_y) = tf.keras.datasets.cifar10.load_data()

    # データセットの仕様
    image_shape = train_x.shape[1:]
    num_classes = 10

    # 乱数シードを設定しておく
    tf.random.set_seed(42)

    # 環境に応じたストラテジオブジェクトを取得する
    strategy = detect_strategy()

    # データ供給のパイプラインを Dataset API で構築する
    device_batch_size = 512  # デバイス単位で見たバッチサイズ
    global_batch_size = strategy.num_replicas_in_sync * device_batch_size
    ds_train = datafeed_pipeline(train_x, train_y, global_batch_size)
    ds_test = datafeed_pipeline(test_x, test_y, global_batch_size)

    with strategy.scope():
        # ストラテジオブジェクトのスコープでモデルを組み立てる
        # これによって内部で使われる Variable の型などが変わる
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=image_shape),
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(num_classes, activation='softmax')
        ])
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer='adam',
            metrics=['sparse_categorical_accuracy'],
        )

    # モデルの概要
    print(model.summary())

    # モデルを学習させる
    fit_callbacs = [
        tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                         patience=5,
                                         mode='min'),
    ]
    model.fit(ds_train,
              epochs=100,
              validation_data=ds_test,
              callbacks=fit_callbacs,
              )

    # テストデータを評価する
    scr, sca = model.evaluate(ds_test)
    print(f'Loss: {scr}, Accuracy: {sca}')


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回は、精度とかは横に置いておく。

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_3 (Conv2D)            (None, 30, 30, 32)        896       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 15, 15, 32)        0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 13, 13, 64)        18496     
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 6, 6, 64)          0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 4, 4, 128)         73856     
_________________________________________________________________
flatten_1 (Flatten)          (None, 2048)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 64)                131136    
_________________________________________________________________
dense_3 (Dense)              (None, 10)                650       
=================================================================
Total params: 225,034
Trainable params: 225,034
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/100
13/13 [==============================] - 10s 431ms/step - loss: 2.1851 - sparse_categorical_accuracy: 0.1976 - val_loss: 1.9839 - val_sparse_categorical_accuracy: 0.2979
Epoch 2/100
13/13 [==============================] - 1s 88ms/step - loss: 1.9094 - sparse_categorical_accuracy: 0.3103 - val_loss: 1.8427 - val_sparse_categorical_accuracy: 0.3334
Epoch 3/100
13/13 [==============================] - 1s 85ms/step - loss: 1.7798 - sparse_categorical_accuracy: 0.3575 - val_loss: 1.7185 - val_sparse_categorical_accuracy: 0.3849

...(snip)...

Epoch 71/100
13/13 [==============================] - 1s 87ms/step - loss: 0.8315 - sparse_categorical_accuracy: 0.7118 - val_loss: 0.9328 - val_sparse_categorical_accuracy: 0.6744
Epoch 72/100
13/13 [==============================] - 1s 89ms/step - loss: 0.8223 - sparse_categorical_accuracy: 0.7148 - val_loss: 0.9292 - val_sparse_categorical_accuracy: 0.6737
Epoch 73/100
13/13 [==============================] - 1s 88ms/step - loss: 0.8054 - sparse_categorical_accuracy: 0.7224 - val_loss: 0.9340 - val_sparse_categorical_accuracy: 0.6756
3/3 [==============================] - 1s 16ms/step - loss: 0.9340 - sparse_categorical_accuracy: 0.6756
Loss: 0.9339648485183716, Accuracy: 0.675599992275238

ちゃんと動いているようだ。 カスタムトレーニングループを使うときは、また気にするところがあるみたいだけど、今回は取り扱わない。

参考

cloud.google.com

cloud.google.com

cloud.google.com

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org