CUBE SUGAR CONTAINER

技術系のこと書きます。

kubectl の複数の設定ファイルを一つにマージする

何度も調べることになりそうなのでメモしておく。 kubectl で複数の設定ファイルがあるときに、ひとつにまとめる方法について。

使った環境は次のとおり。

$ sw_vers                  
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033
$ kubectl version --short=true
Client Version: v1.18.3
Server Version: v1.18.2

まとめ方

はじめに、まとめたいコンフィグのファイルパスを、コロン (:) 区切りで KUBECONFIG というシェル変数に列挙する。

$ KUBECONFIG=<config-src1>:<config-src2>:...

あとは kubectl config view --flatten コマンドを使えばひとつのコンフィグにまとめることができる。 リダイレクトでファイルに書き出して、使っているコンフィグと入れ替えればオッケー。

$ kubectl config view --flatten > <config-dst>

いじょう。

4K ディスプレイを導入するときは HDMI ケーブルに注意が必要

今回は、自宅で 4K ディスプレイを導入した際に HDMI ケーブルを起因とするトラブルが起きた話について。

生じた問題について

我が家では、自宅の作業環境を改善する一環として半年ほど前に 4K ディスプレイを導入した。 そして、普段使っているパソコンとは HDMI ケーブルを介して接続することにした。 その際に、自宅にあった適当な HDMI ケーブルを使ってつないでしまったのがまずかった。

具体的には、次のような問題が発生した。 普段はちゃんと映るのに、たまに何の前触れもなく画面が真っ暗になる (ブラックアウトする) 。 問題が発生する間隔はまちまちで、数十分発生しないかと思いきや、ひどいときは数分と待たずに連続して起こる。

結局、問題を切り分けながら HDMI ケーブルに当たりをつけて 4K 対応を謳っている製品に入れ替えたところ問題は解消した。 そして、この問題はどうやら HDMI を扱う上で意外と有名な現象らしかった。 具体的には、ケーブルの相性問題がある。

HDMI ケーブルには準拠する仕様としてのデータ転送レートがある上で、信号の減衰やノイズがのったりするとエラーが生じて画面が写りにくくなるらしい。 そして、ディスプレイの解像度やリフレッシュレートが上がると、必要とするデータ転送レートも上がるため、よりエラーの発生にシビアになる。 今回も、解像度がフル HD のディスプレイでは何の問題もなく使えるのに、4K にしたところ発生するようになった。

それまで使っていた機材を流用してディスプレイを入れ替えるときは気をつけよう。

Python: mlflow.start_run(nested=True) は使い方に注意しよう

今回は MLflow Tracking のすごーく細かい話。 ソースコードを読んでいて、ハマる人もいるかもなと思ったので書いておく。 結論から先に書くと、MLflow Tracking には次のような注意点がある。

  • MLflow Tracking で標準的に使う API はマルチスレッドで Run が同時並行に作られることを想定していない
  • 同時並行に作れそうなmlflow.start_run(nested=True) は、あくまで Run を入れ子にするときだけ使える
  • この点に気をつけないと MLflow Tracking で記録されるデータが壊れる

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033
$ python -V
Python 3.7.7
$ pip list | grep -i mlflow              
mlflow                    1.8.0

もくじ

下準備

あらかじめ MLflow をインストールして Python のインタプリタを起動しておく。

$ pip install mlflow
$ python

そして MLflow をインポートしておく。

>>> import mlflow

注意点の解説

MLflow Tracking では、新しく Run を作るときに mlflow.start_run() という関数を使う。

>>> mlflow.start_run()
<ActiveRun: >

この関数は、すでに同じ Python プロセスで呼ばれていると、再度呼び出したときに例外となるよう作られている。

>>> mlflow.start_run()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/amedama/.virtualenvs/py37/lib/python3.7/site-packages/mlflow/tracking/fluent.py", line 112, in start_run
    _active_run_stack[0].info.run_id))
Exception: Run with UUID a14a8f9bf213473aa4a7a437ac811077 is already active. To start a new run, first end the current run with mlflow.end_run(). To start a nested run, call start_run with nested=True

ただし、オプションに nested=True を指定すると、エラーにならず新たに Run を作ることができる。

>>> mlflow.start_run(nested=True)
<ActiveRun: >

ただ、上記で作られた Run は MLflow のモジュールの中でグローバル変数として用意されたスタック構造で管理されているに過ぎない。 スタック構造は隠しオブジェクトだけど、あえて中身を見せるとこんな感じ。

>>> from mlflow.tracking import fluent
>>> fluent._active_run_stack
[<ActiveRun: >, <ActiveRun: >]

この状況で mlflow.end_run() を呼び出すと、スタック構造からひとつずつ Run を表すオブジェクトが POP されるという寸法。

>>> mlflow.end_run()
>>> fluent._active_run_stack
[<ActiveRun: >]
>>> mlflow.end_run()
>>> fluent._active_run_stack
[]

つまり何が言いたいかというと、mlflow.start_run(nested=True) は、あくまで Run の中で別の Run を「入れ子」にすることだけを想定している。 要するに、以下のようなコード。

>>> # この使い方はセーフ (Run を入れ子にする)
>>> with mlflow.start_run():
...     with mlflow.start_run(nested=True):
...         ...  # 何か時間のかかる実験
... 

「入れ子」ではなく、同時並行に Run を作る用途で使ってしまうとスタック構造が壊れる。 たとえば、次のようなコードを書くとレースコンディションを生む。 具体的には、今実行しているのとは関係ない Run にメトリックやパラメータが記録される。

>>> # この使い方はアウト (スタックが壊れる)
>>> import threading
>>> def f():
...     with mlflow.start_run(nested=True):
...         ...  # 何か時間のかかる実験
... 
>>> for _ in range(10):
...     t = threading.Thread(target=f, daemon=True)
...     t.start()
... 

それでも同時に Run を実行したい!というときはプロセスを分けよう。 プロセスが違えばグローバル変数のいるメモリ空間も分かれるので問題ないはず。 そもそも、一般的な Python の処理系には GIL (Global Interpreter Lock) があるので、I/O を並行処理するときしかマルチスレッドが意味を成さない。

ちなみに、ここらへんの実装は以下にある。

github.com

上記を読むと mlflow.tracking.client.MlflowClient を直接使えばマルチスレッドで複数の Run を同時に使うこともできそう。 ただ、得られる嬉しさが手間に見合わないだろうなという感じがする。

いじょう。

Python: MLflow Tracking を使ってみる

MLflow は MLOps に関連した OSS のひとつ。 いくつかのコンポーネントに分かれていて、それぞれを必要に応じて独立して使うことができる。 今回は、その中でも実験の管理と可視化を司る MLflow Tracking を試してみることにした。

機械学習のプロジェクトでは試行錯誤することが多い。 その際には、パラメータやモデルの構成などを変えながら何度も実験を繰り返すことになる。 すると、回数が増えるごとに使ったパラメータや得られた結果、モデルなどの管理が煩雑になってくる。 MLflow Tracking を使うことで、その煩雑さが軽減できる可能性がある。

使った環境は次のとおり。

$ sw_vers          
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033
$ python -V       
Python 3.7.7
$ mlflow --version
mlflow, version 1.8.0

もくじ

登場する概念について

まず、MLflow Tracking では ExperimentRun というモノを作っていく。 これらは、特定の目的を持った実験と、それに 1:N で紐付いた各試行を表している。 つまり、試行錯誤の度に Run が増えることになる。 そして、それぞれの Run には次のような情報が、またもや 1:N で紐づく。

  • Parameter

    • データの前処理やモデルの学習に使ったパラメータ
  • Metric

    • 各ラウンドの損失や、交差検証のスコアといったメトリック
  • Artifact

    • 学習済みモデルや、特徴量の重要度など実験した結果として得られる成果物
  • Tag

    • 後から実験を探しやすくしたりするためのメタな情報

要するに Experiment > Run > Parameter, Metric, ... ということ。 これらの情報は、典型的には Python のスクリプトから記録される。 そして、記録された情報は後からスクリプトや WebUI 経由で確認できる。

データを保存する場所について

具体的な使い方を紹介する前に、前述した情報がどのように管理されるのか解説しておく。 まず、MLflow Tracking はクライアントとサーバに分かれてる。 そして、データは基本的にサーバに保存される。 クライアントは、データを保存するサーバの場所を Tracking URI と Artifact URI という 2 つの URI で指定する。

ただし、クライアントとサーバは 1 台のマシンで兼ねることもできる。 また、データを記録する方法も、追加でソフトウェアなどを必要としないローカルファイルで完結させることができる。 そのため、クライアント・サーバ方式といっても 1 人で使い始める分には環境構築などの作業は全く必要ない。 あくまで、複数人のチームで記録されたデータを共用・共有したいときに専用のサーバが必要となる。

Tracking URI について

Tracking URI は、概ね Run の中で Artifact 以外の情報を記録するところ。 記録される情報は、基本的には Key-Value 形式になっている。

利用できるバックエンドには次のようなものがある。

  • ローカルファイル

    • マウントされているブロックストレージにファイルとして記録される
      • 手っ取り早く使うならこれ (デフォルト)
    • 形式: file:<path>
  • リレーショナルデータベース

    • SQLAlchemy 経由で RDB にテーブルのエントリとして記録される
    • 形式: <dialect>+<driver>://<username>:<password>@<host>:<port>/<database>
  • REST API

    • MLflow Server というサーバを立ち上げて、そこに記録される
      • 後述するが、記録先の実体はローカルファイルだったりリレーショナルデータベースだったり選べる
    • 形式: http(s)://<host>:<port>

その他、詳細は以下に記載されている。

www.mlflow.org

Artifact URI について

Artifact URI は、その名の通り Artifact を記録するところ。 記録される情報はファイル (バイト列) になる。

利用できるバックエンドには次のようなものがある。

  • ローカルファイル

    • マウントされているブロックストレージにファイルとして保存される
      • 手っ取り早く使うならこれ (デフォルト)
    • 形式: file:<path>
  • 各種クラウド (オブジェクト) ストレージ

    • Amazon S3, Azure Blob Storage, Google Cloud Storage などにファイルとして保存される
    • 形式: s3://<bucket> など
  • (S)FTP サーバ

    • (S)FTP サーバにファイルとして保存される
    • 形式: (s)ftp://<user>@<host>/<path>
  • HDFS

    • HDFS (Hadoop Distributed File System) にファイルとして保存される
    • 形式: hdfs://<path>

その他、詳細は以下に記載されている。

www.mlflow.org

色々とあるけど、結局のところ 1 人で使い始めるなら両方ともローカルファイルにすれば良い。 デフォルトでは、どちらもクライアントを実行した場所の mlruns というディレクトリが使われる。 これは、Traking URI と Artifact URI の両方に file:./mlruns を指定した状態ということ。

ちなみに、チームで使いたいけど自分で構築とか運用したくないってときは、開発の中心となっている Databricks がサーバ部分のマネージドサービスを提供している。

下準備

前置きが長くなったけど、ここからやっと実際に試していく。

はじめに、必要なパッケージをインストールしておこう。 なお、mlflow 以外は、後ほど登場するサンプルコードを動かすためだけに必要なもの。

$ pip install mlflow scikit-learn lightgbm matplotlib

インストールすると mlflow コマンドが使えるようになる。

$ mlflow --version
mlflow, version 1.8.0

基本的な使い方

とりあえず Python の REPL を使って、基本的な使い方を紹介する。 まずは REPL を起動しておく。

$ python

MLflow のパッケージをインポートする。

>>> import mlflow

実験の試行を開始する。 これには start_run() 関数を使う。 なお、デフォルトでは Experiment として Default という領域が使われる。

>>> mlflow.start_run()
<ActiveRun: >

実験に使ったパラメータを log_param() 関数で記録する。

>>> mlflow.log_param(key='foo', value='bar')

得られたメトリックなどの情報は log_metric() 関数で記録する。

>>> mlflow.log_metric(key='logloss', value=1.0)

実験には set_tag() 関数でタグが付与できる。

>>> mlflow.set_tag(key='hoge', value='fuga')

尚、これらは関数名を複数形にすると辞書型で複数の Key-Value を一度に記録できる。

アーティファクトについては、ちょっと面倒くさい。 アーティファクトはバイト列のファイルなので、まずはローカルにファイルを用意して、それをコピー (転送) することになる。 典型的には、一時ディレクトリを tempfile モジュールで作って、中身のファイルを作ったらそれをコピーすれば良い。 一時ディレクトリ以下のファイルは、処理が終わったら自動的に削除される。

>>> import tempfile
>>> import pathlib
>>> with tempfile.TemporaryDirectory() as d:
...     filename = 'test-artifact'
...     artifact_path = pathlib.Path(d) / filename
...     with open(artifact_path, 'w') as fp:
...         print('Hello, World!', file=fp)
...     mlflow.log_artifact(artifact_path)
... 

あとは実験の試行を終了するだけ。

>>> mlflow.end_run()

Python の REPL を終了して、カレントディレクトリを確認してみよう。 デフォルトの Tracking URI / Artifact URI として file:./mlruns が使われるため mlruns というディレクトリが作られている。

$ find mlruns 
mlruns
mlruns/0
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/metrics
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/metrics/logloss
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/artifacts
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/artifacts/test-artifact
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/tags
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/tags/hoge
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/tags/mlflow.user
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/tags/mlflow.source.name
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/tags/mlflow.source.type
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/params
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/params/foo
mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/meta.yaml
mlruns/0/meta.yaml
mlruns/.trash

最初の階層は、それぞれの Experiment を表している。 DefaultExperiment には ID として 0 が付与されていることがわかる。

$ cat mlruns/0/meta.yaml 
artifact_location: file:///Users/amedama/Documents/temporary/helloworld/mlruns/0
experiment_id: '0'
lifecycle_stage: active
name: Default

その下の階層は、それぞれの Run を表している。

$ head mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/meta.yaml
artifact_uri: file:///Users/amedama/Documents/temporary/helloworld/mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/artifacts
end_time: 1591267604170
entry_point_name: ''
experiment_id: '0'
lifecycle_stage: active
name: ''
run_id: 3de33a8f39294c4f8bd404a5d5bccf39
run_uuid: 3de33a8f39294c4f8bd404a5d5bccf39
source_name: ''
source_type: 4

ここにパラメータやメトリックなどが記録される。 メトリックに記録されている各列は、実行時刻、値、ステップ数を表している。 ステップ数というのは、たとえばニューラルネットワークのエポックだったり、ブースティングマシンのラウンドだったりする。 先ほど実行した log_metric() 関数の引数として、実は指定できた。

$ head mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/params/foo 
bar
$ head mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/metrics/logloss 
1591267315737 1.0 0
$ cat mlruns/0/3de33a8f39294c4f8bd404a5d5bccf39/artifacts/test-artifact 
Hello, World!

時刻には 1,000 倍した UNIX time が記録されている。

$ date -r $((1591267315737 / 1000))
202064日 木曜日 194155秒 JST

記録された情報を WebUI で確認する

今のところ「ふーん」という感じだと思うので、記録された情報を WebUI からも確認してみよう。 確認用の WebUI を立ち上げるために、mlruns ディレクトリのある場所で mlflow ui コマンドを実行する。

$ mlflow ui

そして、ブラウザで localhost:5000 を閲覧する。

$ open http://localhost:5000

すると、こんな感じで過去に記録された情報が確認できる。 メトリックで試行を並べ替えたり、ステップ毎の値を可視化する機能も備わっている。

f:id:momijiame:20200604200638p:plain
MLflow Tracking WebUI

ちなみに WebUI を使う以外にも、ちょっと面倒だけどスクリプトから確認することもできる。

$ python

たとえば、メトリックの logloss が最も良いものを絞り込んでみる。 まあ、まだ 1 回しか実行してないんだけどね。

>>> import mlflow
>>> tracking_uri = 'file:./mlruns'
>>> client = mlflow.tracking.MlflowClient(tracking_uri=tracking_uri)
>>> experiment = client.get_experiment_by_name('Default')
>>> run = client.search_runs(experiment.experiment_id,
...                          order_by=['metric.logloss asc'],
...                          max_results=1)[0]
>>> run.data.params
{'foo': 'bar'}
>>> run.data.metrics
{'logloss': 1.0}

複数人のチームで使いたいとき

複数人のチームで使うときは、いくつかの選択肢があるけど基本的には MLflow Server を用意して、そこに皆でアクセスする。 MLflow Server というのは Tracking URI として使える REST API と、先ほど確認した WebUI が一緒になったもの。

注意点として、MLflow Server はあくまで Tracking URI と WebUI のエンドポイントを提供するもの。 なので、Artifact URI の実体については別に用意しなければいけない。 たとえばクラウドストレージが使えるならそれを使っても良し、自分たちで FTP サーバを立ち上げたり HDFS のクラスタを組むことも考えられるだろう。

ここでは MLflow Server について軽く解説しておく。 まず、MLflow Server は mlflow server コマンドで起動できる。 起動するときに、Tracking URI (--backend-store-uri) と Artifact URI (--default-artifact-root) を指定する。

$ mlflow server \
    --backend-store-uri sqlite:///tracking.db \
    --default-artifact-root file:/tmp/artifacts \
    --host 0.0.0.0

--backend-store-uri は、MLflow サーバがクライアントから REST API 経由で受け取ったデータを記録する場所。 一方で、--default-artifact-root はサーバに接続してきたクライアントに「Artifact はここに保存してね」と伝えられる場所に過ぎない。 つまり、MLflow Server がプロキシしてくれるわけではないのでクライアントから接続性のある URI を指定する必要がある。 ここでは横着して /tmp 以下を指定してしまっている。 しかし、こんな風にするなら本来は NFS などでクライアントがすべて /tmp 以下に共有ディレクトリをマウントする必要がある。

起動したサーバを使って実際にデータを記録してみよう。 まずは別のターミナルで Python の REPL を起動する。

$ python

set_tracking_uri() 関数で Tracking URI として MLflow Server のエンドポイントを指定する。

>>> import mlflow
>>> tracking_uri = 'http://localhost:5000'
>>> mlflow.set_tracking_uri(tracking_uri)

すると、データの記録先が次のように設定される。

>>> mlflow.get_tracking_uri()
'http://localhost:5000'
>>> mlflow.get_artifact_uri()
'file:///tmp/artifacts/0/26d9e4204c20401eb7d2807a93be8b75/artifacts'

何か適当にデータを記録してみよう。

>>> mlflow.start_run()
>>> mlflow.log_param(key='foo', value='bar')
>>> mlflow.log_metric(key='logloss', value=0.5)
>>> import tempfile
>>> import pathlib
>>> with tempfile.TemporaryDirectory() as d:
...     filename = 'test-artifact'
...     artifact_path = pathlib.Path(d) / filename
...     with open(artifact_path, 'w') as fp:
...         print('Hello, World!', file=fp)
...     mlflow.log_artifact(artifact_path)
... 
>>> mlflow.end_run()

これで、アーティファクトについては前述したとおり /tmp 以下に保存される。

$ cat /tmp/artifacts/0/26d9e4204c20401eb7d2807a93be8b75/artifacts/test-artifact 
Hello, World!

アーティファクト以外の情報は MLflow Server の方に記録される。 今回であれば MLflow Server を実行したディレクトリにある SQLite3 のデータベースに入っている。

$ sqlite3 tracking.db 'SELECT * FROM experiments'
0|Default|file:///tmp/artifacts/0|active
$ sqlite3 tracking.db 'SELECT * FROM runs'       
26d9e4204c20401eb7d2807a93be8b75||UNKNOWN|||amedama|FINISHED|1591272053874|1591272141536||active|file:///tmp/artifacts/0/26d9e4204c20401eb7d2807a93be8b75/artifacts|0

ちなみに mlflow server コマンドの実装は mlflow.server:app にある Flask の WSGI アプリケーションを gunicorn でホストするコマンドをキックしてるだけ。 なので、自分で WSGI サーバを立ててアプリケーションをホストしても構わないだろう。 認証が必要なら前段にリバースプロキシを置いて好きにやれば良いと思う。 MLflow Server の解説については、ここまでで一旦おわり。

スクリプトに組み込んでみる

続いては、実際に MLflow Tracking を機械学習を扱うコードに組み込んでみよう。 そんなに良い例でもないけど乳がんデータセットを RandomForest で分類する過程を MLflow Tracking で記録してみる。 ここでは set_tracking_uri()set_experiment() していないのでデフォルト値 (mlruns / Default) が使われる。 ちなみに、コードに書かなくても、環境変数の MLFLOW_TRACKING_URIMLFLOW_EXPERIMENT_NAME を使う方法もある。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pickle
import json
import tempfile
import pathlib

import mlflow
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_validate


def main():
    # データセットの読み込み
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target
    feature_names = dataset.feature_names

    # 5-Fold Stratified CV でスコアを確認する
    clf = RandomForestClassifier(n_estimators=100,
                                 random_state=42,
                                 verbose=1)
    folds = StratifiedKFold(shuffle=True, random_state=42)
    # モデルの性能を示すメトリック
    result = cross_validate(clf, X, y,
                            cv=folds,
                            return_estimator=True,
                            n_jobs=-1)
    # 学習済みモデル
    estimators = result.pop('estimator')

    # 結果を記録する Run を始める
    with mlflow.start_run():

        # 実験に使った Parameter を記録する
        mlflow.log_params({
            'n_estimators': clf.n_estimators,
            'random_state': clf.random_state,
        })

        # 実験で得られた Metric を記録する
        for key, value in result.items():
            # 数値なら int/float な必要があるので平均値に直して書き込む
            mlflow.log_metric(key=key,
                              value=value.mean())

        # 実験で得られた Artifact を記録する
        for index, estimator in enumerate(estimators):

            # 一時ディレクトリの中に成果物を書き出す
            with tempfile.TemporaryDirectory() as d:

                # 学習済みモデル
                clf_filename = f'sklearn.ensemble.RandomForestClassifier.{index}'
                clf_artifact_path = pathlib.Path(d) / clf_filename
                with open(clf_artifact_path, 'wb') as fp:
                    pickle.dump(estimator, fp)

                # 特徴量の重要度
                imp_filename = f'sklearn.ensemble.RandomForestClassifier.{index}.feature_importances_.json'  # noqa
                imp_artifact_path = pathlib.Path(d) / imp_filename
                with open(imp_artifact_path, 'w') as fp:
                    importances = dict(zip(feature_names, estimator.feature_importances_))
                    json.dump(importances, fp, indent=2)

                # ディレクトリにあるファイルを Artifact として登録する
                mlflow.log_artifacts(d)


if __name__ == '__main__':
    main()

上記を実行する。

$ python bcrf.py

これで、各モデルの学習に使ったパラメータやメトリック、特徴量の重要度などが記録される。

$ cat mlruns/0/15d6ba2c32e240a382ff1efab8814e47/params/n_estimators 
100
$ cat mlruns/0/15d6ba2c32e240a382ff1efab8814e47/metrics/test_score 
1591273430658 0.9560937742586555 0
$ head mlruns/0/15d6ba2c32e240a382ff1efab8814e47/artifacts/sklearn.ensemble.RandomForestClassifier.0.feature_importances_.json 
{
  "mean radius": 0.04924964271136709,
  "mean texture": 0.01737644949893605,
  "mean perimeter": 0.07531323166620862,
  "mean area": 0.05398223096565245,
  "mean smoothness": 0.007976941962365291,
  "mean compactness": 0.01205471060787445,
  "mean concavity": 0.04868260289112485,
  "mean concave points": 0.08748636077564236,
  "mean symmetry": 0.0037492071919759205,

各種フレームワークの自動ロギング

先ほどのコードを見て分かる通り、MLflow Tracking のコードを組み込むのは結構めんどくさい。 そこで、MLflow Tracking は各種フレームワークの学習を自動で記録するインテグレーションを提供している。 ただし、この機能は今のところ Experimental な点に注意が必要。

以下のサンプルコードは LightGBM の学習を自動で記録するもの。 MLflow Tracking を動作させている部分は mlflow.lightgbm.autolog() を呼び出している一行だけ。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from mlflow import lightgbm as mlflow_lgb
import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


def main():
    # LightGBM の学習を自動でトラッキングする
    mlflow_lgb.autolog()

    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    # 訓練データと検証データに分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        random_state=42)

    # データセットを生成する
    lgb_train = lgb.Dataset(X_train, y_train,
                            feature_name=feature_names)
    lgb_eval = lgb.Dataset(X_test, y_test,
                           reference=lgb_train,
                           feature_name=feature_names)

    lgbm_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'verbosity': -1,
    }
    # ここでは MLflow Tracking がパッチした train() 関数が呼ばれる
    booster = lgb.train(lgbm_params,
                        lgb_train,
                        valid_sets=lgb_eval,
                        num_boost_round=1000,
                        early_stopping_rounds=100,
                        verbose_eval=10,
                        )

    # 学習済みモデルを使って検証データを予測する
    y_pred_proba = booster.predict(X_test,
                                   num_iteration=booster.best_iteration)

    # 検証データのスコアを確認する
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    test_score = accuracy_score(y_test, y_pred)
    print(test_score)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python bclgb.py

...

[180]  valid_0's binary_logloss: 0.14104
[190] valid_0's binary_logloss: 0.143199
Early stopping, best iteration is:
[97]   valid_0's binary_logloss: 0.10515
0.958041958041958

すると、train() 関数を呼ぶときに使われたパラメータやメトリックが記録される。

$ find mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb 
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/metrics
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/metrics/valid_0-binary_logloss
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/metrics/stopped_iteration
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/metrics/best_iteration
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/feature_importance_gain.json
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/feature_importance_gain.png
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/feature_importance_split.json
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/model
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/model/MLmodel
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/model/conda.yaml
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/model/model.lgb
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/artifacts/feature_importance_split.png
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/tags
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/tags/mlflow.user
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/tags/mlflow.source.name
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/tags/mlflow.log-model.history
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/tags/mlflow.source.type
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/categorical_feature
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/feature_name
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/keep_training_booster
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/num_boost_round
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/early_stopping_rounds
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/objective
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/verbosity
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/verbose_eval
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/params/metric
mlruns/0/aae1e01ffa04406db4eaa6031d1c1ffb/meta.yaml

ただし、当たり前だけど自分で計算した内容については自分で記録しなければ残らない。 今回であれば自分でホールドアウト検証で計算している test_score は記録されていないことがわかる。

自分で計算したメトリックも記録するように修正してみよう。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import mlflow
from mlflow import lightgbm as mlflow_lgb
import numpy as np
import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


def main():
    # LightGBM の学習を自動でトラッキングする
    mlflow_lgb.autolog()

    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    # 訓練データと検証データに分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        random_state=42)

    # データセットを生成する
    lgb_train = lgb.Dataset(X_train, y_train,
                            feature_name=feature_names)
    lgb_eval = lgb.Dataset(X_test, y_test,
                           reference=lgb_train,
                           feature_name=feature_names)

    lgbm_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'verbosity': -1,
    }

    # Run を開始する
    active_run = mlflow.start_run()

    # ここでは MLflow Tracking がパッチした train() 関数が呼ばれる
    booster = lgb.train(lgbm_params,
                        lgb_train,
                        valid_sets=lgb_eval,
                        num_boost_round=1000,
                        early_stopping_rounds=100,
                        verbose_eval=10,
                        )

    # 学習済みモデルを使って検証データを予測する
    y_pred_proba = booster.predict(X_test,
                                   num_iteration=booster.best_iteration)

    # 検証データのスコアを確認する
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    test_score = accuracy_score(y_test, y_pred)
    print(test_score)

    # 自分で計算したメトリックを記録する
    mlflow.log_metric(key='test_score', value=test_score)

    # Run を終了する
    mlflow.end_run()


if __name__ == '__main__':
    main()

上記をもう一度実行する。

$ python bclgb.py

すると、今度は test_score も記録されていることがわかる。

$ cat mlruns/0//fd0e67c47819488089431813e2028986/metrics/test_score
1591274826005 0.958041958041958 0

自作の autolog() 相当を作ってみる

ところで、先ほどの autolog() 関数がどのように実現されているのか気にならないだろうか。 これは、モンキーパッチを使うことで対象モジュールのコードを動的に書きかえている。 先ほどの LightGBM であれば lightgbm.train() が MLflow Tracking を使うものに書きかえられた。

そこで、試しに自分でも autolog() 相当のものを書いてみることにした。 以下は scikit-learn の RandomForestClassifier#fit() を書きかえたもの。 MLflow Tracking のモンキーパッチには gorilla というフレームワークが使われている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import json
import tempfile
import pathlib

import numpy as np
import gorilla
import mlflow
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


def autolog_sklearn_random_forest():
    """scikit-learn の RandomForestClassifier#fit() をモンキーパッチする

    モンキーパッチ後は、モデルの学習に関する情報が自動で MLflow Tracking に残る"""

    @gorilla.patch(RandomForestClassifier)
    def fit(self, *args, **kwargs):
        # 実行中の Run が存在するか確認する
        if not mlflow.active_run():
            # 無ければ新しく Run を作る
            mlflow.start_run()
            # 関数内で end_run() を呼ぶ必要があるか
            auto_end_run = True
        else:
            auto_end_run = False

        # 学習に使われたパラメータを記録する
        attr_names = ['n_estimators',
                      'max_depth',
                      'min_samples_split',
                      'min_samples_leaf',
                      'random_state',
                      ]
        for attr_name in attr_names:
            # インスタンスからアトリビュートの値を取り出して記録する
            attr_value = getattr(self, attr_name)
            mlflow.log_param(key=f'sklearn.ensemble.RandomForestClassifier.{attr_name}',
                             value=attr_value)

        # パッチ前のオブジェクトを取得する
        original = gorilla.get_original_attribute(RandomForestClassifier, 'fit')

        # パッチ前のオブジェクトの呼び出し
        result = original(self, *args, **kwargs)

        # メトリックを記録する (ここでは特に何もない)
        # NOTE: validation set の損失などを取得する手段があれば残す

        # アーティファクトを記録する
        # Gini Importance を記録する (XXX: feature names を渡す良い方法が思いつかない...)
        tmpdir = tempfile.mkdtemp()
        filename = 'sklearn.ensemble.RandomForestClassifier.feature_importances_.json'
        artifact_path = pathlib.Path(tmpdir) / filename
        with open(artifact_path, 'w') as fp:
            json.dump(list(self.feature_importances_), fp, indent=2)
        mlflow.log_artifact(artifact_path)

        # 関数内で Run を作っていたら終了する
        if not auto_end_run:
            mlflow.end_run()

        # 結果を返す
        return result

    # 既にパッチされているときは上書きしない
    settings = gorilla.Settings(allow_hit=True, store_hit=True)
    # RandomForestClassifier#fit() をモンキーパッチする
    monkey_patch = gorilla.Patch(RandomForestClassifier, 'fit', fit, settings=settings)
    gorilla.apply(monkey_patch)


def main():
    # RandomForestClassifier をパッチする
    autolog_sklearn_random_forest()

    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    # 訓練データと検証データに分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        random_state=42)

    # RandomForest 分類器を用意する
    clf = RandomForestClassifier(n_estimators=100,
                                 random_state=42,
                                 verbose=1)
    # ここで呼び出されるのはパッチされたオブジェクトになる
    clf.fit(X_train, y_train)

    # 学習済みモデルを使って検証データを予測する
    y_pred_proba = clf.predict(X_test)

    # 検証データのスコアを確認する
    y_pred = np.where(y_pred_proba > 0.5, 1, 0)
    test_score = accuracy_score(y_test, y_pred)
    print(test_score)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ myautolog.py

すると、次のようにパラメータなどが記録される。

$ find mlruns/0/6c2abd9a92344534a45965005f7dfcc6  
mlruns/0/6c2abd9a92344534a45965005f7dfcc6
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/metrics
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/artifacts
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/artifacts/sklearn.ensemble.RandomForestClassifier.feature_importances_.json
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/tags
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/tags/mlflow.user
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/tags/mlflow.source.name
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/tags/mlflow.source.type
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.max_depth
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.random_state
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.n_estimators
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.min_samples_split
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.min_samples_leaf
mlruns/0/6c2abd9a92344534a45965005f7dfcc6/meta.yaml
$ cat mlruns/0/6c2abd9a92344534a45965005f7dfcc6/params/sklearn.ensemble.RandomForestClassifier.n_estimators 
100

モンキーパッチを使うと、共通で何度も使われるようなコードをトラッキングする手間がだいぶ減りそうだ。 ただ、対象のモジュールのインターフェースに追従しなければならない副作用もあるため、使い所は吟味しなければいけないだろう。

まとめ

今回は MLflow の Tracking というコンポーネントを試してみた。

総じて、使い勝手や設計などに筋の良さを感じた。 一人でもチームでも同じコードが使える点や、インテグレーション以外で他に何らかの機械学習フレームワークへの依存がないのはとても良い。 一方で、多少は仕方ないにせよ自分のコードに組み込むときに、モンキーパッチを書かない限りは似たようなコードを何度も書くハメになってつらそう。 あと、チームで使うときにサーバの運用とか Artifact の置き場所どうしよってところは悩みそうだと思った。

そんな感じで。

Python: Optuna の LightGBMTunerCV から学習済みモデルを取り出す

Optuna v1.5.0 では、LightGBM インテグレーションの一環として LightGBMTunerCV という API が追加された。 これは LightGBM の cv() 関数を Step-wise algorithm で最適化するラッパーになっている。 つまり、重要ないくつかのパラメータを Step-wise で調整することで、最も高い交差検証スコアが得られるパラメータを探索できる。 今回は、追加された LightGBMTunerCV の使い方を紹介すると共に学習済みモデルを取り出す方法について書いてみる。 ただし、今のところ Experimental な機能という位置づけなので、今後インターフェースなどが変わる可能性もある。

尚、LightGBM の素の cv() 関数から学習済みモデルを取り出す方法は次のエントリに書いた。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033
$ python -V
Python 3.7.7
$ pip list | egrep -i "(optuna|lightgbm)"
lightgbm        2.3.1
optuna          1.5.0

もくじ

下準備

あらかじめ、使用するパッケージをインストールしておく。

$ brew install libomp
$ pip install lightgbm optuna scikit-learn

乳がんデータセットを交差検証するサンプルコード

早速だけど、以下に LightGBMTunerCV の基本的な使い方を紹介するサンプルコードを示す。 これまでと同様、optuna.integration.lightgbm パッケージをインポートすることで Optuna 経由で LightGBM を使うことになる。 LightGBMTunerCV はクラスとして実装されていて、インスタンス化するときに cv() 関数を実行するときに使うオプションを渡す。 そして、LightGBMTunerCV#run() メソッドを実行すると重要なパラメータが順番に探索されていって、終了するとインスタンスから最も優れたスコアやパラメータが得られる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pprint import pprint

from optuna.integration import lightgbm as lgb
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold


def main():
    # データセットを読み込む
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(X, y)

    # データセットの分割方法
    folds = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

    # 最適化するときの条件
    lgbm_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'verbosity': -1,
    }
    # 基本的には cv() 関数のオプションがそのまま渡せる
    tuner_cv = lgb.LightGBMTunerCV(
        lgbm_params, lgb_train,
        num_boost_round=1000,
        early_stopping_rounds=100,
        verbose_eval=20,
        folds=folds,
    )

    # 最適なパラメータを探索する
    tuner_cv.run()

    # 最も良かったスコアとパラメータを書き出す
    print(f'Best score: {tuner_cv.best_score}')
    print('Best params:')
    pprint(tuner_cv.best_params)


if __name__ == '__main__':
    main()

それでは、上記をファイルに保存して実行してみよう。

$ python tunercv.py
/Users/amedama/.virtualenvs/py37/lib/python3.7/site-packages/optuna/_experimental.py:83: ExperimentalWarning: LightGBMTunerCV is experimental (supported from v1.5.0). The interface can change in the future.
  ExperimentalWarning,
feature_fraction, val_score: inf:   0%|                   | 0/7 [00:00<?, ?it/s][20] cv_agg's binary_logloss: 0.194777 + 0.0385719
[40]  cv_agg's binary_logloss: 0.141999 + 0.056194
[60]   cv_agg's binary_logloss: 0.140204 + 0.0759544

...

min_data_in_leaf, val_score: 0.094602: 100%|######| 5/5 [00:01<00:00,  3.55it/s]
Best score: 0.09460200054481203
Best params:
{'bagging_fraction': 0.5636995994183087,
 'bagging_freq': 4,
 'feature_fraction': 0.44800000000000006,
 'lambda_l1': 0.015234706690217153,
 'lambda_l2': 2.2227989818062668e-07,
 'metric': 'binary_logloss',
 'min_child_samples': 20,
 'num_leaves': 3,
 'objective': 'binary',
 'verbosity': -1}

探索した中で、最も優れた交差検証のスコアを出したパラメータが確認できた。

LightGBMTunerCV から学習済みモデルを取り出す

基本的な使い方が分かったところで今回の本題に入る。 別に LightGBM に限った話ではないけど、交差検証するときに学習させたモデル群を使って Averaging などをするのは一般的な手法となっている。 とはいえ、交差検証をして得られたパラメータを使って、もう一度同じデータを学習させるのは効率が悪い。 そこで、LightGBMTunerCV を使ってパラメータを探索すると同時に、最適なパラメータで学習したモデルを取り出してみた。

以下がそのサンプルコードになる。 学習済みモデルの取り出しは TunerCVCheckpointCallback というコールバックで実装している。 探索の対象となったパラメータごとにモデルへの参照をインスタンスに保持しておいて、後から取り出せるように作った。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
from optuna.integration import lightgbm as lgb
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold, train_test_split


class TunerCVCheckpointCallback(object):
    """Optuna の LightGBMTunerCV から学習済みモデルを取り出すためのコールバック"""

    def __init__(self):
        # オンメモリでモデルを記録しておく辞書
        self.cv_boosters = {}

    @staticmethod
    def params_to_hash(params):
        """パラメータを元に辞書のキーとなるハッシュを計算する"""
        params_hash = hash(frozenset(params.items()))
        return params_hash

    def get_trained_model(self, params):
        """パラメータをキーとして学習済みモデルを取り出す"""
        params_hash = self.params_to_hash(params)
        return self.cv_boosters[params_hash]

    def __call__(self, env):
        """LightGBM の各ラウンドで呼ばれるコールバック"""
        # 学習に使うパラメータをハッシュ値に変換する
        params_hash = self.params_to_hash(env.params)
        # 初登場のパラメータならモデルへの参照を保持する
        if params_hash not in self.cv_boosters:
            self.cv_boosters[params_hash] = env.model


def main():
    dataset = datasets.load_breast_cancer()
    X, y = dataset.data, dataset.target

    # デモ用にデータセットを分割する
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        shuffle=True,
                                                        random_state=42)

    lgb_train = lgb.Dataset(X_train, y_train)

    folds = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

    # 学習済みモデルへの参照を保持するためのコールバック
    checkpoint_cb = TunerCVCheckpointCallback()
    callbacks = [
        checkpoint_cb,
    ]

    lgbm_params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'verbosity': -1,
    }
    tuner_cv = lgb.LightGBMTunerCV(
        lgbm_params, lgb_train,
        num_boost_round=1000,
        early_stopping_rounds=100,
        verbose_eval=20,
        folds=folds,
        callbacks=callbacks,
    )

    tuner_cv.run()

    # NOTE: 念のためハッシュの衝突に備えて Trial の数と学習済みモデルの数を比較しておく
    assert len(checkpoint_cb.cv_boosters) == len(tuner_cv.study.trials) - 1

    # 最も良かったパラメータをキーにして学習済みモデルを取り出す
    cv_booster = checkpoint_cb.get_trained_model(tuner_cv.best_params)
    # Averaging でホールドアウト検証データを予測する
    y_pred_proba_list = cv_booster.predict(X_test,
                                           num_iteration=cv_booster.best_iteration)
    y_pred_proba_avg = np.array(y_pred_proba_list).mean(axis=0)
    y_pred = np.where(y_pred_proba_avg > 0.5, 1, 0)
    accuracy = accuracy_score(y_test, y_pred)
    print('Averaging accuracy:', accuracy)


if __name__ == '__main__':
    main()

ちなみに、学習に使ったパラメータをハッシュにしてモデルへの参照のキーにしている。 そのため、非常に低い確率だけどハッシュが衝突する可能性を考えて assert 文を入れた。

上記をファイルに保存して実行してみよう。 動作確認のために、ホールドアウトしておいたデータを、取り出したモデル (実体は _CVBooster) を使って推論させている。

$ python tunercv2.py 
/Users/amedama/.virtualenvs/py37/lib/python3.7/site-packages/optuna/_experimental.py:83: ExperimentalWarning: LightGBMTunerCV is experimental (supported from v1.5.0). The interface can change in the future.
  ExperimentalWarning,
feature_fraction, val_score: inf:   0%|                   | 0/7 [00:00<?, ?it/s][20] cv_agg's binary_logloss: 0.191389 + 0.0261761
[40]  cv_agg's binary_logloss: 0.133308 + 0.0423785
[60]   cv_agg's binary_logloss: 0.119175 + 0.0400074

...

min_data_in_leaf, val_score: 0.092142: 100%|######| 5/5 [00:01<00:00,  2.51it/s]
Averaging accuracy: 0.972027972027972

交差検証で最も良いスコアを出したモデルたちの推論結果を Averaging した結果として 0.972 という Accuracy が得られた。

いじょう。

kind (Kubernetes IN Docker) を使ってみる

今回は Kubernetes の開発で使われている公式ツールの kind を使ってみる。 このツールを使うと Docker のコンテナを使って Kubernetes のクラスタが素早く簡単に構築できる。 OpenStack でいうところの DevStack に相当するものかな。

使った環境は次のとおり。

$ sw_vers     
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G4032
$ $ docker version
Client: Docker Engine - Community
 Version:           19.03.8
 API version:       1.40
 Go version:        go1.12.17
 Git commit:        afacb8b
 Built:             Wed Mar 11 01:21:11 2020
 OS/Arch:           darwin/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.8
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.17
  Git commit:       afacb8b
  Built:            Wed Mar 11 01:29:16 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683
$ kind version
kind v0.8.1 go1.14.2 darwin/amd64
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"16+", GitVersion:"v1.16.6-beta.0", GitCommit:"e7f962ba86f4ce7033828210ca3556393c377bcc", GitTreeState:"clean", BuildDate:"2020-01-15T08:26:26Z", GoVersion:"go1.13.5", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"18", GitVersion:"v1.18.2", GitCommit:"52c56ce7a8272c798dbc29846288d7cd9fbae032", GitTreeState:"clean", BuildDate:"2020-04-30T20:19:45Z", GoVersion:"go1.13.9", Compiler:"gc", Platform:"linux/amd64"}

もくじ

下準備

はじめに Docker をインストールしておく。

$ brew cask install docker

docker version コマンドが正常に使えることを確認する。

$ docker version
Client: Docker Engine - Community
 Version:           19.03.8
 API version:       1.40
 Go version:        go1.12.17
 Git commit:        afacb8b
 Built:             Wed Mar 11 01:21:11 2020
 OS/Arch:           darwin/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.8
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.17
  Git commit:       afacb8b
  Built:            Wed Mar 11 01:29:16 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

続いて kind をインストールする。 macOS なら Homebrew で入る。

$ brew install kind

これで kind コマンドが使えるようになる。

$ kind version
kind v0.8.1 go1.14.2 darwin/amd64

Kubernetes クラスタを作る

kind を使って Kubernetes のクラスタを作るには kind create cluster コマンドを使う。 最初に実行したときは Kubernetes のノード用のイメージファイルをダウンロードするので時間がかかる。

$ kind create cluster
Creating cluster "kind" ...
 ✓ Ensuring node image (kindest/node:v1.18.2) 🖼 
 ✓ Preparing nodes 📦  
 ✓ Writing configuration 📜 
 ✓ Starting control-plane 🕹️ 
 ✓ Installing CNI 🔌 
 ✓ Installing StorageClass 💾 
Set kubectl context to "kind-kind"
You can now use your cluster with:

kubectl cluster-info --context kind-kind

Thanks for using kind! 😊

コマンドの実行が完了すると、デフォルトの名前 (kind) でクラスタができる。

$ kind get clusters
kind

kubectl コマンドの設定ファイルも自動で作られるので、すぐ使える状態になってる。

$ kubectl config get-contexts
CURRENT   NAME          CLUSTER       AUTHINFO      NAMESPACE
*         kind-kind     kind-kind     kind-kind     
$ kubectl config get-contexts
CURRENT   NAME        CLUSTER     AUTHINFO    NAMESPACE
*         kind-kind   kind-kind   kind-kind
$ kubectl config view                 
apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: DATA+OMITTED
    server: https://127.0.0.1:58588
  name: kind-kind
contexts:
- context:
    cluster: kind-kind
    user: kind-kind
  name: kind-kind
current-context: kind-kind
kind: Config
preferences: {}
users:
- name: kind-kind
  user:
    client-certificate-data: REDACTED
    client-key-data: REDACTED

デフォルトではシングルノード構成でクラスタができる。

$ kubectl get nodes
NAME                 STATUS   ROLES    AGE   VERSION
kind-control-plane   Ready    master   69s   v1.18.2

docker container list コマンドを使って稼働しているコンテナを見ると Kubernetes のコントロールプレーンのコンテナが見える。

$ docker container list
CONTAINER ID        IMAGE                  COMMAND                  CREATED             STATUS              PORTS                       NAMES
bddde119ccb7        kindest/node:v1.18.2   "/usr/local/bin/entr…"   8 minutes ago       Up 8 minutes        127.0.0.1:58588->6443/tcp   kind-control-plane

コンテナの中身を覗くと、必要なサービスが色々と立ち上がっているようだ。

$ docker exec -it bddde119ccb7 ss -tlnp
State      Recv-Q     Send-Q         Local Address:Port            Peer Address:Port                                                   
LISTEN     0          128               127.0.0.11:33945                0.0.0.0:*                                                      
LISTEN     0          128                127.0.0.1:37275                0.0.0.0:*         users:(("containerd",pid=130,fd=6))          
LISTEN     0          128                127.0.0.1:10248                0.0.0.0:*         users:(("kubelet",pid=743,fd=22))            
LISTEN     0          128                127.0.0.1:10249                0.0.0.0:*         users:(("kube-proxy",pid=992,fd=17))         
LISTEN     0          128               172.18.0.2:2379                 0.0.0.0:*         users:(("etcd",pid=691,fd=6))                
LISTEN     0          128                127.0.0.1:2379                 0.0.0.0:*         users:(("etcd",pid=691,fd=5))                
LISTEN     0          128               172.18.0.2:2380                 0.0.0.0:*         users:(("etcd",pid=691,fd=3))                
LISTEN     0          128                127.0.0.1:2381                 0.0.0.0:*         users:(("etcd",pid=691,fd=10))               
LISTEN     0          128                127.0.0.1:10257                0.0.0.0:*         users:(("kube-controller",pid=553,fd=6))     
LISTEN     0          128                127.0.0.1:10259                0.0.0.0:*         users:(("kube-scheduler",pid=598,fd=6))      
LISTEN     0          128                        *:10250                      *:*         users:(("kubelet",pid=743,fd=21))            
LISTEN     0          128                        *:10251                      *:*         users:(("kube-scheduler",pid=598,fd=5))      
LISTEN     0          128                        *:6443                       *:*         users:(("kube-apiserver",pid=644,fd=5))      
LISTEN     0          128                        *:10252                      *:*         users:(("kube-controller",pid=553,fd=5))     
LISTEN     0          128                        *:10256                      *:*         users:(("kube-proxy",pid=992,fd=15))      

ちなみに、推奨される環境としては Docker のランタイムに 6 ~ 8GB 以上のメモリを割り当てることが望ましいらしい。

以下のコマンドで、今のランタイムがどれくらい使えるか確認できる。

$ docker stats --no-stream

複数のクラスタを作る・壊す

特定の名前でクラスタを作りたいときは --name オプションを指定する。

$ kind create cluster --name kind-2

これで、2 つのクラスタができた。

$ kind get clusters
kind
kind-2

kubectl にも複数のクラスタが登録されている。

$ kubectl config get-contexts
CURRENT   NAME          CLUSTER       AUTHINFO      NAMESPACE
          kind-kind     kind-kind     kind-kind     
*         kind-kind-2   kind-kind-2   kind-kind-2   

クラスタを消すときは kind delete cluster コマンドを使う。

$ kind delete cluster --name kind-2

消したら、コンテキストを元のクラスタに切り替えておく。

$ kubectl config use-context kind-kind
Switched to context "kind-kind".
$ kubectl config current-context      
kind-kind

自前のコンテナイメージを使って Pod を立ち上げてみる

次は、試しに自前のコンテナイメージを使って Pod を立ち上げてみよう。

とりあえず、適当に WSGI でホスト名を返すアプリケーションを用意する。

$ cat << 'EOF' > server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from wsgiref.simple_server import make_server


def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    uname = os.uname()
    nodename = uname.nodename
    msg = f'Hello, World! by {nodename=}\n'
    return [msg.encode('ascii')]


def main():
    with make_server('', 8080, application) as httpd:
        print("Serving on port 8080...")
        httpd.serve_forever()

if __name__ == '__main__':
    main()
EOF

上記のアプリケーションを組み込んだ Dockerfile を用意する。

$ cat << 'EOF' > Dockerfile
FROM python:3.8

EXPOSE 8080

COPY server.py .

CMD python3 server.py
EOF

Docker イメージをビルドする。

$ docker build -t example/helloworld:0.1 .

手元のイメージは kind load docker-image コマンドを使ってクラスタに登録できる。

$ kind load docker-image example/helloworld:0.1

次のように自前のイメージを使って Pod を立ち上げるマニフェストファイルを用意する。 ポイントは imagePullPolicy: Never で、これで必ずローカルのイメージが使われるようになる。

$ cat << 'EOF' > example-pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: hello-pod
spec:
  containers:
  - name: hello-server
    image: example/helloworld:0.1
    imagePullPolicy: Never
EOF

マニフェストファイルを適用する。

$ kubectl apply -f example-pod.yaml

すると、次のように Pod が立ち上がる。

$ kubectl get pods
NAME        READY   STATUS    RESTARTS   AGE
hello-pod   1/1     Running   0          7s

ポートフォワーディングで Pod の TCP:8080 ポートを引き出してみよう。

$ kubectl port-forward hello-pod 8080:8080 
Forwarding from 127.0.0.1:8080 -> 8080
Forwarding from [::1]:8080 -> 8080

別のターミナルからリクエストを送ると、ちゃんと HTTP のレスポンスが得られる。

$ curl http://localhost:8080
Hello, World! by nodename='hello-pod'

kubectl exec コマンドで Pod に入っても、ちゃんとプロセスが稼働していることが確認できる。

$ kubectl exec -it hello-pod /bin/bash
root@hello-pod:/# uname -a
Linux hello-pod 4.19.76-linuxkit #1 SMP Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux
root@hello-pod:/# ss -tlnp
State     Recv-Q    Send-Q       Local Address:Port        Peer Address:Port    
LISTEN    0         5                  0.0.0.0:8080             0.0.0.0:*        users:(("python3",pid=8,fd=3))

マルチノードクラスタを作ってみる

kind の良いところは手軽にマルチノードクラスタも作れるところ。

一旦、シングルノードのクラスタは削除しておく。

$ kind delete cluster

そして、以下のような kind 用の設定ファイルを用意する。 これでコントロールプレーンとワーカーノード x2 のクラスタが作れる。

$ cat << 'EOF' >> multi-node-cluster.yaml
kind: Cluster
apiVersion: kind.sigs.k8s.io/v1alpha3
nodes:
- role: control-plane
- role: worker
- role: worker
EOF

設定ファイルを使ってクラスタを作る。

$ kind create cluster --name kind-multi --config multi-node-cluster.yaml

すると、次のように 3 つのノードから成るクラスタができる。

$ kubectl get nodes
NAME                       STATUS   ROLES    AGE   VERSION
kind-multi-control-plane   Ready    master   89s   v1.18.2
kind-multi-worker          Ready    <none>   54s   v1.18.2
kind-multi-worker2         Ready    <none>   56s   v1.18.2

確認すると、Docker コンテナも 3 つ稼働している。

$ docker container list
CONTAINER ID        IMAGE                  COMMAND                  CREATED              STATUS              PORTS                       NAMES
2fb405206d26        kindest/node:v1.18.2   "/usr/local/bin/entr…"   About a minute ago   Up About a minute                               kind-multi-worker
b657e6562c59        kindest/node:v1.18.2   "/usr/local/bin/entr…"   About a minute ago   Up About a minute                               kind-multi-worker2
9792c6353e76        kindest/node:v1.18.2   "/usr/local/bin/entr…"   About a minute ago   Up About a minute   127.0.0.1:61270->6443/tcp   kind-multi-control-plane

自前のイメージで Deployment を立ち上げてみる

マルチノードのクラスタを使って遊んでいこう。 試しに、複数の Pod を持った Deployment を作ってみる。

まずは先ほどビルドした Docker イメージをマルチノードのクラスタに登録する。

$ kind load docker-image example/helloworld:0.1 \
    --name kind-multi

そして、次のように Deployment 用のマニフェストファイルを用意する。 これで、Deployment x1 / ReplicaSet x1 / Pod x2 ができる。

$ cat << 'EOF' > example-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: example-deploy
spec:
  replicas: 2
  selector:
    matchLabels:
      app: hello-pod
  template:
    metadata:
      labels:
        app: hello-pod
    spec:
      containers:
      - name: hello-server
        image: example/helloworld:0.1
        imagePullPolicy: Never
        ports:
        - containerPort: 8080
EOF

マニフェストファイルを適用する。

$ kubectl apply -f example-deploy.yaml

すると、次のようにそれぞれのオブジェクトができた。 Pod を見ると、ちゃんとそれぞれのワーカーで動作している様子が確認できる。

$ kubectl get deployments                       
NAME             READY   UP-TO-DATE   AVAILABLE   AGE
example-deploy   2/2     2            2           14s
$ kubectl get replicaset
NAME                        DESIRED   CURRENT   READY   AGE
example-deploy-7d746dc7f9   2         2         2       61s
$ kubectl get pods -o wide
NAME                              READY   STATUS    RESTARTS   AGE   IP           NODE                 NOMINATED NODE   READINESS GATES
example-deploy-7d746dc7f9-2qg9t   1/1     Running   0          91s   10.244.2.2   kind-multi-worker2   <none>           <none>
example-deploy-7d746dc7f9-7csnf   1/1     Running   0          91s   10.244.1.2   kind-multi-worker    <none>           <none>

Pod に HTTP でアクセスする

続いては上記の Pod に HTTP でアクセスしてみる。 kubectl expose deployment コマンドで Service オブジェクトを作る。

$ kubectl expose deployment example-deploy
service/example-deploy exposed

これでアクセスするための IP アドレス (CLUSTER-IP) が割り当てられた。

$ kubectl get services
NAME             TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
example-deploy   ClusterIP   10.98.227.119   <none>        8080/TCP   6s
kubernetes       ClusterIP   10.96.0.1       <none>        443/TCP    21m

この IP アドレスはクラスタ内部向けなので、コンテナ経由でアクセスしてみよう。

$ docker exec -it kind-multi-control-plane curl http://10.98.227.119:8080
Hello, World! by nodename='example-deploy-7d746dc7f9-dkcwh'
$ docker exec -it kind-multi-control-plane curl http://10.98.227.119:8080
Hello, World! by nodename='example-deploy-7d746dc7f9-7csnf'

ちゃんと、それぞれの Pod がリクエストを捌いていることが確認できた。

ちなみにデフォルトでは --type=LoadBalancer な Service は作れないようだ。

$ kubectl delete service example-deploy
$ kubectl expose deployment example-deploy \
    --type=LoadBalancer
$ kubectl get services
NAME             TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
example-deploy   LoadBalancer   10.111.111.250   <pending>     8080:32633/TCP   9s
kubernetes       ClusterIP      10.96.0.1        <none>        443/TCP          42m

オートヒーリングを確認する

オートヒーリングの様子を観察するために Pod を壊してみよう。

今ある Pod を片方削除してみる。

$ kubectl delete pod example-deploy-7d746dc7f9-2qg9t
pod "example-deploy-7d746dc7f9-2qg9t" deleted

ちょっと待つと消した分の Pod が別の識別子で増えている。

$ kubectl get pods -o wide                          
NAME                              READY   STATUS    RESTARTS   AGE     IP           NODE                NOMINATED NODE   READINESS GATES
example-deploy-7d746dc7f9-7csnf   1/1     Running   0          4m33s   10.244.1.2   kind-multi-worker   <none>           <none>
example-deploy-7d746dc7f9-dkcwh   1/1     Running   0          109s    10.244.1.3   kind-multi-worker   <none>           <none>

ログを確認すると Pod が作り直されていることがわかる。

$ kubectl describe replicaset example-deploy | tail -n 7
Events:
  Type    Reason            Age    From                   Message
  ----    ------            ----   ----                   -------
  Normal  SuccessfulCreate  9m10s  replicaset-controller  Created pod: example-deploy-7d746dc7f9-2qg9t
  Normal  SuccessfulCreate  9m10s  replicaset-controller  Created pod: example-deploy-7d746dc7f9-7csnf
  Normal  SuccessfulCreate  6m26s  replicaset-controller  Created pod: example-deploy-7d746dc7f9-dkcwh

そんな感じで。

Python: Keras でカスタムメトリックを扱う

今回は Keras に組み込みで用意されていない独自の評価指標 (カスタムメトリック) を扱う方法について書いてみる。

なお、Keras でカスタムメトリックを定義する方法については、以下の公式ドキュメントに記載がある。

keras.io

使った環境は次のとおり。 Keras にはスタンドアロン版ではなく TensorFlow 組み込みのもの (tf.keras) を使った。

$ sw_vers  
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G4032
$ python -V                                                       
Python 3.7.7
$ python -c "import tensorflow as tf; print(tf.__version__)"
2.2.0
$ python -c "import tensorflow as tf; print(tf.keras.__version__)"
2.3.0-tf

下準備

まずは TensorFlow をインストールしておく

$ pip install tensorflow

注意点について

Keras のメトリックを定義したオブジェクトには、正解と予測した内容が TensorFlow の Tensor オブジェクトとして渡される。 つまり、一般的な機械学習でおなじみの NumPy 配列のようには扱えない点に注意が必要となる。 そのような事情があるので、カスタムメトリックを計算するときは、はじめに REPL を使ってインタラクティブに動作を確認した方がわかりやすい。

ここでは、そのやり方について書いてみる。 はじめに、Python のインタプリタ (REPL) を起動しよう。

$ python

そして、TensorFlow のパッケージをインポートする。

>>> import tensorflow as tf
>>> from tensorflow.keras import backend as K

正解ラベルと、モデルが出力する予測を模したオブジェクトを次のように用意する。 今回の例は、多値分類問題のラベルを模している。

>>> y_true = tf.constant([[0., 0., 1.], [0., 1., 0.]])
>>> y_pred = tf.constant([[0., 0., 1.], [1., 0., 0.]])

このようにすると、TensorFlow 2 では NumPy の配列として中身が確認できる。

>>> y_true
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [0., 1., 0.]], dtype=float32)>
>>> y_pred
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [1., 0., 0.]], dtype=float32)>

このオブジェクトを使って計算方法の動作確認をしていく。

Recall を計算してみる

試しに Recall を計算してみよう。

正解と予測の Tensor で積を取って、両者が一致している部分だけ残す。

>>> y_true * y_pred
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [0., 0., 0.]], dtype=float32)>

残っている部分を足し合わせれば True Positive の要素数になる。

>>> true_positives = K.sum(y_true * y_pred)
>>> true_positives
<tf.Tensor: shape=(), dtype=float32, numpy=1.0>

正解ラベル部分を足し合わせれば、すべての Positive な要素数が得られる。

>>> total_positives = K.sum(y_true)
>>> total_positives
<tf.Tensor: shape=(), dtype=float32, numpy=2.0>

あとは、 割ってやれば Recall のスコアが得られるという寸法。

>>> recall = true_positives / total_positives
>>> recall
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

Accuracy を計算してみる

もうひとつの例として Accuracy も計算してみる。

はじめに、正解ラベルと予測ラベルのインデックスを取り出す。

>>> y_true_argmax = K.argmax(y_true)
>>> y_pred_argmax = K.argmax(y_pred)
>>> y_true_argmax
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([2, 1])>
>>> y_pred_argmax
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([2, 0])>

両者が一致しているものを調べる。

>>> y_matched = K.equal(y_true_argmax, y_pred_argmax)
>>> y_matched
<tf.Tensor: shape=(2,), dtype=bool, numpy=array([ True, False])>

あとは平均を計算すれば Accuracy になる。

>>> accuracy = K.mean(y_matched)
>>> accuracy
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

もちろん、ここで示したのはあくまで一例で、色々な計算方法が考えられる。

カスタムメトリックを定義して組み込む (Stateless)

それでは、実際に前述した処理を使ってカスタムメトリックを定義してみよう。 とはいえ、やることは正解と予測の Tensor を受け取ってメトリックをまた Tensor として返す関数を用意するだけ。 あとは、それを tf.keras.models.Model#compile() メソッドで metrics 引数に渡してやれば良い。

以下のサンプルコードでは MNIST データセットを MLP で予測するときに、カスタムメトリックとして Recall と Accuracy を定義して使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping


def custom_recall(y_true, y_pred):
    """正解と予測の Tensor から Recall を計算する関数"""
    true_positives = K.sum(y_true * y_pred)
    total_positives = K.sum(y_true)
    return true_positives / (total_positives + K.epsilon())  # ゼロ除算対策


def custom_accuracy(y_true, y_pred):
    """正解と予測の Tensor から Accuracy を計算する関数"""
    y_true_argmax = K.argmax(y_true)
    y_pred_argmax = K.argmax(y_pred)
    y_matched = K.equal(y_true_argmax, y_pred_argmax)
    return K.mean(y_matched)


def main():
    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # one-hot encode
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)

    # flatten
    image_height, image_width = x_train.shape[1:]
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # min-max normalize
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # Multi Layer Perceptron
    inputs = Input(shape=(image_height * image_width,))
    x = Dense(64, activation='relu')(inputs)
    x = Dense(64, activation='relu')(x)
    num_of_classes = y_train.shape[1]
    outputs = Dense(num_of_classes, activation='softmax')(x)

    callbacks = [
        # 検証データに対する Recall が 10 エポック改善しないときは学習を打ち切る
        EarlyStopping(monitor='val_custom_recall',
                      patience=10,
                      verbose=1,
                      mode='max'),
    ]
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  # カスタムメトリックを計算する関数を登録する
                  metrics=['accuracy', custom_recall, custom_accuracy],
                  )

    model.fit(x_train, y_train,
              batch_size=8192,
              epochs=1000,
              verbose=1,
              # ホールドアウトデータを検証データとして用いる
              validation_data=(x_test, y_test),
              callbacks=callbacks)


if __name__ == '__main__':
    main()

上記を保存して実行してみよう。 ちゃんと、custom_ から始まる名前でカスタムメトリックが学習過程のログに登場していることがわかる。

$ python stateless.py
...
Epoch 1/1000
8/8 [==============================] - 0s 46ms/step - loss: 2.1424 - accuracy: 0.3151 - custom_recall: 0.1247 - custom_accuracy: 0.3321 - val_loss: 1.8506 - val_accuracy: 0.5677 - val_custom_recall: 0.1703 - val_custom_accuracy: 0.5829
Epoch 2/1000
8/8 [==============================] - 0s 25ms/step - loss: 1.6449 - accuracy: 0.6475 - custom_recall: 0.2220 - custom_accuracy: 0.6544 - val_loss: 1.3144 - val_accuracy: 0.7322 - val_custom_recall: 0.3227 - val_custom_accuracy: 0.7484
Epoch 3/1000
8/8 [==============================] - 0s 31ms/step - loss: 1.1343 - accuracy: 0.7647 - custom_recall: 0.3867 - custom_accuracy: 0.7668 - val_loss: 0.8659 - val_accuracy: 0.8126 - val_custom_recall: 0.5142 - val_custom_accuracy: 0.8298
...
Epoch 207/1000
8/8 [==============================] - 0s 28ms/step - loss: 0.0086 - accuracy: 0.9991 - custom_recall: 0.9927 - custom_accuracy: 0.9991 - val_loss: 0.1126 - val_accuracy: 0.9738 - val_custom_recall: 0.9718 - val_custom_accuracy: 0.9769
Epoch 208/1000
8/8 [==============================] - 0s 28ms/step - loss: 0.0087 - accuracy: 0.9991 - custom_recall: 0.9927 - custom_accuracy: 0.9992 - val_loss: 0.1131 - val_accuracy: 0.9736 - val_custom_recall: 0.9716 - val_custom_accuracy: 0.9766
Epoch 209/1000
8/8 [==============================] - 0s 26ms/step - loss: 0.0086 - accuracy: 0.9992 - custom_recall: 0.9929 - custom_accuracy: 0.9992 - val_loss: 0.1140 - val_accuracy: 0.9743 - val_custom_recall: 0.9717 - val_custom_accuracy: 0.9774
Epoch 00209: early stopping

ただ、上記を見ると組み込みの accuracy と、自分で定義した custom_accuracy の値が一致していない。

カスタムメトリックを定義して組み込む (Stateful)

先ほどの例で組み込みのメトリックと自前のメトリックが一致しなかった理由は、カスタムメトリックを定義する方法に Stateless と Stateful という 2 つのやり方があるため。 組み込みの accuracy は Stateful なやり方で定義されている一方で、先ほど自分で定義した custom_accuracy は Stateless だったので値がズレてしまった。 あらかじめ断っておくと、値がズレているからといって計算が間違っているわけではない。

それでは、次は Stateful なやり方でカスタムメトリックを定義する方法を試してみよう。 Stateful なやり方では、tensorflow.keras.metrics.Metric を継承して必要なメソッドを実装することでメトリックを計算する。

以下のサンプルコードでは Stateful なやり方で Recall と Accuracy を計算している。 Stateful という名のとおり、tensorflow.keras.metrics.Metric では累積的に与えられる正解と予測のラベルからメトリックを計算することになる。 具体的には、update_state() メソッドで正解と予測ラベルが与えられて、結果を result() メソッドから得る。 そして、状態をリセットしたいときには reset_states() メソッドが呼ばれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
from tensorflow.keras.metrics import Metric
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping


class RecallMetric(Metric):
    """ステートフルに Recall を計算するクラス"""

    def __init__(self, name='custom_recall', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        # 状態を貯めておく変数を用意する
        self.true_positives = tf.Variable(0.)
        self.total_positives = tf.Variable(0.)

    def update_state(self, y_true, y_pred, sample_weight=None):
        """新しく正解と予測が追加で与えられたときの処理"""
        true_positives = K.sum(y_true * y_pred)
        total_positives = K.sum(y_true)

        self.true_positives.assign_add(true_positives)
        self.total_positives.assign_add(total_positives)

    def result(self):
        """現時点の状態から計算されるメトリックを返す"""
        return self.true_positives / (self.total_positives + K.epsilon())

    def reset_states(self):
        """状態をリセットするときに呼ばれるコールバック"""
        self.true_positives.assign(0.)
        self.total_positives.assign(0.)


class AccuracyMetric(Metric):
    """ステートフルに Accuracy を計算するクラス"""

    def __init__(self, name='custom_accuracy', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)

        self.matched = tf.Variable(0.)
        self.unmatched = tf.Variable(0.)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true_argmax = K.argmax(y_true)
        y_pred_argmax = K.argmax(y_pred)

        y_matched = K.sum(K.cast(K.equal(y_true_argmax, y_pred_argmax), dtype='float32'))
        y_unmatched = K.sum(K.cast(K.not_equal(y_true_argmax, y_pred_argmax), dtype='float32'))

        self.matched.assign_add(y_matched)
        self.unmatched.assign_add(y_unmatched)

    def result(self):
        return self.matched / (self.matched + self.unmatched)

    def reset_states(self):
        self.matched.assign(0.)
        self.unmatched.assign(0.)


def main():
    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # one-hot encode
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)

    # flatten
    image_height, image_width = x_train.shape[1:]
    x_train = x_train.reshape(x_train.shape[0], image_height * image_width)
    x_test = x_test.reshape(x_test.shape[0], image_height * image_width)

    # min-max normalize
    x_train = (x_train - x_train.min()) / (x_train.max() - x_train.min())
    x_test = (x_test - x_test.min()) / (x_test.max() - x_test.min())

    # Multi Layer Perceptron
    inputs = Input(shape=(image_height * image_width,))
    x = Dense(64, activation='relu')(inputs)
    x = Dense(64, activation='relu')(x)
    num_of_classes = y_train.shape[1]
    outputs = Dense(num_of_classes, activation='softmax')(x)

    callbacks = [
        # 検証データに対する Recall が 10 エポック改善しないときは学習を打ち切る
        EarlyStopping(monitor='val_custom_recall',
                      patience=10,
                      verbose=1,
                      mode='max'),
    ]
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  # カスタムメトリックを計算するオブジェクトを登録する
                  metrics=['accuracy', RecallMetric(), AccuracyMetric()],
                  )

    model.fit(x_train, y_train,
              batch_size=8192,
              epochs=1000,
              verbose=1,
              # ホールドアウトデータを検証データとして用いる
              validation_data=(x_test, y_test),
              callbacks=callbacks)


if __name__ == '__main__':
    main()

実際に、上記を実行してみよう。

$ python stateful.py
...
Epoch 1/1000
8/8 [==============================] - 0s 46ms/step - loss: 2.2027 - accuracy: 0.2119 - custom_recall: 0.1161 - custom_accuracy: 0.2119 - val_loss: 1.9262 - val_accuracy: 0.4457 - val_custom_recall: 0.1544 - val_custom_accuracy: 0.4457
Epoch 2/1000
8/8 [==============================] - 0s 25ms/step - loss: 1.7486 - accuracy: 0.5403 - custom_recall: 0.1948 - custom_accuracy: 0.5403 - val_loss: 1.4170 - val_accuracy: 0.6929 - val_custom_recall: 0.2810 - val_custom_accuracy: 0.6929
Epoch 3/1000
8/8 [==============================] - 0s 31ms/step - loss: 1.2274 - accuracy: 0.7467 - custom_recall: 0.3455 - custom_accuracy: 0.7467 - val_loss: 0.9200 - val_accuracy: 0.8056 - val_custom_recall: 0.4659 - val_custom_accuracy: 0.8056
...
Epoch 178/1000
8/8 [==============================] - 0s 26ms/step - loss: 0.0147 - accuracy: 0.9979 - custom_recall: 0.9888 - custom_accuracy: 0.9979 - val_loss: 0.1057 - val_accuracy: 0.9717 - val_custom_recall: 0.9662 - val_custom_accuracy: 0.9717
Epoch 179/1000
8/8 [==============================] - 0s 27ms/step - loss: 0.0145 - accuracy: 0.9980 - custom_recall: 0.9889 - custom_accuracy: 0.9980 - val_loss: 0.1055 - val_accuracy: 0.9714 - val_custom_recall: 0.9664 - val_custom_accuracy: 0.9714
Epoch 180/1000
8/8 [==============================] - 0s 27ms/step - loss: 0.0143 - accuracy: 0.9980 - custom_recall: 0.9891 - custom_accuracy: 0.9980 - val_loss: 0.1063 - val_accuracy: 0.9728 - val_custom_recall: 0.9663 - val_custom_accuracy: 0.9728
Epoch 00180: early stopping

今度は組み込みで用意されている accuracy の値と、自前で定義した custom_accuracy の値が一致していることがわかる。 ちなみに、Stateless と Stateful ではメトリックを計算するデータの区間が異なるために値が微妙にズレる。

まとめ

  • Keras のカスタムメトリックには正解と予測のラベルが Tensor オブジェクトとして渡される
  • カスタムメトリックを定義するには Stateless と Stateful なやり方がある
  • Stateless では正解と予測のラベルを受け取る関数を定義する
  • Stateful では tensorflow.keras.metrics.Metric クラスを継承して正解と予測のラベルからの計算結果をためこむ

PythonとKerasによるディープラーニング

PythonとKerasによるディープラーニング

  • 作者:Francois Chollet
  • 発売日: 2018/05/28
  • メディア: 単行本(ソフトカバー)