CUBE SUGAR CONTAINER

技術系のこと書きます。

Overlay Filesystem と Docker について

Linux で利用できるファイルシステムの一つに Overlay Filesystem (OverlayFS) がある。 このファイルシステムは、Docker が推奨しているストレージドライバの overlay2 が利用していることで有名。 今回は、そんな OverlayFS を Docker を介さずに扱ってみる。

使った環境は次のとおり。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=20.04
DISTRIB_CODENAME=focal
DISTRIB_DESCRIPTION="Ubuntu 20.04.3 LTS"
$ uname -rm
5.11.0-1021-gcp x86_64
$ sudo docker version
Client: Docker Engine - Community
 Version:           20.10.9
 API version:       1.41
 Go version:        go1.16.8
 Git commit:        c2ea9bc
 Built:             Mon Oct  4 16:08:29 2021
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.9
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.16.8
  Git commit:       79ea9d3
  Built:            Mon Oct  4 16:06:37 2021
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.4.11
  GitCommit:        5b46e404f6b9f661a205e28d59c982d3634148f8
 runc:
  Version:          1.0.2
  GitCommit:        v1.0.2-0-g52b36a2
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

もくじ

下準備

OverlayFS は Linux カーネルに組み込まれているため、利用する上で特に必要なパッケージはない。 しいていえば、mount(8) がないとユーザ空間から簡単に操作する手段がないくらい。 ただし、今回は最終的に Docker イメージを元に手動で OverlayFS をマウントして unshare(1) でコンテナもどきを作りたい。 そのために、あらかじめ jq と Docker をインストールしておく。

まずは jq を入れる。

$ sudo apt-get update
$ sudo apt-get install jq

続いて Docker を入れる。

$ sudo apt-get remove docker docker-engine docker.io containerd runc
$ sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg \
    lsb-release
$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
$ echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
$ sudo apt-get update
$ sudo apt-get install docker-ce docker-ce-cli containerd.io

Docker が動作することを確認しておく。

$ sudo docker run hello-world

... (snip) ...

Hello from Docker!
This message shows that your installation appears to be working correctly.

... (snip) ...

これで準備が整った。

OverlayFS を読み取り専用で使ってみる

OverlayFS は、別のファイルシステムの上で動作する。 たとえば今回であれば ext4 で構築されたファイルシステム上で扱う。 なお、ドキュメントによると NFS 上で扱う場合には制限事項があるようだ。

$ df -T | head -n 3
Filesystem     Type     1K-blocks    Used Available Use% Mounted on
/dev/root      ext4       9983232 4428432   5538416  45% /
devtmpfs       devtmpfs   4068948       0   4068948   0% /dev

OverlayFS では、ファイルシステム上のディレクトリを重ねてマージした状態で扱うことができる。 これは、あまり言葉で説明しても分かりにくいと思うので、以下に例を示す。

まずは重ねるディレクトリとして lower1lower2 を用意する。 そして、マウントポイントとして merged というディレクトリも用意する。

$ mkdir lower1 lower2 merged

lower1lower2 に、同じ名前を持った a というファイルを用意しよう。 それぞれのファイルには、どちらのディレクトリ由来なのかがわかるようにテキストを書き込んでおく。

$ echo "lower1" > lower1/a
$ echo "lower2" > lower2/a

また、lower2 にだけ存在するファイルとして b も用意する。

$ echo "lower2" > lower2/b

上記の lower1lower2 を OverlayFS で重ね合わせて、merged にマウントしてみよう。 このとき lowerdir にコロン区切りでディレクトリを指定する。

$ sudo mount -t overlay overlay -o lowerdir=lower1:lower2 merged

マウント先を確認すると、mergedab というファイルがある。

$ ls merged/
a  b

ファイルの中身を確認すると、alower1 のものが使われている。 ここから、lowerdir は最初 (左側) に登場したディレクトリの内容が優先されることがわかる。 なお、lower2 にしか存在しない b は、当然ながらそれ由来になる。

$ cat merged/a 
lower1
$ cat merged/b
lower2

なお、lowerdir だけを指定した場合には、ファイルシステムは読み取り専用になる。 これは、lowerdir はファイルシステムの元ネタに過ぎないため、変更点を書き込む場所が存在しないため。

$ echo "Hello, World" > merged/c
-bash: merged/c: Read-only file system

基本的な使い方が確認できたところでアンマウントしておこう。

$ sudo umount merged

アンマウントすると中身は空っぽに戻る。

$ ls merged/

OverlayFS を書き込める状態で使ってみる

続いては書き込み可能なファイルシステムを作ってみよう。

書き込み可能にする場合には workdirupperdir というディレクトリを指定する必要がある。 まずはそれに使うディレクトリを作っておこう。

$ mkdir work upper

先ほどと同じ要領でマウントする。 ただし、今回はオプションに workdirupperdir を指定する。

$ sudo mount -t overlay overlay -o lowerdir=lower1:lower2,workdir=work,upperdir=upper merged

すると、今度はマウントしたファイルシステムに書き込みが可能になる。 試しに c というファイルを書き込んでみよう。

$ echo "Hello, World" > merged/c
$ cat merged/c
Hello, World

変更点は upperdir に指定したディレクトリに書き込まれていることがわかる。

$ ls upper
c
$ cat upper/c 
Hello, World

なお、もちろんファイルを削除することもできる。

$ rm merged/b

削除されたファイルの情報はデバイス番号が 0/0 のキャラクタデバイスファイルとして表現される。

$ ls upper/
b  c
$ file upper/b
upper/b: character special (0/0)

動作確認が終わったら、アンマウントして書き込まれた内容を掃除しておこう。

$ sudo umount merged
$ rm -rf upper/*

Docker イメージを元に手動でコンテナもどきを作ってみる

さて、ここまでで OverlayFS の基本的な使い方がわかった。 次は Docker イメージを元に、OverlayFS を使って手動でコンテナもどきの環境を作ってみることにしよう。

まずは適当なコンテナイメージを取得する。 今回は python:3.9-slim を使うことにした。

$ sudo docker image pull python:3.9-slim

試しに取得したイメージを使ってコンテナを立ち上げてみよう。

$ sudo docker container run --rm -it python:3.9-slim bash

次のように、このイメージでは Python 3.9 が利用できる。

# ls /
bin   dev  home  lib64  mnt  proc  run   srv  tmp  var
boot  etc  lib   media  opt  root  sbin  sys  usr
# which python3
/usr/local/bin/python3
# python3 -V
Python 3.9.7

ちなみに、今回使っているシステムにインストールされている Python はバージョンが 3.8 になっている。

$ python3 -V
Python 3.8.10

さて、取得したイメージを docker inspect で確認してみよう。 すると、イメージを構成しているディレクトリの情報が含まれることがわかる。 おや?この名称は OverlayFS で使われているものと似ていないだろうか。

$ sudo docker inspect python:3.9-slim | jq .[0].GraphDriver.Data
{
  "LowerDir": "/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff:/var/lib/docker/overlay2/9cea339dd2014f814c5b69087ba70aa62df4993f9aa74d6f4aacfd5fca5e156b/diff:/var/lib/docker/overlay2/ea324ac31f37d8a379fec3c132f2684d8928b11db6e83eb3922c93a14a674340/diff:/var/lib/docker/overlay2/4823ba02e2349749afa3a71b55c630b5a90decb0462fc136f1c2c246648ee540/diff",
  "MergedDir": "/var/lib/docker/overlay2/b5e26155a3241b7fc8df4497387d166687c09d3bdbf3ce56fe71899f209d6c87/merged",
  "UpperDir": "/var/lib/docker/overlay2/b5e26155a3241b7fc8df4497387d166687c09d3bdbf3ce56fe71899f209d6c87/diff",
  "WorkDir": "/var/lib/docker/overlay2/b5e26155a3241b7fc8df4497387d166687c09d3bdbf3ce56fe71899f209d6c87/work"
}

LowerDir に含まれるディレクトリを確認すると、これがイメージを構成している「レイヤー」の実体であることがわかる。

$ sudo find /var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local/bin
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local/bin/pydoc
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local/bin/python-config
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local/bin/idle
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff/usr/local/bin/python
$ sudo ls /var/lib/docker/overlay2/4823ba02e2349749afa3a71b55c630b5a90decb0462fc136f1c2c246648ee540/diff
bin  boot  dev  etc  home  lib  lib64  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var

なんとなく LowerDir の中身は、そのまま OverlayFS のオプションとして渡せそうだ。

$ sudo docker inspect python:3.9-slim | jq -r .[0].GraphDriver.Data.LowerDir
/var/lib/docker/overlay2/78920b19e12a8ada04d603b0c5565e8e30fc9139c929aa291e8e45118eb1fede/diff:/var/lib/docker/overlay2/9cea339dd2014f814c5b69087ba70aa62df4993f9aa74d6f4aacfd5fca5e156b/diff:/var/lib/docker/overlay2/ea324ac31f37d8a379fec3c132f2684d8928b11db6e83eb3922c93a14a674340/diff:/var/lib/docker/overlay2/4823ba02e2349749afa3a71b55c630b5a90decb0462fc136f1c2c246648ee540/diff

物は試しということで、先ほどと同じ要領で OverlayFS をマウントしてみよう。

$ sudo mount -t overlay overlay -o lowerdir=$(sudo docker inspect python:3.9-slim | jq -r .[0].GraphDriver.Data.LowerDir),workdir=work,upperdir=upper merged

何だか上手くいった感じがする。

$ ls merged/
bin  boot  dev  etc  home  lib  lib64  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var

ここでおもむろに unshare(1) を使う。 unshare(1) は内部的に unshare(2) を呼んでプロセスのアイソレーションを操作できるコマンド。 以下で指定している -R オプションは、プロセスのルートを指定したディレクトリに変更するというもの。 このとき、起動するプロセスとしては bash などのシェルを指定しよう。

$ sudo unshare -R merged bash

すると、OverlayFS のディレクトリをルートとして持ったシェルのプロセスが誕生する。

# ls /
bin  boot  dev  etc  home  lib  lib64  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var

このプロセスはシステムとは違って Python 3.9 が利用できる。

# which python3
/usr/local/bin/python3
# python3 -V
Python 3.9.7
# python3 -c "print('Hello, World')"
Hello, World

ということで、Docker イメージを元ネタに OverlayFS を直接使ってコンテナっぽいものを作ることができた。 もちろん、このプロセスはファイルシステム以外にはシステムとのアイソレーションができていない。 とはいえ、Docker (のコンテナランタイム) が内部的にやっていることは本質的に上記と同じこと。

いじょう。

参考

https://www.kernel.org/doc/Documentation/filesystems/overlayfs.txt

Python: PyTorch の MultiheadAttention を検算してみる

今回は、言わずと知れた Transformer 1 において、処理の中心的な役割を果たしている (とされる) Multi-Head Attention を扱ってみる。 これは、Scaled Dot Product Attention という処理を改良したもの。

PyTorch には Multi-Head Attention の実装として MultiheadAttention というクラスが用意されている。 今回は、これがどういった処理をしているのかを、検算しながら確かめてみる。

pytorch.org

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.6
BuildVersion:   20G165
$ python -V          
Python 3.9.7
$ pip list | grep -i torch
torch             1.9.1

もくじ

下準備

あらかじめ PyTorch と NumPy をインストールしておく。

$ pip install torch numpy

続いて、Python のインタプリタを起動しておこう。

$ python

PyTorch 関連のモジュールをインポートしておく。

>>> import torch
>>> from torch import nn
>>> from torch.nn import functional as F

検算する

Multi-Head Attention は、Query と Key と Value (以下、Q, K, V) という 3 つのパラメータを入力として受け取る。 それぞれのパラメータは同じ次元数で、返す値は Query と同一の形状になるという特徴がある。 なお、Attention 自体の説明は以下のブログが詳しい。

deeplearning.hatenablog.com

はじめに、返り値の次元数を定義する。 この次元数は、前述のとおり Q の次元数と同じになる。

>>> embed_dim = 4

続いてヘッド数を定義する。 ヘッドというのは、入力をいくつかに分割して処理するそれぞれの Scaled Dot Product Attention のこと。 元の入力データの次元数が \displaystyle{d _ {model}} とすると、各ヘッドに入力されるデータの次元数は  \frac{d_{model}}{h} になる。 ここで  h がヘッド数を表す。 ヘッド数はハイパーパラメータで、多すぎても少なすぎても良くないらしい。 まずは単純なケースとしてヘッド数が 1 の場合を確かめよう。

>>> num_heads = 1

定義した次元数とヘッド数を使って MultiheadAttention をインスタンス化する。 今回は単純にするためバイアス項をなくし、入力データの形状としてバッチが最初にくるようにした。

>>> model = nn.MultiheadAttention(embed_dim=embed_dim,
...                               num_heads=num_heads,
...                               bias=False,
...                               batch_first=True)

つづいて、上記に入力するダミーデータを用意する。 今回は 2 x 5 x 4 という形状のデータにした。 仮に自然言語を想定するなら、最初がバッチ、2 番目が文章の系列長、最後が単語の分散表現の次元数を表すことになる。 それぞれ batch_sizeLembed_dim という変数名で用意している。

>>> batch_size = 2  # 一度に処理するデータの数
>>> L = 5  # 入力する系列データの長さ
>>> X = torch.randn(batch_size, L, embed_dim)  # ダミーの入力データ

今回は Transformer でも用いられている Self Attention を想定するので Q, K, V すべてに同じ X を入れる。

>>> Q = K = V = X

モデルとデータが用意できたので順伝搬させて返り値を得よう。 返り値としては、変換された Q と同一形状のテンソルと、モデルが何に注目したかを表す Attention Weights (Map とも) が返ってくる。

>>> attn_output, attn_weights = model(Q, K, V)

もちろん、上記は初期値の重みとダミーデータを使って得たものなので、中身自体には何の意味があるわけでもない。 しかし、どういった計算を経てこれが得られるのかを確かめる分には十分だ。

以下を見てわかるとおり、出力は入力した Query と同じ形状になっている。

>>> attn_output
tensor([[[-0.1453, -0.1466, -0.0371,  0.3497],
         [-0.1822, -0.1085, -0.0900,  0.3613],
         [-0.0751, -0.1506,  0.0799,  0.2830],
         [-0.1552, -0.1184, -0.0450,  0.3429],
         [-0.2029, -0.0971, -0.1205,  0.3772]],

        [[-0.2694, -0.3017, -0.4528,  0.5128],
         [-0.3347, -0.3734, -0.4705,  0.6975],
         [-0.2867, -0.3250, -0.4454,  0.5810],
         [-0.2401, -0.2732, -0.4402,  0.4353],
         [-0.2539, -0.2964, -0.4241,  0.5060]]], grad_fn=<TransposeBackward0>)
>>> attn_output.shape
torch.Size([2, 5, 4])

Attention Weights は、先頭がバッチで、2 番目と 3 番目が系列長 L と同じ形状になる。

>>> attn_weights
tensor([[[0.2358, 0.2097, 0.2365, 0.1587, 0.1592],
         [0.1823, 0.1938, 0.1635, 0.2238, 0.2366],
         [0.2124, 0.1384, 0.3916, 0.1622, 0.0953],
         [0.1892, 0.1843, 0.2213, 0.2131, 0.1921],
         [0.1687, 0.2126, 0.1319, 0.2141, 0.2727]],

        [[0.2454, 0.1563, 0.1772, 0.2531, 0.1680],
         [0.2078, 0.2083, 0.2640, 0.1586, 0.1612],
         [0.2298, 0.2132, 0.1977, 0.1992, 0.1602],
         [0.2408, 0.1406, 0.1461, 0.2921, 0.1804],
         [0.2226, 0.2197, 0.1611, 0.2224, 0.1742]]], grad_fn=<DivBackward0>)
>>> attn_weights.shape
torch.Size([2, 5, 5])

さて、それでは上記の結果がどのように得られるのかを検算で確かめてみよう。

まず、PyTorch のドキュメントを見ると MultiheadAttention の数式は以下のように定義されている。

\displaystyle{

\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h)W^O \\

\text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)

}

Q, K, V それぞれを、パラメータ行列 \displaystyle{W_i} と積をとって Attention に入れたものが、各ヘッドの出力になる。 各ヘッドの出力は連結した上で、さらにパラメータ行列 \displaystyle{W^ O} とかけたものが MultiheadAttention の出力だ。

ここで   \text{Attention} は Scaled Dot Product Attention なので以下になる。

\displaystyle{
\text{Attention}(Q, K, V) = \text{softmax}(\frac{Q K^T}{d_q}) V 
}

数式がわかったので、まずはモデルが持っているパラメータを確認してみよう。 どうやら in_proj_weightsout_proj.weight というパラメータがあるらしい。 これらは、上記の式において  W_i と [tex: WO] に対応している。

>>> from pprint import pprint
>>> pprint(list(model.named_parameters()))
[('in_proj_weight',
  Parameter containing:
tensor([[-0.5443,  0.3884, -0.1312, -0.1092],
        [ 0.1386, -0.3444,  0.3273,  0.1445],
        [-0.2816,  0.0416, -0.4813,  0.1620],
        [-0.4794, -0.0049, -0.5191, -0.3294],
        [-0.3429,  0.4189, -0.0930,  0.2866],
        [ 0.5036, -0.2311,  0.2426,  0.0193],
        [ 0.5196, -0.0979, -0.4762, -0.3478],
        [-0.3660, -0.3218, -0.2310, -0.2840],
        [-0.4351,  0.1184, -0.3720, -0.2419],
        [-0.2723, -0.5269,  0.2075, -0.4505],
        [ 0.0627,  0.0975,  0.5494, -0.2860],
        [ 0.4284,  0.5447, -0.1266,  0.2931]], requires_grad=True)),
 ('out_proj.weight',
  Parameter containing:
tensor([[-0.0758,  0.0238, -0.4159,  0.4350],
        [ 0.1650, -0.2046,  0.4133,  0.2710],
        [ 0.4356, -0.0973, -0.1273,  0.3115],
        [ 0.3645,  0.4667,  0.4714, -0.4997]], requires_grad=True))]

それぞれの行列をモデルから取り出しておこう。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> Wi = model_weights['in_proj_weight']
>>> Wo = model_weights['out_proj.weight']

Wi は Q, K, V それぞれにかける部位が分かれているので取り出す。

>>> Wi_q, Wi_k, Wi_v = Wi.chunk(3)

取り出したら次のようにして Attention に入力する部分を計算する。

>>> QW = torch.matmul(Q, Wi_q.T)
>>> KW = torch.matmul(K, Wi_k.T)
>>> VW = torch.matmul(V, Wi_v.T)

数式で対応するのは、以下の Attention に渡す前の部分。

\displaystyle{

 \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)

}

なお、今回は Self Attention なので、次のように計算しても結果は変わらない。

>>> QW, KW, VW = torch.matmul(Q, Wi.T).chunk(3, dim=-1)

続いては Scaled Dot Product Attention の中の処理に入る。 まずは \displaystyle{
Q K^ T
} を計算する。

>>> KW_t = KW.transpose(-2, -1)
>>> QK_t = torch.bmm(QW, KW_t)

次にスケールを調整する。 数式だと \displaystyle{
\frac{Q K^ T}{d _ q}
} の部分。

>>> import math
>>> QK_t_scaled = QK_t / math.sqrt(embed_dim)

Softmax を通して足して 1 になるようにする。 \displaystyle{
 \text{softmax}(\frac{Q K^ T}{d _ q})
} の部分。 これが Attention Weights と呼ばれるもの。

>>> attn_weights_ = F.softmax(QK_t_scaled, dim=-1)

モデルから得られた値と比較すると、ちゃんと一致していることがわかる。

>>> attn_weights
tensor([[[0.2358, 0.2097, 0.2365, 0.1587, 0.1592],
         [0.1823, 0.1938, 0.1635, 0.2238, 0.2366],
         [0.2124, 0.1384, 0.3916, 0.1622, 0.0953],
         [0.1892, 0.1843, 0.2213, 0.2131, 0.1921],
         [0.1687, 0.2126, 0.1319, 0.2141, 0.2727]],

        [[0.2454, 0.1563, 0.1772, 0.2531, 0.1680],
         [0.2078, 0.2083, 0.2640, 0.1586, 0.1612],
         [0.2298, 0.2132, 0.1977, 0.1992, 0.1602],
         [0.2408, 0.1406, 0.1461, 0.2921, 0.1804],
         [0.2226, 0.2197, 0.1611, 0.2224, 0.1742]]], grad_fn=<DivBackward0>)
>>> attn_weights_
tensor([[[0.2358, 0.2097, 0.2365, 0.1587, 0.1592],
         [0.1823, 0.1938, 0.1635, 0.2238, 0.2366],
         [0.2124, 0.1384, 0.3916, 0.1622, 0.0953],
         [0.1892, 0.1843, 0.2213, 0.2131, 0.1921],
         [0.1687, 0.2126, 0.1319, 0.2141, 0.2727]],

        [[0.2454, 0.1563, 0.1772, 0.2531, 0.1680],
         [0.2078, 0.2083, 0.2640, 0.1586, 0.1612],
         [0.2298, 0.2132, 0.1977, 0.1992, 0.1602],
         [0.2408, 0.1406, 0.1461, 0.2921, 0.1804],
         [0.2226, 0.2197, 0.1611, 0.2224, 0.1742]]])

続いては Attention Weights を重みとした、V の重みつき和を得る。 これで \displaystyle{
\text{head} _ i = \text{Attention}(QW _ i^ Q, KW _ i^ K, VW _ i^ V)
} に対応するヘッドの出力が得られた。

>>> AV = torch.matmul(attn_weights_, VW)

今回、ヘッド数が 1 なので連結処理は必要ない。 あとはヘッドの出力を \displaystyle{
W^ O
} とかけるだけ。

>>> attn_output_ = torch.matmul(AV, Wo.T)

最初にモデルから得られた値と比較すると、ちゃんと一致していることがわかる。

>>> attn_output
tensor([[[-0.1453, -0.1466, -0.0371,  0.3497],
         [-0.1822, -0.1085, -0.0900,  0.3613],
         [-0.0751, -0.1506,  0.0799,  0.2830],
         [-0.1552, -0.1184, -0.0450,  0.3429],
         [-0.2029, -0.0971, -0.1205,  0.3772]],

        [[-0.2694, -0.3017, -0.4528,  0.5128],
         [-0.3347, -0.3734, -0.4705,  0.6975],
         [-0.2867, -0.3250, -0.4454,  0.5810],
         [-0.2401, -0.2732, -0.4402,  0.4353],
         [-0.2539, -0.2964, -0.4241,  0.5060]]], grad_fn=<TransposeBackward0>)
>>> attn_output_
tensor([[[-0.1453, -0.1466, -0.0371,  0.3497],
         [-0.1822, -0.1085, -0.0900,  0.3613],
         [-0.0751, -0.1506,  0.0799,  0.2830],
         [-0.1552, -0.1184, -0.0450,  0.3429],
         [-0.2029, -0.0971, -0.1205,  0.3772]],

        [[-0.2694, -0.3017, -0.4528,  0.5128],
         [-0.3347, -0.3734, -0.4705,  0.6975],
         [-0.2867, -0.3250, -0.4454,  0.5810],
         [-0.2401, -0.2732, -0.4402,  0.4353],
         [-0.2539, -0.2964, -0.4241,  0.5060]]])

これで、出力と Attention Weights の両方について、モデルから得られたものと計算した値が一致した。

ヘッド数を増やしてみる

さて、先ほどの検算ではヘッド数が 1 の単純なパターンを試した。 続いてはヘッド数を 2 に増やして同様のことをやってみよう。 ちゃんとできるだろうか。

まずはヘッド数を 2 に増やした上で MultiheadAttention をインスタンス化し直す。

>>> num_heads = 2
>>> model = nn.MultiheadAttention(embed_dim=embed_dim,
...                               num_heads=num_heads,
...                               bias=False,
...                               batch_first=True)

ダミーデータはそのまま流用して、また返り値を得よう。

>>> attn_output, attn_weights = model(Q, K, V)

モデルからパラメータを取り出す。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> Wi = model_weights['in_proj_weight']
>>> Wo = model_weights['out_proj.weight']

今回、ヘッド数が 1 から 2 に増えるので、ヘッドに入力されるデータの次元数は半分になる。 そこを計算しておこう。

>>> embed_dim_per_head = embed_dim // num_heads
>>> embed_dim_per_head
2

まずは \displaystyle{
W _ i
} の方のパラメータ行列を先ほどと同じように取り出そう。

>>> Wi_q, Wi_k, Wi_v = Wi.chunk(3)

上記の行列は、ヘッド毎に使う部分が分かれている。 そこで、0 番目のヘッド用と 1 番目のヘッド用に分割して取り出す。

>>> Wi0_q, Wi1_q = Wi_q.chunk(num_heads)
>>> Wi0_k, Wi1_k = Wi_k.chunk(num_heads)
>>> Wi0_v, Wi1_v = Wi_v.chunk(num_heads)

Scaled Dot Product Attention の計算はやることが多いので一旦関数にまとめてしまおう。

>>> def scaled_dot_product_self_attention(X, Wi_q, Wi_k, Wi_v):
...     QW = torch.matmul(Q, Wi_q.T)
...     KW = torch.matmul(K, Wi_k.T)
...     VW = torch.matmul(V, Wi_v.T)
...     KW_t = KW.transpose(-2, -1)
...     QK_t = torch.bmm(QW, KW_t)
...     import math
...     QK_t_scaled = QK_t / math.sqrt(embed_dim_per_head)
...     attn_weights_ = F.softmax(QK_t_scaled, dim=-1)
...     AV = torch.matmul(attn_weights_, VW)
...     return AV, attn_weights_
... 

次のようにヘッドごとの計算結果を得る。

>>> AV0, attn_weights0_ = scaled_dot_product_self_attention(Q, Wi0_q, Wi0_k, Wi0_v)
>>> AV1, attn_weights1_ = scaled_dot_product_self_attention(Q, Wi1_q, Wi1_k, Wi1_v)

ヘッドごとの計算結果を連結する。 数式では \displaystyle{
\text{Concat}(\text{head} _ 1, \dots, \text{head} _ h)
} に対応する。

>>> AV_concat = torch.cat([AV0, AV1], dim=-1)

連結したらあとはさっきと同じ。

>>> attn_output_ = torch.matmul(AV_concat, Wo.T)

結果を確認すると、ちゃんとモデルの出力と一致している。

>>> attn_output
tensor([[[ 0.0637, -0.1713,  0.1646,  0.1171],
         [ 0.2022, -0.2627,  0.1944,  0.0561],
         [ 0.1051, -0.1979,  0.1749,  0.0978],
         [ 0.1367, -0.2148,  0.1865,  0.0759],
         [ 0.1614, -0.2310,  0.1923,  0.0644]],

        [[-0.3877, -0.1922, -0.1522,  0.6143],
         [-0.4006, -0.1432, -0.1958,  0.5701],
         [-0.3928, -0.2885, -0.1149,  0.7077],
         [-0.3871, -0.1448, -0.1641,  0.5711],
         [-0.4203, -0.1682, -0.1782,  0.6106]]], grad_fn=<TransposeBackward0>)
>>> attn_output_
tensor([[[ 0.0637, -0.1713,  0.1646,  0.1171],
         [ 0.2022, -0.2627,  0.1944,  0.0561],
         [ 0.1051, -0.1979,  0.1749,  0.0978],
         [ 0.1367, -0.2148,  0.1865,  0.0759],
         [ 0.1614, -0.2310,  0.1923,  0.0644]],

        [[-0.3877, -0.1922, -0.1522,  0.6143],
         [-0.4006, -0.1432, -0.1958,  0.5701],
         [-0.3928, -0.2885, -0.1149,  0.7077],
         [-0.3871, -0.1448, -0.1641,  0.5711],
         [-0.4203, -0.1682, -0.1782,  0.6106]]])

ちなみに Attention Weights はどうなっているかというと、すべてのヘッドの平均を計算しているようだ。

>>> attn_weights_ = (attn_weights0_ + attn_weights1_) / num_heads

平均をとったものが、モデルの出力と一致している。

>>> attn_weights
tensor([[[0.2021, 0.1750, 0.1170, 0.2130, 0.2929],
         [0.2060, 0.1956, 0.2399, 0.1871, 0.1713],
         [0.2028, 0.1883, 0.1492, 0.2120, 0.2476],
         [0.1966, 0.2012, 0.1703, 0.2065, 0.2254],
         [0.1972, 0.2058, 0.1914, 0.2004, 0.2052]],

        [[0.1916, 0.2228, 0.1979, 0.1893, 0.1984],
         [0.1783, 0.2872, 0.1692, 0.1739, 0.1914],
         [0.1891, 0.0948, 0.2491, 0.2291, 0.2379],
         [0.1943, 0.2597, 0.1843, 0.1796, 0.1822],
         [0.2045, 0.1544, 0.2107, 0.2196, 0.2110]]], grad_fn=<DivBackward0>)
>>> attn_weights_
tensor([[[0.2021, 0.1750, 0.1170, 0.2130, 0.2929],
         [0.2060, 0.1956, 0.2399, 0.1871, 0.1713],
         [0.2028, 0.1883, 0.1492, 0.2120, 0.2476],
         [0.1966, 0.2012, 0.1703, 0.2065, 0.2254],
         [0.1972, 0.2058, 0.1914, 0.2004, 0.2052]],

        [[0.1916, 0.2228, 0.1979, 0.1893, 0.1984],
         [0.1783, 0.2872, 0.1692, 0.1739, 0.1914],
         [0.1891, 0.0948, 0.2491, 0.2291, 0.2379],
         [0.1943, 0.2597, 0.1843, 0.1796, 0.1822],
         [0.2045, 0.1544, 0.2107, 0.2196, 0.2110]]])

これで、複数のヘッドがあるパターンについてもモデルと計算した値が一致することがわかった。

まとめ

今回は Transformer の中心的な処理である Multi-Head Attention について、PyTorch の実装を例に検算してみた。

Python: Luigi の DateIntervalParameter について

バッチ処理に特化した Python のデータパイプライン構築用のフレームワークに Luigi がある。 今回は、特定の時系列的な範囲を Task が受け取るのに使える DateIntervalParameter というパラメータを紹介する。 これは、たとえば一週間とか一ヶ月あるいは特定の日付から日付といった範囲で、何らかの集計をする処理を書くときに便利に使える。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.6
BuildVersion:   20G165
$ python -V        
Python 3.9.7
$ pip list | grep -i luigi
luigi                    3.0.3

もくじ

下準備

あらかじめ、Luigi をインストールしておく。

$ pip install luigi

DateIntervalParameter について

早速だけど以下にサンプルコードを示す。

以下では ExampleTask というタスクを定義している。 このタスクは dt_interval という名前で DateIntervalParameter 型のパラメータを受け取る。 タスクが実行されると DateIntervalParameter#dates() メソッドを呼んで、範囲に含まれる日付を標準出力に書き出す。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi


class ExampleTask(luigi.Task):
    # 期間を指定できるパラメータ
    dt_interval = luigi.DateIntervalParameter()

    def run(self):
        for date in self.dt_interval.dates():
            # 各日付に適用する擬似的な処理
            print(f'Processing: {date}')

    def complete(self):
        # 動作確認用に output() メソッドを定義しないで常にタスクが実行されるようにする
        return False

上記を実行してみよう。 まずは "2021-W01" という文字列を渡してみる。 これは 2021 年の第 01 週を表している。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-W01"

... (snip) ...

Processing: 2021-01-04
Processing: 2021-01-05
Processing: 2021-01-06
Processing: 2021-01-07
Processing: 2021-01-08
Processing: 2021-01-09
Processing: 2021-01-10

... (snip)

上記を見ると 2021-01-04 から 2021-01-10 が、指定した 2021-W01 に含まれる日付ということがわかる。

同様に "2021-01" という文字列を渡してみよう。 これは 2021 年の 1 月を表している。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-01"

... (snip) ...

Processing: 2021-01-01
Processing: 2021-01-02
Processing: 2021-01-03
... (snip) ...
Processing: 2021-01-29
Processing: 2021-01-30
Processing: 2021-01-31

... (snip)

数が多いので省略しているけど、2021-01-01 から 2021-01-31 が範囲に含まれることがわかる。

また、ISO 8601 形式の日付をハイフン (-) でつなぐと、任意の日付の範囲が指定できる。 たとえば 2021-09-01 から 2021-09-07 を指定してみよう。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-09-01-2021-09-07"

... (snip) ...

Processing: 2021-09-01
Processing: 2021-09-02
Processing: 2021-09-03
Processing: 2021-09-04
Processing: 2021-09-05
Processing: 2021-09-06

... (snip)

末尾の日付は含まれずに、2021-09-01 から 2021-09-06 が範囲に含まれていることがわかる。

このように DateIntervalParameter を使うと、特定の日付の範囲を受け取る処理が書きやすい。 典型的には、開始日と終了日を個別に取っていたような処理を置き換えることができる。

動作原理

使い方はわかったので、この DateIntervalParameter というパラメータが、どのように実現されているのか見ていこう。

以下のサンプルコードでは、受け取った dt_interval の型を表示している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi


class ExampleTask(luigi.Task):
    dt_interval = luigi.DateIntervalParameter()

    def run(self):
        # dt_interval の型を表示する
        print(f'*** Type of dt_interval: {type(self.dt_interval)} ***')

    def complete(self):
        return False

上記に "2021-W01" という文字列を渡すと luigi.date_interval.Week という型になっていることが確認できる。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-W01"

... (snip) ...

*** Type of dt_interval: <class 'luigi.date_interval.Week'> ***

... (snip)

同様に、"2021-01" を渡したときでは luigi.date_interval.Month になる。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-01"

... (snip) ...

*** Type of dt_interval: <class 'luigi.date_interval.Month'> ***

... (snip)

以下、実行については省略しつつ、以下のように対応している。

  • <年-月-日>
    • luigi.date_interval.Date
  • <年-W週>
    • luigi.date_interval.Week
  • <年-月>
    • luigi.date_interval.Month
  • <年-月-日>-<年-月-日>
    • luigi.date_interval.Custom

ここからは Python のインタプリタを使って確認していこう。

$ python

luigi.date_interval をインポートする。

>>> from luigi import date_interval

たとえば luigi.date_interval.Week をインスタンス化してみよう。 このクラスには年と週数を渡す必要がある。

>>> from pprint import pprint
>>> week_interval = date_interval.Week(2021, 1)

ちなみに、実行時と同じように文字列を使ってインスタンス化するときは parse() メソッドを使えば良い。

>>> date_interval.Week.parse('2021-W01')
2021-W01

このオブジェクトには、先ほどのサンプルコードでも登場したように dates() というメソッドがある。 このメソッドは、指定された期間に含まれる日付を返す。

>>> pprint(week_interval.dates())
[datetime.date(2021, 1, 4),
 datetime.date(2021, 1, 5),
 datetime.date(2021, 1, 6),
 datetime.date(2021, 1, 7),
 datetime.date(2021, 1, 8),
 datetime.date(2021, 1, 9),
 datetime.date(2021, 1, 10)]

また、next() メソッドを使うと次の期間が、prev() メソッドを使うと前の期間が得られる。

>>> week_interval.next()
2021-W02
>>> week_interval.prev()
2020-W53

これらのメソッドは、Week 以外にも MonthCustom などでそれぞれ実装されている。

ちなみに DateIntervalParameter 自体は、受け取った文字列をそれぞれのクラスの parse() に順番に渡す実装になっている。

luigi.readthedocs.io

いじょう。

Python: Luigi の RangeDaily 系の使い方と注意点について

Python の Luigi はバッチ処理に特化したデータパイプライン構築用のフレームワーク。 バッチ処理に特化しているとあって、定期的に実行する系のユーティリティも色々と用意されている。 今回は、その中でも特定の期間に実行すべきバッチ処理をまとめて扱うことのできる、RangeDaily を代表としたクラス群について書いてみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.6
BuildVersion:   20G165
$ python -V
Python 3.9.7
$ pip list | grep -i luigi    
luigi                    3.0.3

もくじ

下準備

まずは Luigi をインストールしておく。

$ pip install luigi

RangeDaily について

RangeDaily は、日付を受け取って何らかの処理をする Task をまとめて扱うことのできるラッパータスク。 これを使うと、一日ずつ手動でタスクを実行する代わりに、開始期間 (と終了期間) を指定して一気に処理が実行できる。 とはいえ、文章で説明してもあんまりイメージできないと思うのでサンプルコードを使って説明していく。

以下のサンプルコードでは MyDailyTask というタスクを定義している。 このタスクは date というパラメータ名で日付を受け取る。 実行すると、/tmp 以下に受け取った日付を名前に含んだファイルを作成する。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyDailyTask(luigi.Task):
    """日付を受け取って、名前に日付を含むファイルを生成するタスク"""
    date = luigi.DateParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{date:%Y-%m-%d}'.format(date=self.date)
        return luigi.LocalTarget(path)

上記を example.py という名前で保存しよう。

通常なら、上記のタスクを実行するには次のようにする。 Luigi のスケジューラに、タスクを表したクラスと、実行したい日付を --date パラメータで指定する。

$ python -m luigi \
    --local-scheduler \
    --module example \
    MyDailyTask \
    --date 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 MyDailyTask(date=2021-09-01)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行すると、次のようにファイルが作られる。

$ ls /tmp/luigi-2021-09-01  
/tmp/luigi-2021-09-01
$ cat /tmp/luigi-2021-09-01 
Hello, World!

上記は一日ずつ処理する場合の例になる。

続いては、今回の主題である RangeDaily を使ってみよう。 RangeDaily を使うと、この日付以降をまとめて実行する、といったことができる。 以下は --start オプションを使って 2021-09-01 以降を一気に実行する場合の例。 実行するタスクとしては RangeDaily を指定して、--of パラメータで自分で定義したタスクを指定する。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 19 tasks of which:
* 19 ran successfully:
    - 18 MyDailyTask(date=2021-09-02...2021-09-19)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果を見ると、指定した日付以降を一括で実行できていることがわかる。 なお、これを実行しているシステムの日付は 2021-09-19 である。

$ date "+%Y-%m-%d"
2021-09-19

ディレクトリを確認すると 2021-09-01 から 2021-09-19 の範囲でファイルができている。 なお、2021-09-01 については最初に単発で実行したもの。

$ ls /tmp/luigi-* | head -n 1
/tmp/luigi-2021-09-01
$ ls /tmp/luigi-* | tail -n 1
/tmp/luigi-2021-09-19

試しにもう一度同じコマンドを実行してみよう。 すると、いずれのタスクも実行されないことがわかる。 これは、既にタスクが成果物としているファイルが存在するため。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 RangeDaily(...)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

この特性はバッチ処理を扱う上でなかなか便利だったりする。 というのも、何らかの理由で成果物のファイルが日付として歯抜けになっている場合、自動で存在しない日付を検出して実行してくれる。 また、日付の最後も特に指定しない限り今日になるので、バッチ処理として仕込むコマンドに RangeDaily を指定することもできる。

注意点について

続いては RangeDaily の注意点について。 端的に言うと、現時刻から遡れる日数とタスク数に上限が設定されている。

たとえば開始の日付として年初を指定して実行してみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 51 tasks of which:
* 51 ran successfully:
    - 50 MyDailyTask(date=2021-06-12...2021-07-31)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果 (Luigi Execution Summary) を見て違和感を覚えたかもしれない。 というのも、作成されたファイルの日付が 2021-06-12 から始まっているため。

$ ls /tmp/luigi-* | sort | head -n 5
/tmp/luigi-2021-06-12
/tmp/luigi-2021-06-13
/tmp/luigi-2021-06-14
/tmp/luigi-2021-06-15
/tmp/luigi-2021-06-16

また、末尾についても 2021-07-31 で終わっている。

$ ls /tmp/luigi-* | sort | tail -n 25 | head -n 10
/tmp/luigi-2021-07-26
/tmp/luigi-2021-07-27
/tmp/luigi-2021-07-28
/tmp/luigi-2021-07-29
/tmp/luigi-2021-07-30
/tmp/luigi-2021-07-31
/tmp/luigi-2021-09-01
/tmp/luigi-2021-09-02
/tmp/luigi-2021-09-03
/tmp/luigi-2021-09-04

本来なら 2021-01-01 から歯抜けなくファイルができてほしかった。 どうして、こんなことが起こるのか。

この問題はドキュメントを見るとわかる。

luigi.readthedocs.io

遡れる日数の上限 (--days-back)

まず、RangeDailyBase という、RangeDaily の基底クラスには days_back というパラメータがある。 このパラメータは、現在の日時から遡る日付の上限を定めていて、デフォルトで 100 に設定されている。

確認すると、今日から 100 日前は 2021-06-11 だった。 つまり、その日付を含む過去は遡って実行されないようになっている。

$ brew install coreutils
$ gdate --iso-8601 --date '100 days ago'
2021-06-11

試しに --days-back パラメータに 365 を指定して、上限を 1 年まで広げてみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01 \
    --days-back 365

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 51 tasks of which:
* 51 ran successfully:
    - 50 MyDailyTask(date=2021-01-01...2021-02-19)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

すると、上記を見てわかるとおり年初から実行されるようになった。

実行できるタスク数の上限 (--task-limit)

同様に、一度に実行できるタスクの数にも上限がある。 こちらは --task-limit というパラメータで指定できる。 デフォルトでは 50 に設定されている。

試しに、タスク数の上限も 365 に引きあげて実行してみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01 \
    --days-back 365 \
    --task-limit 365

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 144 tasks of which:
* 144 ran successfully:
    - 143 MyDailyTask(date=2021-02-20,2021-02-21,2021-02-22,...)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記を見てわかるとおり、これまでの上限だった 50 を越えてタスクが実行されている。

ファイルの数を検算しても、年初から今日までの日数と一致した。

$ ls /tmp/luigi-* | wc -l
     262
$ gdate --iso-8601 --date '2021-01-01 261 days'
2021-09-19

確認がおわったら、一旦作られたファイルをすべて削除しておこう。

$ rm /tmp/luigi-*

一ヶ月毎の処理 (RangeMonthly)

ちなみに、タイトルに RangeDaily と書いたように、日次のバッチ以外を扱うためのクラスも用意されている。

以下のサンプルコードでは特定の月に対して実行することを想定した MyMonthlyTask というタスクを定義している。 パラメータの名前は date だけど、型が luigi.DateParameter ではなく luigi.MonthParameter になっている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyMonthlyTask(luigi.Task):
    """月を受け取って、名前に月を含むファイルを生成するタスク"""
    date = luigi.MonthParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{month:%Y-%m}'.format(month=self.date)
        return luigi.LocalTarget(path)

先ほどと同じように example.py という名前で保存しておこう。

上記を RangeMonthly 経由で実行する。

$ python -m luigi \
    --local-scheduler \
    RangeMonthly \
    --module example \
    --of MyMonthlyTask \
    --start 2021-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 9 tasks of which:
* 9 ran successfully:
    - 8 MyMonthlyTask(date=2021-01...2021-08)
    - 1 RangeMonthly(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記を見ると、今月を含まない形でタスクが実行されていることがわかる。 これは典型的には日付が揃わないうちに集計処理を実行することは少ないことが関係しているんだろう。

先ほどと同じように、確認できたらファイルを一旦きれいにしておく。

$ rm /tmp/luigi-*

一時間毎の処理 (RangeHourly)

同じように一時間毎の処理も扱うことができる。 以下では一時間毎に実行することを期待した処理を MyHourlyTask という名前で定義している。 パラメータの名前は date だけど、クラスは luigi.DateHourParameter を用いる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyHourlyTask(luigi.Task):
    """時間 (Hour) を受け取って、名前に時間を含むファイルを生成するタスク"""
    date = luigi.DateHourParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{month:%Y-%m-%dT%H}'.format(month=self.date)
        return luigi.LocalTarget(path)

一時間ごとの処理になるとファイルが増えるので、今回は明示的に終了の時刻も指定しておこう。 以下では --start2021-09-15T00 を、--stop2021-09-16T12 を指定している。 T 以降が時間を表している。

$ python -m luigi \
    --local-scheduler \
    RangeHourly \
    --module example \
    --of MyHourlyTask \
    --start 2021-09-15T00 \
    --stop 2021-09-16T12

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 37 tasks of which:
* 37 ran successfully:
    - 36 MyHourlyTask(date=2021-09-15T00...2021-09-16T11)
    - 1 RangeHourly(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を見てわかるとおり、--start の時刻を含んで --stop を含まない形でタスクが実行されていることがわかる。

$ ls /tmp/luigi-*  
/tmp/luigi-2021-09-15T00    /tmp/luigi-2021-09-15T12    /tmp/luigi-2021-09-16T00
/tmp/luigi-2021-09-15T01    /tmp/luigi-2021-09-15T13    /tmp/luigi-2021-09-16T01
/tmp/luigi-2021-09-15T02    /tmp/luigi-2021-09-15T14    /tmp/luigi-2021-09-16T02
/tmp/luigi-2021-09-15T03    /tmp/luigi-2021-09-15T15    /tmp/luigi-2021-09-16T03
/tmp/luigi-2021-09-15T04    /tmp/luigi-2021-09-15T16    /tmp/luigi-2021-09-16T04
/tmp/luigi-2021-09-15T05    /tmp/luigi-2021-09-15T17    /tmp/luigi-2021-09-16T05
/tmp/luigi-2021-09-15T06    /tmp/luigi-2021-09-15T18    /tmp/luigi-2021-09-16T06
/tmp/luigi-2021-09-15T07    /tmp/luigi-2021-09-15T19    /tmp/luigi-2021-09-16T07
/tmp/luigi-2021-09-15T08    /tmp/luigi-2021-09-15T20    /tmp/luigi-2021-09-16T08
/tmp/luigi-2021-09-15T09    /tmp/luigi-2021-09-15T21    /tmp/luigi-2021-09-16T09
/tmp/luigi-2021-09-15T10    /tmp/luigi-2021-09-15T22    /tmp/luigi-2021-09-16T10
/tmp/luigi-2021-09-15T11    /tmp/luigi-2021-09-15T23    /tmp/luigi-2021-09-16T11

まとめ

今回は Luigi で特定の期間毎に実行するバッチ処理を、まとめて扱う RangeDaily 系の使い方と注意点について紹介した。 注意点としては、遡る日付や実行するタスクの数に上限がある点や、期間によって開始・終了を含む・含まないが微妙に異なる点が挙げられる。

Python: PyTorch の GRU / LSTM を検算してみる

以前のエントリで扱った Simple RNN の検算は、個人的になかなか良い勉強になった。

blog.amedama.jp

そこで、今回は Simple RNN の改良版となる GRU (Gated Recurrent Unit) と LSTM (Long Short Term Memory) についても検算してみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.5.2
BuildVersion:   20G95
$ python -V
Python 3.9.6
$ pip list | grep torch
torch                    1.9.0

もくじ

下準備

下準備として、あらかじめ PyTorch をインストールしておく。

$ pip install torch

続いて Python のインタプリタを起動する。

$ python

そして、PyTorch のパッケージをインポートする。

>>> import torch
>>> from torch import nn

GRU を検算する

Simple RNN は、仕組みが単純な一方で隠れ状態が入力によって無条件に更新されてしまう。 そのため、隠れ状態に昔の情報が残りにくいことから、長期的な記憶を保つことが難しいという問題があった。 GRU では、それをゲートという仕組みを導入することで改善を試みている。

まずは次のように GRU クラスをインスタンス化する。 Simple RNN のときと同じように、モデルの初期状態の重みをそのまま使って検算する。

>>> input_dim = 3  # モデルの入力ベクトルの次元数
>>> hidden_dim = 4  # モデルの出力 (隠れ状態) ベクトルの次元数
>>> model = nn.GRU(input_size=input_dim, hidden_size=hidden_dim)

インスタンス化するときに必要な引数は RNN クラスと変わらない。 つまり、入力と出力のサイズを渡すだけ。

インスタンス化できたら、モデルのパラメータを確認しよう。 どうやら、モデルのパラメータが持っている名前は RNN クラスと同じようだ。 ただし、重みを保持している行列のサイズは増している。

>>> from pprint import pprint
>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[-0.3619, -0.1291, -0.0647],
        [-0.4406, -0.2705, -0.3480],
        [ 0.0360,  0.3222,  0.2494],
        [-0.0738, -0.3214,  0.4445],
        [-0.3551,  0.3078, -0.0846],
        [-0.4367,  0.4282, -0.1521],
        [-0.4895,  0.0713,  0.0217],
        [-0.2439,  0.4704, -0.2078],
        [ 0.0460,  0.2528,  0.3555],
        [-0.3008, -0.0595,  0.0586],
        [-0.3535,  0.2088, -0.2179],
        [ 0.2923,  0.0291,  0.4044]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[ 0.0406,  0.3097, -0.2765, -0.2359],
        [ 0.4449,  0.3376,  0.3715, -0.3207],
        [ 0.0157,  0.0347, -0.0091, -0.0438],
        [ 0.1630,  0.3619,  0.3797, -0.0845],
        [ 0.1729, -0.1405,  0.0844, -0.3560],
        [ 0.0711, -0.3750, -0.0721, -0.4998],
        [-0.4140, -0.1105, -0.1611,  0.1338],
        [-0.0574, -0.1216,  0.2439, -0.2021],
        [ 0.1568,  0.2177,  0.4511,  0.4009],
        [-0.4453, -0.0780, -0.1764,  0.3598],
        [ 0.1704,  0.3918, -0.0727,  0.2112],
        [ 0.3841,  0.0154,  0.2495,  0.1840]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([-0.3642, -0.2804,  0.3874, -0.0016, -0.0540, -0.3060, -0.0446, -0.0145,
         0.1529, -0.4700,  0.3887,  0.1273], requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.0260, -0.0787, -0.3992,  0.4587,  0.3522,  0.0618,  0.0865, -0.2561,
         0.0439, -0.4722,  0.2414, -0.2022], requires_grad=True))]

続いて、ダミーの入力データを用意しよう。 ダミーの入力データの形状は RNN を使った場合と変わらない。

>>> T = 5  # 入力する系列データの長さ
>>> batch_size = 2  # 一度に処理するデータの数
>>> X = torch.randn(T, batch_size, input_dim)  # ダミーの入力データ
>>> X.shape
torch.Size([5, 2, 3])

ダミーの入力データをモデルに与えて出力を得る。

>>> H, hn = model(X)

出力は入力の系列データに対応する隠れ状態と、最後の隠れ状態になっている。 この形状も RNN と変わらない。 つまり、PyTorch において GRU は単純に名前を変えるだけで RNN から差し替えて使うことができる。

>>> H.shape, hn.shape
(torch.Size([5, 2, 4]), torch.Size([1, 2, 4]))
>>> H[-1]
tensor([[ 0.5352, -0.5132,  0.2607,  0.5642],
        [-0.0264, -0.6124,  0.5123, -0.2023]], grad_fn=<SelectBackward>)
>>> hn
tensor([[[ 0.5352, -0.5132,  0.2607,  0.5642],
         [-0.0264, -0.6124,  0.5123, -0.2023]]], grad_fn=<StackBackward>)

それでは、ここからは実際に検算に入ろう。 PyTorch で使われている GRU の数式は以下のドキュメントで確認できる。

pytorch.org

数式は以下のとおり。 Simple RNN のときは 1 つだった式が 4 つに増えている。 なお、最終的に求めたいのは一番下にある「入力  x_t に対応した隠れ状態  h_t」になる。

\displaystyle{
        \begin{array}{ll}
            r_t = \sigma(W_{ir} x_t + b_{ir} + W_{hr} h_{(t-1)} + b_{hr}) \\
            z_t = \sigma(W_{iz} x_t + b_{iz} + W_{hz} h_{(t-1)} + b_{hz}) \\
            n_t = \tanh(W_{in} x_t + b_{in} + r_t * (W_{hn} h_{(t-1)}+ b_{hn})) \\
            h_t = (1 - z_t) * n_t + z_t * h_{(t-1)}
        \end{array}
}

ここで  \sigma はシグモイド関数を表す。  r_t z_t n_t は、活性化関数の違いはあるものの、基本的にはいずれも  W_i x_t + b_i + W_h h_{(t-1)} + b_h の形になっていることがわかる。

数式が確認できたところでモデルのパラメータから重みを取り出していこう。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_i = model_weights['weight_ih_l0']
>>> W_h = model_weights['weight_hh_l0']
>>> b_i = model_weights['bias_ih_l0']
>>> b_h = model_weights['bias_hh_l0']

上記は、部分ごとに  r_t 用と  z_t 用と  n_t 用に分かれている。 本来は一気に行列計算した上で後から取り出すわけだけど、今回は数式をなぞるために先に取り出しておこう。

>>> W_ir = W_i[:hidden_dim]
>>> W_iz = W_i[hidden_dim: hidden_dim * 2]
>>> W_in = W_i[hidden_dim * 2:]
>>> 
>>> W_hr = W_h[:hidden_dim]
>>> W_hz = W_h[hidden_dim: hidden_dim * 2]
>>> W_hn = W_h[hidden_dim * 2:]
>>> 
>>> b_ir = b_i[:hidden_dim]
>>> b_iz = b_i[hidden_dim: hidden_dim * 2]
>>> b_in = b_i[hidden_dim * 2:]
>>> 
>>> b_hr = b_h[:hidden_dim]
>>> b_hz = b_h[hidden_dim: hidden_dim * 2]
>>> b_hn = b_h[hidden_dim * 2:]

あとは定義どおりに計算していく。

まずは t = 0 の状態から。 つまり、X[0] に対応する隠れ状態を計算してみよう。 t = 0 かつ、初期の隠れ状態を渡していないので  W_h h_{(t-1)} の項が存在しない。

>>> r_t = torch.sigmoid(torch.matmul(W_ir, X[0].T).T + b_ir + b_hr)
>>> z_t = torch.sigmoid(torch.matmul(W_iz, X[0].T).T + b_iz + b_hz)
>>> n_t = torch.tanh(torch.matmul(W_in, X[0].T).T + b_in + r_t * b_hn)
>>> h_t = (1 - z_t) * n_t

確認すると、モデルから返ってきた隠れ状態と、検算した値が一致していることがわかる。

>>> H[0]
tensor([[-0.1412, -0.2934,  0.3071, -0.2858],
        [ 0.1785, -0.1226,  0.2666,  0.0485]], grad_fn=<SelectBackward>)
>>> h_t
tensor([[-0.1412, -0.2934,  0.3071, -0.2858],
        [ 0.1785, -0.1226,  0.2666,  0.0485]])

次は t = 1 に対する計算を取り上げつつ、それぞれの式が意味するところを考えてみる。

まず、以下の  r_t はリセットゲート (reset gate) と呼ばれている。 リセットゲートの式は、活性化関数がシグモイド関数なので、成分は 0 ~ 1 の範囲になる。

>>> r_t = torch.sigmoid(torch.matmul(W_ir, X[1].T).T + b_ir + torch.matmul(W_hr, H[0].T).T + b_hr)

リセットゲートは、後ほど新しい隠れ状態の候補を作るときに、一つ前の隠れ状態と積を取る。 それによって、次の隠れ状態に、一つ前の隠れ状態をどれくらい反映するか制御する役目を担っている。

ゲートは成分の値が 0 のときに「閉じている」、1 のときに「開いている」と表現するらしい。 もちろん、ゲートの値は人間が明示的に与えるのではなく、学習するデータによって最適化される。

数式で対応しているのは、この部分。

\displaystyle{
        \begin{array}{ll}
            r_t = \sigma(W_{ir} x_t + b_{ir} + W_{hr} h_{(t-1)} + b_{hr}) \\
        \end{array}
}

以下の  n_t は、元の論文には名前付きで登場しないものの、PyTorch の中ではニューゲート (new gate) と呼ばれているようだ 1。 これは、言うなれば次の隠れ状態の候補となるもの。 式は RNN の隠れ状態を作るときのものに近いけど、みると一つ前の隠れ状態に先ほどのリセットゲートがかけられている。 これによって、次の隠れ状態に一つ前の隠れ状態をどれくらい混ぜるか、つまり影響を与えるかを制御している。 たとえば、リセットゲートの成分がすべてゼロなら、次の隠れ状態の候補を作るときに、一つ前の隠れ状態をまったく考慮しないことになる。

>>> n_t = torch.tanh(torch.matmul(W_in, X[1].T).T + b_in + r_t * (torch.matmul(W_hn, H[0].T).T + b_hn))

数式で対応しているのは、この部分。

\displaystyle{
        \begin{array}{ll}
            n_t = \tanh(W_{in} x_t + b_{in} + r_t * (W_{hn} h_{(t-1)}+ b_{hn})) \\
        \end{array}
}

次に、以下の  z_t はアップデートゲート (update gate) と呼ばれている。 このゲートは、次の隠れ状態を作るときに、どれくらい一つ前の隠れ状態を引き継ぐかを制御している。

>>> z_t = torch.sigmoid(torch.matmul(W_iz, X[1].T).T + b_iz + torch.matmul(W_hz, H[0].T).T + b_hz)

数式で対応しているのは、この部分。

\displaystyle{
        \begin{array}{ll}
            z_t = \sigma(W_{iz} x_t + b_{iz} + W_{hz} h_{(t-1)} + b_{hz}) \\
        \end{array}
}

最後に、以下で次の隠れ状態を求めている。 式では、先ほど計算したニューゲートとアップデートゲートが登場している。 次の隠れ状態は、基本的にニューゲートと一つ前の隠れ状態が混ぜられていることがわかる。 そして、混ぜる比率をアップデートゲートが制御している。 もしアップデートゲートの成分がすべてゼロなら、一つ前の隠れ状態はまったく考慮されず、すべてニューゲートのものになる。

>>> h_t = (1 - z_t) * n_t + z_t * H[0]

数式で対応しているのは、この部分。

\displaystyle{
        \begin{array}{ll}
            h_t = (1 - z_t) * n_t + z_t * h_{(t-1)}
        \end{array}
}

計算した隠れ状態を、最初に得られたものと比較してみよう。

>>> H[1]
tensor([[-0.0039, -0.3424,  0.4580, -0.2490],
        [ 0.3869, -0.4714,  0.1700,  0.4022]], grad_fn=<SelectBackward>)
>>> h_t
tensor([[-0.0039, -0.3424,  0.4580, -0.2490],
        [ 0.3869, -0.4714,  0.1700,  0.4022]], grad_fn=<AddBackward0>)

モデルから返ってきた隠れ状態と、検算した値が一致していることがわかる。

LSTM を検算する

続いては LTSM についても同様に検算してみる。

LSTM では、Simple RNN や GRU で扱っていた隠れ状態が「長期記憶」と「短期記憶」に分かれている。 これによって、長いスパンで記憶しておく必要のある情報と、特定のタイミングでのみ必要な情報を扱いやすくしているらしい。 ちなみに LSTM は前述の GRU よりも歴史のあるアーキテクチャで、GRU は LSTM の特殊形と捉えることもできるようだ。

LSTM も、PyTorch ではクラスの名前を LSTM に変更するだけで使うことができる。

>>> model = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim)

モデルに含まれるパラメータを確認してみよう。 パラメータの名前は同じだけど、先ほどの GRU よりも、さらに行列のサイズが増えている。

>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[ 0.3498, -0.0745,  0.0339],
        [-0.0537, -0.4582, -0.0305],
        [-0.1209, -0.1292,  0.0014],
        [-0.4880,  0.4027,  0.2235],
        [-0.3940, -0.4997, -0.4360],
        [ 0.4677, -0.2913,  0.3161],
        [-0.4162, -0.4060, -0.0483],
        [ 0.0281,  0.0586, -0.4602],
        [ 0.0145,  0.3151, -0.0132],
        [ 0.2642,  0.0724, -0.1972],
        [-0.1406,  0.2249, -0.0125],
        [-0.1339, -0.1570, -0.4393],
        [-0.1411, -0.1534,  0.4226],
        [-0.3554,  0.0628,  0.3336],
        [-0.3037, -0.4630, -0.0022],
        [-0.4711,  0.4282,  0.4648]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[ 0.1409,  0.2027,  0.4179,  0.2062],
        [ 0.0182,  0.1814, -0.0826,  0.0193],
        [-0.3766, -0.4391,  0.0336, -0.0875],
        [-0.3921,  0.0581,  0.3184, -0.4362],
        [ 0.0616, -0.0611, -0.0350,  0.2251],
        [-0.1458, -0.2994, -0.4362, -0.0643],
        [ 0.1637, -0.1193,  0.4780, -0.0938],
        [-0.0130,  0.1613,  0.2988, -0.2142],
        [-0.1978,  0.3739, -0.4704,  0.3770],
        [ 0.4956, -0.3259,  0.0976,  0.1588],
        [ 0.2641, -0.2511, -0.3984,  0.2107],
        [ 0.4604,  0.1646, -0.0299,  0.4243],
        [ 0.4658, -0.1663, -0.0066, -0.2386],
        [ 0.2184,  0.3376, -0.2343,  0.2853],
        [-0.2000, -0.4610, -0.2787, -0.2990],
        [ 0.3782, -0.1738, -0.1492, -0.2577]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([-0.1566,  0.4039,  0.2361,  0.1422,  0.1875,  0.0293, -0.2778,  0.4168,
        -0.4732,  0.0960,  0.1191,  0.1664,  0.1017,  0.1526,  0.4041,  0.0643],
       requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.2511,  0.2747, -0.0801, -0.1251,  0.0565, -0.3207,  0.0877,  0.2105,
        -0.3742, -0.3953, -0.3199, -0.1545, -0.1276, -0.4406, -0.3679,  0.4121],
       requires_grad=True))]

モデルにダミーデータを与えてみよう。 このとき、LSTM では返り値が RNNGRU よりも増えている。

>>> H, (hn, cn) = model(X)

上記で、HhnRNNGRU と同じ隠れ状態を表している。 ただし、LSTM においては隠れ状態が「短期記憶」に対応する。

>>> H[-1]
tensor([[-0.2198, -0.1965, -0.0670, -0.5722],
        [-0.1991, -0.0771, -0.1617, -0.0441]], grad_fn=<SelectBackward>)
>>> 
>>> hn
tensor([[[-0.2198, -0.1965, -0.0670, -0.5722],
         [-0.1991, -0.0771, -0.1617, -0.0441]]], grad_fn=<StackBackward>)

返り値で増えているのは、前述した「長期記憶」になる。 詳しくは後述するけど、LSTM の「短期記憶」はこの「長期記憶」から抜き出して作る。

>>> cn
tensor([[[-0.3814, -0.5555, -0.1285, -0.9944],
         [-0.6473, -0.2559, -0.2589, -0.1025]]], grad_fn=<StackBackward>)

使う上で理解すべき概念の説明が終わったところで、検算に移る。 PyTorch で使われている LSTM の数式は以下のドキュメントで確認できる。

pytorch.org

数式は次のとおり。 GRU のときよりも、さらに増えている。

\displaystyle{
        \begin{array}{ll} \\
            i_t = \sigma(W_{ii} x_t + b_{ii} + W_{hi} h_{t-1} + b_{hi}) \\
            f_t = \sigma(W_{if} x_t + b_{if} + W_{hf} h_{t-1} + b_{hf}) \\
            g_t = \tanh(W_{ig} x_t + b_{ig} + W_{hg} h_{t-1} + b_{hg}) \\
            o_t = \sigma(W_{io} x_t + b_{io} + W_{ho} h_{t-1} + b_{ho}) \\
            c_t = f_t \odot c_{t-1} + i_t \odot g_t \\
            h_t = o_t \odot \tanh(c_t) \\
        \end{array}
}

上記で  \odot はアダマール積を表している。

モデルからパラメータを取り出そう。 先ほどと同じように、数式をなぞるために行列から必要な箇所を取り出して名前をつけていく。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_i = model_weights['weight_ih_l0']
>>> W_h = model_weights['weight_hh_l0']
>>> b_i = model_weights['bias_ih_l0']
>>> b_h = model_weights['bias_hh_l0']
>>> 
>>> W_ii = W_i[:hidden_dim]
>>> W_if = W_i[hidden_dim: hidden_dim * 2]
>>> W_ig = W_i[hidden_dim * 2: hidden_dim * 3]
>>> W_io = W_i[hidden_dim * 3:]
>>> 
>>> W_hi = W_h[:hidden_dim]
>>> W_hf = W_h[hidden_dim: hidden_dim * 2]
>>> W_hg = W_h[hidden_dim * 2: hidden_dim * 3]
>>> W_ho = W_h[hidden_dim * 3:]
>>> 
>>> b_ii = b_i[:hidden_dim]
>>> b_if = b_i[hidden_dim: hidden_dim * 2]
>>> b_ig = b_i[hidden_dim * 2: hidden_dim * 3]
>>> b_io = b_i[hidden_dim * 3:]
>>> 
>>> b_hi = b_h[:hidden_dim]
>>> b_hf = b_h[hidden_dim: hidden_dim * 2]
>>> b_hg = b_h[hidden_dim * 2: hidden_dim * 3]
>>> b_ho = b_h[hidden_dim * 3:]

とりあえず、t = 0 の時点の隠れ状態 (短期記憶) を数式のとおりに計算してみよう。 t = 0 かつ、初期の隠れ状態と長期記憶を渡していないので存在しない項がある点に注意する。

>>> i_t = torch.sigmoid(torch.matmul(W_ii, X[0].T).T + b_ii + b_hi)
>>> f_t = torch.sigmoid(torch.matmul(W_if, X[0].T).T + b_if + b_hf)
>>> g_t = torch.tanh(torch.matmul(W_ig, X[0].T).T + b_ig + b_hg)
>>> o_t = torch.sigmoid(torch.matmul(W_io, X[0].T).T + b_io + b_ho)
>>> c_t = i_t * g_t
>>> h_t = o_t * torch.tanh(c_t)

計算した値と、モデルから返ってきた隠れ状態を比較してみよう。

>>> H[0]
tensor([[-0.1018, -0.0494, -0.0653,  0.1273],
        [-0.0617, -0.1598,  0.0546, -0.3024]], grad_fn=<SelectBackward>)
>>> 
>>> h_t
tensor([[-0.1018, -0.0494, -0.0653,  0.1273],
        [-0.0617, -0.1598,  0.0546, -0.3024]])

ちゃんと一致している。

続いては数式の意味を確認しながら t = 1 も計算してみよう。 計算する上で、一つ前の長期記憶が必要になるので c_0 という名前で記録しておく。

>>> c_0 = c_t

まず計算するのは、入力ゲート (input gate) で、新しい入力  x_t を、どれくらい次の長期記憶に反映するかを司っている。

>>> i_t = torch.sigmoid(torch.matmul(W_ii, X[1].T).T + b_ii + torch.matmul(W_hi, H[0].T).T + b_hi)

次に計算しているのは忘却ゲート (forget gate) で、一つ前の長期記憶を、次にどれだけ引き継ぐかを担っている。

>>> f_t = torch.sigmoid(torch.matmul(W_if, X[1].T).T + b_if + torch.matmul(W_hf, H[0].T).T + b_hf)

以下の式は、論文では名前がついていないけど、PyTorch ではセルゲート (cell gate) と呼んでいる。 これは Simple RNN で隠れ状態を計算していた式と同じ。 入力と、一つ前の隠れ状態 (短期記憶) を混ぜている。

>>> g_t = torch.tanh(torch.matmul(W_ig, X[1].T).T + b_ig + torch.matmul(W_hg, H[0].T).T + b_hg)

以下は出力ゲート (output gate) で、長期記憶から短期記憶をどのように抜き出すかを司っている。

>>> o_t = torch.sigmoid(torch.matmul(W_io, X[1].T).T + b_io + torch.matmul(W_ho, H[0].T).T + b_ho)

以下で、一つ前の長期記憶と出力ゲートを混ぜて、次の長期記憶を作っている。 どんな風に混ぜるかは、忘却ゲートと入力ゲートの値に依存する。

>>> c_t = f_t * c_0 + i_t * g_t

そして、最後に長期記憶から出力ゲートを使って短期記憶を抜き出している。

>>> h_t = o_t * torch.tanh(c_t)

隠れ状態を比べてみると、ちゃんと値が一致していることがわかる。

>>> H[1]
tensor([[-0.1224, -0.1573,  0.0294,  0.0794],
        [-0.1954, -0.1273, -0.0442, -0.3626]], grad_fn=<SelectBackward>)
>>> 
>>> h_t
tensor([[-0.1224, -0.1573,  0.0294,  0.0794],
        [-0.1954, -0.1273, -0.0442, -0.3626]], grad_fn=<MulBackward0>)

いじょう。

参考

arxiv.org

(PDF) Long Short-term Memory

youtu.be

youtu.be


  1. 役目的にはゲートではないので何だか変な気もする

Python: PyTorch の RNN を検算してみる

今回は、PyTorch の RNN (Recurrent Neural Network) が内部的にどんな処理をしているのか確認してみる。 なお、ここでいう RNN は、再起的な構造をもったニューラルネットワークの総称ではなく、いわゆる古典的な Simple RNN を指している。

これを書いている人は、ニューラルネットワークが何もわからないので、再帰的な構造があったりすると尚更わからなくなる。 そこで、中身について知っておきたいと考えたのがモチベーションになっている。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.5.2
BuildVersion:   20G95
$ python -V         
Python 3.9.6
$ pip list | grep torch       
torch                    1.9.0

もくじ

下準備

まずは PyTorch をインストールしておく。

$ pip install torch

そして、Python のインタプリタを起動する。

$ python

起動できたら PyTorch のモジュールをインポートしておく。

>>> import torch
>>> from torch import nn

モデルを用意する

PyTorch には nn モジュール以下に RNN というクラスがある。 このクラスが、ミニバッチに対応した Simple RNN を実装している。 このクラスは、最低限 input_sizehidden_size という引数を指定すればインスタンス化できる。

>>> input_dim = 3  # モデルの入力ベクトルの次元数
>>> hidden_dim = 4  # モデルの出力ベクトルの次元数
>>> model = nn.RNN(input_size=input_dim, hidden_size=hidden_dim)

input_size はモデルに入力するデータの次元数で、hidden_size はモデルが出力するデータの次元数になる。 Simple RNN が出力するデータには隠れ状態 (Hidden State) ベクトルという名前がついていて、それが引数の名前に反映されている。

インスタンス化できたら、モデルに含まれるパラメータを確認してみよう。 これは何も学習していない状態の初期値だけど、ダミーのデータを使って検算する分にはそれで問題ない。

>>> from pprint import pprint
>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[ 0.4349,  0.2858, -0.3802],
        [ 0.3035,  0.4744, -0.4774],
        [ 0.4553,  0.1563, -0.0048],
        [-0.4107, -0.4734,  0.3651]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[-0.4045,  0.4994, -0.3950,  0.3627],
        [-0.4304,  0.2032,  0.2878,  0.0923],
        [ 0.0641, -0.0405, -0.2965, -0.3422],
        [ 0.3323, -0.2716, -0.1380,  0.2079]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([-0.2928,  0.2330,  0.1649, -0.2679], requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.0034, -0.0927,  0.0520, -0.0646], requires_grad=True))]

モデルには 4 つの名前つきパラメータが確認できる。 これらのパラメータが何を意味しているかは、以下のドキュメントをみるとわかる。

pytorch.org

上記には、RNN の具体的な計算式が記載されている。

\displaystyle{
h_t = \tanh(W_{ih} x_t + b_{ih} + W_{hh} h_{(t-1)} + b_{hh})
}

ここで、 x_t は入力となる系列データにおいて t 番目 (時点) の要素を表していて、 h_tt 番目の隠れ状態ベクトルになる。  h_{(t-1)}t - 1 番目の隠れ状態ベクトルなので、1 つ前の状態の出力を入力として使っていることがわかる。

それ以外は、先ほどのパラメータと次のように対応している。

  •  W_{ih}

    • weight_ih_l0
  •  W_{hh}

    • weight_hh_l0
  •  b_{ih}

    • bias_ih_l0
  •  b_{hh}

    • bias_hh_l0

ダミーデータを用意する

式がわかったところで、検算するための出力を適当に用意したダミーデータを使って得よう。 次のようにランダムな入力データを用意する。

>>> T = 5  # 入力する系列データの長さ
>>> batch_size = 2  # 一度に処理するデータの数
>>> X = torch.randn(T, batch_size, input_dim)  # ダミーの入力データ
>>> X.shape
torch.Size([5, 2, 3])

上記のダミーデータをモデルに入力として与える。 すると、タプルで 2 つの返り値が得られる。

>>> H, hn = model(X)

このうち、タプルの最初の要素は各時点 (0 ~ T) での隠れ状態ベクトルが入っている。 つまり、X[0] に対応した隠れ状態ベクトルが H[0] で、X[1] に対応した隠れ状態ベクトルが H[1] で...ということ。

>>> H.shape
torch.Size([5, 2, 4])
>>> H
tensor([[[-0.0096,  0.3380,  0.4147, -0.5187],
         [-0.5797,  0.0438, -0.3449, -0.0454]],

        [[-0.3769,  0.5505,  0.1542, -0.6927],
         [ 0.1021,  0.4838,  0.0174, -0.5226]],

        [[ 0.5723,  0.8306,  0.5878, -0.9012],
         [-0.5423, -0.3730,  0.1816,  0.0130]],

        [[-0.2641,  0.0466,  0.7226, -0.6048],
         [-0.6680, -0.4764,  0.2837,  0.2118]],

        [[-0.8623, -0.3724, -0.4284,  0.2948],
         [-0.2464,  0.4500, -0.4194, -0.1977]]], grad_fn=<StackBackward>)

そして、タプルで 2 番目に返ってきた値は最後の時点 (T) での隠れ状態ベクトルになる。 ようするに、上記の最後尾と同じもの。

>>> hn
tensor([[[-0.8623, -0.3724, -0.4284,  0.2948],
         [-0.2464,  0.4500, -0.4194, -0.1977]]], grad_fn=<StackBackward>)
>>> H[-1]
tensor([[-0.8623, -0.3724, -0.4284,  0.2948],
        [-0.2464,  0.4500, -0.4194, -0.1977]], grad_fn=<SelectBackward>)

検算する

次に、実際の検算に入る。 まずは、次のようにして各パラメータの Tensor オブジェクトを得る。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_ih = model_weights['weight_ih_l0']
>>> W_hh = model_weights['weight_hh_l0']
>>> b_ih = model_weights['bias_ih_l0']
>>> b_hh = model_weights['bias_hh_l0']

まずは系列データの一番最初の t = 0X[0] に対応する隠れ状態ベクトルから求める。 ターゲットはこれ。

>>> H[0]
tensor([[-0.0096,  0.3380,  0.4147, -0.5187],
        [-0.5797,  0.0438, -0.3449, -0.0454]], grad_fn=<SelectBackward>)

やることは単純で、先ほどの式を PyTorch で表現すれば良い。 なお、t = 0 の時点では  h_{(t-1)} がないので、その項は消える。

>>> torch.tanh(torch.matmul(W_ih, X[0].T).T + b_ih + b_hh)
tensor([[-0.0096,  0.3380,  0.4147, -0.5187],
        [-0.5797,  0.0438, -0.3449, -0.0454]])

Tensor の値が一致していることがわかる。

次は t = 1X[1] に対応する隠れ状態ベクトルを求める。 ターゲットは以下。

>>> H[1]
tensor([[-0.3769,  0.5505,  0.1542, -0.6927],
        [ 0.1021,  0.4838,  0.0174, -0.5226]], grad_fn=<SelectBackward>)

t = 1 では  h_{(t-1)} h_0 になる。 とはいえ項が増えるだけで、やることは先ほどと変わらない。

>>> torch.tanh(torch.matmul(W_ih, X[1].T).T + b_ih + torch.matmul(W_hh, H[0].T).T + b_hh)
tensor([[-0.3769,  0.5505,  0.1542, -0.6927],
        [ 0.1021,  0.4838,  0.0174, -0.5226]], grad_fn=<TanhBackward>)

こちらも値が一致している。

あとは添字が増えるだけなので省略する。

初期 (t = 0) の隠れ状態ベクトルを渡す場合

先ほどの例では、初期 (t = 0) のときに  h_{(t-1)} に相当する隠れ状態ベクトルが存在しなかった。 これは自分で用意して渡すこともできるので、その場合の挙動も確認しておこう。

次のようにしてランダムな値で初期の隠れ状態ベクトルを h0 として用意する。 なお、先頭の次元は Simple RNN を重ねる段数を表している。 というのも、(総称としての) RNN は縦に積み重ねることで性能向上が望める場合があるらしい 1。 PyTorch のRNN も、インスタンス化するときに num_layers という引数で重ねる数が指定できる。 なお、デフォルト値は 1 になっている。

>>> rnn_layers = 1  # Simple RNN を重ねる数 (num_layers の値)
>>> h0 = torch.randn(rnn_layers, batch_size, hidden_dim)
>>> h0.shape
torch.Size([1, 2, 4])

初期の隠れ状態ベクトルをモデルに渡すには、次のように 2 番目の引数として渡せば良い。

>>> H, hn = model(X, h0)

先ほどと同じように検算してみよう。

>>> H[0]
tensor([[-0.1925,  0.6594, -0.2041, -0.4893],
        [ 0.5740,  0.8465, -0.5979, -0.8112]], grad_fn=<SelectBackward>)

といっても、最初の  h_{(t-1)} として h0 を使うだけ。

>>> torch.tanh(torch.matmul(W_ih, X[0].T).T + b_ih + torch.matmul(W_hh, h0[0].T).T + b_hh)
tensor([[-0.1925,  0.6594, -0.2041, -0.4893],
        [ 0.5740,  0.8465, -0.5979, -0.8112]])

残りは変わらない。

>>> H[1]
tensor([[ 0.0929,  0.5283,  0.2951, -0.7210],
        [-0.1403,  0.0510,  0.3764, -0.4921]], grad_fn=<SelectBackward>)
>>> torch.tanh(torch.matmul(W_ih, X[1].T).T + b_ih + torch.matmul(W_hh, H[0].T).T + b_hh)
tensor([[ 0.0929,  0.5283,  0.2951, -0.7210],
        [-0.1403,  0.0510,  0.3764, -0.4921]], grad_fn=<TanhBackward>)

Simple RNN を重ねた場合

先ほど述べたとおり RNN は層を重ねることで性能向上が望める場合がある。 その場合についても確認しておく。

まずは RNN を 2 層重ねたモデルを用意する。

>>> rnn_layers = 2
>>> model = nn.RNN(input_size=input_dim, hidden_size=hidden_dim, num_layers=rnn_layers)

次のようにモデルのパラメータが増えている。 具体的には名前の末尾が l0 になったものと l1 になったものがある。 これはつまりl0 の上に l1 が重なっていることを示す。

>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[-0.3591,  0.0948, -0.0500],
        [ 0.1963, -0.1717, -0.3551],
        [ 0.0313,  0.0495, -0.0878],
        [ 0.3109,  0.3728,  0.2577]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[-0.3050, -0.0269,  0.1772,  0.0081],
        [-0.0770,  0.3563, -0.1209,  0.0126],
        [-0.3534,  0.0264,  0.2649,  0.2235],
        [ 0.3338, -0.0708,  0.4314, -0.0149]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([ 0.3767,  0.3653, -0.1024,  0.3425], requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.1083, -0.1802, -0.2972,  0.1099], requires_grad=True)),
 ('weight_ih_l1',
  Parameter containing:
tensor([[ 0.2279, -0.4886,  0.4573,  0.2441],
        [-0.0949, -0.2300,  0.1320, -0.2643],
        [ 0.0720,  0.4727,  0.2005, -0.0784],
        [-0.0784,  0.3208,  0.4977, -0.0190]], requires_grad=True)),
 ('weight_hh_l1',
  Parameter containing:
tensor([[-0.0565,  0.1433,  0.0810,  0.1619],
        [ 0.2734,  0.3270, -0.2813,  0.1076],
        [ 0.2989,  0.0412, -0.1173,  0.1614],
        [-0.0805, -0.1851, -0.1254,  0.0713]], requires_grad=True)),
 ('bias_ih_l1',
  Parameter containing:
tensor([-0.3898, -0.1349, -0.2269, -0.1637], requires_grad=True)),
 ('bias_hh_l1',
  Parameter containing:
tensor([ 0.4969,  0.3327,  0.4548, -0.3809], requires_grad=True))]

それぞれのパラメータの重みを取得しておく。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_ih_l0 = model_weights['weight_ih_l0']
>>> W_hh_l0 = model_weights['weight_hh_l0']
>>> b_ih_l0 = model_weights['bias_ih_l0']
>>> b_hh_l0 = model_weights['bias_hh_l0']
>>> 
>>> W_ih_l1 = model_weights['weight_ih_l1']
>>> W_hh_l1 = model_weights['weight_hh_l1']
>>> b_ih_l1 = model_weights['bias_ih_l1']
>>> b_hh_l1 = model_weights['bias_hh_l1']

入力のダミーデータはそのままに、モデルからあらためて隠れ状態ベクトルを取得する。

>>> H, hn = model(X)

初期状態 (t = 0) をターゲットにする。

>>> H[0]
tensor([[-0.1115, -0.0662,  0.2981, -0.5452],
        [ 0.0896,  0.0750,  0.2003, -0.6533]], grad_fn=<SelectBackward>)

まずは、これまでの要領で隠れ状態ベクトルを得る。 ただし、これはあくまで 1 層目の出力にすぎない。 使っているパラメータの名前も末尾が _l0 になっている。

>>> h0_l0 = torch.tanh(torch.matmul(W_ih_l0, X[0].T).T + b_ih_l0 + b_hh_l0)
>>> h0_l0
tensor([[ 0.0724,  0.3836, -0.3525,  0.4635],
        [ 0.6664,  0.0096, -0.3751,  0.0292]])

続いて、1 層目の出力を 2 層目に入力して計算する。

>>> torch.tanh(torch.matmul(W_ih_l1, h0_l0.T).T + b_ih_l1 + b_hh_l1)
tensor([[-0.1115, -0.0662,  0.2981, -0.5452],
        [ 0.0896,  0.0750,  0.2003, -0.6533]])

これで値が一致した。

そんなかんじで。

参考書籍


  1. 詳しくは「ゼロから作るDeep Learning ❷ ―自然言語処理編」を参照のこと

Python: Google Colaboratory で Cloud TPU を TensorFlow から試してみる

Google Colaboratory では、ランタイムのタイプを変更することで Cloud TPU (Tensor Processing Unit) を利用できる。 Cloud TPU は、Google が開発しているハードウェアアクセラレータの一種。 利用することで、行列計算のパフォーマンス向上が期待できる。 ただ、Cloud TPU は CPU や GPU に比べると扱う上でのクセがそれなりにつよい。 今回は、そんな Cloud TPU を使ってみることにする。

使った環境は次のとおり。

# pip list | grep "^tensorflow "
tensorflow                         2.1.0

もくじ

下準備

あらかじめ、メニューの「ランタイム」から「ランタイムのタイプを変更」を選択して、ハードウェアアクセラレータに TPU を指定しておく。

そして、TensorFlow をインポートしておく。

>>> import tensorflow as tf

TPU に接続する

まず、TPU を利用するには、最初に TPU クラスタへ接続する必要がある。 というのも、TPU のデバイスは実行中のホストで動作しているわけではない。 専用のホストに搭載されていて、それを gRPC 経由で制御するらしい。

はじめに、Google Colaboratory の環境であれば tf.distribute.cluster_resolver.TPUClusterResolver() を引数なしで実行する。 これで、利用可能な TPU クラスタの情報が得られる。

>>> tpu = tf.distribute.cluster_resolver.TPUClusterResolver()

あとは、tf.config.experimental_connect_to_cluster() を使って TPU クラスタに接続する。

>>> tf.config.experimental_connect_to_cluster(tpu)

接続できたら、TPU を初期化する。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

これで TPU を利用する準備ができた。 tf.config.list_logical_devices() を使って、タイプが TPU のデバイスを調べると、認識しているデバイスの一覧が確認できる。 下記を見て分かるとおり、複数のデバイスが確認できる。

>>> devices = tf.config.list_logical_devices('TPU')
>>> devices
[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]

現在利用できる TPU v2 / v3 には、最小構成単位である TPU ボード 1 枚につき 4 つの TPU チップが載っている。 そして、それぞれのチップには 2 つの TPU コアがあるため、合計で 8 つのコアがある。 上記は、各コアがデバイスとして TensorFlow から認識されていることを示している。

これは、GPU であれば筐体に複数枚のグラフィックカードを差していたり、あるいは複数のコアが載った GPU チップを利用している状態に近い。 つまり、TPU のパフォーマンスを最大限に活用しようとすると、必然的に複数のデバイスを使った分散学習をすることになる。

ちなみに、Google Colaboratory で利用できるのは単一の TPU ボードだけっぽい。 Google Cloud 経由で利用する場合には、それ以外に TPU Pod や TPU スライスといった、複数の TPU ボードから成るシステムも利用できる。 その場合も、おそらく見え方としては上記のデバイスが増えるだけなんだろう。

単一のデバイスで演算する

さて、デバイスを認識できるようになったので、早速その中の一つを使って行列演算を試してみよう。

まずは、適当に (2, 3) な形状の行列と (3, 2) な形状の行列を作る。

>>> tf.random.set_seed(42)
>>> x = tf.random.normal(shape=(2, 3))
>>> y = tf.random.normal(shape=(3, 2))

TensorFlow では、tf.device() 関数にデバイスの情報を渡してコンテキストマネージャとして使うと、そのデバイス上で演算を実行できる。 試しに先頭の TPU デバイスを使って行列の積を求めてみよう。

>>> with tf.device(devices[0]):
...      z = tf.matmul(x, y)

次のように、ちゃんと計算できた。

>>> z
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[0.5277252 , 4.685486  ],
       [0.8692589 , 0.21500015]], dtype=float32)>

複数のデバイスで演算する

さて、単一のデバイスで計算できることは分かったので、続いては複数のデバイスで分散処理してみよう。

その前に、一旦 TPU の状態を初期化しておく。 TPU で何か新しい処理を始めるときは、初期化しておかないと上手く動作しないことがある。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

TensorFlow で複数のデバイスを使った分散処理をするときは、tf.distribute.Strategy というオブジェクト (以下、ストラテジオブジェクト) を使うことになる。 このオブジェクトには具体的な実装がいくつかあって、何を使うかによってどのように分散処理を進めるかが決まる。 ただし、TPU を使うときは tf.distribute.TPUStrategy を使うことに決まっているので選択の余地はない。

>>> strategy = tf.distribute.TPUStrategy(tpu)

試しに、先ほどと同じように行列の積を分散処理でやらせてみよう。 そのためには、まず行列の積を計算するためのヘルパー関数を次のように定義しておく。 生の tf.matmul() をそのまま使えないの?と思うけど、どうやら今のところ使えなさそう。

>>> @tf.function
... def matmul_fn(x, y):
...   """行列の積を計算する関数"""
...   z = tf.matmul(x, y)
...   return z
... 

あとは、上記の関数を先ほどのストラテジオブジェクトの run() メソッド経由で呼び出すだけ。

>>> zs = strategy.run(matmul_fn, args=(x, y))

結果を確認してみよう。 PerReplica というオブジェクトで、コアと同じ数の計算結果が得られていることがわかる。 それぞれのコアで同じ計算がされたようだ。

>>> zs
PerReplica:{
  0: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  1: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  2: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  3: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  4: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  5: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  6: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  7: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>
}

さて、上記はすべての処理に同じデータを与えているので、結果もすべて同じになっている。 なるほどって感じだけど、これでは複数のデバイスを使っている意味がない。 そこで、続いてはデバイス毎に与えるデータを変えてみよう。

まずは、以下のようにして整数を順番に返す Dataset オブジェクトを作る。

>>> range_dataset = tf.data.Dataset.range(16)
>>> list(range_dataset.as_numpy_iterator())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

続いて、上記を分散処理に使うデバイスの数に合わせてミニバッチへ分割する。 以下は Strategy#num_replicas_in_sync に 1 をかけているので、各デバイスに 1 つずつサンプルを与える場合の設定。

>>> batch_size = 1 * strategy.num_replicas_in_sync
>>> batch_dataset = range_dataset.batch(batch_size)
>>> list(batch_dataset.as_numpy_iterator())
[array([0, 1, 2, 3, 4, 5, 6, 7]), array([ 8,  9, 10, 11, 12, 13, 14, 15])]

そして、上記の Dataset オブジェクトを Strategy#experimental_distribute_dataset() メソッドに渡す。 すると、DistributedDataset というオブジェクトが得られる。

>>> dist_dataset = strategy.experimental_distribute_dataset(batch_dataset)
>>> dist_dataset
<tensorflow.python.distribute.input_lib.DistributedDataset at 0x7f546167d110>

この DistributedDataset オブジェクトからは、先ほど分散処理の結果として返ってきた PerReplica というオブジェクトが得られる。

>>> ite = iter(dist_dataset)
>>> x = next(ite)
>>> x
PerReplica:{
  0: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([0])>,
  1: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([1])>,
  2: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([2])>,
  3: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([3])>,
  4: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([4])>,
  5: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([5])>,
  6: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([6])>,
  7: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([7])>
}

上記の PerReplica オブジェクトを使うと、それぞれのデバイスに対して異なる入力データを与えることができる。 以下の、引数を 2 倍する関数で試してみよう。

>>> @tf.function
... def double_fn(x):
...     """引数を 2 倍する関数"""
...     return x * 2
... 

DistributedDataset オブジェクトから得られる PerReplica オブジェクトを、ストラテジオブジェクト経由で上記の関数に渡す。 すると、以下のように返り値として各要素が 2 倍になった PerReplica オブジェクトが得られることがわかる。

>>> for x in dist_dataset:
...     result = strategy.run(double_fn, args=(x, ))
...     print(result)
PerReplica:{
  0: tf.Tensor([0], shape=(1,), dtype=int64),
  1: tf.Tensor([2], shape=(1,), dtype=int64),
  2: tf.Tensor([4], shape=(1,), dtype=int64),
  3: tf.Tensor([6], shape=(1,), dtype=int64),
  4: tf.Tensor([8], shape=(1,), dtype=int64),
  5: tf.Tensor([10], shape=(1,), dtype=int64),
  6: tf.Tensor([12], shape=(1,), dtype=int64),
  7: tf.Tensor([14], shape=(1,), dtype=int64)
}
PerReplica:{
  0: tf.Tensor([16], shape=(1,), dtype=int64),
  1: tf.Tensor([18], shape=(1,), dtype=int64),
  2: tf.Tensor([20], shape=(1,), dtype=int64),
  3: tf.Tensor([22], shape=(1,), dtype=int64),
  4: tf.Tensor([24], shape=(1,), dtype=int64),
  5: tf.Tensor([26], shape=(1,), dtype=int64),
  6: tf.Tensor([28], shape=(1,), dtype=int64),
  7: tf.Tensor([30], shape=(1,), dtype=int64)
}

上記から、複数のデバイスで、異なる入力データを使った分散処理をできることがわかった。

単一のデバイスで勾配降下法を試す

次は、また単一のデバイスに戻って、自動微分を使った勾配降下法が機能することを確認してみよう。 要するに、ニューラルネットワークが最適化できる本質的な部分の動作を見ておく。

以下のサンプルコードでは、最小化したい関数 objective() を定義している。 そして、それに適当な初期値を与えて、SGD をオプティマイザに最小化している。 実際に損失と勾配を計算してパラメータを更新しているのは training_step() という関数。

# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import tensorflow as tf


def objective(params: tf.Variable) -> tf.Tensor:
    """最小化したい関数"""
    # x_0^2 + x_1^2
    loss = params[0] ** 2 + params[1] ** 2
    return loss


def main():
    # TPU クラスタに接続する
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    devices = tf.config.list_logical_devices('TPU')
    print('TPU devices:', end='')
    pprint(devices)

    # 使用するオプティマイザ
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

    @tf.function
    def training_step(params: tf.Variable) -> None:
        """勾配降下法を使った最適化の 1 ステップ"""
        with tf.GradientTape() as t:
            # 損失を計算する
            loss = objective(params)
        # 勾配を計算する
        grads = t.gradient(loss, params)
        # パラメータを更新する
        optimizer.apply_gradients([(grads, params)])
        # tf.print(params)  # 少なくとも今の TPU では利用できない...

    # 初期値を用意する
    tensor = tf.constant([1., 4.], dtype=tf.float32)

    # 先頭の TPU デバイスで計算する
    with tf.device(devices[0]):
        # TPU デバイス上に Variable を用意する
        params = tf.Variable(tensor, trainable=True)
        # 最適化のループ
        for _ in range(20):  # 回数は適当
            training_step(params)

    # 結果を出力する
    print(f'{objective(params)} @ {params.numpy()}')


if __name__ == '__main__':
    main()

上記の実行結果は次のとおり。

(snip) ...
TPU devices:[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]
0.0022596875205636024 @ [0.01152921 0.04611686]

最適化によって、最終的な objective(params) の結果が小さくなっていることが確認できる。

複数のデバイスで CNN を tf.keras で学習する

次は、これまでのサンプルよりも少し実用性が高めのコードを試す。 具体的には CNN のモデルを tf.keras を使って組んで、CIFAR-10 のデータを学習させてみる。

ポイントとしては、モデルやメトリックなどをストラテジオブジェクトのスコープで組み立てるところ。 こうすると、たとえば Variable オブジェクトは内部的にデバイス間で値が同期できる MirroredVariable になったりするらしい。

CIFAR-10 のデータはメモリに収まるサイズなので、オンメモリのデータから Dataset オブジェクトを生成している。 これが、もしメモリに収まりきらないときは TFRecord フォーマットで GCS に保存する必要がある。

TPU を使う際には、CPU や GPU の環境で動作したコードを転用するのがベストプラクティスらしい。 以下のサンプルコードでは、それがやりやすいように環境毎のストラテジオブジェクトを取得できる detect_strategy() という関数を定義した。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf


def detect_strategy():
    """利用できるハードウェアアクセラレータ毎に適した tf.distribute.Strategy を返す関数"""
    try:
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        devices = tf.config.list_logical_devices('TPU')
        if len(devices) > 0:
            # TPU が利用できる
            return tf.distribute.TPUStrategy(tpu)
    except ValueError:
        pass

    devices = tf.config.list_logical_devices('GPU')
    if len(devices) > 0:
        # GPU が利用できる
        return tf.distribute.MirroredStrategy()

    # Default
    return tf.distribute.get_strategy()


def normalize(element):
    """画像データを浮動小数点型にキャストして正規化する処理"""
    image = element['image']
    normalized_image = tf.cast(image, tf.float32) / 255.
    label = element['label']
    return normalized_image, label


def datafeed_pipeline(x, y, batch_size):
    """オンメモリのテンソルからデータを読み出す Dataset パイプライン"""
    mappings = {
        'image': x,
        'label': y,
    }
    ds = tf.data.Dataset.from_tensor_slices(mappings)
    ds = ds.map(normalize)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.cache()
    return ds


def main():
    # データセットをオンメモリに読み込む
    (train_x, train_y), (test_x, test_y) = tf.keras.datasets.cifar10.load_data()

    # データセットの仕様
    image_shape = train_x.shape[1:]
    num_classes = 10

    # 乱数シードを設定しておく
    tf.random.set_seed(42)

    # 環境に応じたストラテジオブジェクトを取得する
    strategy = detect_strategy()

    # データ供給のパイプラインを Dataset API で構築する
    device_batch_size = 512  # デバイス単位で見たバッチサイズ
    global_batch_size = strategy.num_replicas_in_sync * device_batch_size
    ds_train = datafeed_pipeline(train_x, train_y, global_batch_size)
    ds_test = datafeed_pipeline(test_x, test_y, global_batch_size)

    with strategy.scope():
        # ストラテジオブジェクトのスコープでモデルを組み立てる
        # これによって内部で使われる Variable の型などが変わる
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=image_shape),
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(num_classes, activation='softmax')
        ])
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer='adam',
            metrics=['sparse_categorical_accuracy'],
        )

    # モデルの概要
    print(model.summary())

    # モデルを学習させる
    fit_callbacs = [
        tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                         patience=5,
                                         mode='min'),
    ]
    model.fit(ds_train,
              epochs=100,
              validation_data=ds_test,
              callbacks=fit_callbacs,
              )

    # テストデータを評価する
    scr, sca = model.evaluate(ds_test)
    print(f'Loss: {scr}, Accuracy: {sca}')


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回は、精度とかは横に置いておく。

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_3 (Conv2D)            (None, 30, 30, 32)        896       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 15, 15, 32)        0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 13, 13, 64)        18496     
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 6, 6, 64)          0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 4, 4, 128)         73856     
_________________________________________________________________
flatten_1 (Flatten)          (None, 2048)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 64)                131136    
_________________________________________________________________
dense_3 (Dense)              (None, 10)                650       
=================================================================
Total params: 225,034
Trainable params: 225,034
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/100
13/13 [==============================] - 10s 431ms/step - loss: 2.1851 - sparse_categorical_accuracy: 0.1976 - val_loss: 1.9839 - val_sparse_categorical_accuracy: 0.2979
Epoch 2/100
13/13 [==============================] - 1s 88ms/step - loss: 1.9094 - sparse_categorical_accuracy: 0.3103 - val_loss: 1.8427 - val_sparse_categorical_accuracy: 0.3334
Epoch 3/100
13/13 [==============================] - 1s 85ms/step - loss: 1.7798 - sparse_categorical_accuracy: 0.3575 - val_loss: 1.7185 - val_sparse_categorical_accuracy: 0.3849

...(snip)...

Epoch 71/100
13/13 [==============================] - 1s 87ms/step - loss: 0.8315 - sparse_categorical_accuracy: 0.7118 - val_loss: 0.9328 - val_sparse_categorical_accuracy: 0.6744
Epoch 72/100
13/13 [==============================] - 1s 89ms/step - loss: 0.8223 - sparse_categorical_accuracy: 0.7148 - val_loss: 0.9292 - val_sparse_categorical_accuracy: 0.6737
Epoch 73/100
13/13 [==============================] - 1s 88ms/step - loss: 0.8054 - sparse_categorical_accuracy: 0.7224 - val_loss: 0.9340 - val_sparse_categorical_accuracy: 0.6756
3/3 [==============================] - 1s 16ms/step - loss: 0.9340 - sparse_categorical_accuracy: 0.6756
Loss: 0.9339648485183716, Accuracy: 0.675599992275238

ちゃんと動いているようだ。 カスタムトレーニングループを使うときは、また気にするところがあるみたいだけど、今回は取り扱わない。

参考

cloud.google.com

cloud.google.com

cloud.google.com

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org