CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi でタスク共通のパラメータを扱う

今回は、Luigi で複数のタスクが共通のパラメータを扱う方法について考えてみる。 ここらへん、調べてもあまりドキュメントなどが出てこなかった。 なので、ソースコードを読んでリバースエンジニアリング的に「こういう風にできそう」と判明した内容を書いてみる。 使う API のレイヤー的に、高レベルなやり方と低レベルなやり方が見つかったので、どちらも記載する。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V        
Python 3.9.5
$ pip list | grep -i luigi
luigi           3.0.3

もくじ

下準備

まずは、下準備として Luigi をインストールしておく。

$ pip install luigi

低レベル API (luigi.configuration.get_config()) を使う

まずは低レベル API の luigi.configuration.get_config() を使うやり方から。 この API を使うと、Luigi の設定ファイルを辞書形式でそのまま読み込むことができる。 読み込んだコンフィグは、どのタスクから利用することもできるため共通のパラメータを扱うことができる。

以下にサンプルコードを示す。 サンプルコードには TaskATaskB という、2 つのタスクを定義している。 この中では、それぞれ設定ファイルから [SharedConfig] セクションの shared_param パラメータを読み込んで使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
# 設定を取得するための API
from luigi.configuration import get_config


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig セクションのパラメータを取得する
        section_dict = dict(get_config().items('SharedConfig'))
        # パラメータの内容を標準出力に書き出す (ほんとは output() に書くべき)
        print('Hello,', section_dict['shared_param'], 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # 同じパラメータを使う
        section_dict = dict(get_config().items('SharedConfig'))
        print('Hello,', section_dict['shared_param'], 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

次のように設定ファイルを用意しよう。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

あとは実行するだけ。

$ python lowlayer.py

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の結果からわかるように、タスクが出力するメッセージの中に設定ファイルで指定した値が使われている。

高レベル API (luigi.Config) を使う

続いては高レベル API の luigi.Config を使うパターン。 こちらは luigi.Config というクラスを継承したクラスを定義する。 設定ファイからは、定義したクラスと同名のセクション経由でパラメータを設定できる。 複数のタスクからは、クラスをインスタンス化してやればパラメータがインジェクションされて得られる。

以下にサンプルコードを示す。 先ほどのサンプルコードから SharedConfig というクラスが増えている。 そして、TaskATaskBSharedConfig をインスタンス化して shared_param パラメータにアクセスしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class SharedConfig(luigi.Config):
    """複数のタスクから参照される共通のパラメータ"""
    shared_param = luigi.Parameter()


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig をインスタンス化してパラメータを取り出す
        # パラメータは luigi の設定ファイルで指定できる
        print('Hello,', SharedConfig().shared_param, 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # こちらも同様
        print('Hello,', SharedConfig().shared_param, 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

設定ファイルは先ほどと同じで良い。 セクション名とパラメータ名が同じになるようにクラスを定義してあるため。 作り直すなら次のようにする。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

実行してみよう。

$ python highlayer.py 

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果から、ちゃんとパラメータを参照できていることがわかる。

ところで、上記で使った luigi.Config というクラス、定義を見ると面白いことがわかる。 以下がそのソースコード。

github.com

なんと、luigi.Configluigi.Task を継承しているだけで、他に何もしていない。 つまり、ほとんど同一のものということになる。 実は、luigi.Config を使わなくても、luigi.Task でも同じことはできるのだ。

いじょう。

Python: Jupyter の IPython Kernel にスタートアップスクリプトを登録する

今回は Jupyter の IPython Kernel に、スタートアップスクリプトを登録する方法について書いてみる。 スタートアップスクリプトというのは、カーネルの起動時に読み込まれるコードのこと。 IPython Kernel というのは、いわゆるフツーのノートブックを Jupyter で実行するときに動いているバックエンドのプログラムを指している。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V           
Python 3.9.5
$ jupyter --version
jupyter core     : 4.7.1
jupyter-notebook : 6.4.0
qtconsole        : not installed
ipython          : 7.24.1
ipykernel        : 5.5.5
jupyter client   : 6.1.12
jupyter lab      : 3.0.16
nbconvert        : 6.1.0
ipywidgets       : not installed
nbformat         : 5.1.3
traitlets        : 5.0.5

もくじ

下準備

まずは JupyterLab と Pandas をインストールしておく。 Pandas の方はインポートするのに使うだけなので、あまり本質ではない。

$ pip install jupyterlab pandas

IPython のプロファイルについて

まず、本題に入る前に IPython のプロファイルという概念を説明しておく。 プロファイルは、ようするに IPython が動作するときの設定を扱う名前空間みたいなもの。 存在するプロファイルは ipython profile list コマンドで確認できる。

$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

上記のように、何もしなくても初期状態で default という名前のプロファイルがある。

実は、何気なく実行している ipython コマンドは、暗に --profile=default オプションをつけているのと等価になっている。

$ ipython  # ipython --profile=default と同じ

各プロファイルにはディレクトリがあって、そこにはプロファイル毎の設定ファイルや動作ログが収められている。

$ ipython profile locate default
/Users/amedama/.ipython/profile_default
$ ls $(ipython profile locate default)
db      log       security
history.sqlite    pid     startup

デフォルトのカーネルにスタートアップスクリプトを登録する

先ほど実行したコマンドの出力を見ると、プロファイルのディレクトリには、さらに startup というディレクトリがある。 ここに、名前が数字 2 ケタから始まる Python スクリプトを入れると、カーネルの起動時にそれが呼び出されるようになる。

試しに Pandas と NumPy のインポート文を追加してみよう。

$ cat << 'EOF' >> $(ipython profile locate default)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

試しに IPython を起動して pd という変数を参照してみると、ちゃんと Pandas のモジュールを指していることがわかる。

$ ipython -c "pd"       
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

前述したとおり、上記は暗に --profile=default を付けているのと等価になる。

$ ipython --profile=default -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

もちろん、これは Jupyter からも有効になる。 試しに Jupyter Lab を起動して、デフォルトのカーネルでノートブックを作ってみよう。

$ jupyter lab

先ほどと同じように pd という名前の変数を参照すると、ちゃんと読み込まれている。

f:id:momijiame:20210624191922p:plain

新たに専用のカーネルを作ってスタートアップスクリプトを登録する

続いては、専用のカーネルを作って、そこにスタートアップスクリプトを登録してみよう。 これは、たとえば用途ごとにスタートアップスクリプトを用意して使い分けたいようなユースケースを想定している。

はじめに、スタートアップスクリプトを登録するためのプロファイルを新たに用意する。 新しくプロファイルを作るには ipython profile create コマンドを使う。 ここでは customized という名前でプロファイルを作った。

$ ipython profile create customized
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_config.py'
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_kernel_config.py'
$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    customized
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

先ほどと同じように、プロファイルにスタートアップスクリプトを登録しておこう。

$ cat << 'EOF' >> $(ipython profile locate customized)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

ひとまず、プロファイル経由でスタートアップスクリプトが読み込まれているかを IPython の REPL で確認しておく。 --profile オプションで、作ったプロファイル customized を指定しよう。

$ ipython --profile=customized -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

続いて、カーネルの設定に入る。 まず、現在有効なカーネルの一覧は jupyter kernelspec list コマンドで得られる。

$ jupyter kernelspec list
Available kernels:
  python3    /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3

ここにはカーネルを起動するときの情報が入った kernel.json というファイルがある。 中身を見ると、結局のところカーネルの起動というのは $ python -m ipykernel_launcher -f ... というコマンドを実行しているのに過ぎないことがわかる。

$ cat ~/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json 
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}"
 ],
 "display_name": "Python 3",
 "language": "python"
}

おもむろに、デフォルトのカーネルのディレクトリを丸ごとコピーする。

$ cp -r ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}

そして kernel.json をちょこっと書きかえよう。

$ cat << 'EOF' > ~/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}",
  "--profile",
  "customized"
 ],
 "display_name": "Customized Python 3",
 "language": "python"
}
EOF

差分は以下のとおり。 要するに表示名に Customized をつけているのと、起動時のオプションに --profile customized を追加してるだけ。

$ diff -u ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}/kernel.json
--- /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json  2021-06-24 18:55:06.000000000 +0900
+++ /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json   2021-06-24 19:32:26.000000000 +0900
@@ -4,8 +4,10 @@
   "-m",
   "ipykernel_launcher",
   "-f",
-  "{connection_file}"
+  "{connection_file}",
+  "--profile",
+  "customized"
  ],
- "display_name": "Python 3",
+ "display_name": "Customized Python 3",
  "language": "python"
-}
\ No newline at end of file
+}

Jupyter Lab を起動してみよう。

$ jupyter lab

Web UI を確認すると、新しくカーネルが登録されていることがわかる。

f:id:momijiame:20210624193349p:plain

もちろん、カーネルを起動するとスタートアップスクリプトが実行される。

めでたしめでたし。

Python: Luigi から S3 互換のオブジェクトストレージを使う

今回は、Python のデータパイプライン構築用フレームワークの Luigi から、Amazon 以外が提供している S3 互換のオブジェクトストレージを利用する方法について書いてみる。 S3 互換のオブジェクトストレージとしては、ひとまず以下のエントリで紹介した MinIO をローカルホストで動かした。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers     
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V
Python 3.9.5
$ pip list | grep -i luigi   
luigi           3.0.3
$ minio --version
minio version RELEASE.2021-06-17T00-10-46Z

もくじ

下準備

下準備として、MinIO と AWS CLI、それに Luigi と Boto3 をインストールしておく。 Boto3 は AWS を操作するための Python のクライアントライブラリで、Luigi で AWS 関連の処理をするときに必要となる。

$ brew install minio awscli
$ pip install luigi boto3

デフォルトの設定で MinIO のサーバを起動する。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

そして、テスト用のバケットを example-bucket という名前で作っておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket

サンプルコード

早速だけど、以下にサンプルコードを示す。 サンプルコードでは、ExampleTask というタスクを 1 つ定義している。 タスクの output() メソッドを見ると分かるとおり、Luigi でタスクのターゲットを S3 にしたいときは luigi.contrib.s3.S3Target を使えば良い。 そして、このタスクは実行すると s3://example-bucket/greet.txt という URL にファイルを作る。 ファイルの中には `Hello, World! という文字列が書き込まれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi
from luigi.contrib.s3 import S3Target


class ExampleTask(luigi.Task):

    def run(self):
        # NOTE: バケットは自動で作られない点に注意する
        with self.output().open(mode='w') as out_fp:
            print('Hello, World!', file=out_fp)

    def output(self):
        return S3Target(path=f's3://example-bucket/greet.txt')


if __name__ == '__main__':
    luigi.run(main_task_cls=ExampleTask,
              local_scheduler=True)

上記に適当な名前をつけて保存する。 ここでは例として example.py という名前にした。

さて、問題は上記をそのまま実行すると、アクセス先が本家の AWS になってしまうところ。 どうにかしてローカルホストにアクセスしてもらわないといけない。

結論から先に述べると、Luigi の設定ファイルに [s3] というセクションを作って、そこに設定を書けば良い。 前述したとおり、Luigi の S3 関連の処理は AWS SDK for Python (Boto3) に依存している。 [s3] というセクションに定義したパラメータは、boto3.client() を初期化するときの引数としてそのまま渡される。 つまり、ここでアクセス先や認証情報を変更できる。

$ cat << 'EOF' > luigi.cfg
[s3]
aws_access_key_id=minioadmin
aws_secret_access_key=minioadmin
use_ssl=False
endpoint_url=http://localhost:9000
EOF

設定できたところでタスクを実行してみよう。

$ python example.py

...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 ExampleTask()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を確認してみよう。 バケットを確認すると、ちゃんとオブジェクトができている。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket/ --recursive
2021-06-23 18:32:31         14 greet.txt

オブジェクトの中身を確認すると、ちゃんとメッセージが書き込まれていることがわかる。

$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/greet.txt -
Hello, World!

めでたしめでたし。

S3 互換オブジェクトストレージの OSS - MinIO を試す

MinIO は Amazon S3 互換のオブジェクトストレージを提供する OSS のひとつ。 たとえばオンプレ環境でオブジェクトストレージを構築したいときや、手元で S3 を扱うアプリケーションの動作確認をするときなんかに使える。 今回はそんな MinIO を AWS CLI と Python クライアントの boto3 から使ってみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ minio -v    
minio version RELEASE.2021-05-26T00-22-46Z
$ python -V
Python 3.9.5
$ aws --version
aws-cli/2.2.6 Python/3.9.5 Darwin/20.5.0 source/x86_64 prompt/off
$ pip list | grep -i boto3  
boto3                     1.17.83

もくじ

下準備

今回は Homebrew から MinIO をインストールして使う。 クライアントとして awscli と boto3 も入れておく。

$ brew install minio awscli
$ pip install boto3

インストールできたら作業用のディレクトリを指定して minio server コマンドを実行する。 これで MinIO のサーバが立ち上がる。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

立ち上がると 9000 番ポートを Listen し始める。

$ lsof -i:9000
COMMAND   PID    USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
minio   13739 amedama   14u  IPv6 0x62631bfc177de1b3      0t0  TCP *:cslistener (LISTEN)

ブラウザでローカルホストの 9000 番ポートにアクセスすると管理用の Web UI が見える。

$ open http://localhost:9000/

f:id:momijiame:20210529145812p:plain
MinIO の管理用 Web UI

アカウントはデフォルトだと Access Key と Secret Key がどちらも minioadmin でログインできる。 デフォルトのアカウントを変更したいときはサーバを立ち上げるときに以下の環境変数で指定する。

  • Access Key

    • MINIO_ROOT_USER または MINIO_ACCESS_KEY
  • Secret Key

    • MINIO_ROOT_PASSWORD または MINIO_SECRET_KEY

AWS CLI から操作する

はじめに AWS CLI から操作してみよう。 まずは認証情報を環境変数で設定しておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin

あとは aws コマンドのオプションとして --endpoint-url に MinIO が動作してる http://localhost:9000 を指定するだけ。

$ aws --endpoint-url http://localhost:9000 s3 ls

特にエラーにならず上記が実行できれば大丈夫。

サンプルとなるバケットを example-bucket という名前で作成してみる。

$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket
make_bucket: example-bucket

作成すると、ちゃんと ls でバケットが見えるようになった。

$ aws --endpoint-url http://localhost:9000 s3 ls
2021-05-29 15:25:34 example-bucket

続いてはファイルをバケットにコピーしてみる。

$ echo "Hello, World" > /tmp/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 cp /tmp/greet.txt s3://example-bucket
upload: ../../tmp/greet.txt to s3://example-bucket/greet.txt

ちゃんとアップロードできた。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket
2021-05-29 15:26:29         13 greet.txt

ファイルに深いプレフィックスをつけてコピーしたいときは ls--recursive オプションをつけると再帰的に確認できる。

$ aws --endpoint-url http://localhost:9000 s3 cp /tmp/greet.txt s3://example-bucket/folder/subfolder/ 
upload: ../../tmp/greet.txt to s3://example-bucket/folder/subfolder/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket --recursive
2021-05-29 15:29:37         13 folder/subfolder/greet.txt
2021-05-29 15:26:29         13 greet.txt

上記は / を区切りにした階層構造があるように見えるけど、これはあくまでファイル名に / 区切りのプレフィックスがついているに過ぎない。 つまり、インタフェース的に階層構造があるように見せているだけ、という点には留意する必要がある。 階層構造のように見えたとしても、バケット以下の構造はあくまでもフラットな名前空間になっている。

標準入出力経由でファイルをコピーすることもできる。

$ echo "Hello, World" | aws --endpoint-url http://localhost:9000 s3 cp - s3://example-bucket/stdin/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/stdin/greet.txt -
Hello, World

ファイルを削除するときは rm コマンドを使う。

$ aws --endpoint-url http://localhost:9000 s3 rm s3://example-bucket/stdin/greet.txt
delete: s3://example-bucket/stdin/greet.txt

バケットの削除は、入っているファイルをすべて削除すれば rb コマンドからできる。 ただし、今回は後段の boto3 が残っているので省略する。

boto3 から操作する

続いては Python クライアントの boto3 からアクセスしてみる。

まずは Python のインタプリタを起動する。

$ python

boto3 パッケージをインポートする。

>>> import boto3

エンドポイントや認証情報を与えてクライアントを作る。

>>> s3_client = boto3.client('s3',
...                          use_ssl=False,
...                          endpoint_url='http://localhost:9000',
...                          aws_access_key_id='minioadmin',
...                          aws_secret_access_key='minioadmin')

バケットのリストを確認すると、先ほど AWS CLI で作成したものが確認できる。

>>> response = s3_client.list_buckets()
>>> response['Buckets']
[{'Name': 'example-bucket', 'CreationDate': datetime.datetime(2021, 5, 29, 6, 25, 34, 96000, tzinfo=tzutc())}]

試しに新しくバケットを作ってみよう。

>>> s3_client.create_bucket(Bucket='boto3-bucket')
{'ResponseMetadata': {'RequestId': '168376A910A8A588', 'HostId': '', 'HTTPStatusCode': 200, 'HTTPHeaders': {'accept-ranges': 'bytes', 'content-length': '0', 'content-security-policy': 'block-all-mixed-content', 'location': '/boto3-bucket', 'server': 'MinIO', 'vary': 'Origin', 'x-amz-request-id': '168376A910A8A588', 'x-xss-protection': '1; mode=block', 'date': 'Sat, 29 May 2021 06:45:59 GMT'}, 'RetryAttempts': 0}, 'Location': '/boto3-bucket'}

確認すると、新しくバケットができている。

>>> response = s3_client.list_buckets()
>>> from pprint import pprint
>>> pprint(response['Buckets'])
[{'CreationDate': datetime.datetime(2021, 5, 29, 6, 45, 59, 285000, tzinfo=tzutc()),
  'Name': 'boto3-bucket'},
 {'CreationDate': datetime.datetime(2021, 5, 29, 6, 25, 34, 96000, tzinfo=tzutc()),
  'Name': 'example-bucket'}]

いくつかやり方はあるけど、ここでは upload_fileobj() 関数を使ってファイルをアップロードしてみる。

>>> import io
>>> f = io.BytesIO(b'Hello, World')
>>> s3_client.upload_fileobj(f, 'boto3-bucket', 'greet.txt')

ちゃんとアップロードできた。

>>> response = s3_client.list_objects(Bucket='boto3-bucket')
>>> pprint(response['Contents'])
[{'ETag': '"82bb413746aee42f89dea2b59614f9ef"',
  'Key': 'greet.txt',
  'LastModified': datetime.datetime(2021, 5, 29, 6, 47, 55, 783000, tzinfo=tzutc()),
  'Owner': {'DisplayName': 'minio',
            'ID': '02d6176db174dc93cb1b899f7c6078f08654445fe8cf1b6ce98d8855f66bdbf4'},
  'Size': 12,
  'StorageClass': 'STANDARD'}]

今度は download_fileobj() 関数を使ってファイルをダウンロードしてみよう。

>>> f = io.BytesIO()
>>> s3_client.download_fileobj(Bucket='example-bucket', Key='greet.txt', Fileobj=f)

ちゃんと中身が確認できた。

>>> f.seek(0)
0
>>> f.read()
b'Hello, World\n'

いじょう。

iproute2 の ip-netns(8) を使わずに Network Namespace を操作する

今回は、iproute2 の ip-netns(8) を使わずに、Linux の Network Namespace を操作する方法について書いてみる。 目的は、namespaces(7) について、より深い理解を得ること。

使った環境は次のとおり。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=20.04
DISTRIB_CODENAME=focal
DISTRIB_DESCRIPTION="Ubuntu 20.04.2 LTS"
$ uname -r
5.4.0-1043-gcp

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ sudo apt-get update
$ sudo apt-get -y install iproute2 util-linux gcc

前提知識

Linux の namespaces(7) は、プロセスが利用するリソースを分離するための仕組み。 典型的には、Linux のコンテナ仮想化を実現するために用いられている。 今回はタイトルに Network Namespace と入れたものの、分離できるのは何も Network に限らない。

プロセスが利用している Namespace の情報は procfs から /proc/<pid>/ns で確認できる。 現在のプロセスであれば、自身の pid を確認するまでもなく /proc/self/ns を見れば良い。

$ ls -alF /proc/self/ns
total 0
dr-x--x--x 2 amedama amedama 0 May 21 12:41 ./
dr-xr-xr-x 9 amedama amedama 0 May 21 12:41 ../
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 cgroup -> 'cgroup:[4026531835]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 ipc -> 'ipc:[4026531839]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 mnt -> 'mnt:[4026531840]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 net -> 'net:[4026531992]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 pid -> 'pid:[4026531836]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 pid_for_children -> 'pid:[4026531836]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 user -> 'user:[4026531837]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 uts -> 'uts:[4026531838]'

これらのファイルの実体はシンボリックリンクで、参照先として表示されている謎の数字は inode 番号を示している。 つまり、Namespace は inode 番号が識別子になっている。 上記であれば、/proc/self/ns/net がプロセスが利用している Network Namespace の識別子を表している。

$ file /proc/self/ns/net
/proc/self/ns/net: symbolic link to net:[4026531992]
$ stat -L /proc/self/ns/net
  File: /proc/self/ns/net
  Size: 0          Blocks: 0          IO Block: 4096   regular empty file
Device: 4h/4d   Inode: 4026531992  Links: 1
Access: (0444/-r--r--r--)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2021-05-21 12:42:07.565311760 +0000
Modify: 2021-05-21 12:42:07.565311760 +0000
Change: 2021-05-21 12:42:07.565311760 +0000
 Birth: -

unshare(1) / nsenter(1) / mount(8) を使って操作する

さて、前提知識の確認が終わったところで、実際に ip-netns(8) を使わずに Network Namespace を操作してみよう。 まずは、ip-netns(8) 以外のコマンドラインツールで操作する方法を試す。

新しく namespaces(7) を作るコマンドとしては unshare(1) が使える。 --net オプションを指定すると、コマンドで新たに起動するプロセスが利用する Network Namespace を確保できる。 以下では新しい Network Namespace を使って bash(1) を起動している。

$ sudo unshare --net bash

起動したシェルで確認すると、たしかに /proc/<pid>/ns 以下のファイルの inode 番号が変わっていることが分かる。

# file /proc/self/ns/net
/proc/self/ns/net: symbolic link to net:[4026532254]

ip-link(8) を使ってみるデバイスの状況を確認すると、DOWN したループバックデバイスしか無いことが分かる。 どうやら、ちゃんと Network Namespace が新しく作られたようだ。

# ip link show
1: lo: <LOOPBACK> mtu 65536 qdisc noop state DOWN mode DEFAULT group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00

ただ、この状況で ip-netns(8) を使ってみても何も表示されない。 新しく Network Namespace ができたというのに、どうしてだろう。

# ip netns list

というのも、実は ip-netns(8) の list サブコマンドは、/var/run/netns 以下にあるファイルを見ているだけに過ぎない。 上記で何も表示されないということは、ここに何もファイルがないということ。

# ls /var/run/netns

たしかに何も表示されない。 そもそも、ip-netns(8) を使ったことがない環境であれば、ディレクトリすらできていないことだろう。

ここでおもむろに /var/run/netns 以下にファイルを作って、/proc/self/ns/net--bind オプションつきでマウントしてみよう。

# touch /var/run/netns/example
# mount --bind /proc/self/ns/net /var/run/netns/example

すると、ip-netns(8) の list サブコマンドに、作ったファイルと同じ内容が見られる。

# ip netns list
example

上記は、ちゃんと ip-netns(8) から使うことができる。 一旦、unshare(1) で作ったシェルのプロセスから抜けて、ip-netns(8) の exec サブコマンドを実行してみよう。

# exit
$ sudo ip netns exec example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 12:49 /proc/self/ns/net -> 'net:[4026532254]'

上記から、ちゃんと使えることがわかる。 というのも、これは実のところ ip-netns(8) が内部的にやっているのとほぼ同じことをやっているため。

先ほど /var/run/netns 以下に作ったファイルは nsenter(1) から利用することもできる。 このコマンドは既存の Namespace に切り替えるために用いる。 --net オプションにファイルを指定して、シェルを起動してみよう。

$ sudo nsenter --net=/var/run/netns/example bash

起動したシェルから確認すると、ちゃんと Namespace が切り替わっていることがわかる。

# ls -alF /proc/self/ns/net
lrwxrwxrwx 1 root root 0 May 21 12:53 /proc/self/ns/net -> 'net:[4026532254]'

ちなみに、ip-netns(8) から利用するときには mount(8) を使わなくてもシンボリックリンクを張るだけで代用できる。 次のように、$$ を使って自身の pid を置換しつつ、Namespace を表したファイルからシンボリックリンクを張ってみよう。

# ln -s /proc/$$/ns/net /var/run/netns/symlink

起動したシェルから抜けた上で確認すると、ちゃんと ip-netns(8) のリストに表示されると共に、使えることがわかる。

# exit
$ ip netns list
symlink
example
$ sudo ip netns exec example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 12:58 /proc/self/ns/net -> 'net:[4026532254]'

このテクニックは Docker や Mininet などが作る Network Namespace を ip-netns(8) から操作したいときにも有効。

unshare(2) / setns(2) / mount(2) を使って操作する

さて、ip-netns(8) 以外のコマンドラインツールから操作できることがわかったところで、続いてはシステムコールを使ってみる。 というか、先ほど使った一連のコマンドラインツールも、内部的にはこれらの API を叩いていた。

早速だけど、以下にサンプルコードを示す。 このサンプルコードでは、次のような処理をしている。

  • unshare(2) で Network Namespace を新しく作る
  • mount(2) で /proc/self/ns/net/var/run/netns 以下に syscall-example という名前でマウントする
  • /proc/self/ns/net の中身を表示する
#define _GNU_SOURCE
#include <sched.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mount.h>

int main(int argc, char *argv[]) {
    if (unshare(CLONE_NEWNET) < 0) {
        fprintf(stderr, "Failed to create a new network namespace: %s\n", strerror(errno));
        exit(-1);
    }

    const char *netns_path = "/var/run/netns/syscall-example";
    const int fd = open(netns_path, O_RDONLY | O_CREAT | O_EXCL, 0);
    if (fd < 0) {
        fprintf(stderr, "Cannot create namespace file \"%s\": %s\n",
            netns_path, strerror(errno));
        return EXIT_FAILURE;
    }
    close(fd);

    const char *proc_path = "/proc/self/ns/net";
    if (mount(proc_path, netns_path, "none", MS_BIND, NULL) < 0) {
        fprintf(stderr, "Failed to bind %s -> %s: %s\n",
            proc_path, netns_path, strerror(errno));
    }

    const char *cmd = "file";
    char* const args[] = {"file", "/proc/self/ns/net", NULL};
    if (execvp(cmd, args) < 0) {
        fprintf(stderr, "Failed to exec \"%s\": %s\n", cmd, strerror(errno));
        exit(-1);
    }
    return EXIT_SUCCESS;
}

上記に nsadd.c という名前をつけてビルドする。

$ gcc -o nsadd.o nsadd.c

実行すると、/proc/self/ns/net が新しい識別子になっていることがわかる。

$ sudo ./nsadd.o 
/proc/self/ns/net: symbolic link to net:[4026532315]

ip-netns(8) からも、ちゃんと使える。

$ ip netns list
syscall-example
symlink
example
$ sudo ip netns exec syscall-example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 13:10 /proc/self/ns/net -> 'net:[4026532315]'

続いては、上記で作った Network Namespace を示すファイルを利用するサンプルコード。 次のような処理をしている。

  • /var/run/netns 以下のファイルを open(2) で開く
  • 上記で得られたファイルディスクリプタを setns(2) に渡して Namespace を切り替える
  • /proc/self/ns/net の中身を表示する
#define _GNU_SOURCE
#include <sched.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

int main(int argc, char *argv[]) {
    const char *mounted_path = "/var/run/netns/syscall-example";
    const int fd = open(mounted_path, O_RDONLY | O_CLOEXEC);
    if (fd < 0) {
        fprintf(stderr, "Cannot open mounted path\"%s\": %s\n",
            mounted_path, strerror(errno));
        return EXIT_FAILURE;
    }

    if (setns(fd, CLONE_NEWNET) < 0) {
        fprintf(stderr, "failed to setup the network namespace \"%s\": %s\n",
            mounted_path, strerror(errno));
        close(fd);
        return EXIT_FAILURE;
    }

    const char *cmd = "file";
    char* const args[] = {"file", "/proc/self/ns/net", NULL};
    if (execvp(cmd, args) < 0) {
        fprintf(stderr, "Failed to exec \"%s\": %s\n", cmd, strerror(errno));
        exit(-1);
    }
    return EXIT_SUCCESS;
}

上記に nsexec.c という名前をつけてビルドする。

$ gcc -o nsexec.o nsexec.c 

実行すると、ちゃんと Network Namespace が切り替わっていることがわかる。

$ sudo ./nsexec.o 
/proc/self/ns/net: symbolic link to net:[4026532315]

いじょう。

参考

git.kernel.org

Python: Streamlit を使って手早く WebUI 付きのプロトタイプを作る

Streamlit は、ざっくり言うと主にデータサイエンス領域において WebUI 付きのアプリケーションを手早く作るためのソフトウェア。 使い所としては、ひとまず動くものを見せたかったり、少人数で試しに使うレベルのプロトタイプを作るフェーズに適していると思う。 たとえば、Jupyter で提供すると複数人で使うのに難があるし、かといって Flask や Django を使って真面目に作るほどではない、くらいのとき。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.3.1
BuildVersion:   20E241
$ python -V
Python 3.8.9

もくじ

下準備

まずは必要なパッケージをインストールしておく。 本当に必要なのは streamlit のみ。 watchdog はパフォーマンスのために入れる。 matplotlib についてはグラフを可視化するときに使うため入れておく。 click はスクリプトに引数を渡すサンプルのため。

$ pip install streamlit watchdog matplotlib click

インストールすると streamlit コマンドが使えるようになる。

$ streamlit version
Streamlit, version 0.81.0

必要に応じて Streamlit の設定ファイルを用意する。 以下は、初回の実行時に確認される e-mail アドレスのスキップと、利用に関する統計情報を送信しない場合の設定。 なお、これは別にやらなくても初回の実行時に案内が出る。

$ mkdir -p ~/.streamlit
$ cat << 'EOF' > ~/.streamlit/credentials.toml 
[general]
email = ""
EOF
$ cat << 'EOF' > ~/.streamlit/config.toml
[browser]
    gatherUsageStats = false
EOF

基本的な使い方

まずはもっとも基本的な使い方から見ていく。 以下は streamlit.write() 関数を使って任意のオブジェクトを WebUI に表示するサンプルコード。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # Streamlit が対応している任意のオブジェクトを可視化する (ここでは文字列)
    st.write('Hello, World!')


if __name__ == '__main__':
    main()

上記を適当な名前で保存したら streamlit run サブコマンドで指定して実行する。

$ streamlit run example.py

すると、デフォルトでは 8501 ポートで Streamlit のアプリケーションサーバが起動する。 ブラウザで開いて結果を確認しよう。

$ open http://localhost:8501

すると、次のように「Hello, World!」という表示のある Web ページが確認できる。

f:id:momijiame:20210505001417p:plain

やっていることは静的な文字列を表示しているだけとはいえ、Pure Python なスクリプトをちょっと書くだけで Web ページが表示できた。

なお、Streamlit はデフォルトだと実行するホストの全 IP アドレスを Listen するので注意しよう。 ループバックアドレスだけに絞りたいときは以下のようにする。

$ streamlit run --server.address localhost example.py

ちなみに先ほど使った streamlit.write() 関数は色々なオブジェクトを可視化するのに使うことができる。 現時点で対応しているものをざっと書き出してみると次のとおり。

  • サードパーティー製パッケージ関連
    • Pandas の DataFrame オブジェクト
    • Keras の Model オブジェクト
    • SymPy の表現式 (LaTeX)
    • グラフ描画系
      • Matplotlib
      • Altair
      • Vega Lite
      • Plotly
      • Bokeh
      • PyDeck
      • Graphviz
  • 標準的な Python のオブジェクト
    • 例外オブジェクト
    • 関数オブジェクト
    • モジュールオブジェクト
    • 辞書オブジェクト

その他、任意のオブジェクトは str() 関数に渡したのと等価な結果が得られる。

基本的な書式

続いて、Streamlit に備わっている基本的な書式をいくつか試してみる。 アプリケーションのタイトルやヘッダ、マークダウンテキストや数式など。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # タイトル
    st.title('Application title')
    # ヘッダ
    st.header('Header')
    # 純粋なテキスト
    st.text('Some text')
    # サブレベルヘッダ
    st.subheader('Sub header')
    # マークダウンテキスト
    st.markdown('**Markdown is available **')
    # LaTeX テキスト
    st.latex(r'\bar{X} = \frac{1}{N} \sum_{n=1}^{N} x_i')
    # コードスニペット
    st.code('print(\'Hello, World!\')')
    # エラーメッセージ
    st.error('Error message')
    # 警告メッセージ
    st.warning('Warning message')
    # 情報メッセージ
    st.info('Information message')
    # 成功メッセージ
    st.success('Success message')
    # 例外の出力
    st.exception(Exception('Oops!'))
    # 辞書の出力
    d = {
        'foo': 'bar',
        'users': [
            'alice',
            'bob',
        ],
    }
    st.json(d)


if __name__ == '__main__':
    main()

先ほどの Python ファイルに上書きすると、Streamlit はファイルの変更を検知して自動的に読み込み直してくれる。 アプリケーションを表示しているブラウザはリロードするか、変更が生じた際に自動で読み込むか問うボタンが右上に出てくる。

f:id:momijiame:20210506011250p:plain

プレースホルダー

続いて扱うのはプレースホルダーという機能。 かなり地味なので、この時点で紹介する点に違和感があるかもしれない。 とはいえ、地味なりに多用する機能なので先に説明しておく。

プレースホルダーは、任意のオブジェクトを表示するための入れ物みたいなオブジェクト。 言葉よりも実際に使った方が分かりやすいと思うので以下にサンプルを示す。 プレースホルダーを用意して、後からそこにオブジェクトを書き込む、みたいな使い方をする。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # プレースホルダーを用意する
    placeholder1 = st.empty()
    # プレースホルダーに文字列を書き込む
    placeholder1.write('Hello, World')

    placeholder2 = st.empty()
    # コンテキストマネージャとして使えば出力先をプレースホルダーにできる
    with placeholder2:
        # 複数回書き込むと上書きされる
        st.write(1)
        st.write(2)
        st.write(3)  # この場合は最後に書き込んだものだけ見える


if __name__ == '__main__':
    main()

上記を実行した結果は次のとおり。 プレースホルダーの内容は上書きされるので、特に何もしなければ最後に書きこまれた内容が見える。

f:id:momijiame:20210506013310p:plain

プレースホルダーを応用するとアニメーション的なこともできる。 以下のサンプルコードではスリープを挟みながらプレースホルダーの内容を書きかえることで動きのあるページを作っている。

# -*- coding: utf-8 -*-

import time

import streamlit as st


def main():
    status_area = st.empty()

    # カウントダウン
    count_down_sec = 5
    for i in range(count_down_sec):
        # プレースホルダーに残り秒数を書き込む
        status_area.write(f'{count_down_sec - i} sec left')
        # スリープ処理を入れる
        time.sleep(1)

    # 完了したときの表示
    status_area.write('Done!')
    # 風船飛ばす
    st.balloons()


if __name__ == '__main__':
    main()

上記を実行すると秒数のカウントダウンが確認できる。

f:id:momijiame:20210506013737p:plain

プログレスバーを使った処理の進捗の可視化

ちなみに先ほどのようなカウントダウンをするような処理だとプログレスバーを使うこともできる。 以下のサンプルコードでは 0.1 秒ごとにプログレスバーの数値を増やしていくページができる。

# -*- coding: utf-8 -*-

import time

import streamlit as st


def main():
    status_text = st.empty()
    # プログレスバー
    progress_bar = st.progress(0)

    for i in range(100):
        status_text.text(f'Progress: {i}%')
        # for ループ内でプログレスバーの状態を更新する
        progress_bar.progress(i + 1)
        time.sleep(0.1)

    status_text.text('Done!')
    st.balloons()


if __name__ == '__main__':
    main()

上記を実行すると、以下のようにプログレスバーが表示される。

f:id:momijiame:20210506014133p:plain

基本的な可視化

ここまでの内容だと、面白いけど何が便利なのかイマイチよく分からないと思う。 そこで、ここからはもう少し実用的な話に入っていく。 具体的には、いくつかグラフなどを可視化する方法について見ていこう。

組み込みのグラフ描画機能

Streamlit には組み込みのグラフ描画機能がある。 この機能を使うと NumPy の配列や Pandas のデータフレームなどをサクッとグラフにできる。 以下のサンプルコードでは折れ線グラフ、エリアチャート、バーチャートの 3 種類を試している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # ランダムな値でデータフレームを初期化する
    data = {
        'x': np.random.random(20),
        'y': np.random.random(20) - 0.5,
        'z': np.random.random(20) - 1.0,
    }
    df = pd.DataFrame(data)
    # 折れ線グラフ
    st.subheader('Line Chart')
    st.line_chart(df)
    # エリアチャート
    st.subheader('Area Chart')
    st.area_chart(df)
    # バーチャート
    st.subheader('Bar Chart')
    st.bar_chart(df)


if __name__ == '__main__':
    main()

上記からは次のようなグラフが得られる。

f:id:momijiame:20210506012655p:plain

グラフにデータを動的に追加することもできる。 これにはグラフを描画する関数を実行して得られるオブジェクトに add_rows() メソッドを使えば良い。 以下のサンプルコードでは、折れ線グラフに 0.5 秒間隔で 10 回までデータを追加している。

# -*- coding: utf-8 -*-

import time

import streamlit as st
import numpy as np


def main():
    # 折れ線グラフ (初期状態)
    x = np.random.random(size=(10, 2))
    line_chart = st.line_chart(x)

    for i in range(10):
        # 折れ線グラフに 0.5 秒間隔で 10 回データを追加する
        additional_data = np.random.random(size=(5, 2))
        line_chart.add_rows(additional_data)
        time.sleep(0.5)


if __name__ == '__main__':
    main()

上記を確認すると、0.5 秒間隔でグラフにデータが追加されていく様子が確認できる。 こういったアニメーション効果を手軽に導入できるのは Streamlit の強みだと思う。

f:id:momijiame:20210506012934p:plain

ちなみに気づいたかもしれないけどブラウザをリロードするごとにプロットされる結果は変わる。 これは Streamlit がページを表示するときに、スクリプトを上から順に実行するように処理しているため。 つまり、ブラウザをリロードする毎にスクリプトのコードが評価され直しているように考えれば良い。

Matplotlib

続いては Matplotlib のグラフを描画してみよう。 Streamlit では Matplotlib の Figure オブジェクトを書き出すことでグラフを描画できる。 以下のサンプルコードではランダムに生成した値をヒストグラムにプロットしている。

# -*- coding: utf-8 -*-

import streamlit as st
import numpy as np
from matplotlib import pyplot as plt


def main():
    # 描画領域を用意する
    fig = plt.figure()
    ax = fig.add_subplot()
    # ランダムな値をヒストグラムとしてプロットする
    x = np.random.normal(loc=.0, scale=1., size=(100,))
    ax.hist(x, bins=20)
    # Matplotlib の Figure を指定して可視化する
    st.pyplot(fig)


if __name__ == '__main__':
    main()

上記からは次のような画面が得られる。

f:id:momijiame:20210509171628p:plain

先ほどと同じように、データを更新しながらグラフを描画し直すサンプルも書いてみる。 以下のサンプルコードでは、プレースホルダを使って描画されるグラフの内容を更新している。

# -*- coding: utf-8 -*-

import time

import streamlit as st
import numpy as np
from matplotlib import pyplot as plt


def main():
    # グラフを書き出すためのプレースホルダを用意する
    plot_area = st.empty()
    fig = plt.figure()
    ax = fig.add_subplot()
    x = np.random.normal(loc=.0, scale=1., size=(100,))
    ax.plot(x)
    # プレースホルダにグラフを書き込む
    plot_area.pyplot(fig)

    # 折れ線グラフに 0.5 秒間隔で 10 回データを追加する
    for i in range(10):
        # グラフを消去する
        ax.clear()
        # データを追加する
        additional_data = np.random.normal(loc=.0, scale=1., size=(10,))
        x = np.concatenate([x, additional_data])
        # グラフを描画し直す
        ax.plot(x)
        # プレースホルダに書き出す
        plot_area.pyplot(fig)
        time.sleep(0.5)


if __name__ == '__main__':
    main()

上記を実行すると、一定間隔でデータが追加されながらグラフの描画も更新される。

f:id:momijiame:20210509171822p:plain

Pandas

グラフではないけど Pandas のデータフレームを Jupyter で可視化するときと同じように表示できる。 データフレームを出力するときは streamlit.dataframe()streamlit.table() という 2 種類の関数がある。 前者は行や列の要素が多いときにスクロールバーを使って表示する一方で、後者はすべてをいっぺんに表示する。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # Pandas のデータフレームを可視化してみる
    data = {
        # ランダムな値で初期化する
        'x': np.random.random(20),
        'y': np.random.random(20),
    }
    df = pd.DataFrame(data)
    # データフレームを書き出す
    st.dataframe(df)
    # st.write(df)  でも良い
    # スクロールバーを使わず一度に表示したいとき
    st.table(df)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172138p:plain

画像

画像を表示するときは streamlit.image() 関数を使う。 以下のサンプルコードではランダムに生成した NumPy 配列を、カラー画像として可視化している。

# -*- coding: utf-8 -*-

import streamlit as st
import numpy as np


def main():
    x = np.random.random(size=(400, 400, 3))
    # NumPy 配列をカラー画像として可視化する
    st.image(x)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172356p:plain

地図

地図上にプロットすることもできる。 地図に散布図を描きたいときは streamlit.map() 関数を使えば良い。 以下のサンプルコードでは、東京を中心とした地図にランダムな点をプロットしている。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # 東京のランダムな経度・緯度を生成する
    data = {
        'lat': np.random.randn(100) / 100 + 35.68,
        'lon': np.random.randn(100) / 100 + 139.75,
    }
    map_data = pd.DataFrame(data)
    # 地図に散布図を描く
    st.map(map_data)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172513p:plain

Streamlit がサポートしている可視化の機能は他にも色々とあるけど、とりあえず一旦はここまでで切り上げる。

キャッシュ機構

ここまでのサンプルコードは、ブラウザをリロードすると表示される内容が変わるものが多かった。 それはスクリプトの内容が毎回、評価し直されているのと同じ状態のため。 ただ、それだと困る場面も多い。 たとえば、時間のかかる処理が毎回評価され直すと、パフォーマンスに深刻な影響がある。 そんなときは Streamlit のキャッシュ機構を使うと良い。

キャッシュ機構を使うには streamlit.cache デコレータを使えば良い。 以下のサンプルコードでは、cached_data() 関数をデコレータで修飾している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


# 関数の出力をキャッシュする
@st.cache
def cached_data():
    data = {
        'x': np.random.random(20),
        'y': np.random.random(20),
    }
    df = pd.DataFrame(data)
    return df


def main():
    # リロードしても同じ結果が得られる
    df = cached_data()
    st.dataframe(df)


if __name__ == '__main__':
    main()

上記はオンメモリで結果がキャッシュされるため、ブラウザをリロードしても表示が変わることがない。 その他、キャッシュ機構の詳しい解説は以下のドキュメントに記載されている。

docs.streamlit.io

ウィジェット

ここまでのサンプルには、ユーザからの入力を受け付けるものがなかった。 ここからは、ウィジェットを使ってインタラクティブなページを作る方法について書く。

ボタン

まずは最も基本的なウィジェットとしてボタンを扱う。 このボタン、Streamlit のウィジェットの考え方が、他の UI フレームワークと違うことがよく分かって面白い。

ボタンは streamlit.button() 関数を使って配置できる。 以下のサンプルコードは、ボタンを押すことで表示される内容が変わるものとなっている。 興味深いのは、ボタンにイベントハンドラなどの類が一切設定されていないこと。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # データフレームを書き出す
    data = np.random.randn(20, 3)
    df = pd.DataFrame(data, columns=['x', 'y', 'z'])
    st.dataframe(df)
    # リロードボタン
    st.button('Reload')


if __name__ == '__main__':
    main()

上記を実行すると以下のような表示が得られる。 実際、ボタンを押すと表示内容が変わるはず。

f:id:momijiame:20210509173746p:plain

ポイントは、Streamlit は毎回スクリプトを評価し直すように動作するところ。 つまり、ウィジェットで何らかのイベントが起こったら、Streamlit はページの内容を丸ごと評価し直すと考えれば良い。 上記のサンプルコードは、ボタンが押されるイベントによって、表示が丸ごと変わったわけだ。

ウィジェットは、一番最後の試行 (評価) のときに、ウィジェットがどのような状態になったかを返す場合がある。 ボタンも同様で、最後の試行でボタンが押されたか・押されていないかを真偽値 (bool) で返す。

ウィジェットの特性を利用すると、ウィジェットを設置する関数から返ってくる値を使ってインタラクティブな画面が作れる。 以下のサンプルコードでは、2 つのボタンを設置して、押されたボタンに対応するメッセージを表示している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    if st.button('Top button'):
        # 最後の試行で上のボタンがクリックされた
        st.write('Clicked')
    else:
        # クリックされなかった
        st.write('Not clicked')

    if st.button('Bottom button'):
        # 最後の試行で下のボタンがクリックされた
        st.write('Clicked')
    else:
        # クリックされなかった
        st.write('Not clicked')


if __name__ == '__main__':
    main()

上記を実行すると、以下のような表示が得られる。 ボタンを押すと、表示が更新されて、押されたボタンに対応するメッセージが表示されるはず。

f:id:momijiame:20210509174659p:plain

チェックボックス

チェックボックスは、最後の試行でチェックされたか・されなかったかを元に処理を分岐できる。 以下のサンプルコードでは、チェックされたときだけデータフレームを表示している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # チェックボックスにチェックが入っているかで処理を分岐する
    if st.checkbox('Show'):
        # チェックが入っているときはデータフレームを書き出す
        data = np.random.randn(20, 3)
        df = pd.DataFrame(data, columns=['x', 'y', 'z'])
        st.dataframe(df)


if __name__ == '__main__':
    main()

上記を実行すると、以下のような表示が得られる。 チェックボックスをチェックしたときだけデータフレームが表示される。

f:id:momijiame:20210509175353p:plain

ラジオボタン

同様に、最後の試行でチェックされたアイテムを元に処理をできるラジオボタン。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_item = st.radio('Which do you like?',
                             ['Dog', 'Cat'])
    if selected_item == 'Dog':
        st.write('Wan wan')
    else:
        st.write('Nya- nya-')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511182308p:plain

セレクトボックス

できることは基本的にラジオボタンと変わらないセレクトボックス。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_item = st.selectbox('Which do you like?',
                                 ['Dog', 'Cat'])
    st.write(f'Selected: {selected_item}')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511182423p:plain

単一のアイテムを選択するセレクトボックスの他に、複数のアイテムを選択できるマルチセレクトもある。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_items = st.multiselect('What are your favorite characters?',
                                    ['Miho Nishizumi',
                                     'Saori Takebe',
                                     'Hana Isuzu',
                                     'Yukari Akiyama',
                                     'Mako Reizen',
                                     ])
    st.write(f'Selected: {selected_items}')


if __name__ == '__main__':
    main()

上記から得られる表示は以下のとおり。

f:id:momijiame:20210511182536p:plain

スライダー

スライダーは特定の範囲の中から値を選択するのに使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    age = st.slider(label='Your age',
                    min_value=0,
                    max_value=130,
                    value=30,
                    )
    st.write(f'Selected: {age}')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182710p:plain

デフォルトの値にタプルなどで 2 つの要素を指定すると、レンジを入力できるようになる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    min_value, max_value = st.slider(label='Range selected',
                                     min_value=0,
                                     max_value=100,
                                     value=(40, 60),
                                     )
    st.write(f'Selected: {min_value} ~ {max_value}')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182804p:plain

ちなみに整数以外にも日付とかを指定するのにも使える。 ただ、そんなに使いやすいとは思えない。 日付とか時間は後述する専用のウィジェットを使った方が良いと思う。

# -*- coding: utf-8 -*-

from datetime import date

import streamlit as st


def main():
    birthday = st.slider('When is your birthday?',
                         min_value=date(1900, 1, 1),
                         max_value=date.today(),
                         value=date(2000, 1, 1),
                         format='YYYY-MM-DD',
                         )
    st.write('Birthday: ', birthday)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182923p:plain

Date / Time インプット

日付や時間を扱う専用のウィジェットが続いて紹介する Date / Time インプット。

まずは Date インプットから。

# -*- coding: utf-8 -*-

from datetime import date

import streamlit as st


def main():
    birthday = st.date_input('When is your birthday?',
                             min_value=date(1900, 1, 1),
                             max_value=date.today(),
                             value=date(2000, 1, 1),
                             )
    st.write('Birthday: ', birthday)


if __name__ == '__main__':
    main()

ウィジェットをクリックするとカレンダーで日付を指定できるので使いやすい。

f:id:momijiame:20210511183139p:plain

Time インプットは一日の中の時間を指定できる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    time = st.time_input(label='Your input:')
    st.write('input: ', time)


if __name__ == '__main__':
    main()

こちらもウィジェットをクリックすると時間のセレクタが表示されて使いやすい。

f:id:momijiame:20210511183244p:plain

文字列入力

一行の文字列の入力にはテキストインプットが使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    text = st.text_input(label='Message', value='Hello, World!')
    st.write('input: ', text)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183447p:plain

同様に、複数行に渡る文字列を入力するときはテキストエリアを用いる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    text = st.text_area(label='Multi-line message', value='Hello, World!')
    st.write('input: ', text)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183536p:plain

数字入力

数字を入力するときはナンバーインプットを使う。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    n = st.number_input(label='What is your favorite number?',
                        value=42,
                        )
    st.write('input: ', n)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183634p:plain

デフォルト値を浮動小数点型にすれば、小数を入力できる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    n = st.number_input(label='What is your favorite number?',
                        value=3.14,
                        )
    st.write('input: ', n)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183726p:plain

ファイルアップローダ

ファイルアップローダを使うと、クライアントのファイルをアプリケーションに渡すことができる。 以下のサンプルコードでは、渡されたファイルに含まれるテキストを UTF-8 として表示している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    f = st.file_uploader(label='Upload file:')
    st.write('input: ', f)

    if f is not None:
        # XXX: 信頼できないファイルは安易に評価しないこと
        data = f.getvalue()
        text = data.decode('utf-8')
        st.write('contents: ', text)


if __name__ == '__main__':
    main()

適当なテキストファイルを使って動作確認してみよう。

$ echo "Hello, World" > ~/Downloads/greet.txt

ウィジェットをクリックしてファイルを選択すると、以下のように中身が表示される。

f:id:momijiame:20210511183950p:plain

受け取れるオブジェクトは streamlit.UploadedFile という、オープン済みのファイルライクオブジェクトになる。

カラーピッカー

ちょっと変わり種だけどカラーピッカーも用意されている。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    c = st.color_picker(label='Select color:')
    st.write('input: ', c)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511184147p:plain

フロー制御

ウィジェットが色々とあると、ユーザの入力のバリデーションも考えることになる。 ここではフロー制御をするための機能を紹介する。

特定の条件に満たないときに処理を停止するサンプルコードを以下に示す。 このサンプルではテキストインプットに何か文字列が入っていないときに警告メッセージを出して処理を停止している。 処理の停止には streamlit.stop() 関数を使う。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    name = st.text_input(label='your name:')

    # バリデーション処理
    if len(name) < 1:
        st.warning('Please input your name')
        # 条件を満たないときは処理を停止する
        st.stop()

    st.write('Hello,', name, '!')


if __name__ == '__main__':
    main()

テキストインプットに何も入力されていない状態では、以下のように警告メッセージだけが表示されることになる。

f:id:momijiame:20210511184433p:plain

テキストインプットに文字列を入力すると、警告メッセージが消えて正常系の表示に切り替わる。

f:id:momijiame:20210511184601p:plain

レイアウトを調整する

ここからは画面のレイアウトを調整するための機能を見ていく。

カラム

はじめに紹介するのはカラム。 これは、ようするに画面を縦方向に分割して異なる内容を表示できるもの。

カラムを作るには streamlit.beta_columns() 関数を使う。 以下のサンプルコードでは画面を 3 列に分割している。 関数の返り値をコンテキストマネージャとして使うとデフォルトの出力先として使うこともできるし、オブジェクトに直接書き込むこともできる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # カラムを追加する
    col1, col2, col3 = st.beta_columns(3)

    # コンテキストマネージャとして使う
    with col1:
        st.header('col1')

    with col2:
        st.header('col2')

    with col3:
        st.header('col3')

    # カラムに直接書き込むこともできる
    col1.write('This is column 1')
    col2.write('This is column 2')
    col3.write('This is column 3')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511184758p:plain

コンテナ

続いて扱うのはコンテナ。 これは、不可視な仕切りみたいなもの。

以下のサンプルコードではコンテナの内と外にオブジェクトを書き込んで、結果を確認している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # コンテナを追加する
    container = st.beta_container()

    # コンテキストマネージャとして使うことで出力先になる
    with container:
        st.write('This is inside the container')
    # これはコンテナの外への書き込み
    st.write('This is outside the container')

    # コンテナに直接書き込むこともできる
    container = st.beta_container()
    container.write('1')
    st.write('2')
    # 出力順は後だがレイアウト的にはこちらが先に現れる
    container.write('3')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511185155p:plain

入れ子にすることもできて、たとえば以下のサンプルコードではプレースホルダにコンテナを追加して、さらにそこにカラムを追加している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    placeholder = st.empty()
    # プレースホルダにコンテナを追加する
    container = placeholder.beta_container()
    # コンテナにカラムを追加する
    col1, col2 = container.beta_columns(2)
    # それぞれのカラムに書き込む
    with col1:
        st.write('Hello, World')
    with col2:
        st.write('Konnichiwa, Sekai')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511185307p:plain

エキスパンダ

デフォルトでは折りたたまれて非表示な領域を作るのにエキスパンダが使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    with st.beta_expander('See details'):
        st.write('Hidden item')


if __name__ == '__main__':
    main()

上記を実行して、以下はエキスパンダを展開した状態。

f:id:momijiame:20210511185400p:plain

サイドバー

ウィジェットやオブジェクトの表示をサイドバーに配置することもできる。 使い方は単純で、サイドバーに置きたいなと思ったら sidebar をつけて API を呼び出す。

以下のサンプルコードでは、サイドバーにボタンを配置している。 前述したとおり、streamlit.button()streamlit.sidebar.button() に変えるだけ。 同様に、streamlit.sidebar.dataframe() のように間に sidebar をはさむことで大体の要素はサイドバーに置ける。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # サイドバーにリロードボタンをつける
    st.sidebar.button('Reload')
    # サイドバーにデータフレームを書き込む
    data = np.random.randn(20, 3)
    df = pd.DataFrame(data, columns=['x', 'y', 'z'])
    st.sidebar.dataframe(df)


if __name__ == '__main__':
    main()

上記を実行すると、以下のようにサイドバーに要素が設置されることが確認できる。

f:id:momijiame:20210512222116p:plain

オブジェクトの docstring を表示する

Streamlit はスクリプトの変更を検出して自動でリロードしてくれるため、基本的には WebUI を見ながら開発していくことになる。 そんなとき、この関数またはメソッドの使い方なんだっけ?みたいな場面では streamlit.help() を使うと良い。 オブジェクトの docstring を表示してくれる。

# -*- coding: utf-8 -*-

import pandas as pd

import streamlit as st


def main():
    st.help(pd.DataFrame)


if __name__ == '__main__':
    main()

まあ自動補完とかドキュメント表示をサポートしてる IDE なんかで開発するときは、あんまり使わないかもしれないけど。

f:id:momijiame:20210512222544p:plain

単一のスクリプトで複数のアプリケーションを扱う

Streamlit は基本的に複数のページから成るアプリケーションを作ることができない。 では、複数のアプリケーションを単一のスクリプトで扱うことができないか、というとそうではない。 これは、ウィジェットの状態に応じて表示するアプリケーションを切り替えてやることで実現できる。

以下のサンプルコードでは、セレクトボックスの状態に応じて実行する関数を切り替えている。 それぞれの関数が、それぞれのアプリケーションになっていると考えてもらえれば良い。

# -*- coding: utf-8 -*-

import streamlit as st


def render_gup():
    """GuP のアプリケーションを処理する関数"""
    character_and_quotes = {
        'Miho Nishizumi': 'パンツァーフォー',
        'Saori Takebe': 'やだもー',
        'Hana Isuzu': '私この試合絶対勝ちたいです',
        'Yukari Akiyama': '最高だぜ!',
        'Mako Reizen': '以上だ',
    }
    selected_items = st.multiselect('What are your favorite characters?',
                                    list(character_and_quotes.keys()))
    for selected_item in selected_items:
        st.write(character_and_quotes[selected_item])


def render_aim_for_the_top():
    """トップ!のアプリケーションを処理する関数"""
    selected_item = st.selectbox('Which do you like more in the series?',
                                 [1, 2])
    if selected_item == 1:
        st.write('me too!')
    else:
        st.write('2 mo ii yo ne =)')


def main():
    # アプリケーション名と対応する関数のマッピング
    apps = {
        '-': None,
        'GIRLS und PANZER': render_gup,
        'Aim for the Top! GunBuster': render_aim_for_the_top,
    }
    selected_app_name = st.sidebar.selectbox(label='apps',
                                             options=list(apps.keys()))

    if selected_app_name == '-':
        st.info('Please select the app')
        st.stop()

    # 選択されたアプリケーションを処理する関数を呼び出す
    render_func = apps[selected_app_name]
    render_func()


if __name__ == '__main__':
    main()

上記を実行して得られる表示を以下に示す。

f:id:momijiame:20210512223221p:plain

f:id:momijiame:20210512223230p:plain

f:id:momijiame:20210512223239p:plain

ちなみに、呼び出す関数も 1 つのスクリプトにまとまっている必要はない。 別のモジュールに切り出して、スクリプトではそれをインポートして使うこともできる。 それならコードの見通しもさほど悪くはならないはず。

スクリプトでコマンドライン引数を受け取る

Streamlit のスクリプトにコマンドライン引数を渡したいときもある。 ここでは、そのやり方を紹介する。

Argparse

まずは Python の標準ライブラリにある Argparse を使う場合。 スクリプトを書く時点では特に Streamlit かどうかを意識する必要はない。 一般的な使い方と同じように引数を設定してパースして使うだけ。

# -*- coding: utf-8 -*-

import argparse

import streamlit as st


def main():
    parser = argparse.ArgumentParser(description='parse argument example')
    # --message または -m オプションで文字列を受け取る
    parser.add_argument('--message', '-m', type=str, default='World')
    # 引数をパースする
    args = parser.parse_args()
    # パースした引数を表示する
    st.write(f'Hello, {args.message}!')


if __name__ == '__main__':
    main()

ただ、使う時点ではちょっと注意点がある。 スクリプトの後ろにオプションをつけると Streamlit の引数として認識されてしまう。

$ streamlit run example.py -m Sekai
Usage: streamlit run [OPTIONS] TARGET [ARGS]...
Try 'streamlit run --help' for help.

Error: no such option: -m

そこで -- を使って区切って、スクリプトに対する引数であることを明示的に示す。

$ streamlit run example.py -- -m Sekai

Click

続いてサードパーティ製のパッケージである Click を使う場合。 Click は純粋なコマンドラインパーサ以外の機能もあることから、スクリプトを記述する時点から注意点がある。 具体的には、デコレータで修飾したオブジェクトを呼び出すときに standalone_modeFalse に指定する。 こうすると、デフォルトでは実行が完了したときに exit() してしまう振る舞いを抑制できる。

# -*- coding: utf-8 -*-

import streamlit as st
import click


@click.command()
@click.option('--message', '-m', type=str, default='World')
def main(message):
    # パースした引数を表示する
    st.write(f'Hello, {message}!')


if __name__ == '__main__':
    # click.BaseCommand.main() メソッドが呼ばれる
    # デフォルトの動作では返り値を戻さずに exit してしまう
    # スタンドアロンモードを無効にすることで純粋なコマンドラインパーサとして動作する
    main(standalone_mode=False)

実行するときに Streamlit のオプションとの間に -- で区切りが必要なのは Argparse のときと同じ。

$ streamlit run example.py -- -m Sekai

参考

docs.streamlit.io

click.palletsprojects.com

Python: LightGBM の学習に使うデータ量と最適なイテレーション数の関係性について

XGBoost は同じデータセットとパラメータを用いた場合、学習に使うデータの量 (行数) と最適なイテレーション数が線形な関係にあることが経験的に知られている 1。 今回は、それが同じ GBDT (Gradient Boosting Decision Tree) の一手法である LightGBM にも適用できる経験則なのかを実験で確認する。

使った環境は次のとおり。

$ sw_vers          
ProductName:    macOS
ProductVersion: 11.2.3
BuildVersion:   20D91
$ python -V           
Python 3.9.2
$ pip list | grep -i lightgbm
lightgbm        3.2.0

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install lightgbm scikit-learn seaborn

実験

以下に、実験用のサンプルコードを示す。 サンプルコードでは、sklearn.datasets.make_classification() を使って生成した擬似的な二値分類用のデータセットを使っている。 生成したデータセットから、一定の割合で学習用のデータを無作為抽出して、LightGBM のモデルを学習したときの特性を確認している。 なお、性能の評価は念のため Nested Validation (outer: stratified hold-out, inner: stratified 5-fold cv) にしている。 outer の予測には inner で学習させたモデルで averaging している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import time

import numpy as np
import pandas as pd
import lightgbm as lgb
import seaborn as sns
from matplotlib import pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import log_loss


def main():
    # 疑似的な教師信号を作るためのパラメータ
    dist_args = {
        # データ点数
        'n_samples': 100_000,
        # 次元数
        'n_features': 100,
        # その中で意味のあるもの
        'n_informative': 20,
        # 重複や繰り返しはなし
        'n_redundant': 0,
        'n_repeated': 0,
        # タスクの難易度
        'class_sep': 0.65,
        # 二値分類問題
        'n_classes': 2,
        # 生成に用いる乱数
        'random_state': 42,
        # 特徴の順序をシャッフルしない (先頭の次元が informative になる)
        'shuffle': False,
    }
    # 教師データを作る
    x, y = make_classification(**dist_args)
    # Nested Validation (stratified hold-out -> stratified 5 fold cv)
    train_x, test_x, train_y, test_y = train_test_split(x, y,
                                                        test_size=0.3,
                                                        stratify=y,
                                                        shuffle=True,
                                                        random_state=42,
                                                        )
    folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # 学習用のパラメータ
    lgb_params = {
        # タスク設定
        'objective': 'binary',
        # メトリック
        'metric': 'binary_logloss',
        # 乱数シード
        'seed': 42,
    }

    # 乱数シードを設定する
    np.random.seed(42)

    sampled_rows = []
    best_iterations = []
    test_metrics = []
    learning_times = []
    sampling_rates = np.arange(0.1, 1.0 + 1e-2, 0.1)
    for sampling_rate in sampling_rates:
        train_len = len(train_x)
        sampled_len = int(train_len * sampling_rate)
        sampled_rows.append(sampled_len)

        # 重複なしで無作為抽出する (本当はここも Stratified にした方が良い)
        sampled_indices = np.random.choice(np.arange(train_len),
                                           size=sampled_len,
                                           replace=False)
        sampled_train_x = train_x[sampled_indices]
        sampled_train_y = train_y[sampled_indices]
        train_dataset = lgb.Dataset(sampled_train_x, sampled_train_y)

        # 交差検証
        start_time = time.time()
        cv_result = lgb.cv(params=lgb_params,
                           train_set=train_dataset,
                           num_boost_round=10_000,
                           early_stopping_rounds=100,
                           verbose_eval=100,
                           folds=folds,
                           return_cvbooster=True,
                           )
        end_time = time.time()
        learning_time = end_time - start_time
        learning_times.append(learning_time)

        cvbooster = cv_result['cvbooster']
        best_iterations.append(cvbooster.best_iteration)

        # Fold Averaging でテストデータのメトリックを計算する
        pred_y_folds = cvbooster.predict(test_x)
        pred_y_avg = np.array(pred_y_folds).mean(axis=0)
        test_metric = log_loss(test_y, pred_y_avg)
        test_metrics.append(test_metric)

    # 生の値
    data = {
        'sampling_rates': sampling_rates,
        'sampled_rows': sampled_rows,
        'best_iterations': best_iterations,
        'learning_times': learning_times,
        'test_metrics': test_metrics,
    }
    df = pd.DataFrame(data)
    print(df)

    # グラフにプロットする
    fig = plt.figure(figsize=(8, 12))
    ax1 = fig.add_subplot(3, 1, 1)
    sns.lineplot(data=df,
                 x='sampling_rates',
                 y='best_iterations',
                 label='best iteration',
                 ax=ax1,
                 )
    ax1.grid()
    ax1.legend()
    ax2 = fig.add_subplot(3, 1, 2)
    sns.lineplot(data=df,
                 x='sampling_rates',
                 y='learning_times',
                 label='learning time (sec)',
                 ax=ax2,
                 )
    ax2.grid()
    ax2.legend()
    ax3 = fig.add_subplot(3, 1, 3)
    sns.lineplot(data=df,
                 x='sampling_rates',
                 y='test_metrics',
                 label='test metric (logloss)',
                 ax=ax3,
                 )
    ax3.grid()
    ax3.legend()

    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 計算リソースにもよるけど、それなりに時間がかかるはず。

$ python lgbiter.py 
[LightGBM] [Info] Number of positive: 2764, number of negative: 2836
[LightGBM] [Warning] Auto-choosing col-wise multi-threading, the overhead of testing was 0.003497 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 25500

...

[1500] cv_agg's binary_logloss: 0.118727 + 0.00302704
[1600]    cv_agg's binary_logloss: 0.118301 + 0.00281247
[1700] cv_agg's binary_logloss: 0.117938 + 0.00278925
   sampling_rates  sampled_rows  best_iterations  learning_times  test_metrics
0             0.1          7000              342        6.734761      0.189618
1             0.2         14000              634       12.412657      0.157727
2             0.3         21000              849       18.421927      0.134406
3             0.4         28000             1018       22.645187      0.129939
4             0.5         35000             1162       27.784236      0.122941
5             0.6         42000             1327       33.731716      0.115750
6             0.7         49000             1567       42.821615      0.113003
7             0.8         56000             1614       48.171218      0.109459
8             0.9         63000             1650       60.064258      0.107337
9             1.0         70000             1681       63.199017      0.104814

完了すると、以下のようなグラフが得られる。

f:id:momijiame:20210403001951p:plain
学習に使うデータ量と最適なイテレーション数の関係性

グラフから、LightGBM においても学習に使うデータ量と最適なイテレーション数は概ね線形な関係にあることが確認できた。 また、学習に使うデータ量と学習にかかる時間に関しても概ね線形な関係にあることが見て取れる。 一方で、学習に使うデータが増えても予測精度は非線形な改善にとどまっており、この点も直感には反していない。

いじょう。

Kaggleコンペティション チャレンジブック

Kaggleコンペティション チャレンジブック