CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: SQLAlchemy のテーブルに後からインデックスを追加する

今回は、Python の O/R マッパーである SQLAlchemy について。 テーブルを定義した時点のモデルには無かったインデックスを、後から追加する方法についてメモしておく。

なお、実務における RDBMS のスキーマ変更に関しては、Alembic のようなフレームワークを使ってバージョン管理することを強くおすすめしたい。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers                     
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ python -V
Python 3.8.6
$ python -c "import sqlite3; print(sqlite3.version)"    
2.6.0
$ sqlite3 --version           
3.28.0 2019-04-15 14:49:49 378230ae7f4b721c8b8d83c8ceb891449685cd23b1702a57841f1be40b5daapl

もくじ

下準備

あらかじめ、SQLAlchemy をインストールしておく。

$ pip install sqlalchemy

最初からテーブルの定義にインデックスがある場合

はじめに、最初からテーブルを表すモデルにインデックスの指定がある場合について確認しておく。 以下のサンプルコードには User というクラスが users というテーブルを表している。 そして、age というカラムに対応するアトリビュートに index=True が指定されている。 こうなっていると、テーブルを初期化した時点で、age というカラムにインデックスが有効となる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer
from sqlalchemy.types import Text

ModelBase = declarative_base()


LOGGER = logging.getLogger(__name__)


class User(ModelBase):
    """ユーザのデータを模したサンプル用のモデル"""
    __tablename__ = 'users'

    # ユーザ名
    name = Column(Text, primary_key=True)
    # NOTE: 最初からインデックスを張るときは index=True オプションを使う
    age = Column(Integer, nullable=False, index=True)


def init_db(engine: Engine, drop: bool = False):
    """データベースを初期化する"""
    if drop:
        ModelBase.metadata.drop_all(engine)

    ModelBase.metadata.create_all(engine)


def get_engine(db_uri: str) -> Engine:
    engine = create_engine(
        db_uri,
        pool_recycle=3600,
        encoding='utf-8',
    )
    return engine


def main():
    # SQLAlchemy のログを DEBUG レベルで出力する
    logger = logging.getLogger('sqlalchemy')
    logger.setLevel(logging.DEBUG)
    # DEBUG レベル以上のログを出力する
    logging.basicConfig(level=logging.DEBUG)

    # データベースを初期化する
    db_uri = 'sqlite:///example.db?cache=shared'
    engine = get_engine(db_uri)
    init_db(engine, drop=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 SQLAlchemy のデバッグログを有効にしているため、バックエンドに発行したクエリなどが出力される。 ここでは、それらの出力は省略している。

$ python withindex.py
...

作成されたデータベースのスキーマを確認してみよう。 すると、次のとおり age カラムにインデックスが確認できる。

$ sqlite3 example.db ".schema"            
CREATE TABLE sqlite_sequence(name,seq);
CREATE TABLE users (
    name TEXT NOT NULL, 
    age INTEGER NOT NULL, 
    PRIMARY KEY (name)
);
CREATE INDEX ix_users_age ON users (age);

最初からテーブルの定義にインデックスがない場合

続いてが本題の、最初の定義にはインデックスの指定がない場合について。 以下のサンプルコードでは、先ほど作成したテーブルを DROP して、新たにインデックスの指定がないものを作り直している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer
from sqlalchemy.types import Text

ModelBase = declarative_base()


LOGGER = logging.getLogger(__name__)


class User(ModelBase):
    __tablename__ = 'users'

    name = Column(Text, primary_key=True)
    # この時点ではインデックスがない
    age = Column(Integer, nullable=False)


def init_db(engine: Engine, drop: bool = False):
    if drop:
        ModelBase.metadata.drop_all(engine)

    ModelBase.metadata.create_all(engine)


def get_engine(db_uri: str) -> Engine:
    engine = create_engine(
        db_uri,
        pool_recycle=3600,
        encoding='utf-8',
    )
    return engine


def main():
    logger = logging.getLogger('sqlalchemy')
    logger.setLevel(logging.DEBUG)
    logging.basicConfig(level=logging.DEBUG)

    db_uri = 'sqlite:///example.db?cache=shared'
    engine = get_engine(db_uri)
    init_db(engine, drop=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python withoutindex.py
...

すると、次のようにインデックスのないテーブルが作り直される。

$ sqlite3 example.db ".schema"
CREATE TABLE users (
    name TEXT NOT NULL, 
    age INTEGER NOT NULL, 
    PRIMARY KEY (name)
);

この状況から、ただモデルのアトリビュートに index=True を追加して ModelBase.metadata.create_all() しても上手くいかない。

SQLAlchemy でインデックスを後から追加する

SQLAlchemy で後からインデックスを追加するには Index というクラスのインスタンスを作る。 そして、create() というメソッドを呼ぶことでインデックスが作成できるようだ。

文章で説明するよりも実際に試す方が分かりやすいと思うので REPL を使って作業していく。 なお、REPL を起動する場所は、先ほどのサンプルコードを作ったディレクトリにしよう。 あるいは、サンプルコードの場所をシェル変数の PYTHONPATH に指定してもいい。

$ python

以下がインデックスを作ることができるクラス。

>>> from sqlalchemy import Index

既存モデルのカラムに対応したインデックスを作りたいなら、次のようにする。 先ほどサンプルコードとして示したモジュールをインポートして使っている。

>>> from withoutindex import User
>>> UserAgeIndex = Index('ix_users_age', User.age)

動かすのに sqlalchemy.engine.Engine のインスタンスが必要なので用意する。

>>> from withoutindex import get_engine
>>> db_uri = 'sqlite:///example.db?cache=shared'
>>> engine = get_engine(db_uri)

あとは、先ほど作った Index クラスのインスタンスに対して create() メソッドの呼ぶだけ。

>>> UserAgeIndex.create(engine)
Index('ix_users_age', Column('age', Integer(), table=<users>, nullable=False))

テーブルの定義を確認すると、次のようにインデックスが追加されている。

$ sqlite3 example.db ".schema"
CREATE TABLE users (
    name TEXT NOT NULL, 
    age INTEGER NOT NULL, 
    PRIMARY KEY (name)
);
CREATE INDEX ix_users_age ON users (age);

いじょう。

繰り返しになるけど、実際には上記のような手動オペレーションをするのではなく、Alembic などを使ってスキーマをバージョン管理しよう。

Python: OmegaConf を使ってみる

OmegaConf は、Python の Configuration フレームワークのひとつ。 Hydra が低レイヤー API に利用している、という点が有名だと思う。 というより、Hydra を使おうとすると OmegaConf の API が部分的にそのまま露出していることに気づく。 なので、OmegaConf を理解しておかないと、Hydra はちゃんと扱えないくらいまであると思う。 今回は、そんな OmegaConf の使い方を一通り見ていく。

OmegaConf には、ざっくりと次のような特徴がある。

  • 良い (と私が感じた) ところ

    • YAML 形式で設定ファイルを管理できる
    • 設定の Key-Value ペアを階層的に管理できる
    • Data Classes (PEP 557) を使って設定の構造を定義できる
    • Syntax for Variable Annotations (PEP 526) を使ってバリデーションできる
    • ある場所で定義されている Value を、別の Value から利用できる (Interpolation)
    • Interpolation の解決をカスタマイズする仕組みがある (Custom Resolver)
  • 発展途上 (と私が略) なところ

    • Value のバリデーションをカスタマイズする仕組みがない
    • プリミティブ型以外を Value として扱うための仕組みが限られている

検証に使った環境は次のとおり。

$ sw_vers                
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ python -V            
Python 3.8.6
$ pip list | grep -i omegaconf
omegaconf                 2.0.4

下準備

あらかじめ OmegaConf をインストールしておく。

$ pip install omegaconf

設定を表すオブジェクトを作る

OmegaConf では omegaconf.OmegaConf というクラスのオブジェクトが設定を司っている。 まずは、あまり実用性はないサンプルコードを以下に示す。 このサンプルコードでは、設定の元ネタになる辞書から OmegaConf のインスタンスを生成している。 そして、それを YAML 形式で標準出力に書き出している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from omegaconf import OmegaConf


def main():
    # 辞書からコンフィグを読み込む
    dict_conf = {
        'name': 'Alice',
        # リストを含んだり
        'friends': ['Bob', 'Carol'],
        # ネストさせても良い
        'location': {
            'country': 'Japan',
            'prefecture': 'Tokyo',
        }
    }
    conf = OmegaConf.create(dict_conf)
    # 読み込んだ内容を出力する
    print(OmegaConf.to_yaml(conf))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、辞書から読み込んだ設定が YAML 形式で表示される。

$ python example.py            
name: Alice
friends:
- Bob
- Carol
location:
  country: Japan
  prefecture: Tokyo

読み込んだ設定にアクセスする

先ほどのサンプルコードでは、辞書から読み込んだ内容を、ただ YAML 形式にしているだけだった。 続いては、読み込んだ内容にプログラムからアクセスする方法について示す。 以下のサンプルコードでは、OmegaConf に内包されている設定に、「object style」と「dictionary style」という 2 つの方法でアクセスしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from omegaconf import OmegaConf


def main():
    dict_conf = {
        'name': 'Alice',
        'friends': ['Bob', 'Carol'],
        'location': {
            'country': 'Japan',
            'prefecture': None,
        }
    }
    conf = OmegaConf.create(dict_conf)

    # 読み込んだコンフィグの使い方
    print(conf.location.country)  # object style
    print(conf['location']['country'])  # dictionary style

    # 取得するタイミングでデフォルト値を設定する場合
    print(conf.location.get('prefecture', 'Tokyo'))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、辞書に入っていた内容に OmegaConf のインスタンス経由でアクセスできることがわかる。

$ python example.py            
Japan
Japan
Tokyo

ただ、このサンプルも実用上はあまり意味がない。 なぜなら、上記であれば OmegaConf を使うまでもなく辞書をそのまま使っていれば良い。

YAML 形式の設定ファイルから読み出す

続いては、もう少し実用的なサンプルになる。 以下のサンプルコードでは YAML 形式で書かれた設定ファイルから OmegaConf のインスタンスを生成している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tempfile
import os

from omegaconf import OmegaConf


def main():
    yaml_conf = """
    name: Alice
    friends:
    - Bob
    - Carol
    location:
      country: Japan
      prefecture: Tokyo
    """
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as fp:
        # 一時ファイルに YAML を書き込む
        fp.write(yaml_conf)
    # YAML ファイルから設定を読み込む
    conf_filepath = fp.name
    conf = OmegaConf.load(conf_filepath)
    """
    # あるいは File-like オブジェクトからも読み込める
    with open(conf_filepath, mode='r') as fp:
        conf = OmegaConf.load(fp)
    """
    # 読み込んだ内容を出力する
    print(OmegaConf.to_yaml(conf))
    # 後片付け
    os.remove(conf_filepath)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、YAML 形式の設定ファイルから読み込まれた内容が、また YAML 形式で出力される。

$ python example.py            
name: Alice
friends:
- Bob
- Carol
location:
  country: Japan
  prefecture: Tokyo

とはいえ、このサンプルも別に OmegaConf を使う必要はない。 なぜなら、YAML 形式を Python の辞書と相互に変換するパッケージさえあれば実現できるため。

コマンドラインの引数から設定を読み込む

ここらへんから、じわりじわりと便利なポイントが見えてくる。 次のサンプルコードでは、スクリプトを実行する際に渡されたコマンドラインの引数から設定を読み込んでいる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from omegaconf import OmegaConf


def main():
    conf = OmegaConf.from_cli()
    print(OmegaConf.to_yaml(conf))


if __name__ == '__main__':
    main()

上記を実行してみよう。 以下では、コマンドラインの引数として server.port=8080server.host=127.0.0.1 を渡した。 すると、コマンドライン経由で仮想構造になった設定がちゃんと読み込まれていることがわかる。

$ python example.py server.port=8080 server.host=127.0.0.1
server:
  port: 8080
  host: 127.0.0.1

コマンドラインで指定するのがめんどくさいときは、次のようにしても良い。 以下では sys.argv の内容をスクリプトの中で上書きしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys

from omegaconf import OmegaConf


def main():
    sys.argv = [__file__,
                'server.port=8080',
                'server.host=127.0.0.1']
    conf = OmegaConf.from_cli()
    print(OmegaConf.to_yaml(conf))


if __name__ == '__main__':
    main()

複数の経路で読み込んだ設定をマージする

また、設定ファイルや辞書から作った OmegaConf をひとつにマージすることもできる。 次のサンプルコードでは、辞書の設定をベースとして、コマンドラインの設定で上書きしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys

from omegaconf import OmegaConf


def main():
    dict_conf = {
        'server': {
            'port': 80,
            'host': '127.0.0.1',
        }
    }
    base_conf = OmegaConf.create(dict_conf)

    sys.argv = [__file__,
                'server.host=0.0.0.0']
    cli_conf = OmegaConf.from_cli()

    # 複数のコンフィグをマージする
    merged_conf = OmegaConf.merge(base_conf, cli_conf)
    print(OmegaConf.to_yaml(merged_conf))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、ベースとなった辞書の設定では server.host127.0.0.1 だったのに、その内容がコマンドライン経由で渡された 0.0.0.0 に上書きされている。

$ python example.py                                       
server:
  port: 80
  host: 0.0.0.0

Data Classes (PEP 557) で設定の構造を定義する

OmegaConf は、Python 3.7 から導入された Data Classes を使って設定の構造を定義できる。 以下のサンプルコードでは、先ほどの server.portserver.host という要素が含まれる設定を定義している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from dataclasses import dataclass

from omegaconf import OmegaConf


@dataclass
class ServerStructure:
    port: int = 80
    host: str = '127.0.0.1'


@dataclass
class RootStructure:
    server: ServerStructure = ServerStructure()


def main():
    # Data Classes を使って設定を読み込む
    conf = OmegaConf.structured(RootStructure)
    print(OmegaConf.to_yaml(conf))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、Data Classes を使って設定の構造を定義できていることがわかる。 また、デフォルト値も読み込むことができている。

$ python example.py                                       
server:
  port: 80
  host: 127.0.0.1

Data Classes の有無 (structured / non-structured) の違いについて

続いては、Data Classes を使う場合と、使わない場合で OmegaConf のインスタンスにどのような違いがあるか見ておく。 以下のサンプルコードでは辞書から生成した OmegaConf と Data Classes で生成した OmegaConf を比較している。 コメントで補足してあるけど、Data Classes から生成したものは struct モードになっている。 このモードでは、元々の定義には存在しない Key-Value ペアを設定に追加できない。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

from dataclasses import dataclass

from omegaconf import OmegaConf
from omegaconf.errors import ConfigAttributeError


LOGGER = logging.getLogger(__name__)


@dataclass
class ConfigStructure:
    message: str = 'Hello, World!'


def main():
    # OmegaConf.create() で作ったコンフィグ
    dict_conf = OmegaConf.create({
        'message': 'Hello, World!'
    })
    # OmegaConf.structured() で作ったコンフィグ
    struct_conf = OmegaConf.structured(ConfigStructure)

    # 後者は struct フラグが有効になっている
    OmegaConf.is_struct(dict_conf)
    OmegaConf.is_struct(struct_conf)

    # struct でない場合は後から存在しない要素を追加できる
    dict_conf.hoge = 'fuga'
    # 一方で struct になっていると存在しない要素を追加できない
    try:
        struct_conf.hoge = 'fuga'
    except ConfigAttributeError as e:
        LOGGER.error(e)

    # この振る舞いは後から変更することもできる
    # たとえば、次のようにすると non struct にできる
    OmegaConf.set_struct(struct_conf, False)
    struct_conf.hoge = 'fuga'


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、struct モードな OmegaConf に存在しない Key-Value を設定しようとすると例外になることが確認できる。

$ python example.py                                       
Key 'hoge' not in 'ConfigStructure'
    full_key: hoge
    reference_type=Optional[ConfigStructure]
    object_type=ConfigStructure

Syntax for Variable Annotations (PEP 526) を使ったバリデーション

先ほどの例にもあったけど Data Classes のアトリビュートには型ヒントがつけられる。 以下のサンプルコードでは、型ヒントと合わない Value をアトリビュートに代入しようとしたときの振る舞いを確認している。 また、設定を後から上書きされたくないときに用いる OmegaConf のインスタンスを読み取り専用にする方法も試している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

from dataclasses import dataclass

from omegaconf import OmegaConf
from omegaconf.errors import ValidationError
from omegaconf.errors import ReadonlyConfigError


LOGGER = logging.getLogger(__name__)


@dataclass
class ServerStructure:
    port: int = 80
    host: str = '127.0.0.1'


@dataclass
class RootStructure:
    server: ServerStructure = ServerStructure()


def main():
    # DataClass から読み込む
    conf = OmegaConf.structured(RootStructure)

    # 型ヒントが int になっている要素に対して...
    # OK: 当然 int の値は入る
    conf.server.port = 8080
    # OK: int として解釈できるときは自動的に変換される
    conf.server.port = '8080'
    # NG: int として解釈できないときは例外になる
    try:
        conf.server.port = 'Oops!'
    except ValidationError as e:
        LOGGER.error(e)

    # 値を上書きされたくないときは read only にする
    OmegaConf.set_readonly(conf, True)

    try:
        # NG: read only にしたコンフィグには代入できない
        conf.server.port = 8888
    except ReadonlyConfigError as e:
        LOGGER.error(e)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、型の合わない代入をしたり、読み取り専用のオブジェクトに代入をしたときに例外になることがわかる。

$ python example.py                                       
Value 'Oops!' could not be converted to Integer
    full_key: server.port
    reference_type=ServerStructure
    object_type=ServerStructure
Cannot change read-only config container
    full_key: server.port
    reference_type=ServerStructure
    object_type=ServerStructure

ちなみに Data Classes で読み込んだ OmegaConf のインスタンスを読み取り専用にする方法はもうひとつある。 それは Data Classes を定義するときにデコレータに frozen=True を設定すること。 こうすると、後から変更するまでもなく最初から OmegaConf のインスタンスが読み取り専用になる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging
from typing import List
from dataclasses import dataclass
from dataclasses import field

from omegaconf import OmegaConf
from omegaconf import ReadonlyConfigError


LOGGER = logging.getLogger(__name__)


# デフォルトで上書きさせたくないときは dataclass を frozen にする
@dataclass(frozen=True)
class RootStructure:
    name: str = 'Alice'
    friends: List[str] = field(default_factory=lambda: ['Bob', 'Carol'])


def main():
    conf = OmegaConf.structured(RootStructure)

    # デフォルトで上書きができなくなっている
    try:
        conf.friends.append('Daniel')
    except ReadonlyConfigError as e:
        LOGGER.error(e)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、読み取り専用なのにリストに要素を追加したことで例外となることがわかる。

$ python example.py                                       
ListConfig is read-only
    full_key: friends[2]
    reference_type=List[str]
    object_type=list

項目ごとに必須かオプションかを指定する

また、項目が必須かオプションかは、次のように指定できる。

  • アトリビュートのデフォルト値を omegaconf.MISSING にすることで必須のパラメータにできる
  • アトリビュートの型ヒントを typing.Optional にすることでオプションのパラメータにできる
    • 言いかえると Value が None でも構わない
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

from dataclasses import dataclass
from typing import Optional

from omegaconf import OmegaConf
from omegaconf import MISSING
from omegaconf import MissingMandatoryValue
from omegaconf import ValidationError


LOGGER = logging.getLogger(__name__)


@dataclass
class ServerStructure:
    # デフォルト値に MISSING を設定しておくと指定が必須のパラメータになる
    port: int = MISSING
    # 型ヒントが Optional になっていると None を入れても良い
    host: Optional[str] = '127.0.0.1'


@dataclass
class RootStructure:
    server: ServerStructure = ServerStructure()


def main():
    conf = OmegaConf.structured(RootStructure)

    try:
        # 中身が空のままで読み取ろうとすると例外になる
        conf.server.port
    except MissingMandatoryValue as e:
        LOGGER.error(e)

    # ちゃんと中身を入れれば例外は出なくなる
    conf.server.port = 8080
    print(conf.server.port)

    # また、デフォルトでは None を入れようとしても例外になる
    try:
        conf.server.port = None
    except ValidationError as e:
        LOGGER.error(e)

    # 一方、型ヒントが Optional になっていれば None を入れても例外にならない
    conf.server.host = None


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、項目ごとに必須かオプションかが制御できていることがわかる。

$ python example.py                                       
Missing mandatory value: server.port
    full_key: server.port
    reference_type=ServerStructure
    object_type=ServerStructure
8080
child 'server.port' is not Optional
    full_key: server.port
    reference_type=ServerStructure
    object_type=ServerStructure

Interpolation と Custom Resolver について

OmegaConf には Interpolation という機能がある。 この機能を使うと、典型的にはある Value を別の Value を使って定義できる。 文字だと説明しにくいけど Literal String Interpolation (PEP 498 / いわゆる f-string) とかテンプレートの考え方に近い。 また、Interpolation の指定に : (コロン) でプレフィックスをつけると、それを解釈する Resolver が指定できる。 Resolver は自分で書くこともできて、使いこなすとだいぶ便利そうな感じがする。

正直、ここは文字で説明するのが難しいので以下のサンプルコードを見てもらいたい。 たとえば ClientStructureurl というアトリビュートは、別の場所で定義した ServerStructure の内容を参照して組み立てている。 また、UserStructurename は、プレフィックスに env をつけることで環境変数 USER から読み込むようになっている。 そして、SystemStructuredatesome_value には、自分で Resolver を書いて OmegaConf に登録している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging
from datetime import datetime
from dataclasses import dataclass

from omegaconf import OmegaConf


LOGGER = logging.getLogger(__name__)


@dataclass
class ServerStructure:
    port: int = 80
    host: str = '127.0.0.1'


@dataclass
class ClientStructure:
    # Interpolation を使うと別の場所にある値を埋め込むことができる
    url: str = 'http://${server.host}:${server.port}'


@dataclass
class UserStructure:
    # 環境変数を読み込むこともできる
    name: str = '${env:USER}'


# Interpolation の解決をカスタマイズする (引数なし)
def today_resolver() -> str:
    return datetime.now().strftime('%Y-%m-%d')


# Interpolation の解決をカスタマイズする (引数あり)
def double_resolver(n: str) -> int:
    # 解決する時点では文字列で渡される点に注意する
    return int(n) * 2


@dataclass
class SystemStructure:
    date: str = '${today:}'
    some_value: int = '${double:100}'


@dataclass
class RootStructure:
    server: ServerStructure = ServerStructure()
    client: ClientStructure = ClientStructure()
    user: UserStructure = UserStructure()
    system: SystemStructure = SystemStructure()


def main():
    conf = OmegaConf.structured(RootStructure)

    # カスタムリゾルバを登録する
    OmegaConf.register_resolver('today', today_resolver)
    OmegaConf.register_resolver('double', double_resolver)

    print(conf.client.url)
    print(conf.user.name)
    print(conf.system.date)
    print(conf.system.some_value)


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、最初に参照している conf.client.urlconf.server.hostconf.server.port のデフォルト値で組み立てられていることが確認できる。 次に参照している conf.user.name は、実行したシェルの環境変数を読み込んでいる。 その次に参照している conf.system.datetoday_resolver() によって、今日の日付が設定されている。 そして、最後に参照している conf.system.some_value は、元の値である 100double_resolver() が 2 倍にしている。

$ python example.py                                       
http://127.0.0.1:80
amedama
2020-11-05
200
$ echo $USER                                
amedama
$ date +%Y-%m-%d                   
2020-11-05

補足

最初に、発展途上なところとして以下の 2 点を挙げた。

  • Value のバリデーションをカスタマイズする仕組みがない
  • プリミティブ型以外を Value として扱うための仕組みが限られている

このうち、最初の方については Flexible function and variable annotations (PEP 593) を使った実装が検討されているようだ。

github.com

後の方については、まあ Custom Resolver を駆使して何とかする方法もあるかな?

そんな感じで。

Python: MLflow Projects を使ってみる

MLflow は MLOps に関連した OSS のひとつ。 いくつかのコンポーネントに分かれていて、それぞれを必要に応じて独立して使うことができる。 今回は、その中でも MLflow Projects というコンポーネントを使ってみる。 MLflow Projects を使うと、なるべく環境に依存しない形で、ソフトウェアのコードをどのように動かすかを示すことができる。

機械学習に限らず、特定のリポジトリのソースコードをどのように動かすかはプロジェクトによって異なる。 よくあるのは Makefile やシェルスクリプトを用意しておくパターンだと思う。 ただ、そういったスクリプトをプロジェクトごとに、環境依存のできるだけ少ない形で書いていくのはなかなかつらい作業になる。 MLflow Projects を使うと、そういった作業の負担を軽減できる可能性がある。

今回使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ python -V
Python 3.8.6
$ pip list | grep -i mlflow
mlflow                    1.11.0
$ docker version   
Client: Docker Engine - Community
 Cloud integration  0.1.18
 Version:           19.03.13
 API version:       1.40
 Go version:        go1.13.15
 Git commit:        4484c46d9d
 Built:             Wed Sep 16 16:58:31 2020
 OS/Arch:           darwin/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.13
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       4484c46d9d
  Built:            Wed Sep 16 17:07:04 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.3.7
  GitCommit:        8fba4e9a7d01810a393d5d25a3621dc101981175
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

もくじ

下準備

あらかじめ、今回使うパッケージをインストールしておく。

$ pip install mlflow

また、Python のパッケージ以外にも Docker をインストールしておこう。

$ brew cask install docker

最も単純なサンプル

MLflow Projects を最低限使うのに必要なのは MLproject という YAML ファイルを用意することだけ。 ここにプロジェクトを実行するときのエントリポイントを記述していく。 基本的には、単純にシェルで実行するコマンドを書く。

以下は wget(1) を使って Iris データセットの CSV をダウンロードする MLproject を用意している。 エントリポイントは複数書けるけどデフォルトでは main という名前が使われる。

$ cat << 'EOF' > MLproject               
name: example

entry_points:
  main:
    command: "wget -P data https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
EOF

上記のファイルが用意できたら、実際に実行してみよう。 MLproject がある場合には mlflow コマンドの run サブコマンドが使えるようになる。 たとえば、以下のようにして実行する。 .MLproject のある場所を表していて、--no-conda は Conda の仮想環境を使わずに実行することを示す。

$ mlflow run . --no-conda
2020/10/19 19:31:56 INFO mlflow.projects.utils: === Created directory /var/folders/1f/2k50hyvd2xq2p4yg668_ywsh0000gn/T/tmp84mtkoa7 for downloading remote URIs passed to arguments of type 'path' ===
2020/10/19 19:31:56 INFO mlflow.projects.backend.local: === Running command 'wget -P data https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv' in run with ID '96c013ed298f48be81ceafc9e64bb726' === 

...

2020-10-19 19:31:57 (8.16 MB/s) - `data/iris.csv' へ保存完了 [3858/3858]

2020/10/19 19:31:57 INFO mlflow.projects: === Run (ID '96c013ed298f48be81ceafc9e64bb726') succeeded ===

すると、次のようにファイルがダウンロードされることがわかる。

$ ls data
iris.csv
$ head data/iris.csv 
sepal_length,sepal_width,petal_length,petal_width,species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa
5.0,3.4,1.5,0.2,setosa
4.4,2.9,1.4,0.2,setosa

Docker コンテナで実行する

ただ、先ほどの例はシステムに wget(1) がインストールされていることを仮定している。 しかしながら、誰もがシステムに wget(1) をインストールしているとは限らないだろう。 かといって、各プラットフォームごとに wget(1) をインストールするスクリプトや、方法について README には書きたくない。 そこで、続いては MLflow Projects を Docker コンテナ上で実行できるようにしてみよう。

MLflow Projects を Docker コンテナ上で実行する場合には、docker_env という要素を MLproject ファイルに書く。 たとえば、次のサンプルでは example/mlflow-docker というイメージでこのプロジェクトを実行しますよ、という宣言になる。

$ cat << 'EOF' > MLproject               
name: example

docker_env:
  image: example/mlflow-docker

entry_points:
  main:
    command: "wget -P data https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
EOF

ただ、上記を実行するにはシステムのローカルないし Docker リポジトリに example/mlflow-docker というイメージが必要になる。 そこで、たとえば次のような Dockerfile を用意してみよう。 これで、wget(1) のインストールされた Ubuntu 20.04 LTS のイメージがビルドできる。

$ cat << 'EOF' > Dockerfile
FROM ubuntu:20.04

RUN apt-get update \
 && apt-get -yq dist-upgrade \
 && apt-get install -yq \
      wget \
 && apt-get clean \
 && rm -rf /var/lib/apt/lists/*
EOF

イメージのビルド手順も設定ファイルに書きたいので Docker Compose を使うことにする。

$ cat << 'EOF' > docker-compose.yaml
version: "3.8"

services:
  mlflow:
    image: example/mlflow-docker
    build:
      context: .
EOF

これで特に何を考えずイメージがビルドできる。

$ docker-compose build
Building mlflow
Step 1/2 : FROM ubuntu:20.04

...

Successfully tagged example/mlflow-docker:latest

システムのローカルに必要なイメージが登録されたことを確認しよう。

$ docker image list | grep example
example/mlflow-docker                latest                                           09c03e33daea        3 minutes ago       111MB

動作確認の邪魔になるので、先ほどダウンロードしたファイルは一旦削除しておこう。

$ rm -rf data

それでは、満を持して MLflow Projects を実行してみよう。 今度は実行環境として Docker を使うことが設定ファイルからわかるので --no-conda オプションをつける必要はない。

$ mlflow run .
2020/10/19 19:50:16 INFO mlflow.projects.docker: === Building docker image example ===

...

2020-10-19 10:50:18 (5.87 MB/s) - 'data/iris.csv' saved [3858/3858]

2020/10/19 19:50:19 INFO mlflow.projects: === Run (ID '18f4068e82434b119dc7cf9979082f9b') succeeded ===

しめしめ、これでカレントディレクトリにファイルがダウンロードされるはず…と思っていると何も見当たらない。

$ ls
Dockerfile      docker-compose.yaml
MLproject       mlruns

なぜ、こうなるか。 MLflow Projects は、Docker コンテナを実行するときに、ホストのディレクトリをコンテナにコピーしている。 あくまで、共有ではなくコピーなので、コンテナ上にファイルをダウンロードしたからといってホストには反映されない。

この問題を回避するには、ホストのボリュームをコンテナにマウントする必要がある。 たとえば、次のようにしてホストのディレクトリをコンテナの /mnt にマウントできる。 そして、実行するコマンドでは、保存先のディレクトリを /mnt 以下にする。

$ cat << 'EOF' > MLproject               
name: example

docker_env:
  image: example/mlflow-docker
  volumes: ["$(pwd):/mnt"]

entry_points:
  main:
    command: "wget -P /mnt/data https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
EOF

もう一度実行してみよう。

$ mlflow run .
2020/10/19 19:54:32 INFO mlflow.projects.docker: === Building docker image example ===

...

2020-10-19 10:54:34 (776 KB/s) - '/mnt/data/iris.csv' saved [3858/3858]

2020/10/19 19:54:34 INFO mlflow.projects: === Run (ID '4372f3f53dbf478bbc8ac96b4f7d5811') succeeded ===

すると、次のようにホスト側にディレクトリとファイルができる。

$ ls
Dockerfile      MLproject       data            docker-compose.yaml mlruns
$ head data/iris.csv 
sepal_length,sepal_width,petal_length,petal_width,species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa
5.0,3.4,1.5,0.2,setosa
4.4,2.9,1.4,0.2,setosa

エントリポイントを増やしてみる

続いては、例としてエントリポイントを増やしてみることにしよう。 example という Python のパッケージを用意する。

$ mkdir -p example
$ touch example/__init__.py

そして、その中に preprocess.py というモジュールを追加する。 これは CSV ファイルから読み込んだ Pandas の DataFrame を Pickle にして保存するコードになっている。

$ cat << 'EOF' > example/preprocess.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""CSV ファイルから読み込んだ DataFrame を Pickle として永続化する"""

import os
import logging
import pathlib

import pandas as pd

LOGGER = logging.getLogger(__name__)


def main():
    logging.basicConfig(level=logging.INFO)

    # データの置き場所
    data_dir = pathlib.Path('/mnt/data')

    # CSV ファイルを読み込む
    raw_data_file = data_dir / 'iris.csv'
    LOGGER.info(f'load data from {raw_data_file}')
    df = pd.read_csv(str(raw_data_file))

    LOGGER.info(f'data shape: {df.shape}')

    # Pickle として永続化する
    parsed_data_file = data_dir / 'iris.pickle'
    LOGGER.info(f'save data to {parsed_data_file}')
    df.to_pickle(parsed_data_file)


if __name__ == '__main__':
    main()
EOF

以下のようにエントリポイントを fetchpreprocess に分離してみる。

$ cat << 'EOF' > MLproject
name: example

docker_env:
  image: example/mlflow-docker
  volumes: ["$(pwd):/mnt"]

entry_points:
  fetch:
    command: "wget -P /mnt/data https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
  preprocess:
    command: "python3 -m example.preprocess"
EOF

先ほどのモジュールを見るとわかる通り、動作には Pandas が必要になる。 そこで、Docker イメージも次のように更新しておく。

$ cat << 'EOF' > Dockerfile
FROM ubuntu:20.04

RUN apt-get update \
 && apt-get -yq dist-upgrade \
 && apt-get install -yq \
      wget \
      python3-pip \
 && apt-get clean \
 && rm -rf /var/lib/apt/lists/* \
 && pip3 install --user pandas
EOF
$ docker-compose build

新しく追加した preprocess エントリポイントを実行してみよう。 エントリポイントを指定するときは -e オプションを使う。

$ mlflow run -e preprocess .
2020/10/19 20:11:00 INFO mlflow.projects.docker: === Building docker image example ===

...

INFO:__main__:load data from /mnt/data/iris.csv
INFO:__main__:data shape: (150, 5)
INFO:__main__:save data to /mnt/data/iris.pickle
2020/10/19 20:11:02 INFO mlflow.projects: === Run (ID '6bae8e1f7d3e4bde85fa8e9922d2938d') succeeded ===

これで、次のように Pickle フォーマットのファイルができあがる。

$ ls data       
iris.csv    iris.pickle
$ python -m pickle data/iris.pickle
     sepal_length  sepal_width  petal_length  petal_width    species
0             5.1          3.5           1.4          0.2     setosa
1             4.9          3.0           1.4          0.2     setosa
2             4.7          3.2           1.3          0.2     setosa
3             4.6          3.1           1.5          0.2     setosa
4             5.0          3.6           1.4          0.2     setosa
..            ...          ...           ...          ...        ...
145           6.7          3.0           5.2          2.3  virginica
146           6.3          2.5           5.0          1.9  virginica
147           6.5          3.0           5.2          2.0  virginica
148           6.2          3.4           5.4          2.3  virginica
149           5.9          3.0           5.1          1.8  virginica

[150 rows x 5 columns]

エントリポイントをまとめる

ただ、先ほどのように複数のエントリポイントがあると何をいつどのように実行すれば良いのかが不明瞭になる。 それでは MLflow Projects を導入した意味が薄れてしまうかもしれない。 おそらく、エントリポイントは先ほどよりももっと大きなタスクで分けた方が良いと思う。 たとえば、モデルの学習と結果の可視化のような粒度であれば、分けていても混乱は招かないかもしれない。 あまり巨大すぎるエントリポイントを作るのも、後から取り回しが悪くなるだろう。

さて、前置きが長くなったけど、ここではあくまで一例としてエントリポイントを Metaflow を使ってまとめてみよう。 まずはデータのダウンロード部分も Python のコードにしてしまう。 めんどくさければ subprocess モジュールを使って wget(1) をキックするだけでも良いと思う。

$ cat << 'EOF' > example/fetchdata.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""データセットを取得して CSV として保存する"""

import logging
import pathlib
import os
from urllib.parse import urlparse

import requests

LOGGER = logging.getLogger(__name__)

DATASET_URL = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv'


def main():
    logging.basicConfig(level=logging.INFO)

    # データの置き場所
    data_dir = pathlib.Path('/mnt/data')

    # ディレクトリがなければ作る
    os.makedirs(str(data_dir), exist_ok=True)

    # 保存先のファイルパス
    url_path = urlparse(DATASET_URL).path
    save_filepath = data_dir / pathlib.Path(url_path).name

    # データセットとの接続を開く
    LOGGER.info(f'starting download: {DATASET_URL}')
    with requests.get(DATASET_URL, stream=True) as r:
        # 2XX でないときは例外にする
        r.raise_for_status()
        # 書き込み先のファイルを開く
        with open(save_filepath, mode='wb') as w:
            # チャンク単位で書き込んでいく
            for chunk in r.iter_content(chunk_size=8192):
                w.write(chunk)
    LOGGER.info(f'download complete: {save_filepath}')


if __name__ == '__main__':
    main()
EOF

そして、次のようにデータの取得と加工をパイプライン化してみる。 ここでは Metaflow を使ったけど、別に何を使っても良いと思う。 手軽だったので選んだに過ぎない。

$ cat << 'EOF' > entrypoint.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import metaflow

from example import fetchdata
from example import preprocess


class MyFlow(metaflow.FlowSpec):

    @metaflow.step
    def start(self):
        self.next(self.fetch)

    @metaflow.step
    def fetch(self):
        fetchdata.main()
        self.next(self.preprocess)

    @metaflow.step
    def preprocess(self):
        preprocess.main()
        self.next(self.end)

    @metaflow.step
    def end(self):
        pass


if __name__ == '__main__':
    MyFlow()
EOF

あとは、次のようにパイプラインを実行するように MLproject を編集する。 説明が遅くなったけど、次のように可変な部分はパラメータとして与えることができる。

$ cat << 'EOF' > MLproject               
name: example

docker_env:
  image: example/mlflow-docker
  volumes: ["$(pwd):/mnt"]
  environment: [["USERNAME", "$(whoami)"]]

entry_points:
  main:
    parameters:
      command:
        type: string
        default: run
    command: "python3 -m entrypoint {command}"
EOF

動作に必要なパッケージが増えたので、Docker イメージも編集する。

$ cat << 'EOF' > Dockerfile
FROM ubuntu:20.04

RUN apt-get update \
 && apt-get -yq dist-upgrade \
 && apt-get install -yq \
      wget \
      python3-pip \
 && apt-get clean \
 && rm -rf /var/lib/apt/lists/* \
 && pip3 install --user pandas requests metaflow
EOF
$ docker-compose build

先ほどダウンロードして加工したファイルは動作確認の邪魔になるので一旦削除しよう。

$ rm -rf data

ディレクトリ構成は以下のようになる。

$ ls     
Dockerfile      docker-compose.yaml example
MLproject       entrypoint.py       mlruns

それでは、実行してみよう。

$ mlflow run .
2020/10/19 20:24:57 INFO mlflow.projects.docker: === Building docker image example ===

...

2020-10-19 11:25:04.898 Done!
2020/10/19 20:25:05 INFO mlflow.projects: === Run (ID 'd9dd8d4daa264db99004142b60640307') succeeded ===

これで、次のようにファイルができる。

$ ls data 
iris.csv    iris.pickle

Python のコードから実行する

ちなみに、これまでの例は MLflow Projects を mlflow run コマンドから実行していた。 もちろん Python のコードからも実行できる。

たとえば Python の REPL を起動しよう。

$ python

あとは mlflow をインポートして mlflow.projects.run() を呼ぶだけ。

>>> import mlflow
>>> mlflow.projects.run(uri='.')

いじょう。

参考

複数ステップで構成された MLflow Projects のサンプルプロジェクトが以下にある。

github.com

リモートの Docker ホストでコンテナを SSH Port Forward 経由で動かす

今回は、Docker クライアントをリモートの Docker ホストに SSH Port Forward 経由で接続させてコンテナを操作する方法を試してみる。

まず、Docker クライアントの環境は次のとおり。 macOS に Docker for Mac をインストールしてある。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ docker version                  
Client: Docker Engine - Community
 Cloud integration  0.1.18
 Version:           19.03.13
 API version:       1.40
 Go version:        go1.13.15
 Git commit:        4484c46d9d
 Built:             Wed Sep 16 16:58:31 2020
 OS/Arch:           darwin/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.13
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       4484c46d9d
  Built:            Wed Sep 16 17:07:04 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          v1.3.7
  GitCommit:        8fba4e9a7d01810a393d5d25a3621dc101981175
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

リモートの Docker ホストの環境は次のとおり。 Ubuntu 18.04 LTS に Docker をインストールしてある。

$ cat /etc/*-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=18.04
DISTRIB_CODENAME=bionic
DISTRIB_DESCRIPTION="Ubuntu 18.04.4 LTS"
NAME="Ubuntu"
VERSION="18.04.4 LTS (Bionic Beaver)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 18.04.4 LTS"
VERSION_ID="18.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=bionic
UBUNTU_CODENAME=bionic
$ sudo docker version
Client:
 Version:           19.03.6
 API version:       1.40
 Go version:        go1.12.17
 Git commit:        369ce74a3c
 Built:             Fri Feb 28 23:45:43 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server:
 Engine:
  Version:          19.03.6
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.17
  Git commit:       369ce74a3c
  Built:            Wed Feb 19 01:06:16 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.3.3-0ubuntu1~18.04.2
  GitCommit:        
 runc:
  Version:          spec: 1.0.1-dev
  GitCommit:        
 docker-init:
  Version:          0.18.0
  GitCommit:        

ようするに、上記の macOS を Docker クライアント、Ubuntu を Docker サーバとして動作させたい、ということ。

Docker デーモンをローカルホストの TCP:2376 で待ち受けるようにする

デフォルトでは、Docker デーモンは Unix ドメインソケット経由で制御されるように設定されている。 しかし、これだとリモートから扱う上で都合が悪い。 そこで、まずは Docker ホストのデーモンを TCP で待ち受けるように変更する。

まずは、リモートにある Docker ホストにログインする。 この段階では、シェルの操作ができさえすれば良いので、別にログインの方法は何でも構わない。

$ ssh <hostname>

はじめに、Docker の設定が入った systemd のコンフィグをバックアップしておく。

$ sudo cp /lib/systemd/system/docker.service{,.orig}

そして、次のように dockerd のオプションとしてローカルホストの 2376 ポートで待ち受けるようにオプションを追加する。

$ diff -u /lib/systemd/system/docker.service{.orig,}
--- /lib/systemd/system/docker.service.orig 2020-10-10 14:22:10.482817997 +0000
+++ /lib/systemd/system/docker.service  2020-10-10 14:22:56.297714001 +0000
@@ -11,7 +11,7 @@
 # the default is not to use systemd for cgroups because the delegate issues still
 # exists and systemd currently does not support the cgroup feature set required
 # for containers run by docker
-ExecStart=/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock
+ExecStart=/usr/bin/dockerd -H fd:// -H tcp://127.0.0.1:2376 --containerd=/run/containerd/containerd.sock
 ExecReload=/bin/kill -s HUP $MAINPID
 TimeoutSec=0
 RestartSec=2

ちなみに、上記は APT でインストールしたパッケージが管理しているファイルなので、直接編集するのはあまりお行儀が良くない。

$ dpkg-query -L docker.io | grep systemd
/lib/systemd
/lib/systemd/system
/lib/systemd/system/docker.service
/lib/systemd/system/docker.socket

編集したらサービスを再起動する。

$ sudo systemctl daemon-reload
$ sudo systemctl restart docker

これで Docker デーモンがローカルホストの TCP:2376 ポートで待ち受けるようになる。

$ ss -tlnp | grep 2376
LISTEN0      128                             127.0.0.1:2376        0.0.0.0:*

これでリモートの Docker ホストの準備は整った。

クライアントからリモートの Docker デーモンを操作する

ここからは Docker クライアントの操作になる。

リモートの Docker ホストに、SSH Port Forward を有効にしてあらためてログインする。

$ ssh -L 2376:localhost:2376 <hostname>

これで、ローカルの TCP:2376 ポートにアクセスすると、リモートの TCP:2376 ポートにつながることになる。

$ lsof -i -P | grep -i listen | grep 2376
ssh       7028 amedama    5u  IPv6 0xda00d64e20c3a761      0t0  TCP localhost:2376 (LISTEN)
ssh       7028 amedama    6u  IPv4 0xda00d64e24baa8d1      0t0  TCP localhost:2376 (LISTEN)

あとは、シェル変数の DOCKER_HOSTtcp://127.0.0.1:2376 を指定して docker コマンドを使うだけ。

$ DOCKER_HOST=tcp://127.0.0.1:2376 docker version
Client: Docker Engine - Community
 Cloud integration  0.1.18
 Version:           19.03.13
 API version:       1.40
 Go version:        go1.13.15
 Git commit:        4484c46d9d
 Built:             Wed Sep 16 16:58:31 2020
 OS/Arch:           darwin/amd64
 Experimental:      false

Server:
 Engine:
  Version:          19.03.6
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.17
  Git commit:       369ce74a3c
  Built:            Wed Feb 19 01:06:16 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.3.3-0ubuntu1~18.04.2
  GitCommit:        
 runc:
  Version:          spec: 1.0.1-dev
  GitCommit:        
 docker-init:
  Version:          0.18.0
  GitCommit:        

上記から、クライアントとサーバで Docker のバージョンが異なっていることが確認できる。

また、シェル変数を有効にしたときとしないときで uname -a の結果が変わっていることもわかる。 コンテナはホストのカーネルを共有するため、このようなことになる。

$ docker container run -it alpine:latest uname -a
Linux 829155145f04 4.19.76-linuxkit #1 SMP Tue May 26 11:42:35 UTC 2020 x86_64 Linux
$ DOCKER_HOST=tcp://127.0.0.1:2376 docker container run -it alpine:latest uname -a
Linux f4981d8001f3 4.15.0-111-generic #112-Ubuntu SMP Thu Jul 9 20:32:34 UTC 2020 x86_64 Linux

これで、ローカルの docker コマンドを使って、リモートにある Docker ホスト上のコンテナを操作できるようになった。

注意点

ただし、このやり方にはいくつか注意点もある。

たとえば、このやり方ではコンテナが動いているのはあくまでリモートのホストになる。 そのため、ホストのボリュームをコンテナでマウントしようとしたときに使われるディレクトリはリモートのものになる。

確認しておこう。 まずはリモートにある Docker ホストにディレクトリを作ってファイルを用意する。

$ mkdir -p /tmp/mnt
$ echo "Remote" > /tmp/mnt/loc.txt

そして、ローカルの Docker クライアントにも同じようにディレクトリを作って区別できるようにファイルを用意する。

$ mkdir -p /tmp/mnt
$ echo "Local" > /tmp/mnt/loc.txt

上記で作ったディレクトリをマウントしたコンテナを起動してみよう。

$ export DOCKER_HOST=tcp://127.0.0.1:2376
$ docker container run \
  -v /tmp/mnt:/mnt \
  -it ubuntu:latest \
  bash

そして、ファイルの中身を確認する。

# cat /mnt/loc.txt 
Remote

うん、リモートだね。

あとは、受け付けるアドレスをローカルホストに絞っているとはいえ、これだとホストにログインできるユーザは誰でも Docker が使える。 もし、それが好ましくない状況であれば、以下のようにクライアントからのアクセス制御をした方が良いと思われる。

docs.docker.com

いじょう。

Python: LIME (Local Interpretable Model Explanations) を LightGBM と使ってみる

今回は、機械学習モデルの解釈可能性を向上させる手法のひとつである LIME (Local Interpretable Model Explanations) を LightGBM と共に使ってみる。 LIME は、大局的には非線形なモデルを、局所的に線形なモデルを使って近似することで、予測の解釈を試みる手法となっている。

今回使った環境は次のとおり。

$ sw_vers                            
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ python -V                                      
Python 3.8.5

もくじ

下準備

まずは、下準備として使うパッケージをインストールしておく。

$ pip install lime scikit-learn lightgbm jupyterlab

LIME は Jupyter の WebUI に可視化する API を提供している。 そのため、今回は Jupyter Lab 上でインタラクティブに試していくことにしよう。

$ jupyter lab

Boston データセットを LightGBM で学習させる

とりあえず、LIME を使うにしても学習済みモデルがないと話が始まらない。 そこで、まずは scikit-learn から Boston データセットを読み込む。

>>> from sklearn import datasets
>>> dataset = datasets.load_boston()
>>> train_x, train_y = dataset.data, dataset.target
>>> feature_names = dataset.feature_names

続いて LightGBM の Early Stopping で検証用データにするためにデータセットを分割しておく。

>>> from sklearn.model_selection import train_test_split
>>> tr_x, val_x, tr_y, val_y = train_test_split(train_x, train_y,
...                                             shuffle=True,
...                                             random_state=42,
...                                            )

それぞれを LightGBM のデータ表現にする。

>>> import lightgbm as lgb
>>> lgb_train = lgb.Dataset(tr_x, tr_y)
>>> lgb_val = lgb.Dataset(val_x, val_y, reference=lgb_train)

上記のデータセットを LightGBM で回帰タスクとして学習させる。

>>> lgbm_params = {
...     'objective': 'regression',
...     'metric': 'rmse',
...     'verbose': -1,
... }
>>> booster = lgb.train(lgbm_params,
...                     lgb_train,
...                     valid_sets=lgb_val,
...                     num_boost_round=1_000,
...                     early_stopping_rounds=100,
...                     verbose_eval=50,
...                     )
Training until validation scores don't improve for 100 rounds
[50]  valid_0's rmse: 3.41701
[100]  valid_0's rmse: 3.30722
[150] valid_0's rmse: 3.24115
[200]  valid_0's rmse: 3.22073
[250] valid_0's rmse: 3.2121
[300]  valid_0's rmse: 3.21216
[350] valid_0's rmse: 3.21811
Early stopping, best iteration is:
[271]  valid_0's rmse: 3.20437

これで LightGBM の学習済みモデルが手に入った。

LIME を使って局所的な解釈を得る

続いては学習モデルの予測を LIME で解釈してみよう。

今回使ったのは構造化されたテーブルデータなので LimeTabularExplainer を用いる。 このクラスに学習データやタスクの内容といった情報を渡してインスタンスを作る。 なお、LIME 自体はテーブルデータ以外にも自然言語処理や画像認識など幅広い応用が効くらしい。

>>> from lime.lime_tabular import LimeTabularExplainer
>>> explainer = LimeTabularExplainer(training_data=tr_x,
...                                  feature_names=feature_names,
...                                  training_labels=tr_y,
...                                  mode='regression',
...                                  verbose=True,
...                                  )

上記を使うには、ひとつの特徴ベクトルを受け取って予測値を返す関数が必要になる。 そこで、今回は次のように定義しておく。

>>> predict_func = lambda x: booster.predict(x,
...                                          num_iteration=booster.best_iteration)

上記を使って、試しに学習データの先頭要素に対する予測を解釈してみよう。 ここでは、線形モデルを使って近似させたときの切片と予測値、そして本来のモデルの予測値が出力される。

>>> explanation = explainer.explain_instance(tr_x[0], predict_func)
Intercept 20.92723014344181
Prediction_local [37.81548009]
Right: 37.949401123930926

次のようにすると、結果が Notebook 上で視覚的に確認できる。

>>> explanation.show_in_notebook(show_table=False)

結果からは、予測されるレンジの中で要素がどこにあるか、そして各特徴量が線形モデルでどのように作用しているかがわかる。 たとえば、近似した線形モデルでは特徴量の RM6.66 以上なので、予測値を 8.71 押し上げる効果があるようだ。

f:id:momijiame:20201009235425p:plain
LIME による予測の解釈

ちなみに、学習データの先頭要素の中身はこんな感じ。

>>> import prettyprint
>>> pprint(dict(zip(feature_names, tr_x[0])))
{'CRIM': 0.09103,
 'ZN': 0.0,
 'INDUS': 2.46,
 'CHAS': 0.0,
 'NOX': 0.488,
 'RM': 7.155,
 'AGE': 92.2,
 'DIS': 2.7006,
 'RAD': 3.0,
 'TAX': 193.0,
 'PTRATIO': 17.8,
 'B': 394.12,
 'LSTAT': 4.82}

条件と、予測値に対してどのように作用しているかは as_list() メソッドで得られる。

>>> pprint(explanation.as_list())
[('RM > 6.66', 8.711363613654798),
 ('LSTAT <= 6.87', 7.923627881873513),
 ('TAX <= 279.00', 1.018446806095536),
 ('78.10 < AGE <= 93.85', -0.5986522163835298),
 ('0.45 < NOX <= 0.54', 0.5314517960646176),
 ('2.08 < DIS <= 3.11', 0.4316641373927205),
 ('16.60 < PTRATIO <= 18.60', 0.29967804190197933),
 ('INDUS <= 5.13', 0.21697123227933096),
 ('0.08 < CRIM <= 0.27', -0.17672992076030344),
 ('RAD <= 4.00', -0.16478059886664842)]

たとえば切片と上記の要素をすべて足すと、最初に得られた線形モデルで近似した予測値になる。

>>> sum(value for key, value in explanation.as_list()) + explanation.intercept[0]
38.33991784140311

上記はあくまで線形モデルを使った近似なので、本来のモデルのアルゴリズムにもとづいて説明しているわけではないはず。 とはいえ、局所的に特徴量がどのように作用しているか確認できるのはなかなか面白い。

所感としては、大局的な解釈にも応用が効く SHAP の方が使い勝手は良さそうかな。

blog.amedama.jp

参考

github.com

arxiv.org

Python: MLflow Models の Custom Python Models でデータを Pickle 以外に永続化する

以前、このブログでは MLflow Models の使い方について以下のようなエントリを書いた。 この中では、Custom Python Models を作るときに、データを Python の Pickle 形式のファイルとして永続化していた。 今回は、それ以外のファイルにデータを永続化する方法について書いてみる。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H2
$ python -V
Python 3.8.5
$ pip list | grep -i mlflow
mlflow                    1.11.0

もくじ

下準備

まずは、下準備として MLflow をインストールしておく。

$ pip install mlflow

Pickle 形式のファイルにモデルを永続化する

まずは、おさらいとして Custom Python Models を作るときに、モデルのデータを Pickle 形式のファイルに永続化する方法から。

以下にサンプルコードを示す。 この中では、定数を入力に加えるだけのモデルとして AddN というクラスを定義している。 このクラスは mlflow.pyfunc.PythonModel を継承しているため、mlflow.pyfunc.save_model()mlflow.pyfunc.load_model() を使ってファイルに読み書きできる。 サンプルコードでは、実際に定数として 5 を加える設定にしたインスタンスをファイルに永続化している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from mlflow import pyfunc


class AddN(pyfunc.PythonModel):
    """指定した数を入力に加えるモデル"""

    def __init__(self, n):
        self.n = n

    def predict(self, context, model_input):
        return model_input.apply(lambda column: column + self.n)


def main():
    # Python の Pickle ファイルとしてモデルを永続化する
    model_path = 'add-n-pickle'
    add5_model = AddN(5)
    pyfunc.save_model(path=model_path,
                      python_model=add5_model)



if __name__ == '__main__':
    main()

上記のポイントは、mlflow.pyfunc.save_model()python_model にインスタンスを指定しているだけというところ。 この場合、永続化されるのはインスタンスを Pickle 形式で直列化したファイルになる。

上記を実行してみよう。

$ python saveaddnpkl.py

実行すると、以下のように MLflow Models のフォーマットに沿ってディレクトリができる。 この中で python_model.pkl が前述した AddN クラスのインスタンスを表す Pickle 形式のファイルになる。

$ ls add-n-pickle 
MLmodel         conda.yaml      python_model.pkl
$ python -m pickle add-n-pickle/python_model.pkl
<__main__.AddN object at 0x1048cdd90>

上記を Python から読み込んで使うサンプルコードも次のように用意した。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
from mlflow import pyfunc


def main():
    model_path = 'add-n-pickle'
    loaded_model = pyfunc.load_model(model_path)

    x = pd.DataFrame(list(range(10, 21)), columns=['n'])
    y = loaded_model.predict(x)

    print(f'Input: {x.n.values}')
    print(f'Output: {y.n.values}')


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python loadaddnpkl.py
Input: [10 11 12 13 14 15 16 17 18 19 20]
Output: [15 16 17 18 19 20 21 22 23 24 25]

ちゃんと、読み込んだモデルが入力データに定数として 5 を加えていることが確認できる。

テキストファイルにインスタンスのアトリビュートを永続化してみる

それでは、続いて Pickle 以外のフォーマットのファイルも使って永続化するパターンを扱う。 これは、たとえば永続化したいモデルが Pickle 以外のフォーマットでファイルに読み書きする API がある場合などに使い勝手が良い。 実際に MLflow Models と XGBoost や LightGBM のインテグレーションは、フレームワークが提供する永続化用 API を流用して書かれている。

以下にサンプルコードを示す。 今回は、入力に加算する定数をテキストファイルとして、AddN クラスのインスタンスとは別に永続化するやり方を取った。 また、先ほどはファイルへの書き込みと読み込みを別の Python モジュールに分けたのに対して、これはひとつで完結させている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tempfile
import os

import pandas as pd
from mlflow import pyfunc


class AddN(pyfunc.PythonModel):
    """指定した数を入力に加えるモデル"""

    def __init__(self, n=None):
        self.n = n

    def load_context(self, context: pyfunc.PythonModelContext):
        """インスタンスの状態を復元するのに使われるコールバック"""
        # アーティファクトのパスを取り出す
        artifact_path = context.artifacts['n_file']
        with open(artifact_path, mode='r') as fp:
            # ファイルからモデルのパラメータを復元する
            self.n = int(fp.read())

    def predict(self,
                context: pyfunc.PythonModelContext,
                model_input: pd.DataFrame):
        return model_input.apply(lambda column: column + self.n)

    def __repr__(self):
        """インスタンスの文字列表現を得るときに呼ばれる特殊メソッド"""
        return f'<AddN n:{self.n}>'


def save_model(n: int, path: str):
    """モデルをアーティファクトに永続化するためのユーティリティ関数"""
    # モデルが動作するのに必要なパラメータをテキストファイルなどにアーティファクトとして書き出す
    filename = 'n.txt'
    with tempfile.TemporaryDirectory() as d:
        # 一時ディレクトリを用意して、そこにファイルを作る
        artifact_path = os.path.join(d, filename)
        with open(artifact_path, mode='w') as fp:
            fp.write(str(n))
        # 上記で作ったファイルをアーティファクトとして記録する
        # NOTE: path 以下にファイルがコピーされる
        artifacts = {
            'n_file': artifact_path,
        }
        pyfunc.save_model(path=path,
                          # このインスタンスに load_context() メソッド経由でパラメータが読み込まれる
                          python_model=AddN(),
                          artifacts=artifacts,
                          )


def main():
    # モデルをアーティファクトに永続化する
    model_path = 'add-n-artifacts'
    save_model(5, model_path)

    # モデルをアーティファクトから復元する
    loaded_model = pyfunc.load_model(model_path)

    # 動作を確認する
    x = pd.DataFrame(list(range(10, 21)), columns=['n'])
    y = loaded_model.predict(x)

    print(f'Input: {x.n.values}')
    print(f'Output: {y.n.values}')


if __name__ == '__main__':
    main()

上記にはポイントがいくつかある。 まず、書き込みに関しては mlflow.pyfunc.save_model() を呼ぶ際に artifacts というオプションを指定している。 これには、モデルのデータなどを記録したファイルへのパスを Python の辞書として渡す。 ちなみに、ここに指定するパスは実際には Artifact URI なので、リモートのストレージにあっても構わない。 この点は、おそらく MLflow Tracking と組み合わせて使うことを想定しているのだと思う。 もちろん、ファイルはあらかじめ、そのパス (繰り返しになるけど、実際には Artifact URI) に存在している必要がある。 ここに指定したファイルは、MLflow Models が作成するディレクトリへとコピーされる。 そして、読み込みに関しては mlflow.pyfunc.PythonModel を継承したクラスに load_context() というメソッドを実装する。 このメソッドは、インスタンスを Pickle 形式のファイルから読み込んだ後に呼ばれるようだ。 つまり、Pickle 以外のファイルにデータを保存しているときに、モデルの状態をそれで更新するときに使うことができる。

さて、前置きが長くなったけど実際に上記のサンプルコードを実行してみよう。

$ python addnartifacts.py
Input: [10 11 12 13 14 15 16 17 18 19 20]
Output: [15 16 17 18 19 20 21 22 23 24 25]

ちゃんと想定どおりの入出力になっている。

MLflow Models が作成したディレクトリを確認してみよう。 すると、artifacts というサブディレクトリがあることに気づく。

$ ls add-n-artifacts 
MLmodel         artifacts       conda.yaml      python_model.pkl

中を見ると、インスタンスのアトリビュートがテキストファイルとして書き込まれている。

$ cat add-n-artifacts/artifacts/n.txt   
5

それ以外には、アトリビュートが None の状態の AddN クラスのインスタンスが Pickle 形式で永続化されていることがわかる。

$ python -m pickle add-n-artifacts/python_model.pkl
<AddN n:None>

いじょう。

Python: MLflow Models を使ってみる

MLflow は MLOps に関連した OSS のひとつ。 いくつかのコンポーネントに分かれていて、それぞれを必要に応じて独立して使うことができる。

その中でも、今回扱う MLflow Models は主に学習済みモデルやパイプラインの取り回しに関するコンポーネント。 MLflow Models を使うことで、たとえば学習済みモデルの Serving やシステムへの組み込みが容易になる可能性がある。

使った環境は次のとおり。

$ sw_vers                     
ProductName:    Mac OS X
ProductVersion: 10.15.6
BuildVersion:   19G73
$ python -V                                        
Python 3.8.5
$ pip list | egrep "(mlflow|lightgbm|scikit-learn)"
lightgbm                  3.0.0
mlflow                    1.11.0
scikit-learn              0.23.2

もくじ

下準備

はじめに、必要なパッケージをインストールしておく。

$ pip install mlflow lightgbm scikit-learn seaborn category_encoders

モデルを MLflow Models で永続化する

論よりコードということで、いきなりだけど以下にサンプルコードを示す。 このサンプルコードでは Boston データセットを LightGBM で学習するコードになっている。 そして、学習させたモデルを MLflow Models を使って永続化している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    # Boston データセットを読み込む
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    # 学習用データと検証用データに分割する
    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # LightGBM のデータ形式に直す
    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    # モデルを学習する
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    # モデルを MLflow Models の形式で永続化する
    mlflow_lgb.save_model(booster, path='mlflow-lgbm')
    """
    # MLflow Tracking に残すならこうする
    with mlflow.start_run():
        mlflow_lgb.log_model(booster,
                             artifact_path='mlflow-lgbm')
    """


if __name__ == '__main__':
    main()

上記のモジュールを実行してみよう。

$ python lgbmlf.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

すると、次のようにディレクトリができる。 このディレクトリと中身のファイルが、MLflow Models を使って永続化したモデルを表している。 要するに、決められたフォーマットに沿って学習済みモデルをパッケージングしている。

$ ls mlflow-lgbm 
MLmodel     conda.yaml  model.lgb

この中で特に重要なのが MLmodel という YAML フォーマットで書かれたファイル。 このファイルには、そのモデルがどのように永続化されたかといった情報が記録されている。

$ cat mlflow-lgbm/MLmodel 
flavors:
  lightgbm:
    data: model.lgb
    lgb_version: 3.0.0
  python_function:
    data: model.lgb
    env: conda.yaml
    loader_module: mlflow.lightgbm
    python_version: 3.8.5
utc_time_created: '2020-09-30 09:44:55.890106'

なお、上記のフォーマットの詳細は次のドキュメントに記載されている。

www.mlflow.org

www.mlflow.org

また、conda.yaml というファイルには Conda の仮想環境に関する情報が記録されている。 これはつまり、永続化したモデルを利用するために必要な Conda の環境を構築するたのもの。 MLflow Models では、デフォルトで Conda の仮想環境上に学習済みモデルをデプロイすることを想定している。

たとえば、中身を見ると LightGBM が依存パッケージとして追加されていることがわかる。

$ cat mlflow-lgbm/conda.yaml 
channels:
- defaults
- conda-forge
dependencies:
- python=3.8.5
- pip
- pip:
  - mlflow
  - lightgbm==3.0.0
name: mlflow-env

永続化したモデルを使って推論用の REST API を立ち上げる

ここからは MLflow Models を使うことで得られる嬉しさについて書いていく。 MLflow には、MLflow Models で永続化したモデルを扱うための機能がいくつか用意されている。

たとえば、MLflow には mlflow というコマンドラインが用意されている。 このコマンドの models serve サブコマンドを使うと、学習済みモデルを使った推論用の REST API が気軽に立てられる。

実際に使ってみよう。 コマンドを実行する際に、--model-uri オプションには、先ほど永続化したディレクトリを指定する。 また、今回は Conda を使っていないので --no-conda オプションをつけた。 これで、デフォルトでは localhost の 5000 番ポートで推論用の API が立ち上がる

$ mlflow models serve \
    --no-conda \
    --model-uri mlflow-lgbm
2020/09/30 18:49:46 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/09/30 18:49:46 INFO mlflow.pyfunc.backend: === Running command 'gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-09-30 18:49:47 +0900] [22853] [INFO] Starting gunicorn 20.0.4
[2020-09-30 18:49:47 +0900] [22853] [INFO] Listening at: http://127.0.0.1:5000 (22853)
[2020-09-30 18:49:47 +0900] [22853] [INFO] Using worker: sync
[2020-09-30 18:49:47 +0900] [22855] [INFO] Booting worker with pid: 22855

上記に推論させたいデータを HTTP で投げ込んでみよう。 たとえば curl コマンドを使って以下のようにする。

$ curl -X POST \
    -H "Content-Type:application/json" \
    --data '{"columns": ["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT"], "data": [[10.233, 0.0, 18.1, 0.0, 0.614, 6.185, 96.7, 2.1705, 24.0, 666.0, 20.2, 379.7, 18.03]]}' \
    http://localhost:5000/invocations
[15.444706764627714]

すると、推論の結果として 15.44... という結果が得られた。

永続化したモデルを使って CSV ファイルを処理する

また、同様に CSV のファイルを処理することもできる。 さっきと同じ内容を CSV ファイルに記録してみよう。

$ cat << 'EOF' > data.csv
CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
10.233,0.0,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03
EOF

今度は models predict というサブコマンドを使う。 --content-type オプションには csv を指定する。 そして、--input-path オプションに先ほど保存した CSV ファイルを指定する。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 18:51:50 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[15.444706764627714]

先ほどと同じように、推論結果として 15.44... という値が得られた。

ただ、現状のままだと上手くいかない場面もある。 たとえば、CSV のカラムを一部入れ替えてみよう。 以下では CRIM カラムと ZN カラムの順番が入れ替わっている。

$ cat << 'EOF' > data.csv
ZN,CRIM,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0.0,10.233,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03
EOF

このファイルを使ってもう一度同じことをしてみよう。 ちゃんとカラム名まで認識していれば結果は変わらないはず。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 18:52:33 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[16.96720478471085]

しかし、残念ながら結果は変わってしまった。 つまり、先ほどのサンプルコードではカラム名の情報までは永続化できていない。

Signature を追加する

カラム名まで認識してほしいときは、モデルを永続化する際に Signature という情報を追加する必要がある。

以下にサンプルコードを示す。 先ほどのサンプルコードに、Pandas の DataFrame から自動的に Signature を認識させるコードを追加している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from mlflow.models.signature import infer_signature
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    y_tr_pred = booster.predict(x_tr, num_iteration=booster.best_iteration)
    # 入力が DataFrame であれば、場合によってはカラム名とデータ型を自動で認識してくれる
    x_tr_df = pd.DataFrame(x_tr, columns=dataset.feature_names)
    signature = infer_signature(x_tr_df, y_tr_pred)
    # 渡すデータと推論の結果を Signature として付与する
    mlflow_lgb.save_model(booster, path='mlflow-lgbm-with-sig', signature=signature)


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python lgbmlfsig.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

今度は保存された MLmodel ファイルに signature という情報が付与されている。 中身を見るとカラム名とデータ型が入っている。

$ ls mlflow-lgbm-with-sig
MLmodel     conda.yaml  model.lgb
$ cat mlflow-lgbm-with-sig/MLmodel 
flavors:
  lightgbm:
    data: model.lgb
    lgb_version: 3.0.0
  python_function:
    data: model.lgb
    env: conda.yaml
    loader_module: mlflow.lightgbm
    python_version: 3.8.5
signature:
  inputs: '[{"name": "CRIM", "type": "double"}, {"name": "ZN", "type": "double"},
    {"name": "INDUS", "type": "double"}, {"name": "CHAS", "type": "double"}, {"name":
    "NOX", "type": "double"}, {"name": "RM", "type": "double"}, {"name": "AGE", "type":
    "double"}, {"name": "DIS", "type": "double"}, {"name": "RAD", "type": "double"},
    {"name": "TAX", "type": "double"}, {"name": "PTRATIO", "type": "double"}, {"name":
    "B", "type": "double"}, {"name": "LSTAT", "type": "double"}]'
  outputs: '[{"type": "double"}]'
utc_time_created: '2020-09-30 09:58:18.952375'

それでは、Signature を追加したモデルで推論させてみよう。 CSV ファイルは先ほどと同じものを使う。 つまり、モデルの学習時と推論時でカラムの順番が入れかわっている。

$ cat data.csv                 
ZN,CRIM,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0.0,10.233,18.1,0.0,0.614,6.185,96.7,2.1705,24.0,666.0,20.2,379.7,18.03

永続化したモデルを使って推論させてみる。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-lgbm-with-sig/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:00:54 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
[15.444706764627714]

今度は、カラムを入れかえているにも関わらず、結果が変わらなくなった。 上手くいっているようだ。

ちなみに手動で Signature の情報を指定するときは次のようにすれば良い。

    # 手動で Signature を構築する場合
    from mlflow.models.signature import ModelSignature
    from mlflow.types.schema import Schema
    from mlflow.types.schema import ColSpec
    input_schema = Schema([
        ColSpec('double', 'CRIM'),
        ColSpec('double', 'ZN'),
        ColSpec('double', 'INDUS'),
        ColSpec('double', 'CHAS'),
        ColSpec('double', 'NOX'),
        ColSpec('double', 'RM'),
        ColSpec('double', 'AGE'),
        ColSpec('double', 'DIS'),
        ColSpec('double', 'RAD'),
        ColSpec('double', 'TAX'),
        ColSpec('double', 'PTRATIO'),
        ColSpec('double', 'B'),
        ColSpec('double', 'LSTAT'),
    ])
    output_schema = Schema([ColSpec('double', 'MEDV')])
    signature = ModelSignature(inputs=input_schema, outputs=output_schema)
    mlflow_lgb.save_model(booster, path='mlflow-lgbm-with-sig', signature=signature)

Input Example を追加する

また、永続化するモデルにはサンプルとなる入力データも Input Example として同梱させることができる。 次は Input Example も追加してみよう。

以下にサンプルコードを示す。 やっていることは簡単で、学習させたデータの先頭の何件かを与えているだけ。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import lightgbm as lgb
from mlflow import lightgbm as mlflow_lgb
from mlflow.models.signature import infer_signature
from sklearn import datasets
from sklearn.model_selection import train_test_split


def main():
    dataset = datasets.load_boston()
    train_x, train_y = dataset.data, dataset.target
    feature_names = list(dataset.feature_names)

    x_tr, x_eval, y_tr, y_eval = train_test_split(train_x, train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    lgb_train = lgb.Dataset(x_tr, y_tr, feature_name=feature_names)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
        'random_state': 42,
    }
    booster = lgb.train(params=lgbm_params,
                        train_set=lgb_train,
                        valid_sets=[lgb_train, lgb_eval],
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    y_tr_pred = booster.predict(x_tr, num_iteration=booster.best_iteration)
    x_tr_df = pd.DataFrame(x_tr, columns=dataset.feature_names)
    signature = infer_signature(x_tr_df, y_tr_pred)
    # サンプルの入力データをつける
    input_example = x_tr_df.iloc[:5]
    mlflow_lgb.save_model(booster,
                          path='mlflow-lgbm-with-sig-and-example',
                          input_example=input_example,
                          signature=signature)


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python lgbmlfeg.py
Training until validation scores don't improve for 100 rounds
[50]  training's rmse: 2.23963    valid_1's rmse: 3.59161
[100] training's rmse: 1.55562    valid_1's rmse: 3.4141
[150] training's rmse: 1.22661    valid_1's rmse: 3.36079
[200] training's rmse: 1.0165 valid_1's rmse: 3.35222
[250] training's rmse: 0.860022   valid_1's rmse: 3.34358
[300] training's rmse: 0.735403   valid_1's rmse: 3.34575
[350] training's rmse: 0.630665   valid_1's rmse: 3.35982
Early stopping, best iteration is:
[254] training's rmse: 0.848352   valid_1's rmse: 3.33877
Evaluated only: rmse

見ると、今度は input_example.json というファイルがディレクトリに追加されている。

$ ls mlflow-lgbm-with-sig-and-example 
MLmodel         conda.yaml      input_example.json  model.lgb
$ cat mlflow-lgbm-with-sig-and-example/input_example.json 
{"columns": ["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT"], "data": [[10.233, 0.0, 18.1, 0.0, 0.614, 6.185, 96.7, 2.1705, 24.0, 666.0, 20.2, 379.7, 18.03], [0.67191, 0.0, 8.14, 0.0, 0.538, 5.813, 90.3, 4.682, 4.0, 307.0, 21.0, 376.88, 14.81], [0.14455, 12.5, 7.87, 0.0, 0.524, 6.172, 96.1, 5.9505, 5.0, 311.0, 15.2, 396.9, 19.15], [0.11132, 0.0, 27.74, 0.0, 0.609, 5.983, 83.5, 2.1099, 4.0, 711.0, 20.1, 396.9, 13.35], [0.12802, 0.0, 8.56, 0.0, 0.52, 6.474, 97.1, 2.4329, 5.0, 384.0, 20.9, 395.24, 12.27]]}

試しに、このサンプルを推論させてみよう まずは REST API を立ち上げる。

$ mlflow models serve \
    --no-conda \
    --model-uri mlflow-lgbm-with-sig-and-example
2020/09/30 19:05:39 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/09/30 19:05:39 INFO mlflow.pyfunc.backend: === Running command 'gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-09-30 19:05:39 +0900] [23035] [INFO] Starting gunicorn 20.0.4
[2020-09-30 19:05:39 +0900] [23035] [INFO] Listening at: http://127.0.0.1:5000 (23035)
[2020-09-30 19:05:39 +0900] [23035] [INFO] Using worker: sync
[2020-09-30 19:05:39 +0900] [23037] [INFO] Booting worker with pid: 23037

サンプルの JSON ファイルを使って REST API を叩く。

$ curl -X POST \
    -H "Content-Type:application/json" \
    --data "$(cat mlflow-lgbm-with-sig-and-example/input_example.json)" \
    http://localhost:5000/invocations
[15.444706764627714, 16.79758862860849, 25.64257218297901, 19.626464010328057, 20.184689951658456]

ちゃんと推論できているようだ。 今のところクライアント側からサンプルの情報は得られないのかな。 とはいえ、モデルがどんな入力を受け取るかソースコードを見て調べることってよくある。 なので、管理する上で助かるといえば助かるのかな。

前処理が必要なデータセットで試す

ところで、ここまでのサンプルコードには前処理が入っていなかった。 しかし、実際には前処理が存在しない機械学習のコードなんて考えられないだろう。 続いては前処理を含んだコードを MLflow Models で扱う方法について考えている。

たとえば、以下のサンプルコードでは前処理と推論の処理を scikit-learn の Pipeline としてまとめている。 Pipeline にまとめるには、関連するオブジェクトが scikit-learn のインターフェースに準拠している必要がある。 そこで LightGBM の分類器としては LGBMClassifier を使った。 また、ラベルエンコードには category_encoders の実装を使っている。 分類するデータには Titanic データセットを使った。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import sklearn as mlflow_sklearn
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import lightgbm as lgb


def main():
    # Titanic データを読み込む
    df = sns.load_dataset('titanic')

    # 使う特徴量
    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    # 前処理 (ラベルエンコード)
    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    # 学習用データと検証用データに分割する
    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # 分類器
    clf = lgb.LGBMClassifier(n_estimators=1_000,
                             first_metric_only=True,
                             random_state=42,
                             )
    # 学習させる
    clf.fit(x_tr, y_tr,
            early_stopping_rounds=100,
            eval_set=[(x_eval, y_eval)],
            verbose=50,
            eval_metric='binary_logloss',
            )

    # 学習させたエンコーダーとモデルをパイプラインにまとめる
    steps = [
        ('preprocessing', encoder),
        ('classification', clf)
    ]
    pipeline = Pipeline(steps)

    # パイプラインを MLflow Models で保存する
    # NOTE: Categorical な型があると MLflow がスキーマをうまく推測できない
    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)
    input_example = train_x.iloc[:5]
    mlflow_sklearn.save_model(pipeline,
                              path='mlflow-sklearn-pipeline',
                              signature=signature,
                              input_example=input_example,
                              )


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python skpipemlf.py
Training until validation scores don't improve for 100 rounds
[50]  valid_0's binary_logloss: 0.457466
[100]  valid_0's binary_logloss: 0.510931
Early stopping, best iteration is:
[25]  valid_0's binary_logloss: 0.427704
Evaluated only: binary_logloss

永続化したモデルで推論してみよう。 データは次のように CSV のファイルとして記録しておく。 見ると分かるとおり、扱う上ではエンコードが必要となるカラムが複数含まれている。

$ cat << 'EOF' > data.csv 
class,sex,age,sibsp,parch,fare,embark_town,deck
Third,male,22.0,1,0,7.25,Southampton,
First,female,38.0,1,0,71.2833,Cherbourg,C
Third,female,26.0,0,0,7.925,Southampton,
First,female,35.0,1,0,53.1,Southampton,C
Third,male,35.0,0,0,8.05,Southampton,
EOF

しかし、先ほどのサンプルコードでは前処理を含めたパイプラインを MLflow Models で永続化している。 そのため、前処理が必要なデータをそのまま放り込んでも推論できる。 DeprecationWarning は出ているところは愛嬌ということで。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-sklearn-pipeline/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:19:29 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0, 1, 0, 1, 0]

ところで上記を見てわかるとおり、結果はバイナリの整数に丸められてしまっている。 これは MLflow Models の sklearn モジュールでは、モデルの predict() メソッドを呼ぶように作られているため。 scikit-learn のインターフェースでは分類器の predict() が整数に丸めた結果を返してしまう。

モデルが確率 (predict_proba) を返すようにする

ただ、丸めた結果だけでは困るケースが多いはず。 なので、試しに predict_proba() の結果を返すようにしてみよう。 やり方は簡単で LGBMClassifier を継承して predict()predict_proba() にすりかえるクラスを用意する。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import sklearn as mlflow_sklearn
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import lightgbm as lgb


class LGBMClassifierWrapper(lgb.LGBMClassifier):
    """predict() の処理を predict_proba() にリダイレクトするラッパー"""

    def predict(self, *args, **kwargs):
        # 処理をリダイレクトする
        proba = super().predict_proba(*args, **kwargs)
        # Positive の確率を返す
        return proba[:, 1]

def main():
    df = sns.load_dataset('titanic')

    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )
    # 処理をラップした分類器を使う
    clf = LGBMClassifierWrapper(n_estimators=1_000,
                                first_metric_only=True,
                                random_state=42,
                                )
    clf.fit(x_tr, y_tr,
            early_stopping_rounds=100,
            eval_set=[(x_eval, y_eval)],
            verbose=50,
            eval_metric='binary_logloss',
            )

    steps = [
        ('preprocessing', encoder),
        ('classification', clf)
    ]
    pipeline = Pipeline(steps)

    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)
    input_example = train_x.iloc[:5]
    mlflow_sklearn.save_model(pipeline,
                              path='mlflow-sklearn-pipeline-with-proba',
                              signature=signature,
                              input_example=input_example,
                              )


if __name__ == '__main__':
    main()

上記を実行しよう。

$ python skpipemlfp.py
Training until validation scores don't improve for 100 rounds
[50]  valid_0's binary_logloss: 0.457466
[100]  valid_0's binary_logloss: 0.510931
Early stopping, best iteration is:
[25]  valid_0's binary_logloss: 0.427704
Evaluated only: binary_logloss

推論させてみると、今度はちゃんと浮動小数点の結果になっている。

$ mlflow models predict \                   
    --no-conda \
    --model-uri mlflow-sklearn-pipeline-with-proba/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 19:24:09 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0.20596751692506657, 0.9204031036579741, 0.4137860515635112, 0.9286529085847584, 0.0976711088927368]

永続化した内容を Python から読み込んで使う

ここまでの内容は、永続化した内容を常に mlflow コマンドから読み込んで使ってきた。 しかし、Python のコードから読み込んで使いたいケースも当然あるはず。

以下のサンプルコードでは先ほど永続化したモデルを読み込んで使っている。 具体的には mlflow.pyfunc.load_model() を使えばモデルが読み込める。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
from mlflow import pyfunc


def main():
    # Titanic データを読み込む
    df = sns.load_dataset('titanic')

    # Categorical 型は文字列に直す
    df = df.astype({
        'class': str,
        'deck': str,
    })

    # 使う特徴量の名前
    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    # 保存したモデルを読み込む
    model_path = 'mlflow-sklearn-pipeline-with-proba'
    # 汎用な pyfunc モジュールから読み出せる
    loaded_model = pyfunc.load_model(model_path)
    """
    # あるいは sklearn 向けモジュールから読んでも良い
    from mlflow import sklearn as mlflow_sklearn
    loaded_model = mlflow_sklearn.load_model(model_path)
    """

    # 保存したモデルで予測する
    # NOTE: ここで予測しているのはモデルが見たことのあるデータなので、あくまでデモとして
    train_y_pred = loaded_model.predict(train_x)

    # 先頭を表示してみる
    print(f'Inference: {train_y_pred[:5]}')
    # 正解
    print(f'GroundTruth: {train_y.values[:5]}')


if __name__ == '__main__':
    main()

ポイントとしては、永続化に使ったモジュールが何であれ、この統一されたインターフェースから読み出せるということ。 ようするに mlflow.sklearnmlflow.lightgbm などのモジュールを使って永続化したモデルであっても、ひとつの API で読める。 MLmodel ファイルには loader_module という項目に、モデルの復元に使うモジュールが指定されているため、このようなことが実現できる。 復元したモデルには predict() メソッドがあるので、あとはこれを使って推論すれば良い。

上記を実行してみよう。

$ python load.py                      
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
Inference: [0.20596752 0.9204031  0.41378605 0.92865291 0.09767111]
GroundTruth: [0 1 1 1 0]

ちゃんと推論できている。

Custom Python Models を作る

先ほど扱ったサンプルコードでは、前処理とモデルが scikit-learn のインターフェースを備えていることを前提としていた。 しかし、扱うコードによっては scikit-learn のインターフェースがない場合もあるはず。 続いては、そんな場合にどうすれば良いかを扱う。

最も簡単なやり方は mlflow.pyfunc.PythonModel を継承したクラスを作るというもの。 継承したクラスの predict() メソッドに、生データから推論するまでに必要な処理のパイプラインを詰め込む。 そして、このクラスのインスタンスを mlflow.pyfunc.save_model() で永続化してやれば良い。

以下にサンプルコードを示す。 今度は LightGBM の標準 API を使っているため scikit-learn のインターフェースに準拠していない。 つまり、scikit-learn の Pipeline にまとめる作戦が使えない状況を意図的に作り出している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import seaborn as sns
import category_encoders as ce
from mlflow import pyfunc
from mlflow.models.signature import ModelSignature
from mlflow.pyfunc.model import get_default_conda_env
from mlflow.types.schema import Schema
from mlflow.types.schema import ColSpec
from sklearn.model_selection import train_test_split
import lightgbm as lgb


class InferencePipeline(pyfunc.PythonModel):
    """推論に使うパイプライン"""

    def __init__(self, preprocessor, estimator):
        self.preprocessor = preprocessor
        self.estimator = estimator

    def predict(self, context, model_input):
        """入力を推論結果に変換する過程"""
        transformed_input = self.preprocessor.transform(model_input)
        prediction = self.estimator.predict(transformed_input)
        return prediction


def main():
    df = sns.load_dataset('titanic')

    feature_names = [
        'class',
        'sex',
        'age',
        'sibsp',
        'parch',
        'fare',
        'embark_town',
        'deck',
    ]
    train_x = df[feature_names]
    train_y = df['survived']

    cols = ['class', 'sex', 'embark_town', 'deck']
    encoder = ce.OrdinalEncoder(cols)
    encoded_train_x = encoder.fit_transform(train_x)

    x_tr, x_eval, y_tr, y_eval = train_test_split(encoded_train_x,
                                                  train_y,
                                                  test_size=0.33,
                                                  shuffle=True,
                                                  random_state=42,
                                                  )

    # lightgbm.train() を使う
    # 返ってくる Booster オブジェクトには scikit-learn インターフェースがない
    lgb_train = lgb.Dataset(x_tr, y_tr)
    lgb_eval = lgb.Dataset(x_eval, y_eval, reference=lgb_train)

    lgb_params = {
        'objective': 'binary',
        'metrics': 'binary_logloss',
        'first_metric_only': True,
        'random_state': 42,
        'verbose': -1,
    }
    booster = lgb.train(lgb_params, lgb_train,
                        valid_sets=lgb_eval,
                        num_boost_round=1_000,
                        early_stopping_rounds=100,
                        verbose_eval=50,
                        )

    # 前処理と推論の処理を mlflow.pyfunc.PythonModel を継承したクラスのインスタンスにまとめる
    pipeline = InferencePipeline(encoder, booster)

    input_schema = Schema([
        ColSpec('string', 'class'),
        ColSpec('string', 'sex'),
        ColSpec('double', 'age'),
        ColSpec('long', 'sibsp'),
        ColSpec('long', 'parch'),
        ColSpec('double', 'fare'),
        ColSpec('string', 'embark_town'),
        ColSpec('string', 'deck'),
    ])
    output_schema = Schema([ColSpec('double', 'survived')])
    signature = ModelSignature(inputs=input_schema,
                               outputs=output_schema)

    input_example = train_x.iloc[:5]

    # 動作に必要な依存ライブラリを追加する
    conda_env = get_default_conda_env()
    deps = conda_env['dependencies']
    other_deps = deps[-1]  # XXX: ちょっと決め打ちすぎ
    other_deps['pip'].append('category_encoders')
    other_deps['pip'].append('scikit-learn')
    other_deps['pip'].append('lightgbm')

    # 永続化する
    pyfunc.save_model(path='mlflow-custom-pyfunc-model',
                      python_model=pipeline,
                      signature=signature,
                      input_example=input_example,
                      conda_env=conda_env,
                      )


if __name__ == '__main__':
    main()

MLmodel は次のように記録されている。 モデルの本体は Pickle オブジェクトとして python_model.pkl にある

$ cat mlflow-custom-pyfunc-model/MLmodel   
flavors:
  python_function:
    cloudpickle_version: 1.6.0
    env: conda.yaml
    loader_module: mlflow.pyfunc.model
    python_model: python_model.pkl
    python_version: 3.8.5
saved_input_example_info:
  artifact_path: input_example.json
  pandas_orient: split
  type: dataframe
signature:
  inputs: '[{"name": "class", "type": "string"}, {"name": "sex", "type": "string"},
    {"name": "age", "type": "double"}, {"name": "sibsp", "type": "long"}, {"name":
    "parch", "type": "long"}, {"name": "fare", "type": "double"}, {"name": "embark_town",
    "type": "string"}, {"name": "deck", "type": "string"}]'
  outputs: '[{"name": "survived", "type": "double"}]'
utc_time_created: '2020-09-30 09:11:38.717424'

永続化した内容を使って推論させてみよう。

$ mlflow models predict \
    --no-conda \
    --model-uri mlflow-custom-pyfunc-model/ \
    --content-type csv \
    --input-path data.csv
2020/09/30 20:00:10 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/patsy/constraint.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working
  from collections import Mapping
[0.20596751692506657, 0.9204031036579741, 0.4137860515635112, 0.9286529085847584, 0.0976711088927368]

ちゃんと動いていることがわかる。

ちなみに、今回は扱わなかったけどモデルの情報を Pickle 以外のファイルに artifacts として保存することもできるようだ。 また、さらに複雑なモデルや Python 以外の言語を使う場合には、自分で Custom Flavor を書くこともできる。

とりあえず、そんな感じで。