CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: LightGBM の決定木を可視化して分岐を追ってみる

今回は、LightGBM が構築するブースターに含まれる決定木を可視化した上で、その分岐を追いかけてみよう。 その過程を通して、LightGBM の最終的な出力がどのように得られているのかを確認してみよう。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.3
BuildVersion:   20D91
$ python -V
Python 3.9.2

もくじ

下準備

まずは動作に必要なパッケージをインストールする。

決定木の可視化のために graphviz を、並列計算のために OpenMP を入れておく。

$ brew install graphviz libomp

そして、Python のパッケージを入れる。

$ pip install lightgbm scikit-learn graphviz matplotlib

二値分類問題 (乳がんデータセット)

まずは乳がんデータセットを使って二値分類問題を扱ってみよう。

LightGBM には、lightgbm.Booster オブジェクトに含まれる決定木を可視化する API として lightgbm.plot_tree() という関数が用意されている。 使うときは、tree_index オプションにイテレーション番号を指定することで、そのイテレーションで作成された決定木がグラフとして得られる。 以下のサンプルコードでは、学習させた lightgbm.Booster の先頭にある決定木をグラフにプロットした。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import lightgbm as lgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt


def main():
    # 乳がんデータセットを使う
    dataset = datasets.load_breast_cancer()
    x, y = dataset.data, dataset.target
    # ホールドアウト
    train_x, eval_x, train_y, eval_y = train_test_split(x, y,
                                                        stratify=y,
                                                        shuffle=True,
                                                        random_state=42)
    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(train_x, train_y,
                            feature_name=list(dataset.feature_names))
    lgb_eval = lgb.Dataset(eval_x, eval_y, reference=lgb_train)
    # 学習パラメータ
    lgb_params = {
        'objective': 'binary',
        'metric': "binary_logloss",
        'verbose': -1,
        'seed': 42,
        'deterministic': True,
    }
    # 学習する
    booster = lgb.train(params=lgb_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=50,
                        verbose_eval=10,
                        )

    # 検証用データの先頭の情報を出力する
    head_row = eval_x[0]
    pprint(dict(zip(dataset.feature_names, head_row)))
    # 1 本目の決定木だけを使って予測してみる
    single_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=1)
    print(f'single tree pred: {single_tree_pred}')
    # 2 本目までの決定木を使って予測してみる
    double_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=2)
    print(f'double tree pred: {double_tree_pred}')

    # 先頭の決定木を可視化してみる
    rows = 2
    cols = 1
    # 表示する領域を準備する
    fig = plt.figure(figsize=(12, 6))
    # 一本ずつプロットしていく
    for i in range(rows * cols):
        ax = fig.add_subplot(rows, cols, i + 1)
        ax.set_title(f'Booster index: {i}')
        lgb.plot_tree(booster=booster,
                      tree_index=i,
                      show_info='internal_value',
                      ax=ax,
                      )
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 実行すると、検証用データの先頭行の説明変数と、それを学習済みモデルで予測させたときのスコアが出力される。 なお、以下は一例であり環境が変わると出力は異なる可能性がある。 (LightGBM の seeddeterministic オプションは、学習に CPU を使った上で完全に同一の環境でのみ結果が同じになることを仮定できる)

$ python binary.py 
Training until validation scores don't improve for 50 rounds
[10]  training's binary_logloss: 0.247243 valid_1's binary_logloss: 0.266197
[20]  training's binary_logloss: 0.116632 valid_1's binary_logloss: 0.158874
[30]  training's binary_logloss: 0.0581821    valid_1's binary_logloss: 0.113181
[40]  training's binary_logloss: 0.0286961    valid_1's binary_logloss: 0.0965949
[50]  training's binary_logloss: 0.0140411    valid_1's binary_logloss: 0.0985209
[60]  training's binary_logloss: 0.00667688   valid_1's binary_logloss: 0.10083
[70]  training's binary_logloss: 0.00317889   valid_1's binary_logloss: 0.104945
[80]  training's binary_logloss: 0.00160051   valid_1's binary_logloss: 0.115742
[90]  training's binary_logloss: 0.00082228   valid_1's binary_logloss: 0.129502
Early stopping, best iteration is:
[45]  training's binary_logloss: 0.0201096    valid_1's binary_logloss: 0.0943608
{'area error': 28.62,
 'compactness error': 0.01561,
 'concave points error': 0.009199,
 'concavity error': 0.01977,
 'fractal dimension error': 0.003629,
 'mean area': 493.8,
 'mean compactness': 0.1117,
 'mean concave points': 0.02995,
 'mean concavity': 0.0388,
 'mean fractal dimension': 0.06623,
 'mean perimeter': 82.51,
 'mean radius': 12.75,
 'mean smoothness': 0.1125,
 'mean symmetry': 0.212,
 'mean texture': 16.7,
 'perimeter error': 2.495,
 'radius error': 0.3834,
 'smoothness error': 0.007509,
 'symmetry error': 0.01805,
 'texture error': 1.003,
 'worst area': 624.1,
 'worst compactness': 0.1979,
 'worst concave points': 0.08045,
 'worst concavity': 0.1423,
 'worst fractal dimension': 0.08557,
 'worst perimeter': 93.63,
 'worst radius': 14.45,
 'worst smoothness': 0.1475,
 'worst symmetry': 0.3071,
 'worst texture': 21.74}
single tree pred: [0.66326872]
double tree pred: [0.69607225]

今回は、以下のようなグラフが得られた。

f:id:momijiame:20210319004235p:plain
乳がんデータセットを学習したモデルの先頭にある決定木

検証用データの先頭行が、どのリーフに落ちるのかを決定木から確認してみよう。 まずは、Booster index: 0 から。

最初の条件は worst perimeter <= 112.800 になっていて、データは 93.63 なので yes に分岐する。 次は worst concave points <= 0.146 で、0.08045なので yes に分岐する。 以下、同様に area error <= 34.75028.62 なので yesworst texture <= 30.04521.74 なので yesmean radius <= 13.87512.75 なので yesmean radius <= 12.31012.75 なので no。 最終的に leaf 7 に落ちて、内部的なスコアは 0.678 になった。

さて、この 0.678 という値は、先頭の決定木だけを使って予測した 0.66326872 というスコアとは少し乖離がある。 これは当然のことで、実際には内部的なスコアにシグモイド関数がかかるため。

Python のインタプリタを別で起動して確認してみよう。

$ python

次のようにシグモイド関数を定義する。

>>> import numpy as np
>>> def sigmoid(x):
...     return 1. / (1. + np.exp(-x))
... 

leaf 7 の内部的なスコアをシグモイド関数にかけると、最終的な予測とほぼ同じ値が得られる。 微妙にズレているのはグラフに出力するときの値がデフォルトだと小数点 3 桁で丸められているから。

>>> sigmoid(0.678)
0.6632921720482895

同じように 2 本 (イテレーション) 目の決定木も確認してみよう。 2 本目は分岐の詳細は省略するけど、最終的に leaf 0 に落ちて 0.151 というスコアになる。 2 本目までの決定木を使った予測は、各決定木から得られる内部的なスコアを足してシグモイド関数にかければ良い。

>>> sigmoid(0.678 + 0.151)
0.6961434435735563

サンプルコードから得られた 0.69607225 というスコアと、ほぼ同じ結果が得られることがわかる。 以下、同様にすべてのイテレーションで作られた決定木のスコアを足し合わせていくことで最終的な結果が得られる。

回帰問題 (ボストンデータセット)

続いてはボストンデータセットを使って回帰問題を扱ってみる。 以下のサンプルコードは先ほどとやっていることはほとんど同じ。 問題が二値分類から回帰になっているだけ。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import lightgbm as lgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt


def main():
    # ボストンデータセットを使う
    dataset = datasets.load_boston()
    x, y = dataset.data, dataset.target
    # ホールドアウト
    train_x, eval_x, train_y, eval_y = train_test_split(x, y,
                                                        shuffle=True,
                                                        random_state=42)
    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(train_x, train_y,
                            feature_name=list(dataset.feature_names))
    lgb_eval = lgb.Dataset(eval_x, eval_y, reference=lgb_train)
    # 学習パラメータ
    lgb_params = {
        'objective': 'regression',
        'metric': "rmse",
        'verbose': -1,
        'seed': 42,
        'deterministic': True,
    }
    # 学習する
    booster = lgb.train(params=lgb_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=50,
                        verbose_eval=10,
                        )

    # 検証用データの先頭の情報を出力する
    head_row = eval_x[0]
    pprint(dict(zip(dataset.feature_names, head_row)))
    # 1 本目の決定木だけを使って予測してみる
    single_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=1)
    print(f'single tree pred: {single_tree_pred}')
    # 2 本目までの決定木を使って予測してみる
    double_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=2)
    print(f'double tree pred: {double_tree_pred}')

    # 先頭の決定木を可視化してみる
    rows = 2
    cols = 1
    # 表示する領域を準備する
    fig = plt.figure(figsize=(12, 6))
    # 一本ずつプロットしていく
    for i in range(rows * cols):
        ax = fig.add_subplot(rows, cols, i + 1)
        ax.set_title(f'Booster index: {i}')
        lgb.plot_tree(booster=booster,
                      tree_index=i,
                      show_info='internal_value',
                      ax=ax,
                      )
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python regression.py 
Training until validation scores don't improve for 50 rounds
[10]  training's rmse: 4.71003    valid_1's rmse: 4.87178
[20]  training's rmse: 3.18599    valid_1's rmse: 3.9085
[30]  training's rmse: 2.68799    valid_1's rmse: 3.66692
[40]  training's rmse: 2.3669 valid_1's rmse: 3.55354
[50]  training's rmse: 2.13835    valid_1's rmse: 3.41701
[60]  training's rmse: 1.96456    valid_1's rmse: 3.38303
[70]  training's rmse: 1.81511    valid_1's rmse: 3.35055
[80]  training's rmse: 1.68986    valid_1's rmse: 3.34631
[90]  training's rmse: 1.59394    valid_1's rmse: 3.34022
[100] training's rmse: 1.49454    valid_1's rmse: 3.30722
[110] training's rmse: 1.41423    valid_1's rmse: 3.30035
[120] training's rmse: 1.33056    valid_1's rmse: 3.28779
[130] training's rmse: 1.25246    valid_1's rmse: 3.26555
[140] training's rmse: 1.19406    valid_1's rmse: 3.25197
[150] training's rmse: 1.13264    valid_1's rmse: 3.24115
[160] training's rmse: 1.07332    valid_1's rmse: 3.23656
[170] training's rmse: 1.02584    valid_1's rmse: 3.22715
[180] training's rmse: 0.983137   valid_1's rmse: 3.2189
[190] training's rmse: 0.940608   valid_1's rmse: 3.22034
[200] training's rmse: 0.898673   valid_1's rmse: 3.22073
[210] training's rmse: 0.862312   valid_1's rmse: 3.22325
[220] training's rmse: 0.827644   valid_1's rmse: 3.22175
[230] training's rmse: 0.795422   valid_1's rmse: 3.22017
Early stopping, best iteration is:
[184] training's rmse: 0.966589   valid_1's rmse: 3.21349
{'AGE': 84.1,
 'B': 395.5,
 'CHAS': 0.0,
 'CRIM': 0.09178,
 'DIS': 2.6463,
 'INDUS': 4.05,
 'LSTAT': 9.04,
 'NOX': 0.51,
 'PTRATIO': 16.6,
 'RAD': 5.0,
 'RM': 6.416,
 'TAX': 296.0,
 'ZN': 0.0}
single tree pred: [23.08757859]
double tree pred: [23.24927528]

以下のようなグラフが得られる。

f:id:momijiame:20210319010222p:plain
ボストンデータセットを学習したモデルの先頭にある決定木

先ほどと同じように決定木の分岐を追いかけてみる。 分岐を辿ると、1 本目の決定木は leaf 10 に落ちて 23.088 というスコアが得られる。 これは先頭 1 本だけを使った予測と同じ値になっており、回帰では内部的なスコアがそのまま最終的な出力となることがわかる。

同様に 2 本目の分岐を辿ると leaf 11 に落ちて 0.162 というスコアになる。 2 本目までを使った予測は、両方の決定木のスコアを足し合わせることで得られる。

>>> 23.088 + 0.162
23.25

多値分類問題 (あやめデータセット)

続いてはあやめデータセットを使って多値分類問題を扱う。 基本的にはこれまでと変わらないけど、多値分類問題は内部的に作られる決定木の数が多い。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

from pprint import pprint

import lightgbm as lgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt


def main():
    # あやめデータセットを使う
    dataset = datasets.load_iris()
    x, y = dataset.data, dataset.target
    # ホールドアウト
    train_x, eval_x, train_y, eval_y = train_test_split(x, y,
                                                        stratify=y,
                                                        shuffle=True,
                                                        random_state=42)
    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(train_x, train_y,
                            feature_name=list(dataset.feature_names))
    lgb_eval = lgb.Dataset(eval_x, eval_y, reference=lgb_train)
    # 学習パラメータ
    lgb_params = {
        'objective': 'multiclass',
        'metric': 'softmax',
        'num_class': 3,
        'verbose': -1,
        'seed': 42,
        'deterministic': True,
    }
    # 学習する
    booster = lgb.train(params=lgb_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=50,
                        verbose_eval=10,
                        )

    # 検証用データの先頭の情報を出力する
    head_row = eval_x[0]
    pprint(dict(zip(dataset.feature_names, head_row)))
    # 1 本目の決定木だけを使って予測してみる
    single_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=1)
    print(f'single tree pred: {single_tree_pred}')
    # 2 本目までの決定木を使って予測してみる
    double_tree_pred = booster.predict(data=[head_row],
                                       num_iteration=2)
    print(f'double tree pred: {double_tree_pred}')

    # 先頭の決定木を可視化してみる
    rows = 2
    cols = 3
    # 表示する領域を準備する
    fig = plt.figure(figsize=(14, 6))
    # 一本ずつプロットしていく
    for i in range(rows * cols):
        ax = fig.add_subplot(rows, cols, i + 1)
        ax.set_title(f'Booster index: {i}')
        lgb.plot_tree(booster=booster,
                      tree_index=i,
                      show_info='internal_value',
                      ax=ax,
                      )
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python multiclass.py
Training until validation scores don't improve for 50 rounds
[10]  training's multi_logloss: 0.287552  valid_1's multi_logloss: 0.366785
[20]  training's multi_logloss: 0.119475  valid_1's multi_logloss: 0.232667
[30]  training's multi_logloss: 0.0678466 valid_1's multi_logloss: 0.234263
[40]  training's multi_logloss: 0.0346539 valid_1's multi_logloss: 0.270086
[50]  training's multi_logloss: 0.016588  valid_1's multi_logloss: 0.350929
[60]  training's multi_logloss: 0.00939384    valid_1's multi_logloss: 0.384578
[70]  training's multi_logloss: 0.00567975    valid_1's multi_logloss: 0.414652
Early stopping, best iteration is:
[26]  training's multi_logloss: 0.0854995 valid_1's multi_logloss: 0.216513
{'petal length (cm)': 1.3,
 'petal width (cm)': 0.2,
 'sepal length (cm)': 4.4,
 'sepal width (cm)': 3.2}
single tree pred: [[0.4084366 0.2957817 0.2957817]]
double tree pred: [[0.47189453 0.26405274 0.26405274]]

以下のようなグラフが得られる。

f:id:momijiame:20210319010334p:plain
あやめデータセットを学習したモデルの先頭にある決定木

LightGBM では、多値分類問題を扱う際に「クラス数 x イテレーション数」本の決定木が作られる。 先頭にある「クラス数」本の決定木が、各クラスの出力を得るのに使われる。 今回の例でいえばあやめの品種は 3 種類なので、先頭の 3 本が 1 イテレーション目のそれぞれの品種に対応することになる。 同様に、4 ~ 6 本目が 2 イテレーション目のそれぞれの品種に対応する。 ようするに、上記のグラフでいうと縦に並んでいる決定木がそれぞれの品種 (クラス) に対応しているということ。

今回も決定木の分岐を追いかけてみよう。 1 本目の決定木は leaf 0 に落ちて -0.884 というスコアになる。 同様に、2 本目は leaf 0 に落ちて -1.207 になる。 3 本目は leaf 2 に落ちて -1.207 になる。

さて、1 イテレーション目の内部的なスコアは [-0.884, -1.207, -1.207] になった。 これを 1 イテレーション目の最終的な出力である [0.4084366 0.2957817 0.2957817] にするにはソフトマックス関数にかける。

以下のように、ソフトマックス関数を定義する。

>>> def softmax(x):
...     return np.exp(x) / np.sum(np.exp(x))
...

内部的なスコアをソフトマックス関数にかけてみよう。

>>> softmax([-0.884, -1.207, -1.207])
array([0.40850546, 0.29574727, 0.29574727])

すると、最終的な出力とほぼ同じ値になった。 2 イテレーション目以降の処理は、これまでと同じなので省略する。 足すだけ。

まとめ

LightGBM の決定木を可視化して分岐を追いかけることで最終的な予測がどのように得られるのかを確認できた。

Python: ipywidgets で Jupyter に簡単な UI を作る

Jupyter を使ってデータを可視化していると、似たようなグラフを何度も描くことがある。 そんなとき、変数の値を変更しながらグラフを描画するセルを実行しまくるのは効率があまりよくない。 そこで、今回は ipywidgets を使って簡単な UI を作ることで、Jupyter でインタラクティブな操作ができるようにしてみる。 グラフの描画には、今回は主に Matplotlib を使うことを想定している。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.2
BuildVersion:   20D80
$ python -V
Python 3.9.2
$ pip list | grep widgets   
ipywidgets          7.6.3
jupyterlab-widgets  1.0.0
widgetsnbextension  3.5.1

下準備

下準備として、必要なパッケージをインストールしておく。

$ pip install ipywidgets jupyterlab matplotlib

そして、JupyterLab を起動する。

$ jupyter-lab

ウィジェットを作る

ここからは、コードを JupyterLab のセル内で実行することを想定する。

まずは、ipywidgets パッケージを widgets という名前でインポートしておこう。 名前を変更しているのは、公式のサンプルコードがこのやり方をしているため。

import ipywidgets as widgets

たとえば、もっとも単純なサンプルとしてボタンを作ってみよう。 はじめに、widgets.Button クラスをインスタンス化する。

button = widgets.Button(description='Click me')

上記でインスタンス化したボタンを表示するには、たとえばセルの最後で評価させる方法がある。

button

f:id:momijiame:20210302194334p:plain
widgets.Button

一方で、上記のやり方はウィジェットが複数あったり、複雑なパターンを扱いにくい。 そのため IPython.display.display() 関数の引数にウィジェットを渡していくやり方がおそらく分かりやすいと思う。 JupyterLab だとインポートしないで使えるっぽいけど、念のためインポートした上でボタンを可視化するサンプルコードを以下に示す。

from IPython.display import display
display(button)

もちろん、見え方は先ほどと同じ。

ちなみに、ボタン以外にもウィジェットはたくさんある。 一覧はとても紹介しきれないので、以下の公式ページを見てもらいたい。

ipywidgets.readthedocs.io

イベントハンドラと標準出力

さて、先ほど作ったボタンは、押しても何も起こらない。 何も起こらない UI を作っても意味がないので、次はウィジェットにイベントハンドラを登録しよう。

たとえば、widgets.Button なら widgets.Button#on_click() というメソッドで、クリックされた時に発火するイベントハンドラを登録できる。 試しに、イベントハンドラの中で print() 関数を呼んでみよう。

button = widgets.Button(description='Click me')

def on_click_callback(clicked_button: widgets.Button) -> None:
    """ボタンが押されたときに発火するイベントハンドラ"""
    print('Clicked')  # イベントハンドラ内で print() 関数を呼んでみる

# ボタンにイベントハンドラを登録する
button.on_click(on_click_callback)
display(button)

さて、上記を実行して表示されたボタンをクリックしてみても、実は何も表示されない。 おや?と思いながらブラウザの下方に目を移すと、"Log" というペインに通知が出てくるはず。 このペインを開くと、print() 関数で出力した内容がそこに表示されていることが分かる。

f:id:momijiame:20210302195653p:plain
イベントハンドラ内で print() するとデフォルトではログに残る

これはこれで悪くはないけど、毎回ログのペインを確認しながら作業するのも微妙な感じ。 できればセルの出力に表示させたいので `widgets.Output`` というウィジェットを使う。 このウィジェットは ipywidgets を使っていると、かなり登場機会が多い。 後ほどグラフを描画するときにもお世話になる。

widgets.Output はいくつかの使い方がある。 以下では、コンテキストマネージャとして使っている。 コンテキストマネージャのスコープ内で print() 関数を呼ぶと、出力先が widgets.Output の描画エリアに向く。

button = widgets.Button(description='Click me')
# 標準出力を表示するエリアを用意する
output = widgets.Output(layour={'border': '1px solid black'})

def on_click_callback(clicked_button: widgets.Button) -> None:
    # コンテキストマネージャとして使う
    with output:
        # スコープ内の標準出力は Output に書き出される
        print('Clicked')

button.on_click(on_click_callback)
# Output も表示対象に入れる
display(button, output)

f:id:momijiame:20210302200533p:plain
widgets.Output

もう一つの使い方は、デコレータとしてコールバック関数をラップするやり方。 これだと、そのコールバック関数内でのデフォルトの標準出力が widgets.Output に向く。

button = widgets.Button(description='Click me')
output = widgets.Output(layout={'border': '1px solid black'})

# デコレータとして使うとデフォルトの向け先になる
@output.capture()
def on_click_callback(b: widgets.Button) -> None:
    print('Clicked')

button.on_click(on_click_callback)
display(button, output)

得られる結果は同じ。

ちなみに widgets.Output は、そのままだと内容が追記されていく。 もし内容を消去したいときは widgets.Output#clear_output() メソッドを呼び出せば良い。 以下のサンプルコードでは、ボタンをクリックしたタイミングで前回の内容を消去しつつ、時刻を表示している。

from datetime import datetime

button = widgets.Button(description='Click me')
output = widgets.Output(layour={'border': '1px solid black'})

def on_click_callback(clicked_button: widgets.Button) -> None:
    with output:
        # 表示エリアの内容を消去する
        output.clear_output()
        print(f'Clicked at {datetime.now()}')

button.on_click(on_click_callback)
display(button, output)

# 手動でイベントを発生させる
button.click()

f:id:momijiame:20210302200635p:plain
widgets.Output#clear_output()

デコレータの使い方のときは Output#capture() の引数で clear_output オプションを有効にすると良い。 これで、コールバック関数が呼ばれる毎に出力内容がクリアされる。

from datetime import datetime

button = widgets.Button(description='Click me')
output = widgets.Output(layout={'border': '1px solid black'})

# 関数が呼ばれる度に出力をクリアする
@output.capture(clear_output=True)
def on_click_callback(b: widgets.Button) -> None:
    print(f'Clicked at {datetime.now()}')

button.on_click(on_click_callback)
display(button, output)

button.click()

複数のウィジェットを連携させる

さて、実際に UI を書いていくと、複数のウィジェットを連携させることが多い。 多くのウィジェットは、選択されている値を value というアトリビュートで読み出すことができる。

以下のサンプルコードでは、widgets.Button を押したタイミングで widgets.Select で選択されているアイテムを widgets.Output に表示させている。

button = widgets.Button(description='Click me')
# 値を選択するセレクタ
select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
output = widgets.Output(layour={'border': '1px solid black'})

@output.capture()
def on_click_callback(clicked_button: widgets.Button) -> None:
    # セレクタで選択されているアイテムを使う
    print(f'Selected item: {select.value}')

button.on_click(on_click_callback)
display(select, button, output)

f:id:momijiame:20210302201325p:plain
別のウィジェットの内容を読み取る

ウィジェットをグローバルスコープに置かない

複数のウィジェットを扱うようになると、それらがグローバルスコープにあるとコードがどんどんスパゲッティになっていく。 複数のセルに複数のウィジェットを置くと特にやばい。 そのため、以下のように一連のウィジェットは関数スコープの中で作るようにした方が良いと思う。

def show_widgets():
    """ウィジェットを設定する関数"""
    button = widgets.Button(description='Click me')
    select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
    output = widgets.Output(layour={'border': '1px solid black'})

    @output.capture()
    def on_click_callback(clicked_button: widgets.Button) -> None:
        print(f'Selected item: {select.value}')

    button.on_click(on_click_callback)
    display(select, button, output)

# ウィジェットを表示する
show_widgets()

ただ、これだと関数スコープ内にあるウィジェットが GC に拾われないか心配だったけど、とりあえず大丈夫そう。 どこに参照が生き残るのかはちょっと気になるね。

もしどうしても気になるようなら、以下のように widgets.VBox という複数のウィジェットをまとめるウィジェットを使うのはどうだろう。 これなら、少なくともグローバルスコープにウィジェットの参照が残るので、GC に拾われないことは担保できるはず。

def show_widgets() -> widgets.VBox:
    """ウィジェットを設定する関数"""
    button = widgets.Button(description='Click me')
    select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
    output = widgets.Output(layour={'border': '1px solid black'})

    @output.capture()
    def on_click_callback(clicked_button: widgets.Button) -> None:
        print(f'Selected item: {select.value}')

    button.on_click(on_click_callback)
    # 一連のウィジェットを VBox にまとめて返す
    return widgets.VBox([button, select, output])

# ウィジェットを表示する
box = show_widgets()
display(box)

値の変更を監視する

先ほど使ったセレクタのように、値を選択したり入力する系のウィジェットは、入力値が変更されたタイミングでイベントを発火させたいことが多い。 そのような場合、ウィジェットによっては observe() というメソッドでイベントハンドラを登録できる。 以下のサンプルコードでは、widgets.Select で選んだ内容を widgets.Output に表示している。

from traitlets.utils.bunch import Bunch

def show_widgets():
    select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
    output = widgets.Output(layour={'border': '1px solid black'})

    @output.capture()
    def on_value_change(change: Bunch) -> None:
        # 値が変更されたイベントを扱う
        if change['name'] == 'value':
            output.clear_output()
            # 変更前と変更後の値を出力する
            old_value = change['old']
            new_value = change['new']
            print(f'value changed: {old_value} -> {new_value}')

    # 値の変更を監視する
    select.observe(on_value_change)
    display(select, output)

show_widgets()

f:id:momijiame:20210302204328p:plain
値の変更を契機にイベントを発火する

ただ、実際に UI を作ってみると複数のウィジェットが連携することも多い。 その場合は、個別に observe() すると煩雑になりがち。 そういったときは ipywidgets.interactive() を使った方がコードの見通しが良くなると思う。 以下では、IntSliderSelect の両方の値の変更を監視している。

def show_widgets():
    slider = widgets.IntSlider(value=50, min=1, max=100, description='slider:')
    select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
    output = widgets.Output(layour={'border': '1px solid black'})

    @output.capture(clear_output=True)
    def on_value_change(select_value: str, slider_value: int) -> None:
        print(f'value changed: {select_value=}, {slider_value=}')

    # 複数のウィジェットの変更を一度に監視できる
    widgets.interactive(on_value_change, select_value=select, slider_value=slider)
    display(select, slider, output)

show_widgets()

f:id:momijiame:20210302205749p:plain
ipywidgets.interactive() で複数のウィジェットを監視する

ウィジェットの配置を工夫する

デフォルトでは display() 関数に渡された順序で、垂直にウィジェットが配置されていく。 しかし、それだと操作が分かりにくいこともあるので配置を工夫する方法について書く。

たとえば、ウィジェットを横に並べたいときは widgets.Box または widgets.HBox でウィジェットをまとめると良い。 以下のサンプルコードではスライダーとセレクタを widgets.Box を使って横に並べている。 なお、既に登場しているとおり widgets.VBox を使うと縦に並べることができる。

def show_widgets():
    slider = widgets.IntSlider(value=50, min=1, max=100, description='slider:')
    select = widgets.Select(options=['Apple', 'Banana', 'Cherry'])
    output = widgets.Output(layour={'border': '1px solid black'})

    @output.capture(clear_output=True)
    def on_value_change(select_value: str, slider_value: int) -> None:
        print(f'value changed: {select_value=}, {slider_value=}')

    widgets.interactive(on_value_change, select_value=select, slider_value=slider)

    # 横に並べるときはウィジェットを Box や HBox にまとめる
    box = widgets.Box([slider, select])
    display(box, output)

show_widgets()

f:id:momijiame:20210302210339p:plain
widgets.Box でウィジェットを横に並べる

他にも widgets.GridBoxwidgets.Layout を組み合わせてグリッドレイアウトを作ったり。

def show_widgets():
    labels = [widgets.Label(str(i)) for i in range(8)]
    # グリッドレイアウト
    grid_box = widgets.GridBox(labels,
                               layout=widgets.Layout(grid_template_columns="repeat(3, 100px)"))
    display(grid_box)

show_widgets()

f:id:momijiame:20210302210505p:plain
グリッドレイアウト

widgets.Tab を使えばタブを使った UI も作れる。

def show_widgets(num_of_tabs: int = 5):
    # タブ毎のウィジェット
    contents = [widgets.Label(f'This is tab {i}') for i in range(num_of_tabs)] 
    tab = widgets.Tab(children=contents)
    # タブのタイトルを設定する
    for i in range(num_of_tabs):
        tab.set_title(i, f'tab {i}')
    display(tab)

show_widgets()

f:id:momijiame:20210302210541p:plain
widgets.Tab

Matplotlib と連携させる

さて、やっとかって感じだけど Matplotlib との連携について書いていく。 基本的にはこれまでの延長線上にある。 ポイントは、display() 関数で Matplotlib の Figure オブジェクトを描画するところ。 このとき、描画先が widgets.Output オブジェクトになるようにスコープ内で呼んでやれば良い。

以下のサンプルコードではボタンを押す度にグラフの描画を更新している。

from matplotlib import pyplot as plt
import numpy as np


def show_widgets():
    button = widgets.Button(description='Refresh')
    # グラフの描画領域としての Output を用意する
    output = widgets.Output()
    # アクティブな Axes オブジェクトを取得する
    ax = plt.gca()

    # NOTE: デコレータを使っても問題はない
    # @output.capture(clear_output=True, wait=True)
    def on_click(b: widgets.Button) -> None:
        # 前回の描画内容をクリアする
        ax.clear()
        # 描画し直す
        rand_x = np.random.randn(100)
        rand_y = np.random.randn(100)
        ax.plot(rand_x, rand_y, '+')
        #  Output に書き出す
        with output:
            output.clear_output(wait=True)
            display(ax.figure)

    button.on_click(on_click)
    display(button, output)

    # セルの出力に描画されるのを抑制するために一旦アクティブな Figure を破棄する
    plt.close()

    # 最初の 1 回目の描画を手動でトリガーする
    button.click()
    
show_widgets()

f:id:momijiame:20210304222745p:plain
Matplotlib の描画をウィジェットのイベントで更新する

もうひとつサンプルコードを示す。 こちらでは、widgets.IntRangeSlider の値が変更されたタイミングでグラフを描画し直している。 また、先ほどとの違いとしてプロットされるグラフのサイズを大きくしている。

from __future__ import annotations

def show_widgets():
    MIN, MAX, STEPS = 1, 11, 1000
    range_slider = widgets.IntRangeSlider(value=[2, 4], min=MIN, max=MAX, step=1, description='plot range:')
    output = widgets.Output()
    # サイズを指定する場合
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot()

    @output.capture(clear_output=True, wait=True)
    def on_value_change(selected_range: tuple(int, int)) -> None:
        # サイン波を作る
        x = np.linspace(MIN, MAX, num=STEPS)
        y = np.sin(x)
        # 選択範囲を取り出す
        selected_lower, selected_upper = selected_range
        lower = (selected_lower - MIN) * (STEPS // (MAX - MIN))
        upper = (selected_upper - MIN) * (STEPS // (MAX - MIN))
        # 前回の描画内容をクリアする
        ax.clear()
        # 描画する
        ax.plot(x[lower:upper], y[lower:upper])
        # Output に書き出す
        display(ax.figure)

    widgets.interactive(on_value_change, selected_range=range_slider)
    display(range_slider, output)

    plt.close()

show_widgets()

f:id:momijiame:20210304224020p:plain
ウィジェットの変更を元にグラフを描画する

別のウィジェットのイベントでウィジェットの値を更新する

あとはもはや蛇足っぽいけど、あるウィジェットのイベントを契機に別のウィジェットの値を変更するのもよくあるよねってことで。 以下のサンプルコードでは widgets.Button をクリックすると widgets.Text に入力された内容が widgets.Select に追加されていく。

def show_widgets():
    text = widgets.Text()
    select = widgets.Select(options=[])
    output = widgets.Output(layour={'border': '1px solid black'})
    button = widgets.Button(description='Add')

    def on_click_callback(b: widgets.Button) -> None:
        # テキストの入力を選択肢として追加する
        select.options = list(select.options) + [text.value]

    button.on_click(on_click_callback)
    display(text, button, select)

show_widgets()

f:id:momijiame:20210302211057p:plain
別のウィジェットの変更を元にウィジェットを変更する

複数のウィジェットで値を同期する

あとはあるウィジェットと別のウィジェットの値を同期させる、みたいなことも widgets.jslink() 関数を使ってできる。 以下は時系列の情報を表示させるのに便利な widgets.Playwidgets.IntSlider の値を同期させている。

def show_widgets():
    # アニメーション制御
    play = widgets.Play(
        value=50,
        min=1,
        max=100,
        step=1,
        interval=500,  # 更新間隔 (ミリ秒)
        description="play:",
    )
    slider = widgets.IntSlider(value=50, min=1, max=100, description='slider:')
    output = widgets.Output(layour={'border': '1px solid black'})

    # ウィジェットの値を連動させる
    widgets.jslink((play, 'value'), (slider, 'value'))

    @output.capture(clear_output=True)
    def on_value_change(slider_value: int) -> None:
        print(f'value changed: {slider_value=}')

    widgets.interactive(on_value_change, slider_value=slider)
    display(play, slider, output)

show_widgets()

f:id:momijiame:20210302211230p:plain
ウィジェット同士の値を同期させる

とりあえず、そんな感じで。

Python: TensorFlow/Keras で Word2Vec の SGNS を実装してみる

以前のエントリで、Word2Vec の CBOW (ContinuousBagOfWords) モデルを TensorFlow/Keras で実装した。 CBOW は、コンテキスト (周辺語) からターゲット (入力語) を推定する多値分類のタスクが考え方のベースになっている。

blog.amedama.jp

今回扱うのは、CBOW と対を成すモデルの Skip Gram をベースにした SGNS (Skip Gram with Negative Sampling) になる。 Skip Gram では、CBOW とは反対にターゲット (入力語) からコンテキスト (周辺語) を推定する多値分類のタスクを扱う。 ただし、with Negative Sampling と付くことで、タスクを多値分類から二値分類にして計算量を削減している。 SGNS では、ターゲットとコンテキストを入力にして、それらが共起 (Co-occurrence) するか否かを推定することになる。 コーパスを処理して実際に共起する単語ペアを正例、出現頻度を元にランダムにサンプルした単語ペアを共起していない負例としてモデルに与える。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.1
BuildVersion:   20D74
$ python -V  
Python 3.8.7

下準備

まずは、必要なパッケージをインストールする。

$ pip install tensorflow gensim scipy tqdm

そして、コーパスとして PTB (Penn Treebank) データセットをダウンロードしておく。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt

サンプルコード

早速だけど、サンプルコードを以下に示す。 いくらかマジックナンバーがコードに残ってしまっていて、あんまりキレイではないかも。 各エポックの終了時には、WordSim353 データセットを使って単語間類似度で単語埋め込みを評価している。 また、学習が終わった後には、いくつかの単語で類似する単語や類推語の結果を確認している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator
from functools import reduce
from functools import partial
from collections import Counter

import numpy as np  # type: ignore
import tensorflow as tf  # type: ignore
from tensorflow.keras import Model  # type: ignore
from tensorflow.keras.layers import Embedding  # type: ignore
from tensorflow import Tensor  # type: ignore
from tensorflow.data import Dataset  # type: ignore
from tensorflow.keras.optimizers import Adam  # type: ignore
from tensorflow.keras.layers import Dense  # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.callbacks import Callback  # type: ignore
from tensorflow.keras.layers import Dot  # type: ignore
from tensorflow.keras.layers import Flatten  # type: ignore
from tensorflow.keras.losses import BinaryCrossentropy  # type: ignore
from gensim.test.utils import datapath  # type: ignore
from scipy.stats import pearsonr  # type: ignore
from tqdm import tqdm  # type: ignore


class SkipGramWithNegativeSampling(Model):
    """Word2Vec の SGNS モデルを実装したクラス"""

    def __init__(self, vocab_size: int, embedding_size: int):
        super().__init__()

        # ターゲット (入力語) の埋め込み
        self.target_embedding = Embedding(input_dim=vocab_size,
                                          input_shape=(1, ),
                                          output_dim=embedding_size,
                                          name='word_embedding',
                                          )
        # コンテキスト (周辺語) の埋め込み
        self.context_embedding = Embedding(input_dim=vocab_size,
                                           input_shape=(1, ),
                                           output_dim=embedding_size,
                                           name='context_embedding',
                                           )

        self.dot = Dot(axes=1)
        self.output_layer = Sequential([
            Flatten(),
            Dense(1, activation='sigmoid'),
        ])

    def call(self, inputs: Tensor) -> Tensor:
        # ターゲットのベクトルを取り出す
        target_label = inputs[:, 0]
        target_vector = self.target_embedding(target_label)
        # コンテキストのベクトルを取り出す
        context_label = inputs[:, 1]
        context_vector = self.context_embedding(context_label)
        # ターゲットとコンテキストの内積を計算する
        x = self.dot([target_vector, context_vector])
        # 共起したか・していないかを二値の確率にする
        prediction = self.output_layer(x)
        return prediction


def cosine_similarity_one_to_one(x, y):
    """1:1 のコサイン類似度"""
    nx = x / np.sqrt(np.sum(x ** 2))
    ny = y / np.sqrt(np.sum(y ** 2))
    return np.dot(nx, ny)


class WordSimilarity353Callback(Callback):
    """WordSim353 データセットを使って単語間の類似度を評価するコールバック"""

    def __init__(self, word_id_table: dict[str, int]):
        super().__init__()

        self.word_id_table = word_id_table
        self.model = None

        # 評価用データを読み込む
        self.eval_data = []
        wordsim_filepath = datapath('wordsim353.tsv')
        with open(wordsim_filepath, mode='r') as fp:
            # 最初の 2 行はヘッダなので読み飛ばす
            fp.readline()
            fp.readline()
            for line in fp:
                word1, word2, sim_score = line.strip().split('\t')
                self.eval_data.append((word1, word2, float(sim_score)))

    def set_model(self, model):
        self.model = model

    def on_epoch_end(self, epoch, logs=None):
        # モデルから学習させたレイヤーの重みを取り出す
        model_layers = {layer.name: layer for layer in self.model.layers}
        embedding_layer = model_layers['word_embedding']
        word_vectors = embedding_layer.weights[0].numpy()

        # 評価用データセットに含まれる単語間の類似度を計算する
        labels = []
        preds = []
        for word1, word2, sim_score in self.eval_data:
            # Out-of-Vocabulary な単語はスキップ
            if word1 not in self.word_id_table or word2 not in self.word_id_table:
                continue

            # コサイン類似度を計算する
            word1_vec = word_vectors[self.word_id_table[word1]]
            word2_vec = word_vectors[self.word_id_table[word2]]
            pred = cosine_similarity_one_to_one(word1_vec, word2_vec)
            preds.append(pred)
            # 正解ラベル
            labels.append(sim_score)

        # ピアソンの相関係数を求める
        r_score = pearsonr(labels, preds)[0]
        print(f'Pearson\'s r score with WordSim353: {r_score}')


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    with open(filepath, mode='r') as fp:
        for line in fp:
            # 改行コードは取り除く
            yield line.rstrip()


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語はスキップする
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応するインデックスに変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def extract_contexts(word_ids: Tensor, window_size: int) -> Tensor:
    """コンテキストの単語をラベル形式で得る"""
    target_ids = word_ids[:-window_size]
    context_ids = word_ids[window_size:]
    # ウィンドウサイズ分ずらした Tensor 同士をくっつける
    co_occurrences = tf.transpose([target_ids, context_ids])
    # 逆順でも共起したのは同じ
    reversed_co_occurrences = tf.transpose([context_ids, target_ids])
    concat_co_occurrences = tf.concat([co_occurrences,
                                       reversed_co_occurrences],
                                      axis=0)
    # ラベル (正例なので 1)
    labels = tf.ones_like(concat_co_occurrences[:, 0],
                          dtype=tf.int8)
    return concat_co_occurrences, labels


def positive_pipeline(ds: Dataset, window_size: int) -> Dataset:
    """正例を供給するパイプライン"""

    ctx_ds_list = []
    for window in range(1, window_size + 1):
        partialed = partial(extract_contexts, window_size=window)
        # ウィンドウサイズごとに共起した単語を抽出する
        mapped_ds = ds.map(partialed,
                           num_parallel_calls=tf.data.AUTOTUNE,
                           deterministic=False)
        ctx_ds_list.append(mapped_ds)

    # すべての Dataset をつなげる
    context_ds = reduce(lambda l_ds, r_ds: l_ds.concatenate(r_ds),
                        ctx_ds_list)
    return context_ds


def word_frequency(sentences: Iterable[Iterable[str]], word_id_table: dict[str, int]) -> dict[str, int]:
    """単語の出現頻度を調べる"""
    counter = Counter(word for words in sentences for word in words)
    id_count = {word_id_table[word]: count for word, count in counter.items()}
    # ID 順でソートされた出現頻度
    sorted_freq = np.array([count for _, count in sorted(id_count.items(), key=lambda x: x[0])],
                           dtype=np.int32)
    return sorted_freq


def noisy_word_pairs(word_proba: list[float], eps: float = 1e-6) -> Iterator[Tensor]:
    """単語の出現頻度を元にネガティブサンプルの単語ペアを生成するジェネレータ関数"""
    p = tf.constant(word_proba) + eps
    logits = tf.math.log([p, p])
    while True:
        word_pair = tf.random.categorical(logits, num_samples=2**12)
        word_pair_t = tf.transpose(word_pair)
        # ラベル (負例なので 0)
        labels = tf.zeros_like(word_pair_t[:, 0], dtype=tf.int8)
        yield word_pair_t, labels


def negative_pipeline(sentences: Iterable[Iterable[str]], word_id_table: dict[str, int]) -> Dataset:
    """負例を供給するパイプライン"""
    # 単語の出現頻度からサンプリングテーブルを求める
    word_freq = word_frequency(sentences, word_id_table)
    word_proba = word_freq / np.sum(word_freq)
    # 0.75 乗することで、出現頻度の低い単語をちょっとだけ選ばれやすくする
    ADJUST_FACTOR = 0.75
    adjusted_word_proba = np.power(word_proba, ADJUST_FACTOR)
    adjusted_word_proba /= np.sum(adjusted_word_proba)
    # 単語の出現頻度を元にノイジーワードペアを生成する
    negative_ds = Dataset.from_generator(lambda: noisy_word_pairs(adjusted_word_proba),
                                         (tf.int32, tf.int8))

    return negative_ds


def batched_concat(pos_tensor: Tensor, neg_tensor: Tensor) -> Tensor:
    """正例と負例を直列に結合する関数"""
    pos_word_pairs, pos_labels = pos_tensor
    neg_word_pairs, neg_labels = neg_tensor
    word_pairs = tf.concat((pos_word_pairs, neg_word_pairs), axis=0)
    labels = tf.concat((pos_labels, neg_labels), axis=0)
    return word_pairs, labels


def skip_grams_with_negative_sampling_dataset(positive_ds: Dataset,
                                              negative_ds: Dataset,
                                              negative_sampling_ratio: int):
    """データセットで共起した単語ペアを正例、出現頻度を元にランダムに選んだ単語ペアを負例として供給するパイプライン"""
    positive_batch_size = 1024  # 正例の供給単位
    batched_pos_ds = positive_ds.unbatch().batch(positive_batch_size)
    batched_neg_ds = negative_ds.unbatch().batch(positive_batch_size * negative_sampling_ratio)
    zipped_ds = tf.data.Dataset.zip((batched_pos_ds, batched_neg_ds))
    concat_ds = zipped_ds.map(batched_concat,
                              num_parallel_calls=tf.data.AUTOTUNE,
                              deterministic=False).unbatch()
    # バッチサイズ単位でシャッフルする
    shuffle_buffer_size = positive_batch_size * (negative_sampling_ratio + 1)
    shuffled_ds = concat_ds.shuffle(buffer_size=shuffle_buffer_size)
    return shuffled_ds


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """N:N のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(similarities: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を得る"""
    similar_word_ids = np.argsort(similarities)[::-1]
    top_n_word_ids = similar_word_ids[:top_n]
    top_n_word_sims = similarities[similar_word_ids][:top_n]
    return zip(top_n_word_ids, top_n_word_sims)


def cosine_similarity_one_to_many(word_vector: np.ndarray,
                                  word_vectors: np.ndarray,
                                  eps: float = 1e-8):
    """1:N のコサイン類似度"""
    normalized_word_vector = word_vector / np.sqrt(np.sum(word_vector ** 2))
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    return np.dot(normalized_word_vector, normalized_word_vectors.T)


def main():
    # Penn Treebank コーパスを読み込む
    train_sentences = load_corpus('ptb.train.txt')

    # コーパスを単語に分割する
    train_corpus_words = list(sentences_to_words(train_sentences))

    # 単語に ID を振る
    word_to_id = word_id_mappings(train_corpus_words)

    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # データセットを準備する
    # ID に変換したコーパスを行ごとに読み出せるデータセット
    train_word_ids_ds = Dataset.from_generator(lambda: words_to_ids(train_corpus_words, word_to_id),
                                               tf.int32,
                                               output_shapes=[None])

    # 共起したと判断する単語の距離
    CONTEXT_WINDOW_SIZE = 5
    positive_ds = positive_pipeline(train_word_ids_ds, window_size=CONTEXT_WINDOW_SIZE)
    negative_ds = negative_pipeline(train_corpus_words, word_to_id)

    # 正例に対する負例の比率 (一般的に 5 ~ 10)
    NEGATIVE_SAMPLING_RATIO = 5
    train_ds = skip_grams_with_negative_sampling_dataset(positive_ds,
                                                         negative_ds,
                                                         NEGATIVE_SAMPLING_RATIO)

    # モデルとタスクを定義する
    EMBEDDING_SIZE = 100  # 埋め込み次元数
    criterion = BinaryCrossentropy()
    optimizer = Adam(learning_rate=1e-2)
    model = SkipGramWithNegativeSampling(vocab_size, EMBEDDING_SIZE)
    model.compile(optimizer=optimizer,
                  loss=criterion,
                  )

    # データセットを準備する
    TRAIN_BATCH_SIZE = 2 ** 14
    train_ds = train_ds.batch(TRAIN_BATCH_SIZE)
    train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    train_ds = train_ds.cache()

    print('caching train data...')
    num_of_steps_per_epoch = sum(1 for _ in tqdm(train_ds))
    print(f'{num_of_steps_per_epoch=}')
    train_ds = train_ds.repeat()

    callbacks = [
        # WordSim353 データセットを使って単語間の類似度を相関係数で確認する
        WordSimilarity353Callback(word_to_id),
    ]
    # 学習する
    model.fit(train_ds,
              steps_per_epoch=num_of_steps_per_epoch,
              epochs=5,
              callbacks=callbacks,
              verbose=1,
              )

    # モデルから学習させたレイヤーの重みを取り出す
    model_layers = {layer.name: layer for layer in model.layers}
    embedding_layer = model_layers['word_embedding']
    word_vectors = embedding_layer.weights[0].numpy()

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)
    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        similarities = cs_matrix[target_word_id, :]
        top_n_most_similars = most_similar_words(similarities, top_n=6)
        # 先頭は自分自身になるので取り除く
        next(top_n_most_similars)
        # 単語と類似度を表示する
        for rank, (similar_word_id, similarity) in enumerate(top_n_most_similars, start=1):
            similar_word = id_to_word[similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)

    # いくつか類推語を確認してみる
    analogies = [
        ('king', 'man', 'woman'),
        ('took', 'take', 'go'),
        ('cars', 'car', 'child'),
        ('better', 'good', 'bad'),
    ]
    for word1, word2, word3 in analogies:
        print(f'The most similar words of "{word1}" - "{word2}" + "{word3}"')
        word1_vec = word_vectors[word_to_id[word1]]
        word2_vec = word_vectors[word_to_id[word2]]
        word3_vec = word_vectors[word_to_id[word3]]
        new_vec = word1_vec - word2_vec + word3_vec
        similarities = cosine_similarity_one_to_many(new_vec, word_vectors)
        top_n_most_similars = most_similar_words(similarities)
        # 単語と類似度を表示する
        for rank, (similar_word_id, similarity) in enumerate(top_n_most_similars, start=1):
            similar_word = id_to_word[similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記を実行してみよう。 コーパスの前処理に結構時間がかかる。 今回使った環境では 1 分 47 秒かかった。 学習に関しては、WordSim353 データセットを使った内省的評価で 4 エポック目にはサチる感じ。

$ python sgns.py
caching train data...
2802it [01:47, 26.01it/s]
num_of_steps_per_epoch=2802
Epoch 1/5
2802/2802 [==============================] - 50s 18ms/step - loss: 0.3845
Pearson's r score with WordSim353: 0.2879142572631919
Epoch 2/5
2802/2802 [==============================] - 47s 17ms/step - loss: 0.3412
Pearson's r score with WordSim353: 0.367370159567898
Epoch 3/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3307
Pearson's r score with WordSim353: 0.3898624474454972
Epoch 4/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3248
Pearson's r score with WordSim353: 0.39416965929977094
Epoch 5/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3211
Pearson's r score with WordSim353: 0.39503500234447125
The most similar words of "you"
TOP 1: your = 0.6509251594543457
TOP 2: i = 0.6414255499839783
TOP 3: we = 0.569475531578064
TOP 4: re = 0.5692735314369202
TOP 5: someone = 0.5565952658653259
--------------------------------------------------
The most similar words of "year"
TOP 1: earlier = 0.5841510891914368
TOP 2: month = 0.5817509293556213
TOP 3: period = 0.572060763835907
TOP 4: ago = 0.5633273720741272
TOP 5: last = 0.5298276543617249
--------------------------------------------------
The most similar words of "car"
TOP 1: cars = 0.6105561852455139
TOP 2: luxury = 0.5986034870147705
TOP 3: truck = 0.563898503780365
TOP 4: ford = 0.5133273005485535
TOP 5: auto = 0.5039612054824829
--------------------------------------------------
The most similar words of "toyota"
TOP 1: infiniti = 0.6949869394302368
TOP 2: honda = 0.6433103084564209
TOP 3: mazda = 0.6296555995941162
TOP 4: lexus = 0.6275536417961121
TOP 5: motor = 0.6175971627235413
--------------------------------------------------
The most similar words of "king" - "man" + "woman"
TOP 1: king = 0.7013309001922607
TOP 2: woman = 0.6033017039299011
TOP 3: burger = 0.4819037914276123
TOP 4: md = 0.46715104579925537
TOP 5: egg = 0.45565301179885864
--------------------------------------------------
The most similar words of "took" - "take" + "go"
TOP 1: took = 0.6121359467506409
TOP 2: go = 0.6096295714378357
TOP 3: stands = 0.4905664920806885
TOP 4: hammack = 0.45641931891441345
TOP 5: refuge = 0.4478893578052521
--------------------------------------------------
The most similar words of "cars" - "car" + "child"
TOP 1: child = 0.8112377524375916
TOP 2: women = 0.5026379823684692
TOP 3: cars = 0.4889577627182007
TOP 4: patients = 0.4796864092350006
TOP 5: custody = 0.47176921367645264
--------------------------------------------------
The most similar words of "better" - "good" + "bad"
TOP 1: bad = 0.6880872249603271
TOP 2: better = 0.510699987411499
TOP 3: involved = 0.45395123958587646
TOP 4: serious = 0.4192639887332916
TOP 5: hardest = 0.41672468185424805
--------------------------------------------------

CBOW のときは WordSim353 の評価が 0.25 前後だったことを考えると、今回の 0.39 前後という結果はかなり良く見える。 ただし、CBOW の実験ではコンテキストウィンドウサイズが 1 だったのに対し、上記の SGNS では 5 を使っている。 同じコンテキストウィンドウサイズの 1 に揃えると、評価指標は 0.28 前後まで落ちる。

学習が終わってから確認している類似語は、CBOW のときと同じでなかなか良い感じに見える。 一方で、類推語の方はほとんど上手くいっていない。 類推語を解ける位の単語埋め込みを学習するには、もっと大きなコーパスが必要なのだろうか?

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

Python: TensorFlow の Dataset API を試す

ニューラルネットワークの並列計算には、今や GPU や TPU を使うのが一般的になっている。 一方で、それらのデバイスにデータを供給する部分がボトルネックにならないよう気をつけなければいけない。 具体的には、デバイスが計算している最中に、次に計算するデータを用意しておく必要がある。 今回は、TensorFlow で効率的なデータ供給を実現するために用意された Dataset API を試してみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.1
BuildVersion:   20D74
$ python -V             
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、TensorFlow をインストールしておく。

$ pip install tensorflow

インストールできたら Python の REPL を起動する。

$ python

そして、TensorFlow をインポートする。

>>> import tensorflow as tf

Dataset API について

Dataset API を使うと、TensorFlow/Keras で定義したモデルに対して、効率的に Tensor を供給するためのパイプライン処理が記述できる。 たとえば、tensorflow.keras.Modelfit() メソッドには NumPy 配列や Tensor オブジェクト以外にも、Dataset というオブジェクトを渡すことができる。

www.tensorflow.org

Dataset オブジェクトの作り方

Dataset オブジェクトはいくつかの作り方がある。

Dataset.range()

一番シンプルなのが Dataset.range() を使うやり方かな。 これはおそらくパイプラインをデバッグするときに活躍する。

>>> ds = tf.data.Dataset.range(1, 6)
>>> ds
<RangeDataset shapes: (), types: tf.int64>

Dataset オブジェクトはイテラブルなので iter() 関数を使うとイテレータが返ってくる。

>>> it = iter(ds)
>>> it
<tensorflow.python.data.ops.iterator_ops.OwnedIterator object at 0x14cd26d30>

イテレータに対して next() を使うと、Tensor オブジェクトが得られる。

>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=1>
>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=2>
>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=3>

中身を一通り確認したいときは as_numpy_iterator() メソッドを使うのが楽かな。

>>> list(ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

Dataset.from_tensor_slices()

既存の配列や辞書なんかから作りたいときは Dataset.from_tensor_slices() を使うと良い。 たとえば、配列から作るときはこんな感じ。

>>> slices = tf.data.Dataset.from_tensor_slices([2, 3, 5, 7, 11])
>>> list(slices.as_numpy_iterator())
[2, 3, 5, 7, 11]

多次元のときはタプルか配列でネストする。

>>> slices = tf.data.Dataset.from_tensor_slices([(1, 1), (2, 3), (5, 8), (13, 21)])
>>> list(slices.as_numpy_iterator())
[array([1, 1], dtype=int32), array([2, 3], dtype=int32), array([5, 8], dtype=int32), array([13, 21], dtype=int32)]

単純な Tensor だけでなく、辞書にネストした Tensor を要素として返すこともできるらしい。

>>> elements = {
...     'x': ([1, 4, 7], [2, 5, 8]),
...     'y': [3, 6, 9],
... }
>>> dict_ds = tf.data.Dataset.from_tensor_slices(elements)
>>> list(dict_ds.as_numpy_iterator())
[{'x': (1, 2), 'y': 3}, {'x': (4, 5), 'y': 6}, {'x': (7, 8), 'y': 9}]

Dataset.from_tensor()

既存の Tensor を流用する API としては Dataset.from_tensor() もある。 こちらは各 Tensor の形状が一致していなくてもエラーにならない。

>>> t1 = tf.constant([1, 2, 3])
>>> t2 = tf.constant([4, 5])
>>> ts_ds = tf.data.Dataset.from_tensors((t1, t2))
>>> list(ts_ds.as_numpy_iterator())
[(array([1, 2, 3], dtype=int32), array([4, 5], dtype=int32))]

Dataset.from_generator()

割とよく使いそうなのが Dataset.from_generator() で、これはジェネレータを元に Dataset オブジェクトが作れる。

何か適当にジェネレータ関数を用意する。

>>> def g():
...     yield 1
...     yield 2
...     yield 3
... 

あとはそれを元に作れる。 ジェネレータをそのまま渡すと、一回読み出して終わりになってしまうので lambda 式と組み合わせるのが良いと思う。

>>> g_ds = tf.data.Dataset.from_generator(lambda: g(), output_types=tf.uint32)
>>> list(g_ds.as_numpy_iterator())
[1, 2, 3]

多次元だったりデータ型が複合的なときはこんな感じ。

>>> def g():
...     yield ('a', 1)
...     yield ('b', 2)
...     yield ('c', 3)
... 
>>> g_ds = tf.data.Dataset.from_generator(lambda: g(), output_types=(tf.string, tf.uint32))
>>> list(g_ds.as_numpy_iterator())
[(b'a', 1), (b'b', 2), (b'c', 3)]

Dataset.list_files()

ファイルを元に Dataset オブジェクトを作りたいときは Dataset.list_files() を使うのが便利そう。

たとえばカレンとワーキングディレクトリに、次のようにテキストファイルを用意しておく。

$ echo "Hello, World" > greet.txt
$ echo "Konnichiwa, Sekai" > aisatsu.txt

Dataset.list_files() にファイル名のパターンを入れると、それに該当するファイル名が抽出される。

>>> files_ds = tf.data.Dataset.list_files('*.txt')
>>> list(files_ds.as_numpy_iterator())
[b'./aisatsu.txt', b'./greet.txt']

例えば、これを後述する map() とかで処理すると並列化が楽にできる。

基本的な情報を得る

Dataset オブジェクトには、基本的な情報を得るためのアトリビュートがいくつかある。

Dataset#elementspec

たとえば、得られるデータの形状や型は elementspec というプロパティで確認できる。

>>> ds.element_spec
TensorSpec(shape=(), dtype=tf.int64, name=None)

Dataset#cardinality()

サイズが固定だったり、いくつかの条件を満たすときは cardinality() というメソッドで要素の種類が得られる。

>>> ds.cardinality()
<tf.Tensor: shape=(), dtype=int64, numpy=5>

len()

Dataset オブジェクトのサイズも、あらかじめ分かっているときに関しては得られる。

>>> len(ds)
5

とはいえ、加工するとすぐに得られなくなるので使い所はなかなか難しい。

>>> len(ds.repeat())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 452, in __len__
    raise TypeError("dataset length is infinite.")
TypeError: dataset length is infinite.

データセットを加工する

ここでは、Dataset オブジェクトを加工する方法について書く。 加工には、関数型プログラミング的なインターフェースが用意されている。

Dataset#filter()

特定の条件を満たす要素だけを残したいときには filter() メソッドを使う。 以下は偶数の要素だけを抽出する例。

>>> even_ds = ds.filter(lambda x: x % 2 == 0)
>>> list(even_ds.as_numpy_iterator())
[2, 4]

Dataset#reduce()

要素を集約したいときは reduce() メソッドが使える。 以下は要素をすべて足していく例。

>>> ds.reduce(initial_state=tf.Variable(0, dtype=tf.int64),
...           reduce_func=lambda cumulo, current: cumulo + current)
<tf.Tensor: shape=(), dtype=int64, numpy=15>

Dataset#map()

すべての要素に処理を適用したいときは map() メソッドを使う。 以下はすべての要素を 2 倍する例。

>>> double_ds = ds.map(lambda x: x * 2)
>>> list(double_ds.as_numpy_iterator())
[2, 4, 6, 8, 10]

map() メソッドは処理を並列化できる。 並列度は num_parallel_calls オプションで指定する。 また、順序を保持する必要があるかを deterministic オプションで指定する。 以下は、4 並列で順序を保持しない場合の例。

>>> double_ds = ds.map(lambda x: x * 2, num_parallel_calls=4, deterministic=False)
>>> list(double_ds.as_numpy_iterator())
[4, 2, 6, 8, 10]

24 の順番が入れ替わっていることがわかる。 パフォーマンスを追い求めるときは deterministic=False が推奨されている。

また、並列度をランタイムで良い感じに決めてほしいときは tf.data.AUTOTUNE という定数を指定すれば良いっぽい。

>>> ds.map(lambda x: x * 2, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)

Dataset#flat_map()

要素が入れ子構造になっているときに、開きながら処理したいときは flat_map() を使うと良い。 例えば、次のような 2 次元の要素を返す Dataset があったとする。

>>> nested_ds = tf.data.Dataset.from_tensor_slices([[1, 2], [3, 4], [5, 6]])
>>> list(nested_ds.as_numpy_iterator())
[array([1, 2], dtype=int32), array([3, 4], dtype=int32), array([5, 6], dtype=int32)]

ただ単に flatten したいなら、次のように関数の中で Dataset オブジェクトを返すようにする。

>>> flatten_ds = nested_ds.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x))
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

flatten して、さらに各要素に処理をしたいなら、メソッドチェーンしてこんな感じ。 もちろん、flatten した Dataset オブジェクトに map() メソッドを呼んでも良いんだけど。

>> flatten_double_ds = nested_ds.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x).map(lambda x: x * 2))
>>> list(flatten_double_ds.as_numpy_iterator())
[2, 4, 6, 8, 10, 12]

Dataset#interleave()

flat_map() の並列化版とでもいえるようなのが interleave() メソッド。

先ほどと同じように、ただ flatten するような処理をしてみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x))
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 5, 2, 4, 6]

すると、要素の順番が入れ替わっていることがわかる。 これは、デフォルトで処理が並列化されているため。

interleave() と同じ結果にしたいときは cycle_length1 に指定する必要がある。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=1)
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

cycle_length って何よ?って話になるんだけど、これは Dataset オブジェクトをキューのように処理する (仮想的な) Consumer の数を指している。 例えば、cycle_length2 のときは、キューが交互に処理されていくということ。 もちろん、新しい Dataset オブジェクトに要素が積まれる順番は早いもの勝ち。 試しに cycle_length=2 で処理してみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=2)
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 2, 4, 5, 6]

要素の順番が入れ替わった。

一度に処理する要素の数も block_length で指定できる。 試しに 2 を指定してみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=2, block_length=2)
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

結果からは分かりにくいけど、上記は [1, 2] -> [3, 4] -> [5, 6] という単位で処理されているはず。

ちょっとややこしいのは num_parallel_calls っていうオプションもあるところ。 ドキュメントによると、こちらのオプションに tf.data.AUTOTUNE を指定しておけば cycle_length は自動的に最大の並列度になるらしい。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 5, 2, 4, 6]

要するに、以下のように使えば複数のファイルを並列で処理できて良い感じってことみたい。 複数のテキストファイルに書かれている文章を、一つの Dataset オブジェクトとしてまとめる例。

>>> files_ds = tf.data.Dataset.list_files('*.txt')
>>> lines_ds = files_ds.interleave(lambda filepath: tf.data.TextLineDataset(filepath), num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
>>> list(lines_ds.as_numpy_iterator())
[b'Konnichiwa, Sekai', b'Hello, World']

いきなり tf.data.TextLineDataset が登場してるけど、これはテキストファイルを処理するために用意された Dataset オブジェクトの子クラス。 ファイルに書かれたテキストを一行ずつ読み出せる。 こういった、特定の処理に特化したクラスもいくつか用意されているが、ここでは詳しく扱わない。

Dataset#shuffle()

要素の順番を意図的に入れ替えたいときは shuffle() メソッドを使う。 ただし、ストリーミング的にデータを処理するので、バッファ単位でシャッフルすることになる。

バッファというのは、いわばキューのようなもので、そのキューの中で要素がランダムにシャッフルされる。 たとえば buffer_size1 なら、要素の順番は決して入れ替わらない。

>>> list(ds.shuffle(buffer_size=1).as_numpy_iterator())
[1, 2, 3, 4, 5]

buffer_size2 なら、要素の順番が入れ替わる。 ただし、最初の方にある要素が最後の方までいくのには、相当な運が必要になる。 なぜならシャッフルの処理で、そのままの位置にあっては取り出されてしまうため、連続で後ろに移動する必要がある。

>>> list(ds.shuffle(buffer_size=2).as_numpy_iterator())
[2, 1, 3, 5, 4]

そのため、バッファのサイズが大きいほど、より広い範囲で値が入れ替わりやすくなる。

>>> list(ds.shuffle(buffer_size=10).as_numpy_iterator())
[2, 1, 4, 3, 5]

Dataset#enumerate()

組み込み関数の enumerate() と同じ概念だけど、enumerate() メソッドを使うと連番の添字を付与できる。

>> enum_ds = ds.enumerate(start=100)
>>> list(enum_ds.as_numpy_iterator())
[(100, 1), (101, 2), (102, 3), (103, 4), (104, 5)]

Dataset#window()

window() メソッドを使うと、指定した数だけずらした要素が取れる。 活躍するとしたら、自然言語処理の分布仮説に関わる処理をするところかな。

例えば、1 つずらした要素を 2 つずつ得たい場合は次のようにする。

>>> shifted_ds = ds.window(size=2, shift=1)

window() メソッドはちょっとクセがあって、そのままだと as_numpy_iterator() メソッドが実行できない。

>>> shifted_ds.as_numpy_iterator()
Traceback (most recent call last):
...
TypeError: Dataset.as_numpy_iterator() does not support datasets containing <class 'tensorflow.python.data.ops.dataset_ops.DatasetV2'>

これは、Dataset オブジェクトから返る要素自体が Dataset オブジェクトになっているため。 なので、こんな感じで確認することになる。

>>> for window in shifted_ds:
...     print(list(window.as_numpy_iterator()))
... 
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5]

上記を見てわかるとおり、そのままだと末尾に size に満たない要素が得られてしまう。 これを避けるためには drop_remainder を有効にする。

>>> shifted_ds = ds.window(size=2, shift=1, drop_remainder=True)
>>> for window in shifted_ds:
...     print(list(window.as_numpy_iterator()))
... 
[1, 2]
[2, 3]
[3, 4]
[4, 5]

データ供給の効率を向上させる

ここでは Dataset を扱う上で、データ供給を効率的に行うために必要な処理を書いていく。 なお、処理を並列化する num_parallel_calls みたいなオプションについては、既にいくつかのメソッドで紹介した。

Dataset#prefetch()

Dataset オブジェクトの各要素は、イテレータから要素を取り出そうとするタイミングで評価される。 パイプラインが長大だとレイテンシが増加するので、prefetch() メソッドを使うことで先に読み込んでおくことができる。

>>> prefetched_ds = ds.prefetch(buffer_size=5)
>>> list(prefetched_ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

Dataset#cache()

cache() メソッドを使うと、一度呼び出した Dataset オブジェクトの要素をファイルにキャッシュできる。

要素をシャッフルする加工が入ったパイプラインを使って動作を確認してみよう。 そのままだと、毎回シャッフルの処理が評価されるため得られる結果が異なる。

>>> shuffled_ds = ds.shuffle(buffer_size=10)
>>> list(shuffled_ds.as_numpy_iterator())
[4, 1, 2, 5, 3]
>>> list(shuffled_ds.as_numpy_iterator())
[5, 1, 4, 3, 2]

一方で cache() すると、一旦要素をすべて読み出し尽くした Dataset オブジェクトからは何度読んでも同じ順番で要素が返ってくる。

>>> cached_ds = shuffled_ds.cache()
>>> list(cached_ds.as_numpy_iterator())
[5, 1, 3, 4, 2]
>>> list(cached_ds.as_numpy_iterator())
[5, 1, 3, 4, 2]

ランダムな処理ではデータの多様性が失われる恐れはあるものの、時間のかかる処理がパイプラインに含まれるときには有効なはず。 cache() メソッドはお手軽な一方で、もうちょっと真面目にやるなら TFRecord 形式で永続化した方が良いかもしれない。

www.tensorflow.org

Dataset#shard()

データベースの負荷分散技術としてもよく知られているシャーディング。 shard() メソッドを使うと、データセットを複数に分割できる。 たぶん、分散学習とか複数のデバイスで学習するときに使われるんだと思う。

例えば、以下はシャード数に 2 を指定してデータセットを分割した場合。

>>> ds1of2 = ds.shard(num_shards=2, index=0)
>>> ds2of2 = ds.shard(num_shards=2, index=1)

確認すると、データが交互に分けられている。

>>> list(ds1of2.as_numpy_iterator())
[1, 3, 5]
>>> list(ds2of2.as_numpy_iterator())
[2, 4]

Dataset#batch()

現在のニューラルネットワークの学習で一般的に用いられるミニバッチ勾配降下法では、複数の要素をミニバッチという単位で読み込む。 batch() メソッドを使うと、Dataset オブジェクトからミニバッチの単位毎にデータを読み込めるようになる。

>>> batched_ds = ds.batch(3)
>>> list(batched_ds.as_numpy_iterator())
[array([1, 2, 3]), array([4, 5])]

drop_remainder を有効にすると、バッチサイズに満たない残りの要素は読み出されなくなる。

>>> batched_ds = ds.batch(3, drop_remainder=True)
>>> list(batched_ds.as_numpy_iterator())
[array([1, 2, 3])]

ちなみに batch() メソッドを適用した Dataset オブジェクトは、要素がミニバッチの単位で処理される点に注意が必要となる。 たとえば、prefetch() メソッドを呼んだときはバッファが 1 つ消費される毎にミニバッチが 1 回読み込まれることになる。

Dataset#padded_batch()

Dataset オブジェクトが要素毎に shape が異なる Tensor を返す場合には、padded_batch() を使うと揃えることができる。

例えば、以下のような感じで各要素の次元が異なる場合を考える。 自然言語処理とかで文の長さが違うとかかな。

>>> mixed_shape_ds = ds.map(lambda x: tf.fill([x], x))
>>> list(mixed_shape_ds.as_numpy_iterator())
[array([1]), array([2, 2]), array([3, 3, 3]), array([4, 4, 4, 4]), array([5, 5, 5, 5, 5])]

例えば 2 つずつ要素を取り出す場合で試してみる。

>>> padded_ds = mixed_shape_ds.padded_batch(2)

取り出される要素を確認すると、各バッチ毎に次元数が揃っていることがわかる。

>>> from pprint import pprint
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[1, 0],
       [2, 2]]),
 array([[3, 3, 3, 0],
       [4, 4, 4, 4]]),
 array([[5, 5, 5, 5, 5]])]

padded_shapes オプションを使えば、明示的にパディングするサイズを指定することもできる。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10))
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [2, 2, 0, 0, 0, 0, 0, 0, 0, 0]]),
 array([[3, 3, 3, 0, 0, 0, 0, 0, 0, 0],
       [4, 4, 4, 4, 0, 0, 0, 0, 0, 0]]),
 array([[5, 5, 5, 5, 5, 0, 0, 0, 0, 0]])]

パディングに使う値は padding_values オプションで指定できる。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10), padding_values=tf.constant(-1, dtype=tf.int64))
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[ 1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
       [ 2,  2, -1, -1, -1, -1, -1, -1, -1, -1]]),
 array([[ 3,  3,  3, -1, -1, -1, -1, -1, -1, -1],
       [ 4,  4,  4,  4, -1, -1, -1, -1, -1, -1]]),
 array([[ 5,  5,  5,  5,  5, -1, -1, -1, -1, -1]])]

バッチサイズに満たない要素を切り捨てる場合に drop_remainder オプションを使うのは batch() メソッドと同様。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10), padding_values=tf.constant(-1, dtype=tf.int64), drop_remainder=True)
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[ 1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
       [ 2,  2, -1, -1, -1, -1, -1, -1, -1, -1]]),
 array([[ 3,  3,  3, -1, -1, -1, -1, -1, -1, -1],
       [ 4,  4,  4,  4, -1, -1, -1, -1, -1, -1]])]

Dataset#unbatch()

batch() メソッドとは反対に、各要素単位で読み込めるように戻すのが unbatch() メソッドになる。 パフォーマンス向上などを目的として、パイプラインの中にバッチ単位で処理する箇所があると、意外と使うことになる。

>>> batched_ds = ds.batch(3)
>>> unbatched_ds = batched_ds.unbatch()
>>> list(unbatched_ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

ちなみに入れ子になった構造を flatten するときにも使えたりする。

>>> nested_ds = tf.data.Dataset.from_tensor_slices([[1, 2], [3, 4], [5, 6]])
>>> list(nested_ds.unbatch().as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

複数のデータセットをまとめる

パイプラインによっては、複数のデータセットをひとまとめにして扱いたいことがある。

Dataset#concatenate()

複数の Dataset オブジェクトを直列につなげたいときは concatenate() メソッドを使う。

>>> ds1 = tf.data.Dataset.range(0, 5)
>>> ds2 = tf.data.Dataset.range(5, 10)
>>> concat_ds = ds1.concatenate(ds2)
>>> list(concat_ds.as_numpy_iterator())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Dataset.zip()

concatenate() とは反対に、並列にまとめたいときは zip() が使える。

>>> ds1 = tf.data.Dataset.range(0, 5)
>>> ds2 = tf.data.Dataset.range(5, 10)
>>> zip_ds = tf.data.Dataset.zip((ds1, ds2))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)]

3 つ以上をまとめることもできる。 また、要素数に違いがあるときは短いものに合わせられる。

>>> ds3 = tf.data.Dataset.range(10, 19)
>>> zip_ds = tf.data.Dataset.zip((ds1, ds2, ds3))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5, 10), (1, 6, 11), (2, 7, 12), (3, 8, 13), (4, 9, 14)]

まとめる Dataset オブジェクトは shape が異なっていても問題ない。

>>> zip_ds = tf.data.Dataset.zip((ds1, ds2, ds3.batch(2)))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5, array([10, 11])), (1, 6, array([12, 13])), (2, 7, array([14, 15])), (3, 8, array([16, 17])), (4, 9, array([18]))]

その他の処理

その他の分類が難しい処理たち。

Dataset#repeat()

repeat() メソッドを使うと、Dataset オブジェクトが返す内容を繰り返すようにできる。

例えば、2 回繰り返すようにしてみる。

>>> repeat2_ds = ds.repeat(2)
>>> list(repeat2_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

繰り返しの回数を明示的に指定しない場合には無限ループになる。

>>> inf_loop_ds = ds.repeat()
>>> [next(it).numpy() for _ in range(20)]
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

長さやカーディナリティは取れなくなる。

>>> len(inf_loop_ds)
Traceback (most recent call last):
...
TypeError: dataset length is infinite.
>>> inf_loop_ds.cardinality().numpy()
-1

Dataset#apply()

Dataset オブジェクトを加工するための一連の処理を、一度に適用するのに Dataset#apply() メソッドが使える。

例えば、偶数の要素だけを残して値を 2 倍にする処理を考える。 そのためには、次のように Dataset オブジェクトを受け取って、処理を適用した Dataset オブジェクトを返す関数を定義する。

>>> def pipeline(ds: tf.data.Dataset) -> tf.data.Dataset:
...     odd_ds = ds.filter(lambda x: x % 2 != 0)
...     double_ds = odd_ds.map(lambda x: x * 2)
...     return double_ds
... 

あとは、関数を引数にして apply() メソッドを呼ぶだけ。

>>> applied_ds = ds.apply(pipeline)
>>> list(applied_ds.as_numpy_iterator())
[2, 6, 10]

ただ、これってメソッドチェーンの形で処理が書けるだけなので、どこに嬉しさがあるのかはあんまりよくわからない。

>>> list(pipeline(ds).as_numpy_iterator())
[2, 6, 10]

Dataset#take()

take() メソッドを使うと、先頭の要素だけを返す Dataset オブジェクトを作れる。

>>> head_ds = ds.take(2)
>>> list(head_ds.as_numpy_iterator())
[1, 2]

Dataset#skip()

take() メソッドとは反対に skip() メソッドを使うと、先頭の要素を読み飛ばす Dataset オブジェクトが作れる。

>>> tail_ds = ds.skip(2)
>>> list(tail_ds.as_numpy_iterator())
[3, 4, 5]

参考

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

Python: NumPy の empty() / zeros() を呼び出した直後は物理メモリの使用量が増えない

表題のとおりなんだけど、NumPy の empty()zeros() は呼び出した直後はメモリの RSS (Resident Set Size) が増えない。 ようするに、呼び出した直後は配列に物理メモリが割り当てられていない、ということ。 今回は、そのせいでちょっとハマったのでメモとして詳細について書き残しておく。 ただし、この事象が起こるのは仮想記憶に COW (Copy-On-Write) の機構を備えた OS、という条件はある。

使った環境は次のとおり。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal
$ uname -r
5.4.0-1036-gcp

TL; DR

  • NumPy の empty() / zeros() は内部的に calloc(3) / malloc(3) を呼び出している
    • 仮想記憶の COW (Copy-On-Write) が働くので、実際にメモリの書き込みが生じないと物理メモリが割り当てられない
  • OS がメモリの管理にページング方式を採用していると、使った領域の分だけ物理メモリが増えていく
    • そのため zeros() で作ったスパースな配列を操作していると突然物理メモリが爆発して戸惑う
  • 一方で ones()zeros_like()malloc(3) した上でメモリに書き込みが生じるため最初から物理メモリが割り当てられる
  • 最初から必要な物理メモリを見積もりたいときは zeros() は使わず empty() した上で 0 を書き込むと良い

もくじ

下準備

今回は NumPy のソースコードをデバッガで追いかけるので、ちょっと準備の手数が多い。

まずは必要なパッケージをインストールしておく。

$ sudo apt-get -y install git python3-venv gdb

Python の仮想環境を作ってアクティベートする。

$ python3 -m venv example
$ source example/bin/activate

Python の REPL からメモリの情報を取りたいので psutil をインストールしておく。

$ pip3 install psutil

NumPy のリポジトリをクローンして、リリース版のタグをチェックアウトする。

$ git clone https://github.com/numpy/numpy.git
$ cd numpy/
$ git checkout -b v1.20.1 tags/v1.20.1

依存パッケージをインストールしたら、デバッグ用の情報を残して拡張モジュールをビルドする。

$ pip3 install -r test_requirements.txt
$ CFLAGS='-Wall -O0 -g' python3 setup.py build_ext -i

あとは開発モードでインストールする。

$ pip3 install -e .

Python の REPL を起動したら準備完了。

$ python3

実験してみる

それでは、実際に NumPy の empty()zeros() を呼び出して仮想メモリと物理メモリの使用状況を確認してみよう。

まずは、仮想メモリと物理メモリの使用状況を表示する関数を定義する。

>>> import psutil
>>> def print_memory_usage():
...     process = psutil.Process()
...     mem_info = process.memory_info()
...     print(f'vms={mem_info.vms // 1024 // 1024}MB rss={mem_info.rss // 1024 // 1024}MB')
...

定義したら NumPy をインポートする。

>>> import numpy as np

メモリの初期状態はこんな感じ。

>>> print_memory_usage()
vms=44MB rss=26MB

zeros() を使って配列を作ってみよう。

>>> x = np.zeros((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB

すると、仮想メモリの使用量は増えているが、物理メモリの使用量は増えていないことがわかる。

一旦、配列を削除しよう。 すると、仮想メモリの使用量も減る。

>>> del x
>>> print_memory_usage()
vms=44MB rss=26MB

empty() も同じように試してみる。

>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB

こちらも、同じように仮想メモリの使用量しか増えない。

では、ones() はどうか?

>>> del x
>>> print_memory_usage()
vms=44MB rss=26MB
>>> x = np.ones((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=789MB

なんと、これは物理メモリまで増えるのだ。

同じように zeros_like() も、呼び出した直後に物理メモリの割り当てが生じる。

>>> del x
>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB
>>> y = np.zeros_like(x)
>>> print_memory_usage()
vms=1570MB rss=789MB

ハマった状況について

この振る舞いによって、実際に自分がハマったパターンについて説明しておく。 zeros() を使って初期化したスパースな配列を操作していたところ、途中までは何も問題がなかった。 しかし、配列に対して np.sum() を使ったところ物理メモリが突如として大量に必要となってプロセスが落ちた。

もうお分かりだと思うけど、そもそも初期化した時点で物理メモリに収まらないサイズの配列だった。 しかし、スパースだったが故に、途中まで必要とする物理メモリが少なかった。 今回であれば np.sum() という、およそメモリを大量に消費することなどなさそうな関数でプロセスが死んでデバッグに苦労した。

問題の回避策

問題を回避するには物理メモリを割り当ててやれば良い。 つまり、初期化した後に自分で値を書き込む。 例えば、次のように empty() で初期化した上で自分で 0 を書き込んでやる。

>>> del x
>>> del y
>>> print_memory_usage()
vms=44MB rss=26MB
>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB
>>> x[...] = 0
>>> print_memory_usage()
vms=807MB rss=789MB

ソースコードを追いかけてみる

ここからは、実際に NumPy のソースコードを追いかけてみよう。

一旦、Python のインタプリタのプロセスは立ち上げ直しておく。

$ python3

そして、改めて NumPy をインポートしよう。

>>> import numpy as np

プロセス ID を確認する。

>>> import os
>>> os.getpid()
14592

別のターミナルで gdb を起動する。

$ sudo gdb

Python の REPL プロセスをアタッチする。

(gdb) attach 14592

NumPy で配列のメモリを確保している箇所にブレークポイントを打つ。

(gdb) break PyDataMem_NEW_ZEROED
Breakpoint 1 at 0x7f4c524c9010: file numpy/core/src/multiarray/alloc.c, line 265.

REPL に制御を戻す。

(gdb) c
Continuing.

REPL で zeros() を呼び出す。

>>> x = np.zeros((100, 100))

ブレークポイントに到達するのでバックトレースを表示する。

Breakpoint 1, PyDataMem_NEW_ZEROED (size=size@entry=80000, elsize=elsize@entry=1) at numpy/core/src/multiarray/alloc.c:265
265    {
(gdb) bt
#0  PyDataMem_NEW_ZEROED (size=size@entry=80000, elsize=elsize@entry=1) at numpy/core/src/multiarray/alloc.c:265
#1  0x00007f4c524c90ba in npy_alloc_cache_zero (sz=80000) at numpy/core/src/multiarray/alloc.c:155
#2  0x00007f4c52500486 in PyArray_NewFromDescr_int (subtype=<optimized out>, descr=0x7f4c527c1940 <DOUBLE_Descr>, nd=nd@entry=2, dims=dims@entry=0x13c1da0, 
    strides=strides@entry=0x0, data=data@entry=0x0, flags=<optimized out>, obj=<optimized out>, base=<optimized out>, zeroed=<optimized out>, allow_emptystring=<optimized out>)
    at numpy/core/src/multiarray/ctors.c:826
#3  0x00007f4c525047ee in PyArray_Zeros (nd=2, dims=0x13c1da0, type=<optimized out>, is_f_order=0) at numpy/core/src/multiarray/ctors.c:2833
#4  0x00007f4c5258dc33 in array_zeros (__NPY_UNUSED_TAGGEDignored=<optimized out>, args=0x7f4c52a894c0, kwds=0x0) at numpy/core/src/multiarray/multiarraymodule.c:2045
#5  0x00000000005f4249 in PyCFunction_Call ()
#6  0x00000000005f46d6 in _PyObject_MakeTpCall ()
#7  0x0000000000570936 in _PyEval_EvalFrameDefault ()
#8  0x000000000056955a in _PyEval_EvalCodeWithName ()
#9  0x000000000068c4a7 in PyEval_EvalCode ()
#10 0x000000000067bc91 in ?? ()
#11 0x000000000067bd0f in ?? ()
#12 0x00000000004a3291 in ?? ()
#13 0x00000000004a4305 in PyRun_InteractiveLoopFlags ()
#14 0x000000000067e059 in PyRun_AnyFileExFlags ()
#15 0x00000000004ee716 in ?? ()
#16 0x00000000006b63bd in Py_BytesMain ()
#17 0x00007f4c530000b3 in __libc_start_main (main=0x4eea30 <main>, argc=1, argv=0x7ffdff0db088, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, 
    stack_end=0x7ffdff0db078) at ../csu/libc-start.c:308
#18 0x00000000005fa4de in _start ()

元は numpy/core/src/multiarray/multiarraymodule.c::array_zeros() から呼び出されていることがわかる。

github.com

行き着く先がブレークポイントを打った関数で、中で calloc(3) している。

github.com

同様に empty() に関しては PyDataMem_NEW() 経由で malloc(3) が呼ばれるっぽい。

github.com

じゃあ何で ones()zeros_like() は物理メモリが増えるかというと、これはデバッガを使うまでもなくわかる。 まず、ones() に関しては empty() で初期化した配列に 1 をコピーしているから。

github.com

同じように zeros_like()empty_like() で作った配列に 0 をコピーしている。

github.com

理由さえ分かってしまえば何ということはない話だった。

Python: TensorFlow2 の自動微分を試してみる

今回は、TensorFlow2 のプリミティブな API を使って、自動微分と勾配法で計算グラフを最適化する方法が気になったので試してみた。 普段は Keras (tf.keras) を使ったミニバッチ学習をすることが多いけど、データのサイズが小さければバッチ学習で解く選択肢もあると思うので。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2
BuildVersion:   20D64
$ python -V     
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、TensorFlow をインストールしておく。

$ pip install tensorflow

サンプルコード

早速だけど以下にサンプルコードを示す。 今回は単純すぎる例だけど x0^2 + x_1^2 を最小化してみた。 TensorFlow2 では GradientTape というコンテキストマネージャの中で Variable を操作すると、計算グラフが構築されて自動微分ができるようだ。 勾配さえ計算できてしまえば、あとはそれをオプティマイザに放り込んでパラメータを更新するだけ。 オプティマイザは特に何を使っても良いけどシンプルな例なので SGD を使った。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import tensorflow as tf  # type: ignore


def objective(params: tf.Variable) -> tf.Tensor:
    """最小化したい目的関数"""
    # x_0^2 + x_1^2
    loss = params[0] ** 2 + params[1] ** 2
    return loss


@tf.function
def training_step(params: tf.Variable, optimizer: tf.keras.optimizers.Optimizer) -> None:
    # 自動微分する
    with tf.GradientTape(watch_accessed_variables=False) as t:
        # 勾配を計算するために追跡が必要なパラメータを登録する
        t.watch(params)  # watch_accessed_variables=True のときは不要
        # 計算グラフを構築する
        loss = objective(params)
    # 勾配を計算する
    grads = t.gradient(loss, params)
    # 勾配を使ってパラメータを更新する
    optimizer.apply_gradients([(grads, params)])
    # 現在のパラメータを出力する
    tf.print(params)


def main():
    # 定数で初期化した変数を用意する
    tensor = tf.constant([1., 4.], dtype=tf.float32)
    params = tf.Variable(tensor, trainable=True)

    # SGD で最適化する
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

    # イテレーションを回す
    for _ in range(20):
        training_step(params, optimizer)

    # 結果を出力する
    print(objective(params))


if __name__ == '__main__':
    main()

上記を保存して実行してみる。 20 回イテレーションを回して、それぞれでパラメータの値を出力している。

$ python tfautograd.py 

...

[0.8 3.2]
[0.64 2.56]
[0.511999965 2.04799986]
[0.40959996 1.63839984]
[0.327679962 1.31071985]
[0.26214397 1.04857588]
[0.209715173 0.838860691]
[0.167772144 0.671088576]
[0.134217709 0.536870837]
[0.107374169 0.429496676]
[0.0858993381 0.343597353]
[0.068719469 0.274877876]
[0.0549755767 0.219902307]
[0.0439804606 0.175921842]
[0.0351843685 0.140737474]
[0.0281474944 0.112589978]
[0.0225179959 0.0900719836]
[0.0180143975 0.0720575899]
[0.0144115184 0.0576460734]
[0.0115292147 0.0461168587]
tf.Tensor(0.0022596875, shape=(), dtype=float32)

上記をみると、ちゃんとパラメータの値が更新されて、最終的な結果が約 0.002 と小さくなっていることがわかる。

いじょう。

参考

www.tensorflow.org

Python: TensorFlow/Keras で Word2Vec の CBOW を実装してみる

(2021-02-04 追記): ニューラルネットワークのアーキテクチャで、出力側の Embedding が誤って Dense になっていた部分を修正した。

Word2Vec の CBOW (Continuous Bag-of-Words) は、単語の分散表現 (Word Embedding) を得るために用いられるニューラルネットワークのアーキテクチャのひとつ。 今回は、それを TensorFlow/Keras を使って実装してみた。 なお、ニューラルネットワークのアーキテクチャは、オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を参考にしている。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

ただし、あらかじめ断っておくと実用性はそれほどない。 実用上は gensim とかを使った方が学習は速いし、得られる分散表現も良い性能になるはず。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2
BuildVersion:   20D64
$ python -V       
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install tensorflow gensim scipy

また、学習するデータセットをダウンロードしておく。 今回は PTB (Penn Treebank) をコーパスに用いる。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt
$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.valid.txt

サンプルコード

早速だけど、以下にサンプルコードを示す。 ニューラルネットワークは SimpleContinuousBagOfWords という名前で実装した。 CBOW は、ある単語の左右に出現する単語から、その単語を推定するアーキテクチャになっている。 そのため、ネットワークに供給するデータは左右に出現する 2 つの単語になる。 そして、どんな単語が出るかを One-Hot 形式の予測値として出力している。 肝心の単語埋め込みは、ネットワーク内にある Embedding レイヤーの重み (Weights) として得られる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator
from functools import partial

import numpy as np  # type: ignore
import tensorflow as tf  # type: ignore
from tensorflow.keras import Model  # type: ignore
from tensorflow.keras.layers import Embedding  # type: ignore
from tensorflow import Tensor  # type: ignore
from tensorflow.data import Dataset  # type: ignore
from tensorflow.keras.losses import CategoricalCrossentropy  # type: ignore
from tensorflow.keras.optimizers import Adam  # type: ignore
from tensorflow.keras.layers import Dense  # type: ignore
from tensorflow.keras.layers import Softmax  # type: ignore
from tensorflow.keras.layers import Layer # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.callbacks import Callback  # type: ignore
from gensim.test.utils import datapath  # type: ignore
from scipy.stats import pearsonr  # type: ignore


class OutputEmbedding(Layer):
    """出力側の単語埋め込み層"""

    def __init__(self, vocab_size: int):
        super().__init__()
        self.vocab_size = vocab_size

    def build(self, input_shape):
        # 埋め込み次元数
        self.embedding_size = input_shape[1]
        # 出力部分に使うので、通常の Embedding とは shape が転置している
        self.embedding = self.add_weight(shape=(self.embedding_size, self.vocab_size))

    def call(self, inputs, **kwargs):
        # 重みは最初から転置してあるので transpose する必要はない
        # (NxD) * (DxV) = (NxV)
        return tf.tensordot(inputs, self.embedding, axes=1)


class SimpleContinuousBagOfWords(Model):
    """Word2Vec の CBOW モデルを実装したクラス"""

    def __init__(self, vocab_size: int, embedding_size: int):
        super().__init__()

        # 入力側の単語埋め込み層
        self.embedding_layer = Embedding(input_dim=vocab_size,
                                         input_shape=(1, ),
                                         output_dim=embedding_size,
                                         name='word_embedding',
                                         )
        self.output_layer = Sequential([
            OutputEmbedding(vocab_size),
            Softmax(),
        ])

    def call(self, inputs: Tensor) -> Tensor:
        # 推定したい単語の左側にある単語のベクトルを取り出す
        left_context = inputs[:, 0]
        left_vector = self.embedding_layer(left_context)
        # 推定したい単語の右側にある単語のベクトルを取り出す
        right_context = inputs[:, 1]
        right_vector = self.embedding_layer(right_context)
        # 両方の単語に対応するベクトルを足して 2 で割る
        mixed_vector = (left_vector + right_vector) / 2
        # 出力側の Embedding と積を取ってから単語の出現確率にする
        prediction = self.output_layer(mixed_vector)
        return prediction


class WordSimilarity353Callback(Callback):
    """WordSim353 データセットを使って単語間の類似度を評価するコールバック"""

    def __init__(self, word_id_table: dict[str, int]):
        super().__init__()

        self.word_id_table = word_id_table
        self.model = None

        # 評価用データを読み込む
        self.eval_data = []
        wordsim_filepath = datapath('wordsim353.tsv')
        with open(wordsim_filepath, mode='r') as fp:
            # 最初の 2 行はヘッダなので読み飛ばす
            fp.readline()
            fp.readline()
            for line in fp:
                word1, word2, sim_score = line.strip().split('\t')
                self.eval_data.append((word1, word2, float(sim_score)))

    def set_model(self, model):
        self.model = model

    def _cosine_similarity(self, x, y):
        nx = x / np.sqrt(np.sum(x ** 2))
        ny = y / np.sqrt(np.sum(y ** 2))
        return np.dot(nx, ny)

    def on_epoch_end(self, epoch, logs=None):
        # モデルから学習させたレイヤーの重みを取り出す
        model_layers = {layer.name: layer for layer in self.model.layers}
        embedding_layer = model_layers['word_embedding']
        word_vectors = embedding_layer.weights[0].numpy()

        # 評価用データセットに含まれる単語間の類似度を計算する
        labels = []
        preds = []
        for word1, word2, sim_score in self.eval_data:
            # Out-of-Vocabulary な単語はスキップ
            if word1 not in self.word_id_table or word2 not in self.word_id_table:
                continue

            # コサイン類似度を計算する
            word1_vec = word_vectors[self.word_id_table[word1]]
            word2_vec = word_vectors[self.word_id_table[word2]]
            pred = self._cosine_similarity(word1_vec, word2_vec)
            preds.append(pred)
            # 正解ラベル
            labels.append(sim_score)

        # ピアソンの相関係数を求める
        r_score = pearsonr(labels, preds)[0]
        print(f'Pearson\'s r score with WordSim353: {r_score}')


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    tag_replace = re.compile('<.*?>')
    with open(filepath, mode='r') as fp:
        for line in fp:
            # コーパスに含まれる <unk> や <eos> は取り除きたい
            yield re.sub(tag_replace, '', line)


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応する ID に変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """単語ベクトル間のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を取り出す"""
    similarities = cs_matrix[word_id, :]
    similar_word_ids = np.argsort(similarities)[::-1]
    # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く
    return similar_word_ids[1: top_n + 1]


def extract_contexts(word_ids: Tensor, window_size: int = 1) -> Tensor:
    """コンテキストの単語をラベル形式で得る"""
    right_contexts = word_ids[window_size + 1:]
    left_contexts = word_ids[:-window_size - 1]
    contexts = tf.transpose([left_contexts, right_contexts])
    return contexts


def extract_targets(word_ids: Tensor, vocab_size: int, window_size: int = 1) -> Tensor:
    """ターゲットの単語を One-Hot 形式にする"""
    targets = word_ids[window_size:-window_size]
    # 正解との損失を計算するために One-Hot 表現にする
    one_hot_targets = tf.one_hot(targets, depth=vocab_size)
    return one_hot_targets


def dataset_pipeline(corpus_words: Iterator[list[str]], word_id_table: dict[str, int]) -> Dataset:
    """ターゲットとコンテキストを供給するパイプライン"""

    # ID に変換したコーパスを行ごとに読み出せるデータセット
    word_ids_ds = Dataset.from_generator(lambda: words_to_ids(corpus_words, word_id_table),
                                         tf.int32,
                                         output_shapes=[None])

    # コンテキストを抽出するパイプライン
    contexts_ds = word_ids_ds.map(extract_contexts,
                                  num_parallel_calls=tf.data.AUTOTUNE,
                                  # ターゲットと対応関係を作る必要があるので決定論的に動作させる
                                  deterministic=True)

    # ターゲットを抽出するパイプライン
    vocab_size = len(word_id_table.keys())
    f = partial(extract_targets, vocab_size=vocab_size)
    targets_ds = word_ids_ds.map(f,
                                 num_parallel_calls=tf.data.AUTOTUNE,
                                 # コンテキストと対応関係を作る必要があるので決定論的に動作させる
                                 deterministic=True)

    # 行単位で処理されているので flatten して結合する
    zipped_ds = Dataset.zip((contexts_ds.unbatch(), targets_ds.unbatch()))
    return zipped_ds


def count_words(corpus_words: Iterator[list[str]]) -> int:
    """コーパスに含まれる単語の数をカウントする"""
    return sum(len(words) for words in corpus_words)


def main():
    # Penn Treebank コーパスを読み込む
    train_sentences = load_corpus('ptb.train.txt')
    valid_sentences = load_corpus('ptb.valid.txt')

    # コーパスを単語に分割する
    train_corpus_words = list(sentences_to_words(train_sentences))
    valid_corpus_words = list(sentences_to_words(valid_sentences))

    # 単語 -> ID
    word_to_id = word_id_mappings(train_corpus_words)
    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # モデルとタスクを定義する
    EMBEDDING_SIZE = 50
    criterion = CategoricalCrossentropy()
    optimizer = Adam(learning_rate=1e-2)
    model = SimpleContinuousBagOfWords(vocab_size, EMBEDDING_SIZE)
    model.compile(optimizer=optimizer,
                  loss=criterion)

    # データセットを準備する
    TRAIN_BATCH_SIZE = 1024
    train_ds = dataset_pipeline(train_corpus_words, word_to_id)
    train_ds = train_ds.shuffle(buffer_size=512)
    train_ds = train_ds.repeat()
    train_ds = train_ds.batch(TRAIN_BATCH_SIZE)
    train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    # 検証用データはリピートしない
    VALID_BATCH_SIZE = 1024
    valid_ds = dataset_pipeline(valid_corpus_words, word_to_id)
    valid_ds = valid_ds.batch(VALID_BATCH_SIZE)
    valid_ds = valid_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

    # 厳密に要素数を計算するのは面倒なので、ざっくり単語数の 2 倍をエポックということにしておく
    num_of_train_samples = count_words(train_corpus_words) * 2

    callbacks = [
        # WordSim353 データセットを使って単語間の類似度を相関係数で確認する
        WordSimilarity353Callback(word_to_id),
    ]
    # 学習する
    model.fit(train_ds,
              steps_per_epoch=num_of_train_samples // TRAIN_BATCH_SIZE,
              validation_data=valid_ds,
              epochs=5,
              callbacks=callbacks,
              verbose=1,
              )

    # モデルから学習させたレイヤーの重みを取り出す
    model_layers = {layer.name: layer for layer in model.layers}
    embedding_layer = model_layers['word_embedding']
    word_vectors = embedding_layer.weights[0].numpy()

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)
    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        most_similar_word_ids = most_similar_words(target_word_id, cs_matrix)
        # 単語とベクトルを表示する
        for rank, similar_word_id in enumerate(most_similar_word_ids, start=1):
            similar_word = id_to_word[similar_word_id]
            similarity = cs_matrix[target_word_id, similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記に名前をつけて保存したら実行してみよう。 CPU を使って学習すると、1 エポックにそれなりの時間がかかるけど、まあなんとかなるレベルのはず。 エポック間には、WordSim353 データセットを使って評価した、単語間の類似度がピアソンの相関係数として表示される。 学習が終わったら、いくつかの単語について、類似したベクトルを持つ単語を上位 5 件について表示させている。

$ python cbow.py

...

1651/1651 [==============================] - 181s 109ms/step - loss: 6.1526 - val_loss: 5.1804
Pearson's r score with WordSim353: 0.21794273571470427
Epoch 2/5
1651/1651 [==============================] - 182s 110ms/step - loss: 4.7424 - val_loss: 5.2353
Pearson's r score with WordSim353: 0.2428972583779604
Epoch 3/5
1651/1651 [==============================] - 172s 104ms/step - loss: 4.4591 - val_loss: 5.3129
Pearson's r score with WordSim353: 0.2583752228928214
Epoch 4/5
1651/1651 [==============================] - 160s 97ms/step - loss: 4.3708 - val_loss: 5.3941
Pearson's r score with WordSim353: 0.25454681209868246
Epoch 5/5
1651/1651 [==============================] - 4038s 2s/step - loss: 4.3027 - val_loss: 5.4816
Pearson's r score with WordSim353: 0.25160892813703517
The most similar words of "you"
TOP 1: we = 0.8296732306480408
TOP 2: they = 0.7543421983718872
TOP 3: i = 0.668086051940918
TOP 4: she = 0.5567612051963806
TOP 5: imbalances = 0.5095677375793457
--------------------------------------------------
The most similar words of "year"
TOP 1: week = 0.7451192140579224
TOP 2: month = 0.7027692794799805
TOP 3: day = 0.6059724688529968
TOP 4: spring = 0.5649460554122925
TOP 5: decade = 0.5465914011001587
--------------------------------------------------
The most similar words of "car"
TOP 1: seed = 0.6106906533241272
TOP 2: taxi = 0.6024419665336609
TOP 3: auto = 0.574679434299469
TOP 4: siemens = 0.5651793479919434
TOP 5: subscriber = 0.5451180934906006
--------------------------------------------------
The most similar words of "toyota"
TOP 1: ford = 0.6328699588775635
TOP 2: celebrity = 0.5823256373405457
TOP 3: humana = 0.5597572922706604
TOP 4: supermarkets = 0.5554667115211487
TOP 5: honda = 0.5467281937599182
--------------------------------------------------

上記をみると、学習データの損失はエポックを重ねるごとに小さくなっているが、検証データの損失は 2 エポック目以降から悪化している。 これは一般的には過学習している状態だが、WordSim353 を使った単語間類似度の評価は 2 エポック目も向上している。 また、ベストなエポックの結果ではないものの、最後に出力している類似の単語については、そこそこ納得感がある結果に思える。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)