CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: TFRecord フォーマットについて

TFRecord フォーマットは、TensorFlow がサポートしているデータセットの表現形式の一つ。 このフォーマットは、一言で表すと TensorFlow で扱うデータを Protocol Buffers でシリアライズしたものになっている。 特に、Dataset API との親和性に優れていたり、Cloud TPU を扱う上で実用上はほぼ必須といった特徴がある。 今回は、そんな TFRecord の扱い方について見ていくことにする。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.5
BuildVersion:   20G71
$ python -V
Python 3.9.6
$ pip list | grep -i tensorflow
tensorflow               2.5.0
tensorflow-datasets      4.3.0
tensorflow-estimator     2.5.0
tensorflow-metadata      1.1.0

もくじ

下準備

あらかじめ TensorFlow をインストールしておく。

$ pip install tensorflow tensorflow_datasets

そして、Python のインタプリタを起動する。

$ python

tensorflow パッケージを tf という名前でインポートしておく。

>>> import tensorflow as tf

概要

TFRecord フォーマットを TensorFlow の Python API から扱おうとすると、いくつかのオブジェクト (クラス) が登場する。 ただ、意外とその数が多いので、理解する上でとっつきにくさを生んでいる感じがある。 そこで、まずは一通りトップダウンで説明することにする。

それぞれの関係は、あるオブジェクトが別のオブジェクトを内包するようになっている。 階層構造で表すと、以下のような感じ。 階層構造で上にあるオブジェクトが、下にあるオブジェクトを内包する。

  • tf.Example
    • tf.train.Features
      • tf.train.Feature
        • tf.train.BytesList
        • tf.train.FloatList
        • tf.train.Int64List

tf.Example

tf.Example は、データセットに含まれる特定のサンプル (データポイント) に対応したオブジェクトになっている。 たとえば、教師あり学習のデータセットなら、あるサンプルの説明変数と目的変数のペアがこれに当たるイメージ。 ただ、サンプルに対応しているオブジェクトというだけなので、別に必要なら何を入れても構わない。 たとえば、画像データなら付随するメタデータとして横幅 (Width) と縦幅 (Height) のピクセル数が必要とかはあるはず。

このオブジェクトは単一の tf.train.Features というオブジェクトを内包する。

tf.train.Features

tf.train.Features は、名前から複数の特徴量を束ねるオブジェクトっぽいけど、まあ概ねその理解で正しいと思う。 概ね、というのは前述したとおりメタデータ的なものや説明変数も含まれるため。

このオブジェクトは複数の tf.train.Feature を内包する。

tf.train.Feature

tf.train.Feature は、特定の特徴量ないしメタデータや説明変数に対応したオブジェクト。

このオブジェクトは単一の tf.train.BytesList または tf.train.FloatList または tf.train.Int64List を内包する。

tf.train.BytesList

tf.train.BytesList は、特徴量としてバイト列のリストを扱うために用いるオブジェクト。

このオブジェクトは bytes 型のリストを内包する。 任意のバイト列を扱えるので、何らかのオブジェクトをシリアライズしたものを入れることができる。 詳しくは後述するけど、この特性は割と重要になってくる。 なぜなら、他の tf.train.FloatListtf.train.Int64List は一次元配列しか扱えないため。

tf.train.FloatList

tf.train.FloatList は、特徴量として浮動小数点のリストを扱うために用いるオブジェクト。

このオブジェクトは浮動小数点のリストを内包する。 前述したとおり、リストは一次元のものしか扱えない。

tf.train.Int64List

tf.train.Int64List は、特徴量として整数のリストを扱うために用いるオブジェクト。

このオブジェクトは整数のリストを内包する。 前述したとおり、リストは一次元のものしか扱えない。

基本的な使い方

一通りのオブジェクトの説明が終わったので、ここからは実際にコードを実行しながら試してみよう。 先ほどの説明とは反対に、ボトムアップでの実行になる。 これは、そうでないとオブジェクトを組み立てられないため。

まず、最もプリミティブなオブジェクトである tf.train.Int64Listtf.train.FloatListtf.train.BytesList から。 これらは前述したとおりバイト列・浮動小数点・整数のリストを内包するオブジェクトになっている。

>>> int64_list = tf.train.Int64List(value=[1, 2, 3])
>>> int64_list
value: 1
value: 2
value: 3

>>> float_list = tf.train.FloatList(value=[1., 2., 3.])
>>> float_list
value: 1.0
value: 2.0
value: 3.0

>>> bytes_list = tf.train.BytesList(value=[b'x', b'y', b'z'])
>>> bytes_list
value: "x"
value: "y"
value: "z"

前述したとおり、value には一次元配列しか渡せないらしい。 渡そうとすると次のようにエラーになる。

>>> import numpy as np
>>> x = np.random.randint(low=0, high=100, size=(3, 2))
>>> tf.train.Int64List(value=x)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: only integer scalar arrays can be converted to a scalar index
>>> tf.train.Int64List(value=[[1, 2], [3, 4]])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: [1, 2] has type list, but expected one of: int, long

この仕様だと、画像データとか扱うときに面倒くさくない?と思うはず。 そんなときは、多次元配列を次のようにバイト列にシリアライズしてやれば良い。

>>> serialized_x = tf.io.serialize_tensor(x)
>>> serialized_x
<tf.Tensor: shape=(), dtype=string, numpy=b'\x08\t\x12\x08\x12\x02\x08\x03\x12\x02\x08\x02"0\x08\x00\x00\x00\x00\x00\x00\x00^\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00\x00\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00G\x00\x00\x00\x00\x00\x00\x00'>

バイト列になっていれば tf.train.BytesList に入れることができる。

>>> tf.train.BytesList(value=[serialized_x.numpy()])
value: "\010\t\022\010\022\002\010\003\022\002\010\002\"0\010\000\000\000\000\000\000\000^\000\000\000\000\000\000\000\017\000\000\000\000\000\000\0006\000\000\000\000\000\000\000H\000\000\000\000\000\000\000G\000\000\000\000\000\000\000"

なお、もちろん多次元配列は Flatten して、別で保存しておいた shape の情報を使って復元してもかまわない。

続いては tf.train.Feature を使って先ほどの *List オブジェクトをラップする。 型ごとに引数が異なるため、そこだけ注意する。

>>> int64_feature = tf.train.Feature(int64_list=int64_list)
>>> int64_feature
int64_list {
  value: 1
  value: 2
  value: 3
}

>>> float_feature = tf.train.Feature(float_list=float_list)
>>> float_feature
float_list {
  value: 1.0
  value: 2.0
  value: 3.0
}

>>> bytes_feature = tf.train.Feature(bytes_list=bytes_list)
>>> bytes_feature
bytes_list {
  value: "x"
  value: "y"
  value: "z"
}

続いては、tf.train.Features を使って、複数の tf.train.Feature を束ねる。

>>> feature_mappings = {
...     'feature0': int64_feature,
...     'feature1': float_feature,
...     'feature2': bytes_feature,
... }
>>> features = tf.train.Features(feature=feature_mappings)
>>> features
feature {
  key: "feature0"
  value {
    int64_list {
      value: 1
      value: 2
      value: 3
    }
  }
}
feature {
  key: "feature1"
  value {
    float_list {
      value: 1.0
      value: 2.0
      value: 3.0
    }
  }
}
feature {
  key: "feature2"
  value {
    bytes_list {
      value: "x"
      value: "y"
      value: "z"
    }
  }
}

あとは tf.train.Example でラップするだけ。

>>> example = tf.train.Example(features=features)
>>> example
features {
  feature {
    key: "feature0"
    value {
      int64_list {
        value: 1
        value: 2
        value: 3
      }
    }
  }
  feature {
    key: "feature1"
    value {
      float_list {
        value: 1.0
        value: 2.0
        value: 3.0
      }
    }
  }
  feature {
    key: "feature2"
    value {
      bytes_list {
        value: "x"
        value: "y"
        value: "z"
      }
    }
  }
}

上記で完成した tf.train.Example オブジェクトがデータセットの中の特定のサンプルに対応することになる。 まあ、使っているのがダミーデータなのでちょっとイメージがつきにくいかもしれないけど。

tf.train.Example オブジェクトは SerializeToString() メソッドを使うことでバイト列にシリアライズできる。 つまり、.tfrecord の拡張子がついた TFRecord ファイルは、このシリアライズされたバイト列が書き込まれている。

>>> serialized_data = example.SerializeToString()
>>> serialized_data
b'\nL\n\x17\n\x08feature2\x12\x0b\n\t\n\x01x\n\x01y\n\x01z\n\x1c\n\x08feature1\x12\x10\x12\x0e\n\x0c\x00\x00\x80?\x00\x00\x00@\x00\x00@@\n\x13\n\x08feature0\x12\x07\x1a\x05\n\x03\x01\x02\x03'

ちなみに、これまでに登場したオブジェクトも、それぞれ単独で SerializeToString() を使えばシリアライズできる。

>>> int64_list.SerializeToString()
b'\n\x03\x01\x02\x03'
>>> int64_feature.SerializeToString()
b'\x1a\x05\n\x03\x01\x02\x03'
>>> features.SerializeToString()
b'\n\x13\n\x08feature0\x12\x07\x1a\x05\n\x03\x01\x02\x03\n\x17\n\x08feature2\x12\x0b\n\t\n\x01x\n\x01y\n\x01z\n\x1c\n\x08feature1\x12\x10\x12\x0e\n\x0c\x00\x00\x80?\x00\x00\x00@\x00\x00@@'

そして、シリアライズしたバイト列は、tf.train.Example.FromString() 関数を使ってデシリアライズできる。

>>> deserialized_object = tf.train.Example.FromString(serialized_data)
>>> deserialized_object
features {
  feature {
    key: "feature0"
    value {
      int64_list {
        value: 1
        value: 2
        value: 3
      }
    }
  }
  feature {
    key: "feature1"
    value {
      float_list {
        value: 1.0
        value: 2.0
        value: 3.0
      }
    }
  }
  feature {
    key: "feature2"
    value {
      bytes_list {
        value: "x"
        value: "y"
        value: "z"
      }
    }
  }
}

データセットを TFRecord ファイルに変換する

基本的な使い方がわかったところで、続いては実際にデータセットを TFRecord 形式のファイルに変換してみよう。

使う題材は特に何でも良いんだけど、今回は tensorflow-datasets 経由でロードした CIFAR10 を使うことにする。

>>> import tensorflow_datasets as tfds
>>> ds_train = tfds.load('cifar10', split='train')

このデータセットには (32, 32, 3) の形状を持った画像のテンソルと、それに対応したラベルが入っている。 画像のデータが一次元になっていないので、わざわざ Flatten する代わりに前述したシリアライズしてバイト列にする作戦でいこう。

>>> from pprint import pprint
>>> pprint(ds_train.element_spec)
{'id': TensorSpec(shape=(), dtype=tf.string, name=None),
 'image': TensorSpec(shape=(32, 32, 3), dtype=tf.uint8, name=None),
 'label': TensorSpec(shape=(), dtype=tf.int64, name=None)}

まず、特定のサンプルに対応したテンソルとラベルを前述した手順でシリアライズする関数を次のように定義する。

>>> def serialize_example(image, label):
...     """1 サンプルを Protocol Buffers で TFRecord フォーマットにシリアライズする関数"""
...     # 画像データをバイト列にシリアライズする
...     serialized_image = tf.io.serialize_tensor(image)
...     image_bytes_list = tf.train.BytesList(value=[serialized_image.numpy()])
...     # ラベルデータ
...     label_int64_list = tf.train.Int64List(value=[label.numpy()])
...     # 特徴量を Features にまとめる
...     feature_mappings = {
...         'image': tf.train.Feature(bytes_list=image_bytes_list),
...         'label': tf.train.Feature(int64_list=label_int64_list),
...     }
...     features = tf.train.Features(feature=feature_mappings)
...     # Example にまとめる
...     example_proto = tf.train.Example(features=features)
...     # バイト列にする
...     return example_proto.SerializeToString()
... 

続いて、データセットから取り出したサンプルに上記の関数を定義するヘルパー関数を次のように定義する。

>>> def tf_serialize_example(element):
...     """シリアライズ処理を tf.data.Dataset に適用するためのヘルパー関数"""
...     # イメージとラベルを取り出す
...     image = element['image']
...     label = element['label']
...     tf_string = tf.py_function(
...         serialize_example, 
...         (image, label),
...         tf.string,
...     )
...     return tf.reshape(tf_string, ())
... 

Dataset API を使って、上記の関数をデータセットに適用する。

>>> serialized_ds_train = ds_train.map(tf_serialize_example)

イテレータにしてサンプルをひとつ取り出してみよう。

>>> ite = iter(serialized_ds_train)
>>> next(ite)
<tf.Tensor: shape=(), dtype=string, numpy=b'\n\xb6\x18\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x07\n\xa3\x18\n\x05image\x12\x99
...

ちゃんとシリアライズされたバイト列が確認できる。

あとは、シリアライズしたバイト列が取り出せる Dataset オブジェクトを引数にして tf.data.experimental.TFRecordWriter#write() を呼び出すだけ。

>>> filename = 'cifar10-train.tfrecord'
>>> writer = tf.data.experimental.TFRecordWriter(filename)
>>> writer.write(serialized_ds_train)

上記はデータセットを丸ごと 1 つのファイルにしてる。 公式ドキュメントを見ると、パフォーマンスを考えると 100 ~ 200MB 程度のサイズで複数に分割するのがおすすめらしい。 これは、おそらく GCS とかにアップロードして並列で読み出すときの話。

カレントディレクトリを確認すると、次のようにファイルが書き出されているはず。

$ du -m cifar10-train.tfrecord
161    cifar10-train.tfrecord
$ file cifar10-train.tfrecord 
cifar10-train.tfrecord: data

TFRecord ファイルからデータを読み出す

次は上記のファイルを読み込んでデシリアライズする。 まず、tf.data.TFRecordDataset に TFRecord ファイルのパスを指定する。 これで、シリアライズしたバイト列を読み出せる Dataset オブジェクトが得られる。

>>> loaded_ds_train = tf.data.TFRecordDataset(filename)

上記からは tf.Example に対応したバイト列が 1 つずつ読み出せる。 なので、それを元のテンソルに戻す関数を次のように定義する。

>>> def deserialize_example(example_proto):
...     """バイト列をデシリアライズしてオブジェクトに戻す関数"""
...     # バイト列のフォーマット
...     feature_description = {
...         'image': tf.io.FixedLenFeature([], tf.string),
...         'label': tf.io.FixedLenFeature([], tf.int64),
...     }
...     # Tensor オブジェクトの入った辞書に戻す
...     parsed_element = tf.io.parse_single_example(example_proto,
...                                                 feature_description)
...     # 画像はバイト列になっているのでテンソルに戻す
...     parsed_element['image'] = tf.io.parse_tensor(parsed_element['image'],
...                                                  out_type=tf.uint8)
...     return parsed_element
... 

上記を先ほどの Dataset オブジェクトに適用する。

>>> deserialized_ds_train = loaded_ds_train.map(deserialize_example)

試しに中身を取り出してみると、ちゃんと画像とラベルのテンソルが復元できていることがわかる。

>>> ite = iter(deserialized_ds_train)
>>> next(ite)
{'image': <tf.Tensor: shape=(32, 32, 3), dtype=uint8, numpy=
array([[[143,  96,  70],
        [141,  96,  72],
        [135,  93,  72],
        ...,
        [ 96,  37,  19],
        [105,  42,  18],
        [104,  38,  20]],

       [[128,  98,  92],
        [146, 118, 112],
        [170, 145, 138],
        ...,
        [108,  45,  26],
        [112,  44,  24],
        [112,  41,  22]],

       [[ 93,  69,  75],
        [118,  96, 101],
        [179, 160, 162],
        ...,
        [128,  68,  47],
        [125,  61,  42],
        [122,  59,  39]],

       ...,

       [[187, 150, 123],
        [184, 148, 123],
        [179, 142, 121],
        ...,
        [198, 163, 132],
        [201, 166, 135],
        [207, 174, 143]],

       [[187, 150, 117],
        [181, 143, 115],
        [175, 136, 113],
        ...,
        [201, 164, 132],
        [205, 168, 135],
        [207, 171, 139]],

       [[195, 161, 126],
        [187, 153, 123],
        [186, 151, 128],
        ...,
        [212, 177, 147],
        [219, 185, 155],
        [221, 187, 157]]], dtype=uint8)>, 'label': <tf.Tensor: shape=(), dtype=int64, numpy=7>}

いじょう。

参考

www.tensorflow.org

Python: Luigi でタスク共通のパラメータを扱う

今回は、Luigi で複数のタスクが共通のパラメータを扱う方法について考えてみる。 ここらへん、調べてもあまりドキュメントなどが出てこなかった。 なので、ソースコードを読んでリバースエンジニアリング的に「こういう風にできそう」と判明した内容を書いてみる。 使う API のレイヤー的に、高レベルなやり方と低レベルなやり方が見つかったので、どちらも記載する。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V        
Python 3.9.5
$ pip list | grep -i luigi
luigi           3.0.3

もくじ

下準備

まずは、下準備として Luigi をインストールしておく。

$ pip install luigi

低レベル API (luigi.configuration.get_config()) を使う

まずは低レベル API の luigi.configuration.get_config() を使うやり方から。 この API を使うと、Luigi の設定ファイルを辞書形式でそのまま読み込むことができる。 読み込んだコンフィグは、どのタスクから利用することもできるため共通のパラメータを扱うことができる。

以下にサンプルコードを示す。 サンプルコードには TaskATaskB という、2 つのタスクを定義している。 この中では、それぞれ設定ファイルから [SharedConfig] セクションの shared_param パラメータを読み込んで使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
# 設定を取得するための API
from luigi.configuration import get_config


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig セクションのパラメータを取得する
        section_dict = dict(get_config().items('SharedConfig'))
        # パラメータの内容を標準出力に書き出す (ほんとは output() に書くべき)
        print('Hello,', section_dict['shared_param'], 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # 同じパラメータを使う
        section_dict = dict(get_config().items('SharedConfig'))
        print('Hello,', section_dict['shared_param'], 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

次のように設定ファイルを用意しよう。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

あとは実行するだけ。

$ python lowlayer.py

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の結果からわかるように、タスクが出力するメッセージの中に設定ファイルで指定した値が使われている。

高レベル API (luigi.Config) を使う

続いては高レベル API の luigi.Config を使うパターン。 こちらは luigi.Config というクラスを継承したクラスを定義する。 設定ファイからは、定義したクラスと同名のセクション経由でパラメータを設定できる。 複数のタスクからは、クラスをインスタンス化してやればパラメータがインジェクションされて得られる。

以下にサンプルコードを示す。 先ほどのサンプルコードから SharedConfig というクラスが増えている。 そして、TaskATaskBSharedConfig をインスタンス化して shared_param パラメータにアクセスしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class SharedConfig(luigi.Config):
    """複数のタスクから参照される共通のパラメータ"""
    shared_param = luigi.Parameter()


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig をインスタンス化してパラメータを取り出す
        # パラメータは luigi の設定ファイルで指定できる
        print('Hello,', SharedConfig().shared_param, 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # こちらも同様
        print('Hello,', SharedConfig().shared_param, 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

設定ファイルは先ほどと同じで良い。 セクション名とパラメータ名が同じになるようにクラスを定義してあるため。 作り直すなら次のようにする。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

実行してみよう。

$ python highlayer.py 

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果から、ちゃんとパラメータを参照できていることがわかる。

ところで、上記で使った luigi.Config というクラス、定義を見ると面白いことがわかる。 以下がそのソースコード。

github.com

なんと、luigi.Configluigi.Task を継承しているだけで、他に何もしていない。 つまり、ほとんど同一のものということになる。 実は、luigi.Config を使わなくても、luigi.Task でも同じことはできるのだ。

いじょう。

Python: Jupyter の IPython Kernel にスタートアップスクリプトを登録する

今回は Jupyter の IPython Kernel に、スタートアップスクリプトを登録する方法について書いてみる。 スタートアップスクリプトというのは、カーネルの起動時に読み込まれるコードのこと。 IPython Kernel というのは、いわゆるフツーのノートブックを Jupyter で実行するときに動いているバックエンドのプログラムを指している。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V           
Python 3.9.5
$ jupyter --version
jupyter core     : 4.7.1
jupyter-notebook : 6.4.0
qtconsole        : not installed
ipython          : 7.24.1
ipykernel        : 5.5.5
jupyter client   : 6.1.12
jupyter lab      : 3.0.16
nbconvert        : 6.1.0
ipywidgets       : not installed
nbformat         : 5.1.3
traitlets        : 5.0.5

もくじ

下準備

まずは JupyterLab と Pandas をインストールしておく。 Pandas の方はインポートするのに使うだけなので、あまり本質ではない。

$ pip install jupyterlab pandas

IPython のプロファイルについて

まず、本題に入る前に IPython のプロファイルという概念を説明しておく。 プロファイルは、ようするに IPython が動作するときの設定を扱う名前空間みたいなもの。 存在するプロファイルは ipython profile list コマンドで確認できる。

$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

上記のように、何もしなくても初期状態で default という名前のプロファイルがある。

実は、何気なく実行している ipython コマンドは、暗に --profile=default オプションをつけているのと等価になっている。

$ ipython  # ipython --profile=default と同じ

各プロファイルにはディレクトリがあって、そこにはプロファイル毎の設定ファイルや動作ログが収められている。

$ ipython profile locate default
/Users/amedama/.ipython/profile_default
$ ls $(ipython profile locate default)
db      log       security
history.sqlite    pid     startup

デフォルトのカーネルにスタートアップスクリプトを登録する

先ほど実行したコマンドの出力を見ると、プロファイルのディレクトリには、さらに startup というディレクトリがある。 ここに、名前が数字 2 ケタから始まる Python スクリプトを入れると、カーネルの起動時にそれが呼び出されるようになる。

試しに Pandas と NumPy のインポート文を追加してみよう。

$ cat << 'EOF' >> $(ipython profile locate default)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

試しに IPython を起動して pd という変数を参照してみると、ちゃんと Pandas のモジュールを指していることがわかる。

$ ipython -c "pd"       
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

前述したとおり、上記は暗に --profile=default を付けているのと等価になる。

$ ipython --profile=default -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

もちろん、これは Jupyter からも有効になる。 試しに Jupyter Lab を起動して、デフォルトのカーネルでノートブックを作ってみよう。

$ jupyter lab

先ほどと同じように pd という名前の変数を参照すると、ちゃんと読み込まれている。

f:id:momijiame:20210624191922p:plain

新たに専用のカーネルを作ってスタートアップスクリプトを登録する

続いては、専用のカーネルを作って、そこにスタートアップスクリプトを登録してみよう。 これは、たとえば用途ごとにスタートアップスクリプトを用意して使い分けたいようなユースケースを想定している。

はじめに、スタートアップスクリプトを登録するためのプロファイルを新たに用意する。 新しくプロファイルを作るには ipython profile create コマンドを使う。 ここでは customized という名前でプロファイルを作った。

$ ipython profile create customized
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_config.py'
[ProfileCreate] Generating default config file: '/Users/amedama/.ipython/profile_customized/ipython_kernel_config.py'
$ ipython profile list

Available profiles in /Users/amedama/.ipython:
    customized
    default

To use any of the above profiles, start IPython with:
    ipython --profile=<name>

先ほどと同じように、プロファイルにスタートアップスクリプトを登録しておこう。

$ cat << 'EOF' >> $(ipython profile locate customized)/startup/00-common-import.py
import pandas as pd
import numpy as np
EOF

ひとまず、プロファイル経由でスタートアップスクリプトが読み込まれているかを IPython の REPL で確認しておく。 --profile オプションで、作ったプロファイル customized を指定しよう。

$ ipython --profile=customized -c "pd"
Out[1]: <module 'pandas' from '/Users/amedama/.virtualenvs/py39/lib/python3.9/site-packages/pandas/__init__.py'>

続いて、カーネルの設定に入る。 まず、現在有効なカーネルの一覧は jupyter kernelspec list コマンドで得られる。

$ jupyter kernelspec list
Available kernels:
  python3    /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3

ここにはカーネルを起動するときの情報が入った kernel.json というファイルがある。 中身を見ると、結局のところカーネルの起動というのは $ python -m ipykernel_launcher -f ... というコマンドを実行しているのに過ぎないことがわかる。

$ cat ~/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json 
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}"
 ],
 "display_name": "Python 3",
 "language": "python"
}

おもむろに、デフォルトのカーネルのディレクトリを丸ごとコピーする。

$ cp -r ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}

そして kernel.json をちょこっと書きかえよう。

$ cat << 'EOF' > ~/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json
{
 "argv": [
  "python",
  "-m",
  "ipykernel_launcher",
  "-f",
  "{connection_file}",
  "--profile",
  "customized"
 ],
 "display_name": "Customized Python 3",
 "language": "python"
}
EOF

差分は以下のとおり。 要するに表示名に Customized をつけているのと、起動時のオプションに --profile customized を追加してるだけ。

$ diff -u ~/.virtualenvs/py39/share/jupyter/kernels/{python3,customized}/kernel.json
--- /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/python3/kernel.json  2021-06-24 18:55:06.000000000 +0900
+++ /Users/amedama/.virtualenvs/py39/share/jupyter/kernels/customized/kernel.json   2021-06-24 19:32:26.000000000 +0900
@@ -4,8 +4,10 @@
   "-m",
   "ipykernel_launcher",
   "-f",
-  "{connection_file}"
+  "{connection_file}",
+  "--profile",
+  "customized"
  ],
- "display_name": "Python 3",
+ "display_name": "Customized Python 3",
  "language": "python"
-}
\ No newline at end of file
+}

Jupyter Lab を起動してみよう。

$ jupyter lab

Web UI を確認すると、新しくカーネルが登録されていることがわかる。

f:id:momijiame:20210624193349p:plain

もちろん、カーネルを起動するとスタートアップスクリプトが実行される。

めでたしめでたし。

Python: Luigi から S3 互換のオブジェクトストレージを使う

今回は、Python のデータパイプライン構築用フレームワークの Luigi から、Amazon 以外が提供している S3 互換のオブジェクトストレージを利用する方法について書いてみる。 S3 互換のオブジェクトストレージとしては、ひとまず以下のエントリで紹介した MinIO をローカルホストで動かした。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers     
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V
Python 3.9.5
$ pip list | grep -i luigi   
luigi           3.0.3
$ minio --version
minio version RELEASE.2021-06-17T00-10-46Z

もくじ

下準備

下準備として、MinIO と AWS CLI、それに Luigi と Boto3 をインストールしておく。 Boto3 は AWS を操作するための Python のクライアントライブラリで、Luigi で AWS 関連の処理をするときに必要となる。

$ brew install minio awscli
$ pip install luigi boto3

デフォルトの設定で MinIO のサーバを起動する。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

そして、テスト用のバケットを example-bucket という名前で作っておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket

サンプルコード

早速だけど、以下にサンプルコードを示す。 サンプルコードでは、ExampleTask というタスクを 1 つ定義している。 タスクの output() メソッドを見ると分かるとおり、Luigi でタスクのターゲットを S3 にしたいときは luigi.contrib.s3.S3Target を使えば良い。 そして、このタスクは実行すると s3://example-bucket/greet.txt という URL にファイルを作る。 ファイルの中には `Hello, World! という文字列が書き込まれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi
from luigi.contrib.s3 import S3Target


class ExampleTask(luigi.Task):

    def run(self):
        # NOTE: バケットは自動で作られない点に注意する
        with self.output().open(mode='w') as out_fp:
            print('Hello, World!', file=out_fp)

    def output(self):
        return S3Target(path=f's3://example-bucket/greet.txt')


if __name__ == '__main__':
    luigi.run(main_task_cls=ExampleTask,
              local_scheduler=True)

上記に適当な名前をつけて保存する。 ここでは例として example.py という名前にした。

さて、問題は上記をそのまま実行すると、アクセス先が本家の AWS になってしまうところ。 どうにかしてローカルホストにアクセスしてもらわないといけない。

結論から先に述べると、Luigi の設定ファイルに [s3] というセクションを作って、そこに設定を書けば良い。 前述したとおり、Luigi の S3 関連の処理は AWS SDK for Python (Boto3) に依存している。 [s3] というセクションに定義したパラメータは、boto3.client() を初期化するときの引数としてそのまま渡される。 つまり、ここでアクセス先や認証情報を変更できる。

$ cat << 'EOF' > luigi.cfg
[s3]
aws_access_key_id=minioadmin
aws_secret_access_key=minioadmin
use_ssl=False
endpoint_url=http://localhost:9000
EOF

設定できたところでタスクを実行してみよう。

$ python example.py

...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 ExampleTask()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を確認してみよう。 バケットを確認すると、ちゃんとオブジェクトができている。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket/ --recursive
2021-06-23 18:32:31         14 greet.txt

オブジェクトの中身を確認すると、ちゃんとメッセージが書き込まれていることがわかる。

$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/greet.txt -
Hello, World!

めでたしめでたし。

S3 互換オブジェクトストレージの OSS - MinIO を試す

MinIO は Amazon S3 互換のオブジェクトストレージを提供する OSS のひとつ。 たとえばオンプレ環境でオブジェクトストレージを構築したいときや、手元で S3 を扱うアプリケーションの動作確認をするときなんかに使える。 今回はそんな MinIO を AWS CLI と Python クライアントの boto3 から使ってみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ minio -v    
minio version RELEASE.2021-05-26T00-22-46Z
$ python -V
Python 3.9.5
$ aws --version
aws-cli/2.2.6 Python/3.9.5 Darwin/20.5.0 source/x86_64 prompt/off
$ pip list | grep -i boto3  
boto3                     1.17.83

もくじ

下準備

今回は Homebrew から MinIO をインストールして使う。 クライアントとして awscli と boto3 も入れておく。

$ brew install minio awscli
$ pip install boto3

インストールできたら作業用のディレクトリを指定して minio server コマンドを実行する。 これで MinIO のサーバが立ち上がる。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

立ち上がると 9000 番ポートを Listen し始める。

$ lsof -i:9000
COMMAND   PID    USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
minio   13739 amedama   14u  IPv6 0x62631bfc177de1b3      0t0  TCP *:cslistener (LISTEN)

ブラウザでローカルホストの 9000 番ポートにアクセスすると管理用の Web UI が見える。

$ open http://localhost:9000/

f:id:momijiame:20210529145812p:plain
MinIO の管理用 Web UI

アカウントはデフォルトだと Access Key と Secret Key がどちらも minioadmin でログインできる。 デフォルトのアカウントを変更したいときはサーバを立ち上げるときに以下の環境変数で指定する。

  • Access Key

    • MINIO_ROOT_USER または MINIO_ACCESS_KEY
  • Secret Key

    • MINIO_ROOT_PASSWORD または MINIO_SECRET_KEY

AWS CLI から操作する

はじめに AWS CLI から操作してみよう。 まずは認証情報を環境変数で設定しておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin

あとは aws コマンドのオプションとして --endpoint-url に MinIO が動作してる http://localhost:9000 を指定するだけ。

$ aws --endpoint-url http://localhost:9000 s3 ls

特にエラーにならず上記が実行できれば大丈夫。

サンプルとなるバケットを example-bucket という名前で作成してみる。

$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket
make_bucket: example-bucket

作成すると、ちゃんと ls でバケットが見えるようになった。

$ aws --endpoint-url http://localhost:9000 s3 ls
2021-05-29 15:25:34 example-bucket

続いてはファイルをバケットにコピーしてみる。

$ echo "Hello, World" > /tmp/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 cp /tmp/greet.txt s3://example-bucket
upload: ../../tmp/greet.txt to s3://example-bucket/greet.txt

ちゃんとアップロードできた。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket
2021-05-29 15:26:29         13 greet.txt

ファイルに深いプレフィックスをつけてコピーしたいときは ls--recursive オプションをつけると再帰的に確認できる。

$ aws --endpoint-url http://localhost:9000 s3 cp /tmp/greet.txt s3://example-bucket/folder/subfolder/ 
upload: ../../tmp/greet.txt to s3://example-bucket/folder/subfolder/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket --recursive
2021-05-29 15:29:37         13 folder/subfolder/greet.txt
2021-05-29 15:26:29         13 greet.txt

上記は / を区切りにした階層構造があるように見えるけど、これはあくまでファイル名に / 区切りのプレフィックスがついているに過ぎない。 つまり、インタフェース的に階層構造があるように見せているだけ、という点には留意する必要がある。 階層構造のように見えたとしても、バケット以下の構造はあくまでもフラットな名前空間になっている。

標準入出力経由でファイルをコピーすることもできる。

$ echo "Hello, World" | aws --endpoint-url http://localhost:9000 s3 cp - s3://example-bucket/stdin/greet.txt
$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/stdin/greet.txt -
Hello, World

ファイルを削除するときは rm コマンドを使う。

$ aws --endpoint-url http://localhost:9000 s3 rm s3://example-bucket/stdin/greet.txt
delete: s3://example-bucket/stdin/greet.txt

バケットの削除は、入っているファイルをすべて削除すれば rb コマンドからできる。 ただし、今回は後段の boto3 が残っているので省略する。

boto3 から操作する

続いては Python クライアントの boto3 からアクセスしてみる。

まずは Python のインタプリタを起動する。

$ python

boto3 パッケージをインポートする。

>>> import boto3

エンドポイントや認証情報を与えてクライアントを作る。

>>> s3_client = boto3.client('s3',
...                          use_ssl=False,
...                          endpoint_url='http://localhost:9000',
...                          aws_access_key_id='minioadmin',
...                          aws_secret_access_key='minioadmin')

バケットのリストを確認すると、先ほど AWS CLI で作成したものが確認できる。

>>> response = s3_client.list_buckets()
>>> response['Buckets']
[{'Name': 'example-bucket', 'CreationDate': datetime.datetime(2021, 5, 29, 6, 25, 34, 96000, tzinfo=tzutc())}]

試しに新しくバケットを作ってみよう。

>>> s3_client.create_bucket(Bucket='boto3-bucket')
{'ResponseMetadata': {'RequestId': '168376A910A8A588', 'HostId': '', 'HTTPStatusCode': 200, 'HTTPHeaders': {'accept-ranges': 'bytes', 'content-length': '0', 'content-security-policy': 'block-all-mixed-content', 'location': '/boto3-bucket', 'server': 'MinIO', 'vary': 'Origin', 'x-amz-request-id': '168376A910A8A588', 'x-xss-protection': '1; mode=block', 'date': 'Sat, 29 May 2021 06:45:59 GMT'}, 'RetryAttempts': 0}, 'Location': '/boto3-bucket'}

確認すると、新しくバケットができている。

>>> response = s3_client.list_buckets()
>>> from pprint import pprint
>>> pprint(response['Buckets'])
[{'CreationDate': datetime.datetime(2021, 5, 29, 6, 45, 59, 285000, tzinfo=tzutc()),
  'Name': 'boto3-bucket'},
 {'CreationDate': datetime.datetime(2021, 5, 29, 6, 25, 34, 96000, tzinfo=tzutc()),
  'Name': 'example-bucket'}]

いくつかやり方はあるけど、ここでは upload_fileobj() 関数を使ってファイルをアップロードしてみる。

>>> import io
>>> f = io.BytesIO(b'Hello, World')
>>> s3_client.upload_fileobj(f, 'boto3-bucket', 'greet.txt')

ちゃんとアップロードできた。

>>> response = s3_client.list_objects(Bucket='boto3-bucket')
>>> pprint(response['Contents'])
[{'ETag': '"82bb413746aee42f89dea2b59614f9ef"',
  'Key': 'greet.txt',
  'LastModified': datetime.datetime(2021, 5, 29, 6, 47, 55, 783000, tzinfo=tzutc()),
  'Owner': {'DisplayName': 'minio',
            'ID': '02d6176db174dc93cb1b899f7c6078f08654445fe8cf1b6ce98d8855f66bdbf4'},
  'Size': 12,
  'StorageClass': 'STANDARD'}]

今度は download_fileobj() 関数を使ってファイルをダウンロードしてみよう。

>>> f = io.BytesIO()
>>> s3_client.download_fileobj(Bucket='example-bucket', Key='greet.txt', Fileobj=f)

ちゃんと中身が確認できた。

>>> f.seek(0)
0
>>> f.read()
b'Hello, World\n'

いじょう。

iproute2 の ip-netns(8) を使わずに Network Namespace を操作する

今回は、iproute2 の ip-netns(8) を使わずに、Linux の Network Namespace を操作する方法について書いてみる。 目的は、namespaces(7) について、より深い理解を得ること。

使った環境は次のとおり。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=20.04
DISTRIB_CODENAME=focal
DISTRIB_DESCRIPTION="Ubuntu 20.04.2 LTS"
$ uname -r
5.4.0-1043-gcp

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ sudo apt-get update
$ sudo apt-get -y install iproute2 util-linux gcc

前提知識

Linux の namespaces(7) は、プロセスが利用するリソースを分離するための仕組み。 典型的には、Linux のコンテナ仮想化を実現するために用いられている。 今回はタイトルに Network Namespace と入れたものの、分離できるのは何も Network に限らない。

プロセスが利用している Namespace の情報は procfs から /proc/<pid>/ns で確認できる。 現在のプロセスであれば、自身の pid を確認するまでもなく /proc/self/ns を見れば良い。

$ ls -alF /proc/self/ns
total 0
dr-x--x--x 2 amedama amedama 0 May 21 12:41 ./
dr-xr-xr-x 9 amedama amedama 0 May 21 12:41 ../
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 cgroup -> 'cgroup:[4026531835]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 ipc -> 'ipc:[4026531839]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 mnt -> 'mnt:[4026531840]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 net -> 'net:[4026531992]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 pid -> 'pid:[4026531836]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 pid_for_children -> 'pid:[4026531836]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 user -> 'user:[4026531837]'
lrwxrwxrwx 1 amedama amedama 0 May 21 12:41 uts -> 'uts:[4026531838]'

これらのファイルの実体はシンボリックリンクで、参照先として表示されている謎の数字は inode 番号を示している。 つまり、Namespace は inode 番号が識別子になっている。 上記であれば、/proc/self/ns/net がプロセスが利用している Network Namespace の識別子を表している。

$ file /proc/self/ns/net
/proc/self/ns/net: symbolic link to net:[4026531992]
$ stat -L /proc/self/ns/net
  File: /proc/self/ns/net
  Size: 0          Blocks: 0          IO Block: 4096   regular empty file
Device: 4h/4d   Inode: 4026531992  Links: 1
Access: (0444/-r--r--r--)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2021-05-21 12:42:07.565311760 +0000
Modify: 2021-05-21 12:42:07.565311760 +0000
Change: 2021-05-21 12:42:07.565311760 +0000
 Birth: -

unshare(1) / nsenter(1) / mount(8) を使って操作する

さて、前提知識の確認が終わったところで、実際に ip-netns(8) を使わずに Network Namespace を操作してみよう。 まずは、ip-netns(8) 以外のコマンドラインツールで操作する方法を試す。

新しく namespaces(7) を作るコマンドとしては unshare(1) が使える。 --net オプションを指定すると、コマンドで新たに起動するプロセスが利用する Network Namespace を確保できる。 以下では新しい Network Namespace を使って bash(1) を起動している。

$ sudo unshare --net bash

起動したシェルで確認すると、たしかに /proc/<pid>/ns 以下のファイルの inode 番号が変わっていることが分かる。

# file /proc/self/ns/net
/proc/self/ns/net: symbolic link to net:[4026532254]

ip-link(8) を使ってみるデバイスの状況を確認すると、DOWN したループバックデバイスしか無いことが分かる。 どうやら、ちゃんと Network Namespace が新しく作られたようだ。

# ip link show
1: lo: <LOOPBACK> mtu 65536 qdisc noop state DOWN mode DEFAULT group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00

ただ、この状況で ip-netns(8) を使ってみても何も表示されない。 新しく Network Namespace ができたというのに、どうしてだろう。

# ip netns list

というのも、実は ip-netns(8) の list サブコマンドは、/var/run/netns 以下にあるファイルを見ているだけに過ぎない。 上記で何も表示されないということは、ここに何もファイルがないということ。

# ls /var/run/netns

たしかに何も表示されない。 そもそも、ip-netns(8) を使ったことがない環境であれば、ディレクトリすらできていないことだろう。

ここでおもむろに /var/run/netns 以下にファイルを作って、/proc/self/ns/net--bind オプションつきでマウントしてみよう。

# touch /var/run/netns/example
# mount --bind /proc/self/ns/net /var/run/netns/example

すると、ip-netns(8) の list サブコマンドに、作ったファイルと同じ内容が見られる。

# ip netns list
example

上記は、ちゃんと ip-netns(8) から使うことができる。 一旦、unshare(1) で作ったシェルのプロセスから抜けて、ip-netns(8) の exec サブコマンドを実行してみよう。

# exit
$ sudo ip netns exec example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 12:49 /proc/self/ns/net -> 'net:[4026532254]'

上記から、ちゃんと使えることがわかる。 というのも、これは実のところ ip-netns(8) が内部的にやっているのとほぼ同じことをやっているため。

先ほど /var/run/netns 以下に作ったファイルは nsenter(1) から利用することもできる。 このコマンドは既存の Namespace に切り替えるために用いる。 --net オプションにファイルを指定して、シェルを起動してみよう。

$ sudo nsenter --net=/var/run/netns/example bash

起動したシェルから確認すると、ちゃんと Namespace が切り替わっていることがわかる。

# ls -alF /proc/self/ns/net
lrwxrwxrwx 1 root root 0 May 21 12:53 /proc/self/ns/net -> 'net:[4026532254]'

ちなみに、ip-netns(8) から利用するときには mount(8) を使わなくてもシンボリックリンクを張るだけで代用できる。 次のように、$$ を使って自身の pid を置換しつつ、Namespace を表したファイルからシンボリックリンクを張ってみよう。

# ln -s /proc/$$/ns/net /var/run/netns/symlink

起動したシェルから抜けた上で確認すると、ちゃんと ip-netns(8) のリストに表示されると共に、使えることがわかる。

# exit
$ ip netns list
symlink
example
$ sudo ip netns exec example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 12:58 /proc/self/ns/net -> 'net:[4026532254]'

このテクニックは Docker や Mininet などが作る Network Namespace を ip-netns(8) から操作したいときにも有効。

unshare(2) / setns(2) / mount(2) を使って操作する

さて、ip-netns(8) 以外のコマンドラインツールから操作できることがわかったところで、続いてはシステムコールを使ってみる。 というか、先ほど使った一連のコマンドラインツールも、内部的にはこれらの API を叩いていた。

早速だけど、以下にサンプルコードを示す。 このサンプルコードでは、次のような処理をしている。

  • unshare(2) で Network Namespace を新しく作る
  • mount(2) で /proc/self/ns/net/var/run/netns 以下に syscall-example という名前でマウントする
  • /proc/self/ns/net の中身を表示する
#define _GNU_SOURCE
#include <sched.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mount.h>

int main(int argc, char *argv[]) {
    if (unshare(CLONE_NEWNET) < 0) {
        fprintf(stderr, "Failed to create a new network namespace: %s\n", strerror(errno));
        exit(-1);
    }

    const char *netns_path = "/var/run/netns/syscall-example";
    const int fd = open(netns_path, O_RDONLY | O_CREAT | O_EXCL, 0);
    if (fd < 0) {
        fprintf(stderr, "Cannot create namespace file \"%s\": %s\n",
            netns_path, strerror(errno));
        return EXIT_FAILURE;
    }
    close(fd);

    const char *proc_path = "/proc/self/ns/net";
    if (mount(proc_path, netns_path, "none", MS_BIND, NULL) < 0) {
        fprintf(stderr, "Failed to bind %s -> %s: %s\n",
            proc_path, netns_path, strerror(errno));
    }

    const char *cmd = "file";
    char* const args[] = {"file", "/proc/self/ns/net", NULL};
    if (execvp(cmd, args) < 0) {
        fprintf(stderr, "Failed to exec \"%s\": %s\n", cmd, strerror(errno));
        exit(-1);
    }
    return EXIT_SUCCESS;
}

上記に nsadd.c という名前をつけてビルドする。

$ gcc -o nsadd.o nsadd.c

実行すると、/proc/self/ns/net が新しい識別子になっていることがわかる。

$ sudo ./nsadd.o 
/proc/self/ns/net: symbolic link to net:[4026532315]

ip-netns(8) からも、ちゃんと使える。

$ ip netns list
syscall-example
symlink
example
$ sudo ip netns exec syscall-example bash -c 'ls -alF /proc/self/ns/net'
lrwxrwxrwx 1 root root 0 May 21 13:10 /proc/self/ns/net -> 'net:[4026532315]'

続いては、上記で作った Network Namespace を示すファイルを利用するサンプルコード。 次のような処理をしている。

  • /var/run/netns 以下のファイルを open(2) で開く
  • 上記で得られたファイルディスクリプタを setns(2) に渡して Namespace を切り替える
  • /proc/self/ns/net の中身を表示する
#define _GNU_SOURCE
#include <sched.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

int main(int argc, char *argv[]) {
    const char *mounted_path = "/var/run/netns/syscall-example";
    const int fd = open(mounted_path, O_RDONLY | O_CLOEXEC);
    if (fd < 0) {
        fprintf(stderr, "Cannot open mounted path\"%s\": %s\n",
            mounted_path, strerror(errno));
        return EXIT_FAILURE;
    }

    if (setns(fd, CLONE_NEWNET) < 0) {
        fprintf(stderr, "failed to setup the network namespace \"%s\": %s\n",
            mounted_path, strerror(errno));
        close(fd);
        return EXIT_FAILURE;
    }

    const char *cmd = "file";
    char* const args[] = {"file", "/proc/self/ns/net", NULL};
    if (execvp(cmd, args) < 0) {
        fprintf(stderr, "Failed to exec \"%s\": %s\n", cmd, strerror(errno));
        exit(-1);
    }
    return EXIT_SUCCESS;
}

上記に nsexec.c という名前をつけてビルドする。

$ gcc -o nsexec.o nsexec.c 

実行すると、ちゃんと Network Namespace が切り替わっていることがわかる。

$ sudo ./nsexec.o 
/proc/self/ns/net: symbolic link to net:[4026532315]

いじょう。

参考

git.kernel.org

Python: Streamlit を使って手早く WebUI 付きのプロトタイプを作る

Streamlit は、ざっくり言うと主にデータサイエンス領域において WebUI 付きのアプリケーションを手早く作るためのソフトウェア。 使い所としては、ひとまず動くものを見せたかったり、少人数で試しに使うレベルのプロトタイプを作るフェーズに適していると思う。 たとえば、Jupyter で提供すると複数人で使うのに難があるし、かといって Flask や Django を使って真面目に作るほどではない、くらいのとき。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.3.1
BuildVersion:   20E241
$ python -V
Python 3.8.9

もくじ

下準備

まずは必要なパッケージをインストールしておく。 本当に必要なのは streamlit のみ。 watchdog はパフォーマンスのために入れる。 matplotlib についてはグラフを可視化するときに使うため入れておく。 click はスクリプトに引数を渡すサンプルのため。

$ pip install streamlit watchdog matplotlib click

インストールすると streamlit コマンドが使えるようになる。

$ streamlit version
Streamlit, version 0.81.0

必要に応じて Streamlit の設定ファイルを用意する。 以下は、初回の実行時に確認される e-mail アドレスのスキップと、利用に関する統計情報を送信しない場合の設定。 なお、これは別にやらなくても初回の実行時に案内が出る。

$ mkdir -p ~/.streamlit
$ cat << 'EOF' > ~/.streamlit/credentials.toml 
[general]
email = ""
EOF
$ cat << 'EOF' > ~/.streamlit/config.toml
[browser]
    gatherUsageStats = false
EOF

基本的な使い方

まずはもっとも基本的な使い方から見ていく。 以下は streamlit.write() 関数を使って任意のオブジェクトを WebUI に表示するサンプルコード。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # Streamlit が対応している任意のオブジェクトを可視化する (ここでは文字列)
    st.write('Hello, World!')


if __name__ == '__main__':
    main()

上記を適当な名前で保存したら streamlit run サブコマンドで指定して実行する。

$ streamlit run example.py

すると、デフォルトでは 8501 ポートで Streamlit のアプリケーションサーバが起動する。 ブラウザで開いて結果を確認しよう。

$ open http://localhost:8501

すると、次のように「Hello, World!」という表示のある Web ページが確認できる。

f:id:momijiame:20210505001417p:plain

やっていることは静的な文字列を表示しているだけとはいえ、Pure Python なスクリプトをちょっと書くだけで Web ページが表示できた。

なお、Streamlit はデフォルトだと実行するホストの全 IP アドレスを Listen するので注意しよう。 ループバックアドレスだけに絞りたいときは以下のようにする。

$ streamlit run --server.address localhost example.py

ちなみに先ほど使った streamlit.write() 関数は色々なオブジェクトを可視化するのに使うことができる。 現時点で対応しているものをざっと書き出してみると次のとおり。

  • サードパーティー製パッケージ関連
    • Pandas の DataFrame オブジェクト
    • Keras の Model オブジェクト
    • SymPy の表現式 (LaTeX)
    • グラフ描画系
      • Matplotlib
      • Altair
      • Vega Lite
      • Plotly
      • Bokeh
      • PyDeck
      • Graphviz
  • 標準的な Python のオブジェクト
    • 例外オブジェクト
    • 関数オブジェクト
    • モジュールオブジェクト
    • 辞書オブジェクト

その他、任意のオブジェクトは str() 関数に渡したのと等価な結果が得られる。

基本的な書式

続いて、Streamlit に備わっている基本的な書式をいくつか試してみる。 アプリケーションのタイトルやヘッダ、マークダウンテキストや数式など。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # タイトル
    st.title('Application title')
    # ヘッダ
    st.header('Header')
    # 純粋なテキスト
    st.text('Some text')
    # サブレベルヘッダ
    st.subheader('Sub header')
    # マークダウンテキスト
    st.markdown('**Markdown is available **')
    # LaTeX テキスト
    st.latex(r'\bar{X} = \frac{1}{N} \sum_{n=1}^{N} x_i')
    # コードスニペット
    st.code('print(\'Hello, World!\')')
    # エラーメッセージ
    st.error('Error message')
    # 警告メッセージ
    st.warning('Warning message')
    # 情報メッセージ
    st.info('Information message')
    # 成功メッセージ
    st.success('Success message')
    # 例外の出力
    st.exception(Exception('Oops!'))
    # 辞書の出力
    d = {
        'foo': 'bar',
        'users': [
            'alice',
            'bob',
        ],
    }
    st.json(d)


if __name__ == '__main__':
    main()

先ほどの Python ファイルに上書きすると、Streamlit はファイルの変更を検知して自動的に読み込み直してくれる。 アプリケーションを表示しているブラウザはリロードするか、変更が生じた際に自動で読み込むか問うボタンが右上に出てくる。

f:id:momijiame:20210506011250p:plain

プレースホルダー

続いて扱うのはプレースホルダーという機能。 かなり地味なので、この時点で紹介する点に違和感があるかもしれない。 とはいえ、地味なりに多用する機能なので先に説明しておく。

プレースホルダーは、任意のオブジェクトを表示するための入れ物みたいなオブジェクト。 言葉よりも実際に使った方が分かりやすいと思うので以下にサンプルを示す。 プレースホルダーを用意して、後からそこにオブジェクトを書き込む、みたいな使い方をする。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # プレースホルダーを用意する
    placeholder1 = st.empty()
    # プレースホルダーに文字列を書き込む
    placeholder1.write('Hello, World')

    placeholder2 = st.empty()
    # コンテキストマネージャとして使えば出力先をプレースホルダーにできる
    with placeholder2:
        # 複数回書き込むと上書きされる
        st.write(1)
        st.write(2)
        st.write(3)  # この場合は最後に書き込んだものだけ見える


if __name__ == '__main__':
    main()

上記を実行した結果は次のとおり。 プレースホルダーの内容は上書きされるので、特に何もしなければ最後に書きこまれた内容が見える。

f:id:momijiame:20210506013310p:plain

プレースホルダーを応用するとアニメーション的なこともできる。 以下のサンプルコードではスリープを挟みながらプレースホルダーの内容を書きかえることで動きのあるページを作っている。

# -*- coding: utf-8 -*-

import time

import streamlit as st


def main():
    status_area = st.empty()

    # カウントダウン
    count_down_sec = 5
    for i in range(count_down_sec):
        # プレースホルダーに残り秒数を書き込む
        status_area.write(f'{count_down_sec - i} sec left')
        # スリープ処理を入れる
        time.sleep(1)

    # 完了したときの表示
    status_area.write('Done!')
    # 風船飛ばす
    st.balloons()


if __name__ == '__main__':
    main()

上記を実行すると秒数のカウントダウンが確認できる。

f:id:momijiame:20210506013737p:plain

プログレスバーを使った処理の進捗の可視化

ちなみに先ほどのようなカウントダウンをするような処理だとプログレスバーを使うこともできる。 以下のサンプルコードでは 0.1 秒ごとにプログレスバーの数値を増やしていくページができる。

# -*- coding: utf-8 -*-

import time

import streamlit as st


def main():
    status_text = st.empty()
    # プログレスバー
    progress_bar = st.progress(0)

    for i in range(100):
        status_text.text(f'Progress: {i}%')
        # for ループ内でプログレスバーの状態を更新する
        progress_bar.progress(i + 1)
        time.sleep(0.1)

    status_text.text('Done!')
    st.balloons()


if __name__ == '__main__':
    main()

上記を実行すると、以下のようにプログレスバーが表示される。

f:id:momijiame:20210506014133p:plain

基本的な可視化

ここまでの内容だと、面白いけど何が便利なのかイマイチよく分からないと思う。 そこで、ここからはもう少し実用的な話に入っていく。 具体的には、いくつかグラフなどを可視化する方法について見ていこう。

組み込みのグラフ描画機能

Streamlit には組み込みのグラフ描画機能がある。 この機能を使うと NumPy の配列や Pandas のデータフレームなどをサクッとグラフにできる。 以下のサンプルコードでは折れ線グラフ、エリアチャート、バーチャートの 3 種類を試している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # ランダムな値でデータフレームを初期化する
    data = {
        'x': np.random.random(20),
        'y': np.random.random(20) - 0.5,
        'z': np.random.random(20) - 1.0,
    }
    df = pd.DataFrame(data)
    # 折れ線グラフ
    st.subheader('Line Chart')
    st.line_chart(df)
    # エリアチャート
    st.subheader('Area Chart')
    st.area_chart(df)
    # バーチャート
    st.subheader('Bar Chart')
    st.bar_chart(df)


if __name__ == '__main__':
    main()

上記からは次のようなグラフが得られる。

f:id:momijiame:20210506012655p:plain

グラフにデータを動的に追加することもできる。 これにはグラフを描画する関数を実行して得られるオブジェクトに add_rows() メソッドを使えば良い。 以下のサンプルコードでは、折れ線グラフに 0.5 秒間隔で 10 回までデータを追加している。

# -*- coding: utf-8 -*-

import time

import streamlit as st
import numpy as np


def main():
    # 折れ線グラフ (初期状態)
    x = np.random.random(size=(10, 2))
    line_chart = st.line_chart(x)

    for i in range(10):
        # 折れ線グラフに 0.5 秒間隔で 10 回データを追加する
        additional_data = np.random.random(size=(5, 2))
        line_chart.add_rows(additional_data)
        time.sleep(0.5)


if __name__ == '__main__':
    main()

上記を確認すると、0.5 秒間隔でグラフにデータが追加されていく様子が確認できる。 こういったアニメーション効果を手軽に導入できるのは Streamlit の強みだと思う。

f:id:momijiame:20210506012934p:plain

ちなみに気づいたかもしれないけどブラウザをリロードするごとにプロットされる結果は変わる。 これは Streamlit がページを表示するときに、スクリプトを上から順に実行するように処理しているため。 つまり、ブラウザをリロードする毎にスクリプトのコードが評価され直しているように考えれば良い。

Matplotlib

続いては Matplotlib のグラフを描画してみよう。 Streamlit では Matplotlib の Figure オブジェクトを書き出すことでグラフを描画できる。 以下のサンプルコードではランダムに生成した値をヒストグラムにプロットしている。

# -*- coding: utf-8 -*-

import streamlit as st
import numpy as np
from matplotlib import pyplot as plt


def main():
    # 描画領域を用意する
    fig = plt.figure()
    ax = fig.add_subplot()
    # ランダムな値をヒストグラムとしてプロットする
    x = np.random.normal(loc=.0, scale=1., size=(100,))
    ax.hist(x, bins=20)
    # Matplotlib の Figure を指定して可視化する
    st.pyplot(fig)


if __name__ == '__main__':
    main()

上記からは次のような画面が得られる。

f:id:momijiame:20210509171628p:plain

先ほどと同じように、データを更新しながらグラフを描画し直すサンプルも書いてみる。 以下のサンプルコードでは、プレースホルダを使って描画されるグラフの内容を更新している。

# -*- coding: utf-8 -*-

import time

import streamlit as st
import numpy as np
from matplotlib import pyplot as plt


def main():
    # グラフを書き出すためのプレースホルダを用意する
    plot_area = st.empty()
    fig = plt.figure()
    ax = fig.add_subplot()
    x = np.random.normal(loc=.0, scale=1., size=(100,))
    ax.plot(x)
    # プレースホルダにグラフを書き込む
    plot_area.pyplot(fig)

    # 折れ線グラフに 0.5 秒間隔で 10 回データを追加する
    for i in range(10):
        # グラフを消去する
        ax.clear()
        # データを追加する
        additional_data = np.random.normal(loc=.0, scale=1., size=(10,))
        x = np.concatenate([x, additional_data])
        # グラフを描画し直す
        ax.plot(x)
        # プレースホルダに書き出す
        plot_area.pyplot(fig)
        time.sleep(0.5)


if __name__ == '__main__':
    main()

上記を実行すると、一定間隔でデータが追加されながらグラフの描画も更新される。

f:id:momijiame:20210509171822p:plain

Pandas

グラフではないけど Pandas のデータフレームを Jupyter で可視化するときと同じように表示できる。 データフレームを出力するときは streamlit.dataframe()streamlit.table() という 2 種類の関数がある。 前者は行や列の要素が多いときにスクロールバーを使って表示する一方で、後者はすべてをいっぺんに表示する。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # Pandas のデータフレームを可視化してみる
    data = {
        # ランダムな値で初期化する
        'x': np.random.random(20),
        'y': np.random.random(20),
    }
    df = pd.DataFrame(data)
    # データフレームを書き出す
    st.dataframe(df)
    # st.write(df)  でも良い
    # スクロールバーを使わず一度に表示したいとき
    st.table(df)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172138p:plain

画像

画像を表示するときは streamlit.image() 関数を使う。 以下のサンプルコードではランダムに生成した NumPy 配列を、カラー画像として可視化している。

# -*- coding: utf-8 -*-

import streamlit as st
import numpy as np


def main():
    x = np.random.random(size=(400, 400, 3))
    # NumPy 配列をカラー画像として可視化する
    st.image(x)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172356p:plain

地図

地図上にプロットすることもできる。 地図に散布図を描きたいときは streamlit.map() 関数を使えば良い。 以下のサンプルコードでは、東京を中心とした地図にランダムな点をプロットしている。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # 東京のランダムな経度・緯度を生成する
    data = {
        'lat': np.random.randn(100) / 100 + 35.68,
        'lon': np.random.randn(100) / 100 + 139.75,
    }
    map_data = pd.DataFrame(data)
    # 地図に散布図を描く
    st.map(map_data)


if __name__ == '__main__':
    main()

上記からは以下のような表示が得られる。

f:id:momijiame:20210509172513p:plain

Streamlit がサポートしている可視化の機能は他にも色々とあるけど、とりあえず一旦はここまでで切り上げる。

キャッシュ機構

ここまでのサンプルコードは、ブラウザをリロードすると表示される内容が変わるものが多かった。 それはスクリプトの内容が毎回、評価し直されているのと同じ状態のため。 ただ、それだと困る場面も多い。 たとえば、時間のかかる処理が毎回評価され直すと、パフォーマンスに深刻な影響がある。 そんなときは Streamlit のキャッシュ機構を使うと良い。

キャッシュ機構を使うには streamlit.cache デコレータを使えば良い。 以下のサンプルコードでは、cached_data() 関数をデコレータで修飾している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


# 関数の出力をキャッシュする
@st.cache
def cached_data():
    data = {
        'x': np.random.random(20),
        'y': np.random.random(20),
    }
    df = pd.DataFrame(data)
    return df


def main():
    # リロードしても同じ結果が得られる
    df = cached_data()
    st.dataframe(df)


if __name__ == '__main__':
    main()

上記はオンメモリで結果がキャッシュされるため、ブラウザをリロードしても表示が変わることがない。 その他、キャッシュ機構の詳しい解説は以下のドキュメントに記載されている。

docs.streamlit.io

ウィジェット

ここまでのサンプルには、ユーザからの入力を受け付けるものがなかった。 ここからは、ウィジェットを使ってインタラクティブなページを作る方法について書く。

ボタン

まずは最も基本的なウィジェットとしてボタンを扱う。 このボタン、Streamlit のウィジェットの考え方が、他の UI フレームワークと違うことがよく分かって面白い。

ボタンは streamlit.button() 関数を使って配置できる。 以下のサンプルコードは、ボタンを押すことで表示される内容が変わるものとなっている。 興味深いのは、ボタンにイベントハンドラなどの類が一切設定されていないこと。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # データフレームを書き出す
    data = np.random.randn(20, 3)
    df = pd.DataFrame(data, columns=['x', 'y', 'z'])
    st.dataframe(df)
    # リロードボタン
    st.button('Reload')


if __name__ == '__main__':
    main()

上記を実行すると以下のような表示が得られる。 実際、ボタンを押すと表示内容が変わるはず。

f:id:momijiame:20210509173746p:plain

ポイントは、Streamlit は毎回スクリプトを評価し直すように動作するところ。 つまり、ウィジェットで何らかのイベントが起こったら、Streamlit はページの内容を丸ごと評価し直すと考えれば良い。 上記のサンプルコードは、ボタンが押されるイベントによって、表示が丸ごと変わったわけだ。

ウィジェットは、一番最後の試行 (評価) のときに、ウィジェットがどのような状態になったかを返す場合がある。 ボタンも同様で、最後の試行でボタンが押されたか・押されていないかを真偽値 (bool) で返す。

ウィジェットの特性を利用すると、ウィジェットを設置する関数から返ってくる値を使ってインタラクティブな画面が作れる。 以下のサンプルコードでは、2 つのボタンを設置して、押されたボタンに対応するメッセージを表示している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    if st.button('Top button'):
        # 最後の試行で上のボタンがクリックされた
        st.write('Clicked')
    else:
        # クリックされなかった
        st.write('Not clicked')

    if st.button('Bottom button'):
        # 最後の試行で下のボタンがクリックされた
        st.write('Clicked')
    else:
        # クリックされなかった
        st.write('Not clicked')


if __name__ == '__main__':
    main()

上記を実行すると、以下のような表示が得られる。 ボタンを押すと、表示が更新されて、押されたボタンに対応するメッセージが表示されるはず。

f:id:momijiame:20210509174659p:plain

チェックボックス

チェックボックスは、最後の試行でチェックされたか・されなかったかを元に処理を分岐できる。 以下のサンプルコードでは、チェックされたときだけデータフレームを表示している。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # チェックボックスにチェックが入っているかで処理を分岐する
    if st.checkbox('Show'):
        # チェックが入っているときはデータフレームを書き出す
        data = np.random.randn(20, 3)
        df = pd.DataFrame(data, columns=['x', 'y', 'z'])
        st.dataframe(df)


if __name__ == '__main__':
    main()

上記を実行すると、以下のような表示が得られる。 チェックボックスをチェックしたときだけデータフレームが表示される。

f:id:momijiame:20210509175353p:plain

ラジオボタン

同様に、最後の試行でチェックされたアイテムを元に処理をできるラジオボタン。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_item = st.radio('Which do you like?',
                             ['Dog', 'Cat'])
    if selected_item == 'Dog':
        st.write('Wan wan')
    else:
        st.write('Nya- nya-')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511182308p:plain

セレクトボックス

できることは基本的にラジオボタンと変わらないセレクトボックス。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_item = st.selectbox('Which do you like?',
                                 ['Dog', 'Cat'])
    st.write(f'Selected: {selected_item}')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511182423p:plain

単一のアイテムを選択するセレクトボックスの他に、複数のアイテムを選択できるマルチセレクトもある。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    selected_items = st.multiselect('What are your favorite characters?',
                                    ['Miho Nishizumi',
                                     'Saori Takebe',
                                     'Hana Isuzu',
                                     'Yukari Akiyama',
                                     'Mako Reizen',
                                     ])
    st.write(f'Selected: {selected_items}')


if __name__ == '__main__':
    main()

上記から得られる表示は以下のとおり。

f:id:momijiame:20210511182536p:plain

スライダー

スライダーは特定の範囲の中から値を選択するのに使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    age = st.slider(label='Your age',
                    min_value=0,
                    max_value=130,
                    value=30,
                    )
    st.write(f'Selected: {age}')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182710p:plain

デフォルトの値にタプルなどで 2 つの要素を指定すると、レンジを入力できるようになる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    min_value, max_value = st.slider(label='Range selected',
                                     min_value=0,
                                     max_value=100,
                                     value=(40, 60),
                                     )
    st.write(f'Selected: {min_value} ~ {max_value}')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182804p:plain

ちなみに整数以外にも日付とかを指定するのにも使える。 ただ、そんなに使いやすいとは思えない。 日付とか時間は後述する専用のウィジェットを使った方が良いと思う。

# -*- coding: utf-8 -*-

from datetime import date

import streamlit as st


def main():
    birthday = st.slider('When is your birthday?',
                         min_value=date(1900, 1, 1),
                         max_value=date.today(),
                         value=date(2000, 1, 1),
                         format='YYYY-MM-DD',
                         )
    st.write('Birthday: ', birthday)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511182923p:plain

Date / Time インプット

日付や時間を扱う専用のウィジェットが続いて紹介する Date / Time インプット。

まずは Date インプットから。

# -*- coding: utf-8 -*-

from datetime import date

import streamlit as st


def main():
    birthday = st.date_input('When is your birthday?',
                             min_value=date(1900, 1, 1),
                             max_value=date.today(),
                             value=date(2000, 1, 1),
                             )
    st.write('Birthday: ', birthday)


if __name__ == '__main__':
    main()

ウィジェットをクリックするとカレンダーで日付を指定できるので使いやすい。

f:id:momijiame:20210511183139p:plain

Time インプットは一日の中の時間を指定できる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    time = st.time_input(label='Your input:')
    st.write('input: ', time)


if __name__ == '__main__':
    main()

こちらもウィジェットをクリックすると時間のセレクタが表示されて使いやすい。

f:id:momijiame:20210511183244p:plain

文字列入力

一行の文字列の入力にはテキストインプットが使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    text = st.text_input(label='Message', value='Hello, World!')
    st.write('input: ', text)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183447p:plain

同様に、複数行に渡る文字列を入力するときはテキストエリアを用いる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    text = st.text_area(label='Multi-line message', value='Hello, World!')
    st.write('input: ', text)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183536p:plain

数字入力

数字を入力するときはナンバーインプットを使う。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    n = st.number_input(label='What is your favorite number?',
                        value=42,
                        )
    st.write('input: ', n)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183634p:plain

デフォルト値を浮動小数点型にすれば、小数を入力できる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    n = st.number_input(label='What is your favorite number?',
                        value=3.14,
                        )
    st.write('input: ', n)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511183726p:plain

ファイルアップローダ

ファイルアップローダを使うと、クライアントのファイルをアプリケーションに渡すことができる。 以下のサンプルコードでは、渡されたファイルに含まれるテキストを UTF-8 として表示している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    f = st.file_uploader(label='Upload file:')
    st.write('input: ', f)

    if f is not None:
        # XXX: 信頼できないファイルは安易に評価しないこと
        data = f.getvalue()
        text = data.decode('utf-8')
        st.write('contents: ', text)


if __name__ == '__main__':
    main()

適当なテキストファイルを使って動作確認してみよう。

$ echo "Hello, World" > ~/Downloads/greet.txt

ウィジェットをクリックしてファイルを選択すると、以下のように中身が表示される。

f:id:momijiame:20210511183950p:plain

受け取れるオブジェクトは streamlit.UploadedFile という、オープン済みのファイルライクオブジェクトになる。

カラーピッカー

ちょっと変わり種だけどカラーピッカーも用意されている。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    c = st.color_picker(label='Select color:')
    st.write('input: ', c)


if __name__ == '__main__':
    main()

f:id:momijiame:20210511184147p:plain

フロー制御

ウィジェットが色々とあると、ユーザの入力のバリデーションも考えることになる。 ここではフロー制御をするための機能を紹介する。

特定の条件に満たないときに処理を停止するサンプルコードを以下に示す。 このサンプルではテキストインプットに何か文字列が入っていないときに警告メッセージを出して処理を停止している。 処理の停止には streamlit.stop() 関数を使う。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    name = st.text_input(label='your name:')

    # バリデーション処理
    if len(name) < 1:
        st.warning('Please input your name')
        # 条件を満たないときは処理を停止する
        st.stop()

    st.write('Hello,', name, '!')


if __name__ == '__main__':
    main()

テキストインプットに何も入力されていない状態では、以下のように警告メッセージだけが表示されることになる。

f:id:momijiame:20210511184433p:plain

テキストインプットに文字列を入力すると、警告メッセージが消えて正常系の表示に切り替わる。

f:id:momijiame:20210511184601p:plain

レイアウトを調整する

ここからは画面のレイアウトを調整するための機能を見ていく。

カラム

はじめに紹介するのはカラム。 これは、ようするに画面を縦方向に分割して異なる内容を表示できるもの。

カラムを作るには streamlit.beta_columns() 関数を使う。 以下のサンプルコードでは画面を 3 列に分割している。 関数の返り値をコンテキストマネージャとして使うとデフォルトの出力先として使うこともできるし、オブジェクトに直接書き込むこともできる。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # カラムを追加する
    col1, col2, col3 = st.beta_columns(3)

    # コンテキストマネージャとして使う
    with col1:
        st.header('col1')

    with col2:
        st.header('col2')

    with col3:
        st.header('col3')

    # カラムに直接書き込むこともできる
    col1.write('This is column 1')
    col2.write('This is column 2')
    col3.write('This is column 3')


if __name__ == '__main__':
    main()

上記を実行して得られる表示は以下のとおり。

f:id:momijiame:20210511184758p:plain

コンテナ

続いて扱うのはコンテナ。 これは、不可視な仕切りみたいなもの。

以下のサンプルコードではコンテナの内と外にオブジェクトを書き込んで、結果を確認している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    # コンテナを追加する
    container = st.beta_container()

    # コンテキストマネージャとして使うことで出力先になる
    with container:
        st.write('This is inside the container')
    # これはコンテナの外への書き込み
    st.write('This is outside the container')

    # コンテナに直接書き込むこともできる
    container = st.beta_container()
    container.write('1')
    st.write('2')
    # 出力順は後だがレイアウト的にはこちらが先に現れる
    container.write('3')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511185155p:plain

入れ子にすることもできて、たとえば以下のサンプルコードではプレースホルダにコンテナを追加して、さらにそこにカラムを追加している。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    placeholder = st.empty()
    # プレースホルダにコンテナを追加する
    container = placeholder.beta_container()
    # コンテナにカラムを追加する
    col1, col2 = container.beta_columns(2)
    # それぞれのカラムに書き込む
    with col1:
        st.write('Hello, World')
    with col2:
        st.write('Konnichiwa, Sekai')


if __name__ == '__main__':
    main()

f:id:momijiame:20210511185307p:plain

エキスパンダ

デフォルトでは折りたたまれて非表示な領域を作るのにエキスパンダが使える。

# -*- coding: utf-8 -*-

import streamlit as st


def main():
    with st.beta_expander('See details'):
        st.write('Hidden item')


if __name__ == '__main__':
    main()

上記を実行して、以下はエキスパンダを展開した状態。

f:id:momijiame:20210511185400p:plain

サイドバー

ウィジェットやオブジェクトの表示をサイドバーに配置することもできる。 使い方は単純で、サイドバーに置きたいなと思ったら sidebar をつけて API を呼び出す。

以下のサンプルコードでは、サイドバーにボタンを配置している。 前述したとおり、streamlit.button()streamlit.sidebar.button() に変えるだけ。 同様に、streamlit.sidebar.dataframe() のように間に sidebar をはさむことで大体の要素はサイドバーに置ける。

# -*- coding: utf-8 -*-

import streamlit as st
import pandas as pd
import numpy as np


def main():
    # サイドバーにリロードボタンをつける
    st.sidebar.button('Reload')
    # サイドバーにデータフレームを書き込む
    data = np.random.randn(20, 3)
    df = pd.DataFrame(data, columns=['x', 'y', 'z'])
    st.sidebar.dataframe(df)


if __name__ == '__main__':
    main()

上記を実行すると、以下のようにサイドバーに要素が設置されることが確認できる。

f:id:momijiame:20210512222116p:plain

オブジェクトの docstring を表示する

Streamlit はスクリプトの変更を検出して自動でリロードしてくれるため、基本的には WebUI を見ながら開発していくことになる。 そんなとき、この関数またはメソッドの使い方なんだっけ?みたいな場面では streamlit.help() を使うと良い。 オブジェクトの docstring を表示してくれる。

# -*- coding: utf-8 -*-

import pandas as pd

import streamlit as st


def main():
    st.help(pd.DataFrame)


if __name__ == '__main__':
    main()

まあ自動補完とかドキュメント表示をサポートしてる IDE なんかで開発するときは、あんまり使わないかもしれないけど。

f:id:momijiame:20210512222544p:plain

単一のスクリプトで複数のアプリケーションを扱う

Streamlit は基本的に複数のページから成るアプリケーションを作ることができない。 では、複数のアプリケーションを単一のスクリプトで扱うことができないか、というとそうではない。 これは、ウィジェットの状態に応じて表示するアプリケーションを切り替えてやることで実現できる。

以下のサンプルコードでは、セレクトボックスの状態に応じて実行する関数を切り替えている。 それぞれの関数が、それぞれのアプリケーションになっていると考えてもらえれば良い。

# -*- coding: utf-8 -*-

import streamlit as st


def render_gup():
    """GuP のアプリケーションを処理する関数"""
    character_and_quotes = {
        'Miho Nishizumi': 'パンツァーフォー',
        'Saori Takebe': 'やだもー',
        'Hana Isuzu': '私この試合絶対勝ちたいです',
        'Yukari Akiyama': '最高だぜ!',
        'Mako Reizen': '以上だ',
    }
    selected_items = st.multiselect('What are your favorite characters?',
                                    list(character_and_quotes.keys()))
    for selected_item in selected_items:
        st.write(character_and_quotes[selected_item])


def render_aim_for_the_top():
    """トップ!のアプリケーションを処理する関数"""
    selected_item = st.selectbox('Which do you like more in the series?',
                                 [1, 2])
    if selected_item == 1:
        st.write('me too!')
    else:
        st.write('2 mo ii yo ne =)')


def main():
    # アプリケーション名と対応する関数のマッピング
    apps = {
        '-': None,
        'GIRLS und PANZER': render_gup,
        'Aim for the Top! GunBuster': render_aim_for_the_top,
    }
    selected_app_name = st.sidebar.selectbox(label='apps',
                                             options=list(apps.keys()))

    if selected_app_name == '-':
        st.info('Please select the app')
        st.stop()

    # 選択されたアプリケーションを処理する関数を呼び出す
    render_func = apps[selected_app_name]
    render_func()


if __name__ == '__main__':
    main()

上記を実行して得られる表示を以下に示す。

f:id:momijiame:20210512223221p:plain

f:id:momijiame:20210512223230p:plain

f:id:momijiame:20210512223239p:plain

ちなみに、呼び出す関数も 1 つのスクリプトにまとまっている必要はない。 別のモジュールに切り出して、スクリプトではそれをインポートして使うこともできる。 それならコードの見通しもさほど悪くはならないはず。

スクリプトでコマンドライン引数を受け取る

Streamlit のスクリプトにコマンドライン引数を渡したいときもある。 ここでは、そのやり方を紹介する。

Argparse

まずは Python の標準ライブラリにある Argparse を使う場合。 スクリプトを書く時点では特に Streamlit かどうかを意識する必要はない。 一般的な使い方と同じように引数を設定してパースして使うだけ。

# -*- coding: utf-8 -*-

import argparse

import streamlit as st


def main():
    parser = argparse.ArgumentParser(description='parse argument example')
    # --message または -m オプションで文字列を受け取る
    parser.add_argument('--message', '-m', type=str, default='World')
    # 引数をパースする
    args = parser.parse_args()
    # パースした引数を表示する
    st.write(f'Hello, {args.message}!')


if __name__ == '__main__':
    main()

ただ、使う時点ではちょっと注意点がある。 スクリプトの後ろにオプションをつけると Streamlit の引数として認識されてしまう。

$ streamlit run example.py -m Sekai
Usage: streamlit run [OPTIONS] TARGET [ARGS]...
Try 'streamlit run --help' for help.

Error: no such option: -m

そこで -- を使って区切って、スクリプトに対する引数であることを明示的に示す。

$ streamlit run example.py -- -m Sekai

Click

続いてサードパーティ製のパッケージである Click を使う場合。 Click は純粋なコマンドラインパーサ以外の機能もあることから、スクリプトを記述する時点から注意点がある。 具体的には、デコレータで修飾したオブジェクトを呼び出すときに standalone_modeFalse に指定する。 こうすると、デフォルトでは実行が完了したときに exit() してしまう振る舞いを抑制できる。

# -*- coding: utf-8 -*-

import streamlit as st
import click


@click.command()
@click.option('--message', '-m', type=str, default='World')
def main(message):
    # パースした引数を表示する
    st.write(f'Hello, {message}!')


if __name__ == '__main__':
    # click.BaseCommand.main() メソッドが呼ばれる
    # デフォルトの動作では返り値を戻さずに exit してしまう
    # スタンドアロンモードを無効にすることで純粋なコマンドラインパーサとして動作する
    main(standalone_mode=False)

実行するときに Streamlit のオプションとの間に -- で区切りが必要なのは Argparse のときと同じ。

$ streamlit run example.py -- -m Sekai

参考

docs.streamlit.io

click.palletsprojects.com