CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: PyTorch の RNN を検算してみる

今回は、PyTorch の RNN (Recurrent Neural Network) が内部的にどんな処理をしているのか確認してみる。 なお、ここでいう RNN は、再起的な構造をもったニューラルネットワークの総称ではなく、いわゆる古典的な Simple RNN を指している。

これを書いている人は、ニューラルネットワークが何もわからないので、再帰的な構造があったりすると尚更わからなくなる。 そこで、中身について知っておきたいと考えたのがモチベーションになっている。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.5.2
BuildVersion:   20G95
$ python -V         
Python 3.9.6
$ pip list | grep torch       
torch                    1.9.0

もくじ

下準備

まずは PyTorch をインストールしておく。

$ pip install torch

そして、Python のインタプリタを起動する。

$ python

起動できたら PyTorch のモジュールをインポートしておく。

>>> import torch
>>> from torch import nn

モデルを用意する

PyTorch には nn モジュール以下に RNN というクラスがある。 このクラスが、ミニバッチに対応した Simple RNN を実装している。 このクラスは、最低限 input_sizehidden_size という引数を指定すればインスタンス化できる。

>>> input_dim = 3  # モデルの入力ベクトルの次元数
>>> hidden_dim = 4  # モデルの出力ベクトルの次元数
>>> model = nn.RNN(input_size=input_dim, hidden_size=hidden_dim)

input_size はモデルに入力するデータの次元数で、hidden_size はモデルが出力するデータの次元数になる。 Simple RNN が出力するデータには隠れ状態 (Hidden State) ベクトルという名前がついていて、それが引数の名前に反映されている。

インスタンス化できたら、モデルに含まれるパラメータを確認してみよう。 これは何も学習していない状態の初期値だけど、ダミーのデータを使って検算する分にはそれで問題ない。

>>> from pprint import pprint
>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[ 0.4349,  0.2858, -0.3802],
        [ 0.3035,  0.4744, -0.4774],
        [ 0.4553,  0.1563, -0.0048],
        [-0.4107, -0.4734,  0.3651]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[-0.4045,  0.4994, -0.3950,  0.3627],
        [-0.4304,  0.2032,  0.2878,  0.0923],
        [ 0.0641, -0.0405, -0.2965, -0.3422],
        [ 0.3323, -0.2716, -0.1380,  0.2079]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([-0.2928,  0.2330,  0.1649, -0.2679], requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.0034, -0.0927,  0.0520, -0.0646], requires_grad=True))]

モデルには 4 つの名前つきパラメータが確認できる。 これらのパラメータが何を意味しているかは、以下のドキュメントをみるとわかる。

pytorch.org

上記には、RNN の具体的な計算式が記載されている。

\displaystyle{
h_t = \tanh(W_{ih} x_t + b_{ih} + W_{hh} h_{(t-1)} + b_{hh})
}

ここで、 x_t は入力となる系列データにおいて t 番目 (時点) の要素を表していて、 h_tt 番目の隠れ状態ベクトルになる。  h_{(t-1)}t - 1 番目の隠れ状態ベクトルなので、1 つ前の状態の出力を入力として使っていることがわかる。

それ以外は、先ほどのパラメータと次のように対応している。

  •  W_{ih}

    • weight_ih_l0
  •  W_{hh}

    • weight_hh_l0
  •  b_{ih}

    • bias_ih_l0
  •  b_{hh}

    • bias_hh_l0

ダミーデータを用意する

式がわかったところで、検算するための出力を適当に用意したダミーデータを使って得よう。 次のようにランダムな入力データを用意する。

>>> T = 5  # 入力する系列データの長さ
>>> batch_size = 2  # 一度に処理するデータの数
>>> X = torch.randn(T, batch_size, input_dim)  # ダミーの入力データ
>>> X.shape
torch.Size([5, 2, 3])

上記のダミーデータをモデルに入力として与える。 すると、タプルで 2 つの返り値が得られる。

>>> H, hn = model(X)

このうち、タプルの最初の要素は各時点 (0 ~ T) での隠れ状態ベクトルが入っている。 つまり、X[0] に対応した隠れ状態ベクトルが H[0] で、X[1] に対応した隠れ状態ベクトルが H[1] で...ということ。

>>> H.shape
torch.Size([5, 2, 4])
>>> H
tensor([[[-0.0096,  0.3380,  0.4147, -0.5187],
         [-0.5797,  0.0438, -0.3449, -0.0454]],

        [[-0.3769,  0.5505,  0.1542, -0.6927],
         [ 0.1021,  0.4838,  0.0174, -0.5226]],

        [[ 0.5723,  0.8306,  0.5878, -0.9012],
         [-0.5423, -0.3730,  0.1816,  0.0130]],

        [[-0.2641,  0.0466,  0.7226, -0.6048],
         [-0.6680, -0.4764,  0.2837,  0.2118]],

        [[-0.8623, -0.3724, -0.4284,  0.2948],
         [-0.2464,  0.4500, -0.4194, -0.1977]]], grad_fn=<StackBackward>)

そして、タプルで 2 番目に返ってきた値は最後の時点 (T) での隠れ状態ベクトルになる。 ようするに、上記の最後尾と同じもの。

>>> hn
tensor([[[-0.8623, -0.3724, -0.4284,  0.2948],
         [-0.2464,  0.4500, -0.4194, -0.1977]]], grad_fn=<StackBackward>)
>>> H[-1]
tensor([[-0.8623, -0.3724, -0.4284,  0.2948],
        [-0.2464,  0.4500, -0.4194, -0.1977]], grad_fn=<SelectBackward>)

検算する

次に、実際の検算に入る。 まずは、次のようにして各パラメータの Tensor オブジェクトを得る。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_ih = model_weights['weight_ih_l0']
>>> W_hh = model_weights['weight_hh_l0']
>>> b_ih = model_weights['bias_ih_l0']
>>> b_hh = model_weights['bias_hh_l0']

まずは系列データの一番最初の t = 0X[0] に対応する隠れ状態ベクトルから求める。 ターゲットはこれ。

>>> H[0]
tensor([[-0.0096,  0.3380,  0.4147, -0.5187],
        [-0.5797,  0.0438, -0.3449, -0.0454]], grad_fn=<SelectBackward>)

やることは単純で、先ほどの式を PyTorch で表現すれば良い。 なお、t = 0 の時点では  h_{(t-1)} がないので、その項は消える。

>>> torch.tanh(torch.matmul(W_ih, X[0].T).T + b_ih + b_hh)
tensor([[-0.0096,  0.3380,  0.4147, -0.5187],
        [-0.5797,  0.0438, -0.3449, -0.0454]])

Tensor の値が一致していることがわかる。

次は t = 1X[1] に対応する隠れ状態ベクトルを求める。 ターゲットは以下。

>>> H[1]
tensor([[-0.3769,  0.5505,  0.1542, -0.6927],
        [ 0.1021,  0.4838,  0.0174, -0.5226]], grad_fn=<SelectBackward>)

t = 1 では  h_{(t-1)} h_0 になる。 とはいえ項が増えるだけで、やることは先ほどと変わらない。

>>> torch.tanh(torch.matmul(W_ih, X[1].T).T + b_ih + torch.matmul(W_hh, H[0].T).T + b_hh)
tensor([[-0.3769,  0.5505,  0.1542, -0.6927],
        [ 0.1021,  0.4838,  0.0174, -0.5226]], grad_fn=<TanhBackward>)

こちらも値が一致している。

あとは添字が増えるだけなので省略する。

初期 (t = 0) の隠れ状態ベクトルを渡す場合

先ほどの例では、初期 (t = 0) のときに  h_{(t-1)} に相当する隠れ状態ベクトルが存在しなかった。 これは自分で用意して渡すこともできるので、その場合の挙動も確認しておこう。

次のようにしてランダムな値で初期の隠れ状態ベクトルを h0 として用意する。 なお、先頭の次元は Simple RNN を重ねる段数を表している。 というのも、(総称としての) RNN は縦に積み重ねることで性能向上が望める場合があるらしい 1。 PyTorch のRNN も、インスタンス化するときに num_layers という引数で重ねる数が指定できる。 なお、デフォルト値は 1 になっている。

>>> rnn_layers = 1  # Simple RNN を重ねる数 (num_layers の値)
>>> h0 = torch.randn(rnn_layers, batch_size, hidden_dim)
>>> h0.shape
torch.Size([1, 2, 4])

初期の隠れ状態ベクトルをモデルに渡すには、次のように 2 番目の引数として渡せば良い。

>>> H, hn = model(X, h0)

先ほどと同じように検算してみよう。

>>> H[0]
tensor([[-0.1925,  0.6594, -0.2041, -0.4893],
        [ 0.5740,  0.8465, -0.5979, -0.8112]], grad_fn=<SelectBackward>)

といっても、最初の  h_{(t-1)} として h0 を使うだけ。

>>> torch.tanh(torch.matmul(W_ih, X[0].T).T + b_ih + torch.matmul(W_hh, h0[0].T).T + b_hh)
tensor([[-0.1925,  0.6594, -0.2041, -0.4893],
        [ 0.5740,  0.8465, -0.5979, -0.8112]])

残りは変わらない。

>>> H[1]
tensor([[ 0.0929,  0.5283,  0.2951, -0.7210],
        [-0.1403,  0.0510,  0.3764, -0.4921]], grad_fn=<SelectBackward>)
>>> torch.tanh(torch.matmul(W_ih, X[1].T).T + b_ih + torch.matmul(W_hh, H[0].T).T + b_hh)
tensor([[ 0.0929,  0.5283,  0.2951, -0.7210],
        [-0.1403,  0.0510,  0.3764, -0.4921]], grad_fn=<TanhBackward>)

Simple RNN を重ねた場合

先ほど述べたとおり RNN は層を重ねることで性能向上が望める場合がある。 その場合についても確認しておく。

まずは RNN を 2 層重ねたモデルを用意する。

>>> rnn_layers = 2
>>> model = nn.RNN(input_size=input_dim, hidden_size=hidden_dim, num_layers=rnn_layers)

次のようにモデルのパラメータが増えている。 具体的には名前の末尾が l0 になったものと l1 になったものがある。 これはつまりl0 の上に l1 が重なっていることを示す。

>>> pprint(list(model.named_parameters()))
[('weight_ih_l0',
  Parameter containing:
tensor([[-0.3591,  0.0948, -0.0500],
        [ 0.1963, -0.1717, -0.3551],
        [ 0.0313,  0.0495, -0.0878],
        [ 0.3109,  0.3728,  0.2577]], requires_grad=True)),
 ('weight_hh_l0',
  Parameter containing:
tensor([[-0.3050, -0.0269,  0.1772,  0.0081],
        [-0.0770,  0.3563, -0.1209,  0.0126],
        [-0.3534,  0.0264,  0.2649,  0.2235],
        [ 0.3338, -0.0708,  0.4314, -0.0149]], requires_grad=True)),
 ('bias_ih_l0',
  Parameter containing:
tensor([ 0.3767,  0.3653, -0.1024,  0.3425], requires_grad=True)),
 ('bias_hh_l0',
  Parameter containing:
tensor([-0.1083, -0.1802, -0.2972,  0.1099], requires_grad=True)),
 ('weight_ih_l1',
  Parameter containing:
tensor([[ 0.2279, -0.4886,  0.4573,  0.2441],
        [-0.0949, -0.2300,  0.1320, -0.2643],
        [ 0.0720,  0.4727,  0.2005, -0.0784],
        [-0.0784,  0.3208,  0.4977, -0.0190]], requires_grad=True)),
 ('weight_hh_l1',
  Parameter containing:
tensor([[-0.0565,  0.1433,  0.0810,  0.1619],
        [ 0.2734,  0.3270, -0.2813,  0.1076],
        [ 0.2989,  0.0412, -0.1173,  0.1614],
        [-0.0805, -0.1851, -0.1254,  0.0713]], requires_grad=True)),
 ('bias_ih_l1',
  Parameter containing:
tensor([-0.3898, -0.1349, -0.2269, -0.1637], requires_grad=True)),
 ('bias_hh_l1',
  Parameter containing:
tensor([ 0.4969,  0.3327,  0.4548, -0.3809], requires_grad=True))]

それぞれのパラメータの重みを取得しておく。

>>> model_weights = {name: param.data for name, param
...                  in model.named_parameters()}
>>> 
>>> W_ih_l0 = model_weights['weight_ih_l0']
>>> W_hh_l0 = model_weights['weight_hh_l0']
>>> b_ih_l0 = model_weights['bias_ih_l0']
>>> b_hh_l0 = model_weights['bias_hh_l0']
>>> 
>>> W_ih_l1 = model_weights['weight_ih_l1']
>>> W_hh_l1 = model_weights['weight_hh_l1']
>>> b_ih_l1 = model_weights['bias_ih_l1']
>>> b_hh_l1 = model_weights['bias_hh_l1']

入力のダミーデータはそのままに、モデルからあらためて隠れ状態ベクトルを取得する。

>>> H, hn = model(X)

初期状態 (t = 0) をターゲットにする。

>>> H[0]
tensor([[-0.1115, -0.0662,  0.2981, -0.5452],
        [ 0.0896,  0.0750,  0.2003, -0.6533]], grad_fn=<SelectBackward>)

まずは、これまでの要領で隠れ状態ベクトルを得る。 ただし、これはあくまで 1 層目の出力にすぎない。 使っているパラメータの名前も末尾が _l0 になっている。

>>> h0_l0 = torch.tanh(torch.matmul(W_ih_l0, X[0].T).T + b_ih_l0 + b_hh_l0)
>>> h0_l0
tensor([[ 0.0724,  0.3836, -0.3525,  0.4635],
        [ 0.6664,  0.0096, -0.3751,  0.0292]])

続いて、1 層目の出力を 2 層目に入力して計算する。

>>> torch.tanh(torch.matmul(W_ih_l1, h0_l0.T).T + b_ih_l1 + b_hh_l1)
tensor([[-0.1115, -0.0662,  0.2981, -0.5452],
        [ 0.0896,  0.0750,  0.2003, -0.6533]])

これで値が一致した。

そんなかんじで。

参考書籍


  1. 詳しくは「ゼロから作るDeep Learning ❷ ―自然言語処理編」を参照のこと

Python: Google Colaboratory で Cloud TPU を TensorFlow から試してみる

Google Colaboratory では、ランタイムのタイプを変更することで Cloud TPU (Tensor Processing Unit) を利用できる。 Cloud TPU は、Google が開発しているハードウェアアクセラレータの一種。 利用することで、行列計算のパフォーマンス向上が期待できる。 ただ、Cloud TPU は CPU や GPU に比べると扱う上でのクセがそれなりにつよい。 今回は、そんな Cloud TPU を使ってみることにする。

使った環境は次のとおり。

# pip list | grep "^tensorflow "
tensorflow                         2.1.0

もくじ

下準備

あらかじめ、メニューの「ランタイム」から「ランタイムのタイプを変更」を選択して、ハードウェアアクセラレータに TPU を指定しておく。

そして、TensorFlow をインポートしておく。

>>> import tensorflow as tf

TPU に接続する

まず、TPU を利用するには、最初に TPU クラスタへ接続する必要がある。 というのも、TPU のデバイスは実行中のホストで動作しているわけではない。 専用のホストに搭載されていて、それを gRPC 経由で制御するらしい。

はじめに、Google Colaboratory の環境であれば tf.distribute.cluster_resolver.TPUClusterResolver() を引数なしで実行する。 これで、利用可能な TPU クラスタの情報が得られる。

>>> tpu = tf.distribute.cluster_resolver.TPUClusterResolver()

あとは、tf.config.experimental_connect_to_cluster() を使って TPU クラスタに接続する。

>>> tf.config.experimental_connect_to_cluster(tpu)

接続できたら、TPU を初期化する。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

これで TPU を利用する準備ができた。 tf.config.list_logical_devices() を使って、タイプが TPU のデバイスを調べると、認識しているデバイスの一覧が確認できる。 下記を見て分かるとおり、複数のデバイスが確認できる。

>>> devices = tf.config.list_logical_devices('TPU')
>>> devices
[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]

現在利用できる TPU v2 / v3 には、最小構成単位である TPU ボード 1 枚につき 4 つの TPU チップが載っている。 そして、それぞれのチップには 2 つの TPU コアがあるため、合計で 8 つのコアがある。 上記は、各コアがデバイスとして TensorFlow から認識されていることを示している。

これは、GPU であれば筐体に複数枚のグラフィックカードを差していたり、あるいは複数のコアが載った GPU チップを利用している状態に近い。 つまり、TPU のパフォーマンスを最大限に活用しようとすると、必然的に複数のデバイスを使った分散学習をすることになる。

ちなみに、Google Colaboratory で利用できるのは単一の TPU ボードだけっぽい。 Google Cloud 経由で利用する場合には、それ以外に TPU Pod や TPU スライスといった、複数の TPU ボードから成るシステムも利用できる。 その場合も、おそらく見え方としては上記のデバイスが増えるだけなんだろう。

単一のデバイスで演算する

さて、デバイスを認識できるようになったので、早速その中の一つを使って行列演算を試してみよう。

まずは、適当に (2, 3) な形状の行列と (3, 2) な形状の行列を作る。

>>> tf.random.set_seed(42)
>>> x = tf.random.normal(shape=(2, 3))
>>> y = tf.random.normal(shape=(3, 2))

TensorFlow では、tf.device() 関数にデバイスの情報を渡してコンテキストマネージャとして使うと、そのデバイス上で演算を実行できる。 試しに先頭の TPU デバイスを使って行列の積を求めてみよう。

>>> with tf.device(devices[0]):
...      z = tf.matmul(x, y)

次のように、ちゃんと計算できた。

>>> z
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[0.5277252 , 4.685486  ],
       [0.8692589 , 0.21500015]], dtype=float32)>

複数のデバイスで演算する

さて、単一のデバイスで計算できることは分かったので、続いては複数のデバイスで分散処理してみよう。

その前に、一旦 TPU の状態を初期化しておく。 TPU で何か新しい処理を始めるときは、初期化しておかないと上手く動作しないことがある。

>>> tf.tpu.experimental.initialize_tpu_system(tpu)

TensorFlow で複数のデバイスを使った分散処理をするときは、tf.distribute.Strategy というオブジェクト (以下、ストラテジオブジェクト) を使うことになる。 このオブジェクトには具体的な実装がいくつかあって、何を使うかによってどのように分散処理を進めるかが決まる。 ただし、TPU を使うときは tf.distribute.TPUStrategy を使うことに決まっているので選択の余地はない。

>>> strategy = tf.distribute.TPUStrategy(tpu)

試しに、先ほどと同じように行列の積を分散処理でやらせてみよう。 そのためには、まず行列の積を計算するためのヘルパー関数を次のように定義しておく。 生の tf.matmul() をそのまま使えないの?と思うけど、どうやら今のところ使えなさそう。

>>> @tf.function
... def matmul_fn(x, y):
...   """行列の積を計算する関数"""
...   z = tf.matmul(x, y)
...   return z
... 

あとは、上記の関数を先ほどのストラテジオブジェクトの run() メソッド経由で呼び出すだけ。

>>> zs = strategy.run(matmul_fn, args=(x, y))

結果を確認してみよう。 PerReplica というオブジェクトで、コアと同じ数の計算結果が得られていることがわかる。 それぞれのコアで同じ計算がされたようだ。

>>> zs
PerReplica:{
  0: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  1: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  2: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  3: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  4: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  5: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  6: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>,
  7: <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[-0.4504242 , -0.07991219],
       [-0.5104828 ,  0.57960224]], dtype=float32)>
}

さて、上記はすべての処理に同じデータを与えているので、結果もすべて同じになっている。 なるほどって感じだけど、これでは複数のデバイスを使っている意味がない。 そこで、続いてはデバイス毎に与えるデータを変えてみよう。

まずは、以下のようにして整数を順番に返す Dataset オブジェクトを作る。

>>> range_dataset = tf.data.Dataset.range(16)
>>> list(range_dataset.as_numpy_iterator())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

続いて、上記を分散処理に使うデバイスの数に合わせてミニバッチへ分割する。 以下は Strategy#num_replicas_in_sync に 1 をかけているので、各デバイスに 1 つずつサンプルを与える場合の設定。

>>> batch_size = 1 * strategy.num_replicas_in_sync
>>> batch_dataset = range_dataset.batch(batch_size)
>>> list(batch_dataset.as_numpy_iterator())
[array([0, 1, 2, 3, 4, 5, 6, 7]), array([ 8,  9, 10, 11, 12, 13, 14, 15])]

そして、上記の Dataset オブジェクトを Strategy#experimental_distribute_dataset() メソッドに渡す。 すると、DistributedDataset というオブジェクトが得られる。

>>> dist_dataset = strategy.experimental_distribute_dataset(batch_dataset)
>>> dist_dataset
<tensorflow.python.distribute.input_lib.DistributedDataset at 0x7f546167d110>

この DistributedDataset オブジェクトからは、先ほど分散処理の結果として返ってきた PerReplica というオブジェクトが得られる。

>>> ite = iter(dist_dataset)
>>> x = next(ite)
>>> x
PerReplica:{
  0: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([0])>,
  1: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([1])>,
  2: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([2])>,
  3: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([3])>,
  4: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([4])>,
  5: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([5])>,
  6: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([6])>,
  7: <tf.Tensor: shape=(1,), dtype=int64, numpy=array([7])>
}

上記の PerReplica オブジェクトを使うと、それぞれのデバイスに対して異なる入力データを与えることができる。 以下の、引数を 2 倍する関数で試してみよう。

>>> @tf.function
... def double_fn(x):
...     """引数を 2 倍する関数"""
...     return x * 2
... 

DistributedDataset オブジェクトから得られる PerReplica オブジェクトを、ストラテジオブジェクト経由で上記の関数に渡す。 すると、以下のように返り値として各要素が 2 倍になった PerReplica オブジェクトが得られることがわかる。

>>> for x in dist_dataset:
...     result = strategy.run(double_fn, args=(x, ))
...     print(result)
PerReplica:{
  0: tf.Tensor([0], shape=(1,), dtype=int64),
  1: tf.Tensor([2], shape=(1,), dtype=int64),
  2: tf.Tensor([4], shape=(1,), dtype=int64),
  3: tf.Tensor([6], shape=(1,), dtype=int64),
  4: tf.Tensor([8], shape=(1,), dtype=int64),
  5: tf.Tensor([10], shape=(1,), dtype=int64),
  6: tf.Tensor([12], shape=(1,), dtype=int64),
  7: tf.Tensor([14], shape=(1,), dtype=int64)
}
PerReplica:{
  0: tf.Tensor([16], shape=(1,), dtype=int64),
  1: tf.Tensor([18], shape=(1,), dtype=int64),
  2: tf.Tensor([20], shape=(1,), dtype=int64),
  3: tf.Tensor([22], shape=(1,), dtype=int64),
  4: tf.Tensor([24], shape=(1,), dtype=int64),
  5: tf.Tensor([26], shape=(1,), dtype=int64),
  6: tf.Tensor([28], shape=(1,), dtype=int64),
  7: tf.Tensor([30], shape=(1,), dtype=int64)
}

上記から、複数のデバイスで、異なる入力データを使った分散処理をできることがわかった。

単一のデバイスで勾配降下法を試す

次は、また単一のデバイスに戻って、自動微分を使った勾配降下法が機能することを確認してみよう。 要するに、ニューラルネットワークが最適化できる本質的な部分の動作を見ておく。

以下のサンプルコードでは、最小化したい関数 objective() を定義している。 そして、それに適当な初期値を与えて、SGD をオプティマイザに最小化している。 実際に損失と勾配を計算してパラメータを更新しているのは training_step() という関数。

# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import tensorflow as tf


def objective(params: tf.Variable) -> tf.Tensor:
    """最小化したい関数"""
    # x_0^2 + x_1^2
    loss = params[0] ** 2 + params[1] ** 2
    return loss


def main():
    # TPU クラスタに接続する
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    devices = tf.config.list_logical_devices('TPU')
    print('TPU devices:', end='')
    pprint(devices)

    # 使用するオプティマイザ
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

    @tf.function
    def training_step(params: tf.Variable) -> None:
        """勾配降下法を使った最適化の 1 ステップ"""
        with tf.GradientTape() as t:
            # 損失を計算する
            loss = objective(params)
        # 勾配を計算する
        grads = t.gradient(loss, params)
        # パラメータを更新する
        optimizer.apply_gradients([(grads, params)])
        # tf.print(params)  # 少なくとも今の TPU では利用できない...

    # 初期値を用意する
    tensor = tf.constant([1., 4.], dtype=tf.float32)

    # 先頭の TPU デバイスで計算する
    with tf.device(devices[0]):
        # TPU デバイス上に Variable を用意する
        params = tf.Variable(tensor, trainable=True)
        # 最適化のループ
        for _ in range(20):  # 回数は適当
            training_step(params)

    # 結果を出力する
    print(f'{objective(params)} @ {params.numpy()}')


if __name__ == '__main__':
    main()

上記の実行結果は次のとおり。

(snip) ...
TPU devices:[LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'),
 LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]
0.0022596875205636024 @ [0.01152921 0.04611686]

最適化によって、最終的な objective(params) の結果が小さくなっていることが確認できる。

複数のデバイスで CNN を tf.keras で学習する

次は、これまでのサンプルよりも少し実用性が高めのコードを試す。 具体的には CNN のモデルを tf.keras を使って組んで、CIFAR-10 のデータを学習させてみる。

ポイントとしては、モデルやメトリックなどをストラテジオブジェクトのスコープで組み立てるところ。 こうすると、たとえば Variable オブジェクトは内部的にデバイス間で値が同期できる MirroredVariable になったりするらしい。

CIFAR-10 のデータはメモリに収まるサイズなので、オンメモリのデータから Dataset オブジェクトを生成している。 これが、もしメモリに収まりきらないときは TFRecord フォーマットで GCS に保存する必要がある。

TPU を使う際には、CPU や GPU の環境で動作したコードを転用するのがベストプラクティスらしい。 以下のサンプルコードでは、それがやりやすいように環境毎のストラテジオブジェクトを取得できる detect_strategy() という関数を定義した。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf


def detect_strategy():
    """利用できるハードウェアアクセラレータ毎に適した tf.distribute.Strategy を返す関数"""
    try:
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        devices = tf.config.list_logical_devices('TPU')
        if len(devices) > 0:
            # TPU が利用できる
            return tf.distribute.TPUStrategy(tpu)
    except ValueError:
        pass

    devices = tf.config.list_logical_devices('GPU')
    if len(devices) > 0:
        # GPU が利用できる
        return tf.distribute.MirroredStrategy()

    # Default
    return tf.distribute.get_strategy()


def normalize(element):
    """画像データを浮動小数点型にキャストして正規化する処理"""
    image = element['image']
    normalized_image = tf.cast(image, tf.float32) / 255.
    label = element['label']
    return normalized_image, label


def datafeed_pipeline(x, y, batch_size):
    """オンメモリのテンソルからデータを読み出す Dataset パイプライン"""
    mappings = {
        'image': x,
        'label': y,
    }
    ds = tf.data.Dataset.from_tensor_slices(mappings)
    ds = ds.map(normalize)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.cache()
    return ds


def main():
    # データセットをオンメモリに読み込む
    (train_x, train_y), (test_x, test_y) = tf.keras.datasets.cifar10.load_data()

    # データセットの仕様
    image_shape = train_x.shape[1:]
    num_classes = 10

    # 乱数シードを設定しておく
    tf.random.set_seed(42)

    # 環境に応じたストラテジオブジェクトを取得する
    strategy = detect_strategy()

    # データ供給のパイプラインを Dataset API で構築する
    device_batch_size = 512  # デバイス単位で見たバッチサイズ
    global_batch_size = strategy.num_replicas_in_sync * device_batch_size
    ds_train = datafeed_pipeline(train_x, train_y, global_batch_size)
    ds_test = datafeed_pipeline(test_x, test_y, global_batch_size)

    with strategy.scope():
        # ストラテジオブジェクトのスコープでモデルを組み立てる
        # これによって内部で使われる Variable の型などが変わる
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=image_shape),
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(num_classes, activation='softmax')
        ])
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer='adam',
            metrics=['sparse_categorical_accuracy'],
        )

    # モデルの概要
    print(model.summary())

    # モデルを学習させる
    fit_callbacs = [
        tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                         patience=5,
                                         mode='min'),
    ]
    model.fit(ds_train,
              epochs=100,
              validation_data=ds_test,
              callbacks=fit_callbacs,
              )

    # テストデータを評価する
    scr, sca = model.evaluate(ds_test)
    print(f'Loss: {scr}, Accuracy: {sca}')


if __name__ == '__main__':
    main()

上記を実行してみよう。 今回は、精度とかは横に置いておく。

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_3 (Conv2D)            (None, 30, 30, 32)        896       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 15, 15, 32)        0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 13, 13, 64)        18496     
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 6, 6, 64)          0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 4, 4, 128)         73856     
_________________________________________________________________
flatten_1 (Flatten)          (None, 2048)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 64)                131136    
_________________________________________________________________
dense_3 (Dense)              (None, 10)                650       
=================================================================
Total params: 225,034
Trainable params: 225,034
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/100
13/13 [==============================] - 10s 431ms/step - loss: 2.1851 - sparse_categorical_accuracy: 0.1976 - val_loss: 1.9839 - val_sparse_categorical_accuracy: 0.2979
Epoch 2/100
13/13 [==============================] - 1s 88ms/step - loss: 1.9094 - sparse_categorical_accuracy: 0.3103 - val_loss: 1.8427 - val_sparse_categorical_accuracy: 0.3334
Epoch 3/100
13/13 [==============================] - 1s 85ms/step - loss: 1.7798 - sparse_categorical_accuracy: 0.3575 - val_loss: 1.7185 - val_sparse_categorical_accuracy: 0.3849

...(snip)...

Epoch 71/100
13/13 [==============================] - 1s 87ms/step - loss: 0.8315 - sparse_categorical_accuracy: 0.7118 - val_loss: 0.9328 - val_sparse_categorical_accuracy: 0.6744
Epoch 72/100
13/13 [==============================] - 1s 89ms/step - loss: 0.8223 - sparse_categorical_accuracy: 0.7148 - val_loss: 0.9292 - val_sparse_categorical_accuracy: 0.6737
Epoch 73/100
13/13 [==============================] - 1s 88ms/step - loss: 0.8054 - sparse_categorical_accuracy: 0.7224 - val_loss: 0.9340 - val_sparse_categorical_accuracy: 0.6756
3/3 [==============================] - 1s 16ms/step - loss: 0.9340 - sparse_categorical_accuracy: 0.6756
Loss: 0.9339648485183716, Accuracy: 0.675599992275238

ちゃんと動いているようだ。 カスタムトレーニングループを使うときは、また気にするところがあるみたいだけど、今回は取り扱わない。

参考

cloud.google.com

cloud.google.com

cloud.google.com

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

Python: Session State API で Streamlit をステートフルにする

これまで Streamlit で書いた Web アプリケーションは、基本的にステートレスだった。 つまり、何らかのイベントが生じてアプリケーションのコードが再評価されると、ウィジェットを除くほとんどすべてのオブジェクトの状態はリセットされていた。 アプリケーションをステートフルにする非公式なスニペットは一部で知られていたが、数行で使い始められるような気軽さはなかった。

そうした中、先日リリースされた Streamlit のバージョン 0.85 には、Session State API という機能が追加された。 この API は、読んで字のごとく Streamlit の Web アプリケーションに限定的ながらステートを持たせることができる機能となっている。

docs.streamlit.io

今回は、追加された Session State API を触ってみることにした。

使った環境は次のとおり。

$ sw_vers                 
ProductName:    macOS
ProductVersion: 11.5.1
BuildVersion:   20G80
$ python -V                   
Python 3.9.6
$ pip list | grep -i streamlit 
streamlit                0.85.0

もくじ

下準備

まずは肝心の Streamlit と、それ以外に可視化で使うデータセットを読み込むために Seaborn をインストールしておく。

$ pip install streamlit seaborn

ボタンを押すとカウンタが増減するサンプルコード

早速だけど、以下にカウンタの値をボタンに連動して増減させるサンプルコードを示す。 Session State API では、session_state という名前の辞書ライクなオブジェクトを扱う。 このオブジェクトに格納したオブジェクト (以下、便宜的にセッション変数と呼ぶ) は、アプリケーションが再評価されても消えずに引き継がれる。 セッション変数の値はウィジェットに追加されたコールバック関数の機能を介して更新する。 以下では st.button()on_change オプションにセッション変数の値を増減させるコールバック関数を登録している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # セッション変数が存在しないときは初期化する
    # ここでは 'counter' というセッション変数を作っている
    if 'counter' not in st.session_state:
        st.session_state['counter'] = 0

    # セッション変数の状態を表示する
    msg = f"Counter value: {st.session_state['counter']}"
    st.write(msg)

    # ボタンが押されたときに発火するコールバック
    def plus_one_clicks():
        # ボタンが押されたらセッション変数の値を増やす
        st.session_state['counter'] += 1
    # ボタンを作成するときにコールバックを登録しておく
    st.button(label='+1',
              on_click=plus_one_clicks)

    # ボタンが押されたらセッション変数の値を減らすバージョン
    def minus_one_clicks():
        st.session_state['counter'] -= 1
    st.button(label='-1',
              on_click=minus_one_clicks)

    # セッション変数の値をリセットするボタン
    def reset_clicks():
        st.session_state['counter'] = 0
    st.button(label='Reset',
              on_click=reset_clicks)


if __name__ == '__main__':
    main()

上記を保存したら Streamlit 経由で実行しよう。

$ streamlit run example.py

デフォルトでは自動で Web ブラウザが開くはず。 開かない場合には以下でアクセスする。

$ open http://localhost:8501

すると、次のような WebUI が表示される。 ボタンを押すと、それに連動してカウンタの値が増えたり減ったりする。

f:id:momijiame:20210728222510p:plain

これまで、ボタンをクリックするとイベントが生じてアプリケーションが再評価され、オブジェクトは一通りリセットされていた。 しかし、Session State API を使うことで、それが回避できている。

Session State API を使う上での注意点は次のようなものがありそう。

  • (当たり前だけど) 存在しない変数 (辞書のキー) を使おうとすると例外になる
  • ブラウザをリロードすると変数はリセットされる
  • ページを複数のタブで開いたとしても変数は共有されない

データフレームのページネーションを実現するサンプルコード

続いては、もうちょっと実用的な例としてページネーションを実現してみる。 以下のサンプルコードでは、タイタニックデータセットを読み込んで、それを 10 件ずつ表示するものになっている。 表示している場所をセッション変数で管理することでページネーションが実現できる。

# -*- coding: utf-8 -*-

import math

import seaborn as sns
import streamlit as st


@st.cache
def load_dataset():
    """Titanic データセットを読み込む関数"""
    return sns.load_dataset('titanic')


def main():
    # データセットを読み込んで必要なページ数を計算する
    df = load_dataset()
    rows_per_page = 10
    total_pages = math.ceil(len(df) / rows_per_page)

    if 'page' not in st.session_state:
        st.session_state['page'] = 1

    left_col, center_col, right_col = st.beta_columns(3)

    # ページ数の増減ボタン
    with left_col:
        def minus_one_page():
            st.session_state['page'] -= 1
        if st.session_state['page'] > 1:
            st.button(label='<< Prev',
                      on_click=minus_one_page)

    with right_col:
        def plus_one_page():
            st.session_state['page'] += 1
        if st.session_state['page'] < total_pages:
            st.button(label='Next >>',
                      on_click=plus_one_page)

    # 現在のページ番号
    with center_col:
        st.write(f"Page: {st.session_state['page']} / {total_pages}")

    # ページ番号に応じた範囲のデータフレームを表示する
    start_iloc = (st.session_state['page'] - 1) * rows_per_page
    end_iloc = start_iloc + rows_per_page + 1
    st.write(df.iloc[start_iloc:end_iloc])


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ streamlit run example.py

すると、ページ単位でデータフレームの内容が確認できる画面が表示される。

f:id:momijiame:20210728224651p:plain

いじょう。

Python: TFRecord フォーマットについて

TFRecord フォーマットは、TensorFlow がサポートしているデータセットの表現形式の一つ。 このフォーマットは、一言で表すと TensorFlow で扱うデータを Protocol Buffers でシリアライズしたものになっている。 特に、Dataset API との親和性に優れていたり、Cloud TPU を扱う上で実用上はほぼ必須といった特徴がある。 今回は、そんな TFRecord の扱い方について見ていくことにする。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.5
BuildVersion:   20G71
$ python -V
Python 3.9.6
$ pip list | grep -i tensorflow
tensorflow               2.5.0
tensorflow-datasets      4.3.0
tensorflow-estimator     2.5.0
tensorflow-metadata      1.1.0

もくじ

下準備

あらかじめ TensorFlow をインストールしておく。

$ pip install tensorflow tensorflow_datasets

そして、Python のインタプリタを起動する。

$ python

tensorflow パッケージを tf という名前でインポートしておく。

>>> import tensorflow as tf

概要

TFRecord フォーマットを TensorFlow の Python API から扱おうとすると、いくつかのオブジェクト (クラス) が登場する。 ただ、意外とその数が多いので、理解する上でとっつきにくさを生んでいる感じがある。 そこで、まずは一通りトップダウンで説明することにする。

それぞれの関係は、あるオブジェクトが別のオブジェクトを内包するようになっている。 階層構造で表すと、以下のような感じ。 階層構造で上にあるオブジェクトが、下にあるオブジェクトを内包する。

  • tf.Example
    • tf.train.Features
      • tf.train.Feature
        • tf.train.BytesList
        • tf.train.FloatList
        • tf.train.Int64List

tf.Example

tf.Example は、データセットに含まれる特定のサンプル (データポイント) に対応したオブジェクトになっている。 たとえば、教師あり学習のデータセットなら、あるサンプルの説明変数と目的変数のペアがこれに当たるイメージ。 ただ、サンプルに対応しているオブジェクトというだけなので、別に必要なら何を入れても構わない。 たとえば、画像データなら付随するメタデータとして横幅 (Width) と縦幅 (Height) のピクセル数が必要とかはあるはず。

このオブジェクトは単一の tf.train.Features というオブジェクトを内包する。

tf.train.Features

tf.train.Features は、名前から複数の特徴量を束ねるオブジェクトっぽいけど、まあ概ねその理解で正しいと思う。 概ね、というのは前述したとおりメタデータ的なものや説明変数も含まれるため。

このオブジェクトは複数の tf.train.Feature を内包する。

tf.train.Feature

tf.train.Feature は、特定の特徴量ないしメタデータや説明変数に対応したオブジェクト。

このオブジェクトは単一の tf.train.BytesList または tf.train.FloatList または tf.train.Int64List を内包する。

tf.train.BytesList

tf.train.BytesList は、特徴量としてバイト列のリストを扱うために用いるオブジェクト。

このオブジェクトは bytes 型のリストを内包する。 任意のバイト列を扱えるので、何らかのオブジェクトをシリアライズしたものを入れることができる。 詳しくは後述するけど、この特性は割と重要になってくる。 なぜなら、他の tf.train.FloatListtf.train.Int64List は一次元配列しか扱えないため。

tf.train.FloatList

tf.train.FloatList は、特徴量として浮動小数点のリストを扱うために用いるオブジェクト。

このオブジェクトは浮動小数点のリストを内包する。 前述したとおり、リストは一次元のものしか扱えない。

tf.train.Int64List

tf.train.Int64List は、特徴量として整数のリストを扱うために用いるオブジェクト。

このオブジェクトは整数のリストを内包する。 前述したとおり、リストは一次元のものしか扱えない。

基本的な使い方

一通りのオブジェクトの説明が終わったので、ここからは実際にコードを実行しながら試してみよう。 先ほどの説明とは反対に、ボトムアップでの実行になる。 これは、そうでないとオブジェクトを組み立てられないため。

まず、最もプリミティブなオブジェクトである tf.train.Int64Listtf.train.FloatListtf.train.BytesList から。 これらは前述したとおりバイト列・浮動小数点・整数のリストを内包するオブジェクトになっている。

>>> int64_list = tf.train.Int64List(value=[1, 2, 3])
>>> int64_list
value: 1
value: 2
value: 3

>>> float_list = tf.train.FloatList(value=[1., 2., 3.])
>>> float_list
value: 1.0
value: 2.0
value: 3.0

>>> bytes_list = tf.train.BytesList(value=[b'x', b'y', b'z'])
>>> bytes_list
value: "x"
value: "y"
value: "z"

前述したとおり、value には一次元配列しか渡せないらしい。 渡そうとすると次のようにエラーになる。

>>> import numpy as np
>>> x = np.random.randint(low=0, high=100, size=(3, 2))
>>> tf.train.Int64List(value=x)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: only integer scalar arrays can be converted to a scalar index
>>> tf.train.Int64List(value=[[1, 2], [3, 4]])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: [1, 2] has type list, but expected one of: int, long

この仕様だと、画像データとか扱うときに面倒くさくない?と思うはず。 そんなときは、多次元配列を次のようにバイト列にシリアライズしてやれば良い。

>>> serialized_x = tf.io.serialize_tensor(x)
>>> serialized_x
<tf.Tensor: shape=(), dtype=string, numpy=b'\x08\t\x12\x08\x12\x02\x08\x03\x12\x02\x08\x02"0\x08\x00\x00\x00\x00\x00\x00\x00^\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00G\x00\x00\x00\x00\x00\x00\x00'>

バイト列になっていれば tf.train.BytesList に入れることができる。

>>> tf.train.BytesList(value=[serialized_x.numpy()])
value: "\010\t\022\010\022\002\010\003\022\002\010\002\"0\010\000\000\000\000\000\000\000^\000\000\000\000\000\000\000\017\000\000\000\000\000\000\0006\000\000\000\000\000\000\000H\000\000\000\000\000\000\000G\000\000\000\000\000\000\000"

なお、もちろん多次元配列は Flatten して、別で保存しておいた shape の情報を使って復元してもかまわない。

続いては tf.train.Feature を使って先ほどの *List オブジェクトをラップする。 型ごとに引数が異なるため、そこだけ注意する。

>>> int64_feature = tf.train.Feature(int64_list=int64_list)
>>> int64_feature
int64_list {
  value: 1
  value: 2
  value: 3
}

>>> float_feature = tf.train.Feature(float_list=float_list)
>>> float_feature
float_list {
  value: 1.0
  value: 2.0
  value: 3.0
}

>>> bytes_feature = tf.train.Feature(bytes_list=bytes_list)
>>> bytes_feature
bytes_list {
  value: "x"
  value: "y"
  value: "z"
}

続いては、tf.train.Features を使って、複数の tf.train.Feature を束ねる。

>>> feature_mappings = {
...     'feature0': int64_feature,
...     'feature1': float_feature,
...     'feature2': bytes_feature,
... }
>>> features = tf.train.Features(feature=feature_mappings)
>>> features
feature {
  key: "feature0"
  value {
    int64_list {
      value: 1
      value: 2
      value: 3
    }
  }
}
feature {
  key: "feature1"
  value {
    float_list {
      value: 1.0
      value: 2.0
      value: 3.0
    }
  }
}
feature {
  key: "feature2"
  value {
    bytes_list {
      value: "x"
      value: "y"
      value: "z"
    }
  }
}

あとは tf.train.Example でラップするだけ。

>>> example = tf.train.Example(features=features)
>>> example
features {
  feature {
    key: "feature0"
    value {
      int64_list {
        value: 1
        value: 2
        value: 3
      }
    }
  }
  feature {
    key: "feature1"
    value {
      float_list {
        value: 1.0
        value: 2.0
        value: 3.0
      }
    }
  }
  feature {
    key: "feature2"
    value {
      bytes_list {
        value: "x"
        value: "y"
        value: "z"
      }
    }
  }
}

上記で完成した tf.train.Example オブジェクトがデータセットの中の特定のサンプルに対応することになる。 まあ、使っているのがダミーデータなのでちょっとイメージがつきにくいかもしれないけど。

tf.train.Example オブジェクトは SerializeToString() メソッドを使うことでバイト列にシリアライズできる。 つまり、.tfrecord の拡張子がついた TFRecord ファイルは、このシリアライズされたバイト列が書き込まれている。

>>> serialized_data = example.SerializeToString()
>>> serialized_data
b'\nL\n\x17\n\x08feature2\x12\x0b\n\t\n\x01x\n\x01y\n\x01z\n\x1c\n\x08feature1\x12\x10\x12\x0e\n\x0c\x00\x00\x80?\x00\x00\x00@\x00\x00@@\n\x13\n\x08feature0\x12\x07\x1a\x05\n\x03\x01\x02\x03'

ちなみに、これまでに登場したオブジェクトも、それぞれ単独で SerializeToString() を使えばシリアライズできる。

>>> int64_list.SerializeToString()
b'\n\x03\x01\x02\x03'
>>> int64_feature.SerializeToString()
b'\x1a\x05\n\x03\x01\x02\x03'
>>> features.SerializeToString()
b'\n\x13\n\x08feature0\x12\x07\x1a\x05\n\x03\x01\x02\x03\n\x17\n\x08feature2\x12\x0b\n\t\n\x01x\n\x01y\n\x01z\n\x1c\n\x08feature1\x12\x10\x12\x0e\n\x0c\x00\x00\x80?\x00\x00\x00@\x00\x00@@'

そして、シリアライズしたバイト列は、tf.train.Example.FromString() 関数を使ってデシリアライズできる。

>>> deserialized_object = tf.train.Example.FromString(serialized_data)
>>> deserialized_object
features {
  feature {
    key: "feature0"
    value {
      int64_list {
        value: 1
        value: 2
        value: 3
      }
    }
  }
  feature {
    key: "feature1"
    value {
      float_list {
        value: 1.0
        value: 2.0
        value: 3.0
      }
    }
  }
  feature {
    key: "feature2"
    value {
      bytes_list {
        value: "x"
        value: "y"
        value: "z"
      }
    }
  }
}

データセットを TFRecord ファイルに変換する

基本的な使い方がわかったところで、続いては実際にデータセットを TFRecord 形式のファイルに変換してみよう。

使う題材は特に何でも良いんだけど、今回は tensorflow-datasets 経由でロードした CIFAR10 を使うことにする。

>>> import tensorflow_datasets as tfds
>>> ds_train = tfds.load('cifar10', split='train')

このデータセットには (32, 32, 3) の形状を持った画像のテンソルと、それに対応したラベルが入っている。 画像のデータが一次元になっていないので、わざわざ Flatten する代わりに前述したシリアライズしてバイト列にする作戦でいこう。

>>> from pprint import pprint
>>> pprint(ds_train.element_spec)
{'id': TensorSpec(shape=(), dtype=tf.string, name=None),
 'image': TensorSpec(shape=(32, 32, 3), dtype=tf.uint8, name=None),
 'label': TensorSpec(shape=(), dtype=tf.int64, name=None)}

まず、特定のサンプルに対応したテンソルとラベルを前述した手順でシリアライズする関数を次のように定義する。

>>> def serialize_example(image, label):
...     """1 サンプルを Protocol Buffers で TFRecord フォーマットにシリアライズする関数"""
...     # 画像データをバイト列にシリアライズする
...     serialized_image = tf.io.serialize_tensor(image)
...     image_bytes_list = tf.train.BytesList(value=[serialized_image.numpy()])
...     # ラベルデータ
...     label_int64_list = tf.train.Int64List(value=[label.numpy()])
...     # 特徴量を Features にまとめる
...     feature_mappings = {
...         'image': tf.train.Feature(bytes_list=image_bytes_list),
...         'label': tf.train.Feature(int64_list=label_int64_list),
...     }
...     features = tf.train.Features(feature=feature_mappings)
...     # Example にまとめる
...     example_proto = tf.train.Example(features=features)
...     # バイト列にする
...     return example_proto.SerializeToString()
... 

続いて、データセットから取り出したサンプルに上記の関数を定義するヘルパー関数を次のように定義する。

>>> def tf_serialize_example(element):
...     """シリアライズ処理を tf.data.Dataset に適用するためのヘルパー関数"""
...     # イメージとラベルを取り出す
...     image = element['image']
...     label = element['label']
...     tf_string = tf.py_function(
...         serialize_example, 
...         (image, label),
...         tf.string,
...     )
...     return tf.reshape(tf_string, ())
... 

Dataset API を使って、上記の関数をデータセットに適用する。

>>> serialized_ds_train = ds_train.map(tf_serialize_example)

イテレータにしてサンプルをひとつ取り出してみよう。

>>> ite = iter(serialized_ds_train)
>>> next(ite)
<tf.Tensor: shape=(), dtype=string, numpy=b'\n\xb6\x18\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x07\n\xa3\x18\n\x05image\x12\x99
...

ちゃんとシリアライズされたバイト列が確認できる。

あとは、シリアライズしたバイト列が取り出せる Dataset オブジェクトを引数にして tf.data.experimental.TFRecordWriter#write() を呼び出すだけ。

>>> filename = 'cifar10-train.tfrecord'
>>> writer = tf.data.experimental.TFRecordWriter(filename)
>>> writer.write(serialized_ds_train)

上記はデータセットを丸ごと 1 つのファイルにしてる。 公式ドキュメントを見ると、パフォーマンスを考えると 100 ~ 200MB 程度のサイズで複数に分割するのがおすすめらしい。 これは、おそらく GCS とかにアップロードして並列で読み出すときの話。

カレントディレクトリを確認すると、次のようにファイルが書き出されているはず。

$ du -m cifar10-train.tfrecord
161    cifar10-train.tfrecord
$ file cifar10-train.tfrecord 
cifar10-train.tfrecord: data

TFRecord ファイルからデータを読み出す

次は上記のファイルを読み込んでデシリアライズする。 まず、tf.data.TFRecordDataset に TFRecord ファイルのパスを指定する。 これで、シリアライズしたバイト列を読み出せる Dataset オブジェクトが得られる。

>>> loaded_ds_train = tf.data.TFRecordDataset(filename)

上記からは tf.Example に対応したバイト列が 1 つずつ読み出せる。 なので、それを元のテンソルに戻す関数を次のように定義する。

>>> def deserialize_example(example_proto):
...     """バイト列をデシリアライズしてオブジェクトに戻す関数"""
...     # バイト列のフォーマット
...     feature_description = {
...         'image': tf.io.FixedLenFeature([], tf.string),
...         'label': tf.io.FixedLenFeature([], tf.int64),
...     }
...     # Tensor オブジェクトの入った辞書に戻す
...     parsed_element = tf.io.parse_single_example(example_proto,
...                                                 feature_description)
...     # 画像はバイト列になっているのでテンソルに戻す
...     parsed_element['image'] = tf.io.parse_tensor(parsed_element['image'],
...                                                  out_type=tf.uint8)
...     return parsed_element
... 

上記を先ほどの Dataset オブジェクトに適用する。

>>> deserialized_ds_train = loaded_ds_train.map(deserialize_example)

試しに中身を取り出してみると、ちゃんと画像とラベルのテンソルが復元できていることがわかる。

>>> ite = iter(deserialized_ds_train)
>>> next(ite)
{'image': <tf.Tensor: shape=(32, 32, 3), dtype=uint8, numpy=
array([[[143,  96,  70],
        [141,  96,  72],
        [135,  93,  72],
        ...,
        [ 96,  37,  19],
        [105,  42,  18],
        [104,  38,  20]],

       [[128,  98,  92],
        [146, 118, 112],
        [170, 145, 138],
        ...,
        [108,  45,  26],
        [112,  44,  24],
        [112,  41,  22]],

       [[ 93,  69,  75],
        [118,  96, 101],
        [179, 160, 162],
        ...,
        [128,  68,  47],
        [125,  61,  42],
        [122,  59,  39]],

       ...,

       [[187, 150, 123],
        [184, 148, 123],
        [179, 142, 121],
        ...,
        [198, 163, 132],
        [201, 166, 135],
        [207, 174, 143]],

       [[187, 150, 117],
        [181, 143, 115],
        [175, 136, 113],
        ...,
        [201, 164, 132],
        [205, 168, 135],
        [207, 171, 139]],

       [[195, 161, 126],
        [187, 153, 123],
        [186, 151, 128],
        ...,
        [212, 177, 147],
        [219, 185, 155],
        [221, 187, 157]]], dtype=uint8)>, 'label': <tf.Tensor: shape=(), dtype=int64, numpy=7>}

いじょう。

参考

www.tensorflow.org

Python: Luigi でタスク共通のパラメータを扱う

今回は、Luigi で複数のタスクが共通のパラメータを扱う方法について考えてみる。 ここらへん、調べてもあまりドキュメントなどが出てこなかった。 なので、ソースコードを読んでリバースエンジニアリング的に「こういう風にできそう」と判明した内容を書いてみる。 使う API のレイヤー的に、高レベルなやり方と低レベルなやり方が見つかったので、どちらも記載する。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V        
Python 3.9.5
$ pip list | grep -i luigi
luigi           3.0.3

もくじ

下準備

まずは、下準備として Luigi をインストールしておく。

$ pip install luigi

低レベル API (luigi.configuration.get_config()) を使う

まずは低レベル API の luigi.configuration.get_config() を使うやり方から。 この API を使うと、Luigi の設定ファイルを辞書形式でそのまま読み込むことができる。 読み込んだコンフィグは、どのタスクから利用することもできるため共通のパラメータを扱うことができる。

以下にサンプルコードを示す。 サンプルコードには TaskATaskB という、2 つのタスクを定義している。 この中では、それぞれ設定ファイルから [SharedConfig] セクションの shared_param パラメータを読み込んで使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
# 設定を取得するための API
from luigi.configuration import get_config


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig セクションのパラメータを取得する
        section_dict = dict(get_config().items('SharedConfig'))
        # パラメータの内容を標準出力に書き出す (ほんとは output() に書くべき)
        print('Hello,', section_dict['shared_param'], 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # 同じパラメータを使う
        section_dict = dict(get_config().items('SharedConfig'))
        print('Hello,', section_dict['shared_param'], 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

次のように設定ファイルを用意しよう。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

あとは実行するだけ。

$ python lowlayer.py

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の結果からわかるように、タスクが出力するメッセージの中に設定ファイルで指定した値が使われている。

高レベル API (luigi.Config) を使う

続いては高レベル API の luigi.Config を使うパターン。 こちらは luigi.Config というクラスを継承したクラスを定義する。 設定ファイからは、定義したクラスと同名のセクション経由でパラメータを設定できる。 複数のタスクからは、クラスをインスタンス化してやればパラメータがインジェクションされて得られる。

以下にサンプルコードを示す。 先ほどのサンプルコードから SharedConfig というクラスが増えている。 そして、TaskATaskBSharedConfig をインスタンス化して shared_param パラメータにアクセスしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class SharedConfig(luigi.Config):
    """複数のタスクから参照される共通のパラメータ"""
    shared_param = luigi.Parameter()


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig をインスタンス化してパラメータを取り出す
        # パラメータは luigi の設定ファイルで指定できる
        print('Hello,', SharedConfig().shared_param, 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # こちらも同様
        print('Hello,', SharedConfig().shared_param, 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

設定ファイルは先ほどと同じで良い。 セクション名とパラメータ名が同じになるようにクラスを定義してあるため。 作り直すなら次のようにする。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

実行してみよう。

$ python highlayer.py 

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果から、ちゃんとパラメータを参照できていることがわかる。

ところで、上記で使った luigi.Config というクラス、定義を見ると面白いことがわかる。 以下がそのソースコード。

github.com

なんと、luigi.Configluigi.Task を継承しているだけで、他に何もしていない。 つまり、ほとんど同一のものということになる。 実は、luigi.Config を使わなくても、luigi.Task でも同じことはできるのだ。

いじょう。

Python: Jupyter の IPython Kernel にスタートアップスクリプトを登録する

今回は Jupyter の IPython Kernel に、スタートアップスクリプトを登録する方法について書いてみる。 スタートアップスクリプトというのは、カーネルの起動時に読み込まれるコードのこと。 IPython Kernel というのは、いわゆるフツーのノートブックを Jupyter で実行するときに動いているバックエンドのプログラムを指している。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V           
Python 3.9.5
$ jupyter --version
jupyter core     : 4.7.1
jupyter-notebook : 6.4.0
qtconsole        : not installed
ipython          : 7.24.1
ipykernel        : 5.5.5
jupyter client   : 6.1.12
jupyter lab      : 3.0.16
nbconvert        : 6.1.0
ipywidgets       : not installed
nbformat         : 5.1.3
traitlets        : 5.0.5

もくじ

下準備

まずは JupyterLab と Pandas をインストールしておく。 Pandas の方はインポートするのに使うだけなので、あまり本質ではない。

$ pip install jupyterlab pandas

IPython のプロファイルについて

まず、本題に入る前に IPython のプロファイルという概念を説明しておく。 プロファイルは、ようするに IPython が動作するときの設定を扱う名前空間みたいなもの。 存在するプロファイルは ipython profile list コマンドで確認できる。

$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

上記のように、何もしなくても初期状態で default という名前のプロファイルがある。

実は、何気なく実行している ipython コマンドは、暗に --profile=default オプションをつけているのと等価になっている。

$ ipython  # ipython --profile=default と同じ

各プロファイルにはディレクトリがあって、そこにはプロファイル毎の設定ファイルや動作ログが収められている。

$ ipython profile locate default
/Users/amedama/.ipython/profile_default
$ ls $(ipython profile locate default)
db      log       security
history.sqlite    pid     startup

デフォルトのカーネルにスタートアップスクリプトを登録する

先ほど実行したコマンドの出力を見ると、プロファイルのディレクトリには、さらに startup というディレクトリがある。 ここに、名前が数字 2 ケタから始まる Python スクリプトを入れると、カーネルの起動時にそれが呼び出されるようになる。

試しに Pandas と NumPy のインポート文を追加してみよう。

$ cat << 'EOF' >> $(ipython profile locate default)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

試しに IPython を起動して pd という変数を参照してみると、ちゃんと Pandas のモジュールを指していることがわかる。

$ ipython -c "pd"       
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

前述したとおり、上記は暗に --profile=default を付けているのと等価になる。

$ ipython --profile=default -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

もちろん、これは Jupyter からも有効になる。 試しに Jupyter Lab を起動して、デフォルトのカーネルでノートブックを作ってみよう。

$ jupyter lab

先ほどと同じように pd という名前の変数を参照すると、ちゃんと読み込まれている。

f:id:momijiame:20210624191922p:plain

新たに専用のカーネルを作ってスタートアップスクリプトを登録する

続いては、専用のカーネルを作って、そこにスタートアップスクリプトを登録してみよう。 これは、たとえば用途ごとにスタートアップスクリプトを用意して使い分けたいようなユースケースを想定している。

はじめに、スタートアップスクリプトを登録するためのプロファイルを新たに用意する。 新しくプロファイルを作るには ipython profile create コマンドを使う。 ここでは customized という名前でプロファイルを作った。

$ ipython profile create customized
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_config.py'
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_kernel_config.py'
$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    customized
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

先ほどと同じように、プロファイルにスタートアップスクリプトを登録しておこう。

$ cat << 'EOF' >> $(ipython profile locate customized)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

ひとまず、プロファイル経由でスタートアップスクリプトが読み込まれているかを IPython の REPL で確認しておく。 --profile オプションで、作ったプロファイル customized を指定しよう。

$ ipython --profile=customized -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

続いて、カーネルの設定に入る。 まず、現在有効なカーネルの一覧は jupyter kernelspec list コマンドで得られる。

$ jupyter kernelspec list
Available kernels:
  python3    /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3

ここにはカーネルを起動するときの情報が入った kernel.json というファイルがある。 中身を見ると、結局のところカーネルの起動というのは $ python -m ipykernel_launcher -f ... というコマンドを実行しているのに過ぎないことがわかる。

$ cat ~/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json 
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}"
 ],
 "display_name": "Python 3",
 "language": "python"
}

おもむろに、デフォルトのカーネルのディレクトリを丸ごとコピーする。

$ cp -r ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}

そして kernel.json をちょこっと書きかえよう。

$ cat << 'EOF' > ~/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}",
  "--profile",
  "customized"
 ],
 "display_name": "Customized Python 3",
 "language": "python"
}
EOF

差分は以下のとおり。 要するに表示名に Customized をつけているのと、起動時のオプションに --profile customized を追加してるだけ。

$ diff -u ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}/kernel.json
--- /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json  2021-06-24 18:55:06.000000000 +0900
+++ /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json   2021-06-24 19:32:26.000000000 +0900
@@ -4,8 +4,10 @@
   "-m",
   "ipykernel_launcher",
   "-f",
-  "{connection_file}"
+  "{connection_file}",
+  "--profile",
+  "customized"
  ],
- "display_name": "Python 3",
+ "display_name": "Customized Python 3",
  "language": "python"
-}
\ No newline at end of file
+}

Jupyter Lab を起動してみよう。

$ jupyter lab

Web UI を確認すると、新しくカーネルが登録されていることがわかる。

f:id:momijiame:20210624193349p:plain

もちろん、カーネルを起動するとスタートアップスクリプトが実行される。

めでたしめでたし。

Python: Luigi から S3 互換のオブジェクトストレージを使う

今回は、Python のデータパイプライン構築用フレームワークの Luigi から、Amazon 以外が提供している S3 互換のオブジェクトストレージを利用する方法について書いてみる。 S3 互換のオブジェクトストレージとしては、ひとまず以下のエントリで紹介した MinIO をローカルホストで動かした。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers     
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V
Python 3.9.5
$ pip list | grep -i luigi   
luigi           3.0.3
$ minio --version
minio version RELEASE.2021-06-17T00-10-46Z

もくじ

下準備

下準備として、MinIO と AWS CLI、それに Luigi と Boto3 をインストールしておく。 Boto3 は AWS を操作するための Python のクライアントライブラリで、Luigi で AWS 関連の処理をするときに必要となる。

$ brew install minio awscli
$ pip install luigi boto3

デフォルトの設定で MinIO のサーバを起動する。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

そして、テスト用のバケットを example-bucket という名前で作っておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket

サンプルコード

早速だけど、以下にサンプルコードを示す。 サンプルコードでは、ExampleTask というタスクを 1 つ定義している。 タスクの output() メソッドを見ると分かるとおり、Luigi でタスクのターゲットを S3 にしたいときは luigi.contrib.s3.S3Target を使えば良い。 そして、このタスクは実行すると s3://example-bucket/greet.txt という URL にファイルを作る。 ファイルの中には `Hello, World! という文字列が書き込まれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi
from luigi.contrib.s3 import S3Target


class ExampleTask(luigi.Task):

    def run(self):
        # NOTE: バケットは自動で作られない点に注意する
        with self.output().open(mode='w') as out_fp:
            print('Hello, World!', file=out_fp)

    def output(self):
        return S3Target(path=f's3://example-bucket/greet.txt')


if __name__ == '__main__':
    luigi.run(main_task_cls=ExampleTask,
              local_scheduler=True)

上記に適当な名前をつけて保存する。 ここでは例として example.py という名前にした。

さて、問題は上記をそのまま実行すると、アクセス先が本家の AWS になってしまうところ。 どうにかしてローカルホストにアクセスしてもらわないといけない。

結論から先に述べると、Luigi の設定ファイルに [s3] というセクションを作って、そこに設定を書けば良い。 前述したとおり、Luigi の S3 関連の処理は AWS SDK for Python (Boto3) に依存している。 [s3] というセクションに定義したパラメータは、boto3.client() を初期化するときの引数としてそのまま渡される。 つまり、ここでアクセス先や認証情報を変更できる。

$ cat << 'EOF' > luigi.cfg
[s3]
aws_access_key_id=minioadmin
aws_secret_access_key=minioadmin
use_ssl=False
endpoint_url=http://localhost:9000
EOF

設定できたところでタスクを実行してみよう。

$ python example.py

...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 ExampleTask()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を確認してみよう。 バケットを確認すると、ちゃんとオブジェクトができている。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket/ --recursive
2021-06-23 18:32:31         14 greet.txt

オブジェクトの中身を確認すると、ちゃんとメッセージが書き込まれていることがわかる。

$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/greet.txt -
Hello, World!

めでたしめでたし。