CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: MLflow Models を使ってみる

MLflow は MLOps に関連した OSS のひとつ。 いくつかのコンポーネントに分かれていて、それぞれを必要に応じて独立して使うことができる。

その中でも、今回扱う MLflow Models は主に学習済みモデルやパイプラインの取り回しに関するコンポーネント。 MLflow Models を使うことで、たとえば学習済みモデルの Serving やシステムへの組み込みが容易になる可能性がある。

使った環境は次のとおり。

$ sw_vers                     
ProductName:    Mac OS X
ProductVersion: 10.15.6
BuildVersion:   19G73
$ python -V                                        
Python 3.8.5
$ pip list | egrep "(mlflow|lightgbm|scikit-learn)"
lightgbm                  3.0.0
mlflow                    1.11.0
scikit-learn              0.23.2

もくじ

下準備

はじめに、必要なパッケージをインストールしておく。

$ pip install mlflow lightgbm scikit-learn seaborn category_encoders

モデルを MLflow Models で永続化する

論よりコードということで、いきなりだけど以下にサンプルコードを示す。 このサンプルコードでは Boston データセットを LightGBM で学習するコードになっている。 そして、学習させたモデルを MLflow Models を使って永続化している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    # Boston データセットを読み込む
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    # 学習用データと検証用データに分割する
    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # LightGBM のデータ形式に直す
    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    # モデルを学習する
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    # モデルを MLflow Models の形式で永続化する
    mlflow_lgb.save_model(booster, path='mlflow-lgbm')
    """
    # MLflow Tracking に残すならこうする
    with mlflow.start_run():
        mlflow_lgb.log_model(booster,
                             artifact_path='mlflow-lgbm')
    """


if __name__ == '__main__':
    main()

上記のモジュールを実行してみよう。

$ python lgbmlf.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

すると、次のようにディレクトリができる。 このディレクトリと中身のファイルが、MLflow Models を使って永続化したモデルを表している。 要するに、決められたフォーマットに沿って学習済みモデルをパッケージングしている。

$ ls mlflow-lgbm 
MLmodel     conda.yaml  model.lgb

この中で特に重要なのが MLmodel という YAML フォーマットで書かれたファイル。 このファイルには、そのモデルがどのように永続化されたかといった情報が記録されている。

$ cat mlflow-lgbm/MLmodel 
flavors:
  lightgbm:
    data: model.lgb
    lgb_version: 3.0.0
  python_function:
    data: model.lgb
    env: conda.yaml
    loader_module: mlflow.lightgbm
    python_version: 3.8.5
utc_time_created: '2020-09-30 09:44:55.890106'

なお、上記のフォーマットの詳細は次のドキュメントに記載されている。

www.mlflow.org

www.mlflow.org

また、conda.yaml というファイルには Conda の仮想環境に関する情報が記録されている。 これはつまり、永続化したモデルを利用するために必要な Conda の環境を構築するたのもの。 MLflow Models では、デフォルトで Conda の仮想環境上に学習済みモデルをデプロイすることを想定している。

たとえば、中身を見ると LightGBM が依存パッケージとして追加されていることがわかる。

$ cat mlflow-lgbm/conda.yaml 
channels:
- defaults
- conda-forge
dependencies:
- python=3.8.5
- pip
- pip:
  - mlflow
  - lightgbm==3.0.0
name: mlflow-env

永続化したモデルを使って推論用の REST API を立ち上げる

ここからは MLflow Models を使うことで得られる嬉しさについて書いていく。 MLflow には、MLflow Models で永続化したモデルを扱うための機能がいくつか用意されている。

たとえば、MLflow には mlflow というコマンドラインが用意されている。 このコマンドの models serve サブコマンドを使うと、学習済みモデルを使った推論用の REST API が気軽に立てられる。

実際に使ってみよう。 コマンドを実行する際に、--model-uri オプションには、先ほど永続化したディレクトリを指定する。 また、今回は Conda を使っていないので --no-conda オプションをつけた。 これで、デフォルトでは localhost の 5000 番ポートで推論用の API が立ち上がる

$ mlflow models serve \
    --no-conda \
    --model-uri mlflow-lgbm
2020/09/30 18:49:46 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/09/30 18:49:46 INFO mlflow.pyfunc.backend: === Running command 'gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-09-30 18:49:47 +0900] [22853] [INFO] Starting gunicorn 20.0.4
[2020-09-30 18:49:47 +0900] [22853] [INFO] Listening at: http://127.0.0.1:5000 (22853)
[2020-09-30 18:49:47 +0900] [22853] [INFO] Using worker: sync
[2020-09-30 18:49:47 +0900] [22855] [INFO] Booting worker with pid: 22855

上記に推論させたいデータを HTTP で投げ込んでみよう。 たとえば curl コマンドを使って以下のようにする。

$ curl -X POST \
    -H "Content-Type:application/json" \
    --data '{"columns": ["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT"], "data": [[10.233, 0.0, 18.1, 0.0, 0.614, 6.185, 96.7, 2.1705, 24.0, 666.0, 20.2, 379.7, 18.03]]}' \
    http://localhost:5000/invocations
[15.444706764627714]

すると、推論の結果として 15.44... という結果が得られた。

永続化したモデルを使って CSV ファイルを処理する

また、同様に CSV のファイルを処理することもできる。 さっきと同じ内容を CSV ファイルに記録してみよう。

$ cat << 'EOF' > data.csv
CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
10.233,0.0,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03
EOF

今度は models predict というサブコマンドを使う。 --content-type オプションには csv を指定する。 そして、--input-path オプションに先ほど保存した CSV ファイルを指定する。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 18:51:50 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[15.444706764627714]

先ほどと同じように、推論結果として 15.44... という値が得られた。

ただ、現状のままだと上手くいかない場面もある。 たとえば、CSV のカラムを一部入れ替えてみよう。 以下では CRIM カラムと ZN カラムの順番が入れ替わっている。

$ cat << 'EOF' > data.csv
ZN,CRIM,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0.0,10.233,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03
EOF

このファイルを使ってもう一度同じことをしてみよう。 ちゃんとカラム名まで認識していれば結果は変わらないはず。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 18:52:33 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[16.96720478471085]

しかし、残念ながら結果は変わってしまった。 つまり、先ほどのサンプルコードではカラム名の情報までは永続化できていない。

Signature を追加する

カラム名まで認識してほしいときは、モデルを永続化する際に Signature という情報を追加する必要がある。

以下にサンプルコードを示す。 先ほどのサンプルコードに、Pandas の DataFrame から自動的に Signature を認識させるコードを追加している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from mlflow.models.signature import infer_signature
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    y_tr_pred = booster.predict(x_tr, num_iteration=booster.best_iteration)
    # 入力が DataFrame であれば、場合によってはカラム名とデータ型を自動で認識してくれる
    x_tr_df = pd.DataFrame(x_tr, columns=dataset.feature_names)
    signature = infer_signature(x_tr_df, y_tr_pred)
    # 渡すデータと推論の結果を Signature として付与する
    mlflow_lgb.save_model(booster, path='mlflow-lgbm-with-sig', signature=signature)


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python lgbmlfsig.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

今度は保存された MLmodel ファイルに signature という情報が付与されている。 中身を見るとカラム名とデータ型が入っている。

$ ls mlflow-lgbm-with-sig
MLmodel     conda.yaml  model.lgb
$ cat mlflow-lgbm-with-sig/MLmodel 
flavors:
  lightgbm:
    data: model.lgb
    lgb_version: 3.0.0
  python_function:
    data: model.lgb
    env: conda.yaml
    loader_module: mlflow.lightgbm
    python_version: 3.8.5
signature:
  inputs: '[{"name": "CRIM", "type": "double"}, {"name": "ZN", "type": "double"},
    {"name": "INDUS", "type": "double"}, {"name": "CHAS", "type": "double"}, {"name":
    "NOX", "type": "double"}, {"name": "RM", "type": "double"}, {"name": "AGE", "type":
    "double"}, {"name": "DIS", "type": "double"}, {"name": "RAD", "type": "double"},
    {"name": "TAX", "type": "double"}, {"name": "PTRATIO", "type": "double"}, {"name":
    "B", "type": "double"}, {"name": "LSTAT", "type": "double"}]'
  outputs: '[{"type": "double"}]'
utc_time_created: '2020-09-30 09:58:18.952375'

それでは、Signature を追加したモデルで推論させてみよう。 CSV ファイルは先ほどと同じものを使う。 つまり、モデルの学習時と推論時でカラムの順番が入れかわっている。

$ cat data.csv                 
ZN,CRIM,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0.0,10.233,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03

永続化したモデルを使って推論させてみる。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm-with-sig/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:00:54 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[15.444706764627714]

今度は、カラムを入れかえているにも関わらず、結果が変わらなくなった。 上手くいっているようだ。

ちなみに手動で Signature の情報を指定するときは次のようにすれば良い。

    # 手動で Signature を構築する場合
    from mlflow.models.signature import ModelSignature
    from mlflow.types.schema import Schema
    from mlflow.types.schema import ColSpec
    input_schema = Schema([
        ColSpec('double', 'CRIM'),
        ColSpec('double', 'ZN'),
        ColSpec('double', 'INDUS'),
        ColSpec('double', 'CHAS'),
        ColSpec('double', 'NOX'),
        ColSpec('double', 'RM'),
        ColSpec('double', 'AGE'),
        ColSpec('double', 'DIS'),
        ColSpec('double', 'RAD'),
        ColSpec('double', 'TAX'),
        ColSpec('double', 'PTRATIO'),
        ColSpec('double', 'B'),
        ColSpec('double', 'LSTAT'),
    ])
    output_schema = Schema([ColSpec('double', 'MEDV')])
    signature = ModelSignature(inputs=input_schema, outputs=output_schema)
    mlflow_lgb.save_model(booster, path='mlflow-lgbm-with-sig', signature=signature)

Input Example を追加する

また、永続化するモデルにはサンプルとなる入力データも Input Example として同梱させることができる。 次は Input Example も追加してみよう。

以下にサンプルコードを示す。 やっていることは簡単で、学習させたデータの先頭の何件かを与えているだけ。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from mlflow.models.signature import infer_signature
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    y_tr_pred = booster.predict(x_tr, num_iteration=booster.best_iteration)
    x_tr_df = pd.DataFrame(x_tr, columns=dataset.feature_names)
    signature = infer_signature(x_tr_df, y_tr_pred)
    # サンプルの入力データをつける
    input_example = x_tr_df.iloc[:5]
    mlflow_lgb.save_model(booster,
                          path='mlflow-lgbm-with-sig-and-example',
                          input_example=input_example,
                          signature=signature)


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python lgbmlfeg.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

見ると、今度は input_example.json というファイルがディレクトリに追加されている。

$ ls mlflow-lgbm-with-sig-and-example 
MLmodel         conda.yaml      input_example.json  model.lgb
$ cat mlflow-lgbm-with-sig-and-example/input_example.json 
{"columns": ["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT"], "data": [[10.233, 0.0, 18.1, 0.0, 0.614, 6.185, 96.7, 2.1705, 24.0, 666.0, 20.2, 379.7, 18.03], [0.67191, 0.0, 8.14, 0.0, 0.538, 5.813, 90.3, 4.682, 4.0, 307.0, 21.0, 376.88, 14.81], [0.14455, 12.5, 7.87, 0.0, 0.524, 6.172, 96.1, 5.9505, 5.0, 311.0, 15.2, 396.9, 19.15], [0.11132, 0.0, 27.74, 0.0, 0.609, 5.983, 83.5, 2.1099, 4.0, 711.0, 20.1, 396.9, 13.35], [0.12802, 0.0, 8.56, 0.0, 0.52, 6.474, 97.1, 2.4329, 5.0, 384.0, 20.9, 395.24, 12.27]]}

試しに、このサンプルを推論させてみよう まずは REST API を立ち上げる。

$ mlflow models serve \
    --no-conda \
    --model-uri mlflow-lgbm-with-sig-and-example
2020/09/30 19:05:39 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/09/30 19:05:39 INFO mlflow.pyfunc.backend: === Running command 'gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-09-30 19:05:39 +0900] [23035] [INFO] Starting gunicorn 20.0.4
[2020-09-30 19:05:39 +0900] [23035] [INFO] Listening at: http://127.0.0.1:5000 (23035)
[2020-09-30 19:05:39 +0900] [23035] [INFO] Using worker: sync
[2020-09-30 19:05:39 +0900] [23037] [INFO] Booting worker with pid: 23037

サンプルの JSON ファイルを使って REST API を叩く。

$ curl -X POST \
    -H "Content-Type:application/json" \
    --data "$(cat mlflow-lgbm-with-sig-and-example/input_example.json)" \
    http://localhost:5000/invocations
[15.444706764627714, 16.79758862860849, 25.64257218297901, 19.626464010328057, 20.184689951658456]

ちゃんと推論できているようだ。 今のところクライアント側からサンプルの情報は得られないのかな。 とはいえ、モデルがどんな入力を受け取るかソースコードを見て調べることってよくある。 なので、管理する上で助かるといえば助かるのかな。

前処理が必要なデータセットで試す

ところで、ここまでのサンプルコードには前処理が入っていなかった。 しかし、実際には前処理が存在しない機械学習のコードなんて考えられないだろう。 続いては前処理を含んだコードを MLflow Models で扱う方法について考えている。

たとえば、以下のサンプルコードでは前処理と推論の処理を scikit-learn の Pipeline としてまとめている。 Pipeline にまとめるには、関連するオブジェクトが scikit-learn のインターフェースに準拠している必要がある。 そこで LightGBM の分類器としては LGBMClassifier を使った。 また、ラベルエンコードには category_encoders の実装を使っている。 分類するデータには Titanic データセットを使った。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import sklearn as mlflow_sklearn
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import lightgbm as lgb


def main():
    # Titanic データを読み込む
    df = sns.load_dataset('titanic')

    # 使う特徴量
    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    # 前処理 (ラベルエンコード)
    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    # 学習用データと検証用データに分割する
    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # 分類器
    clf = lgb.LGBMClassifier(n_estimators=1_000,
                             first_metric_only=True,
                             random_state=42,
                             )
    # 学習させる
    clf.fit(x_tr, y_tr,
            early_stopping_rounds=100,
            eval_set=[(x_eval, y_eval)],
            verbose=50,
            eval_metric='binary_logloss',
            )

    # 学習させたエンコーダーとモデルをパイプラインにまとめる
    steps = [
        ('preprocessing', encoder),
        ('classification', clf)
    ]
    pipeline = Pipeline(steps)

    # パイプラインを MLflow Models で保存する
    # NOTE: Categorical な型があると MLflow がスキーマをうまく推測できない
    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)
    input_example = train_x.iloc[:5]
    mlflow_sklearn.save_model(pipeline,
                              path='mlflow-sklearn-pipeline',
                              signature=signature,
                              input_example=input_example,
                              )


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python skpipemlf.py
Training until validation scores don't improve for 100 rounds
[50]  valid_0's binary_logloss: 0.457466
[100]  valid_0's binary_logloss: 0.510931
Early stopping, best iteration is:
[25]  valid_0's binary_logloss: 0.427704
Evaluated only: binary_logloss

永続化したモデルで推論してみよう。 データは次のように CSV のファイルとして記録しておく。 見ると分かるとおり、扱う上ではエンコードが必要となるカラムが複数含まれている。

$ cat << 'EOF' > data.csv 
class,sex,age,sibsp,parch,fare,embark_town,deck
Third,male,22.0,1,0,7.25,Southampton,
First,female,38.0,1,0,71.2833,Cherbourg,C
Third,female,26.0,0,0,7.925,Southampton,
First,female,35.0,1,0,53.1,Southampton,C
Third,male,35.0,0,0,8.05,Southampton,
EOF

しかし、先ほどのサンプルコードでは前処理を含めたパイプラインを MLflow Models で永続化している。 そのため、前処理が必要なデータをそのまま放り込んでも推論できる。 DeprecationWarning は出ているところは愛嬌ということで。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-sklearn-pipeline/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:19:29 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0, 1, 0, 1, 0]

ところで上記を見てわかるとおり、結果はバイナリの整数に丸められてしまっている。 これは MLflow Models の sklearn モジュールでは、モデルの predict() メソッドを呼ぶように作られているため。 scikit-learn のインターフェースでは分類器の predict() が整数に丸めた結果を返してしまう。

モデルが確率 (predict_proba) を返すようにする

ただ、丸めた結果だけでは困るケースが多いはず。 なので、試しに predict_proba() の結果を返すようにしてみよう。 やり方は簡単で LGBMClassifier を継承して predict()predict_proba() にすりかえるクラスを用意する。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import sklearn as mlflow_sklearn
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import lightgbm as lgb


class LGBMClassifierWrapper(lgb.LGBMClassifier):
    """predict() の処理を predict_proba() にリダイレクトするラッパー"""

    def predict(self, *args, **kwargs):
        # 処理をリダイレクトする
        proba = super().predict_proba(*args, **kwargs)
        # Positive の確率を返す
        return proba[:, 1]

def main():
    df = sns.load_dataset('titanic')

    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # 処理をラップした分類器を使う
    clf = LGBMClassifierWrapper(n_estimators=1_000,
                                first_metric_only=True,
                                random_state=42,
                                )
    clf.fit(x_tr, y_tr,
            early_stopping_rounds=100,
            eval_set=[(x_eval, y_eval)],
            verbose=50,
            eval_metric='binary_logloss',
            )

    steps = [
        ('preprocessing', encoder),
        ('classification', clf)
    ]
    pipeline = Pipeline(steps)

    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)
    input_example = train_x.iloc[:5]
    mlflow_sklearn.save_model(pipeline,
                              path='mlflow-sklearn-pipeline-with-proba',
                              signature=signature,
                              input_example=input_example,
                              )


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python skpipemlfp.py
Training until validation scores don't improve for 100 rounds
[50]  valid_0's binary_logloss: 0.457466
[100]  valid_0's binary_logloss: 0.510931
Early stopping, best iteration is:
[25]  valid_0's binary_logloss: 0.427704
Evaluated only: binary_logloss

推論させてみると、今度はちゃんと浮動小数点の結果になっている。

$ mlflow models predict \                   
    --no-conda \
    --model-uri mlflow-sklearn-pipeline-with-proba/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:24:09 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0.20596751692506657, 0.9204031036579741, 0.4137860515635112, 0.9286529085847584, 0.0976711088927368]

永続化した内容を Python から読み込んで使う

ここまでの内容は、永続化した内容を常に mlflow コマンドから読み込んで使ってきた。 しかし、Python のコードから読み込んで使いたいケースも当然あるはず。

以下のサンプルコードでは先ほど永続化したモデルを読み込んで使っている。 具体的には mlflow.pyfunc.load_model() を使えばモデルが読み込める。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
from mlflow import pyfunc


def main():
    # Titanic データを読み込む
    df = sns.load_dataset('titanic')

    # Categorical 型は文字列に直す
    df = df.astype({
        'class': str,
        'deck': str,
    })

    # 使う特徴量の名前
    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    # 保存したモデルを読み込む
    model_path = 'mlflow-sklearn-pipeline-with-proba'
    # 汎用な pyfunc モジュールから読み出せる
    loaded_model = pyfunc.load_model(model_path)
    """
    # あるいは sklearn 向けモジュールから読んでも良い
    from mlflow import sklearn as mlflow_sklearn
    loaded_model = mlflow_sklearn.load_model(model_path)
    """

    # 保存したモデルで予測する
    # NOTE: ここで予測しているのはモデルが見たことのあるデータなので、あくまでデモとして
    train_y_pred = loaded_model.predict(train_x)

    # 先頭を表示してみる
    print(f'Inference: {train_y_pred[:5]}')
    # 正解
    print(f'GroundTruth: {train_y.values[:5]}')


if __name__ == '__main__':
    main()

ポイントとしては、永続化に使ったモジュールが何であれ、この統一されたインターフェースから読み出せるということ。 ようするに mlflow.sklearnmlflow.lightgbm などのモジュールを使って永続化したモデルであっても、ひとつの API で読める。 MLmodel ファイルには loader_module という項目に、モデルの復元に使うモジュールが指定されているため、このようなことが実現できる。 復元したモデルには predict() メソッドがあるので、あとはこれを使って推論すれば良い。

上記を実行してみよう。

$ python load.py                      
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
Inference: [0.20596752 0.9204031  0.41378605 0.92865291 0.09767111]
GroundTruth: [0 1 1 1 0]

ちゃんと推論できている。

Custom Python Models を作る

先ほど扱ったサンプルコードでは、前処理とモデルが scikit-learn のインターフェースを備えていることを前提としていた。 しかし、扱うコードによっては scikit-learn のインターフェースがない場合もあるはず。 続いては、そんな場合にどうすれば良いかを扱う。

最も簡単なやり方は mlflow.pyfunc.PythonModel を継承したクラスを作るというもの。 継承したクラスの predict() メソッドに、生データから推論するまでに必要な処理のパイプラインを詰め込む。 そして、このクラスのインスタンスを mlflow.pyfunc.save_model() で永続化してやれば良い。

以下にサンプルコードを示す。 今度は LightGBM の標準 API を使っているため scikit-learn のインターフェースに準拠していない。 つまり、scikit-learn の Pipeline にまとめる作戦が使えない状況を意図的に作り出している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import pyfunc
from mlflow.models.signature import ModelSignature
from mlflow.pyfunc.model import get_default_conda_env
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
import lightgbm as lgb


class InferencePipeline(pyfunc.PythonModel):
    """推論に使うパイプライン"""

    def __init__(self, preprocessor, estimator):
        self.preprocessor = preprocessor
        self.estimator = estimator

    def predict(self, context, model_input):
        """入力を推論結果に変換する過程"""
        transformed_input = self.preprocessor.transform(model_input)
        prediction = self.estimator.predict(transformed_input)
        return prediction


def main():
    df = sns.load_dataset('titanic')

    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    # lightgbm.train() を使う
    # 返ってくる Booster オブジェクトには scikit-learn インターフェースがない
    lgb_train = lgb.Dataset(x_tr, y_tr)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgb_params = {
        'objective': 'binary',
        'metrics': 'binary_logloss',
        'first_metric_only': True,
        'random_state': 42,
        'verbose': -1,
    }
    booster = lgb.train(lgb_params, lgb_train,
                        valid_sets=lgb_eval,
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    # 前処理と推論の処理を mlflow.pyfunc.PythonModel を継承したクラスのインスタンスにまとめる
    pipeline = InferencePipeline(encoder, booster)

    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)

    input_example = train_x.iloc[:5]

    # 動作に必要な依存ライブラリを追加する
    conda_env = get_default_conda_env()
    deps = conda_env['dependencies']
    other_deps = deps[-1]  # XXX: ちょっと決め打ちすぎ
    other_deps['pip'].append('category_encoders')
    other_deps['pip'].append('scikit-learn')
    other_deps['pip'].append('lightgbm')

    # 永続化する
    pyfunc.save_model(path='mlflow-custom-pyfunc-model',
                      python_model=pipeline,
                      signature=signature,
                      input_example=input_example,
                      conda_env=conda_env,
                      )


if __name__ == '__main__':
    main()

MLmodel は次のように記録されている。 モデルの本体は Pickle オブジェクトとして python_model.pkl にある

$ cat mlflow-custom-pyfunc-model/MLmodel   
flavors:
  python_function:
    cloudpickle_version: 1.6.0
    env: conda.yaml
    loader_module: mlflow.pyfunc.model
    python_model: python_model.pkl
    python_version: 3.8.5
saved_input_example_info:
  artifact_path: input_example.json
  pandas_orient: split
  type: dataframe
signature:
  inputs: '[{"name": "class", "type": "string"}, {"name": "sex", "type": "string"},
    {"name": "age", "type": "double"}, {"name": "sibsp", "type": "long"}, {"name":
    "parch", "type": "long"}, {"name": "fare", "type": "double"}, {"name": "embark_town",
    "type": "string"}, {"name": "deck", "type": "string"}]'
  outputs: '[{"name": "survived", "type": "double"}]'
utc_time_created: '2020-09-30 09:11:38.717424'

永続化した内容を使って推論させてみよう。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-custom-pyfunc-model/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 20:00:10 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0.20596751692506657, 0.9204031036579741, 0.4137860515635112, 0.9286529085847584, 0.0976711088927368]

ちゃんと動いていることがわかる。

ちなみに、今回は扱わなかったけどモデルの情報を Pickle 以外のファイルに artifacts として保存することもできるようだ。 また、さらに複雑なモデルや Python 以外の言語を使う場合には、自分で Custom Flavor を書くこともできる。

とりあえず、そんな感じで。